diff --git a/google/cloud/bigtable/internal/channel_usage.h b/google/cloud/bigtable/internal/channel_usage.h index 89598174379dd..2a2e6b583c4bc 100644 --- a/google/cloud/bigtable/internal/channel_usage.h +++ b/google/cloud/bigtable/internal/channel_usage.h @@ -18,8 +18,6 @@ #include "google/cloud/internal/clock.h" #include "google/cloud/status_or.h" #include "google/cloud/version.h" -#include -#include #include #include #include @@ -35,67 +33,15 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN template class ChannelUsage : public std::enable_shared_from_this> { public: - using Clock = ::google::cloud::internal::SteadyClock; ChannelUsage() = default; - explicit ChannelUsage(std::shared_ptr stub, std::shared_ptr clock = - std::make_shared()) - : stub_(std::move(stub)), clock_(std::move(clock)) {} + explicit ChannelUsage(std::shared_ptr stub) : stub_(std::move(stub)) {} // This constructor is only used in testing. - ChannelUsage(std::shared_ptr stub, std::shared_ptr clock, - int initial_outstanding_rpcs) + ChannelUsage(std::shared_ptr stub, int initial_outstanding_rpcs, + Status last_refresh_status = {}) : stub_(std::move(stub)), - clock_(std::move(clock)), - outstanding_rpcs_(initial_outstanding_rpcs) {} - - // Computes the weighted average of outstanding RPCs on the channel over the - // past 60 seconds. - StatusOr average_outstanding_rpcs() { - auto constexpr kWindowSeconds = 60; - auto constexpr kWindowDuration = std::chrono::seconds(kWindowSeconds); - std::scoped_lock lk(mu_); - if (!last_refresh_status_.ok()) return last_refresh_status_; - // If there are no measurements then the stub has never been used. In real - // use this will be 0. In testing we sometimes set an initial value. - if (measurements_.empty()) return outstanding_rpcs_; - auto now = clock_->Now(); - auto last_time = now; - auto window_start = now - kWindowDuration; - - double sum = 0.0; - double total_weight = 0.0; - auto iter = measurements_.rbegin(); - while (iter != measurements_.rend() && iter->timestamp >= window_start) { - double weight = - std::chrono::duration(last_time - iter->timestamp).count(); - last_time = iter->timestamp; - sum += iter->outstanding_rpcs * weight; - total_weight += weight; - ++iter; - } - - // It's unlikely we will have a measurement at precisely the beginning of - // the window. So, we need to use the first measurement outside the window - // to compute a measurement for the missing part of the window using a - // weight equal to the missing time. - if (iter != measurements_.rend()) { - double weight = std::max(0.0, kWindowSeconds - total_weight); - sum += iter->outstanding_rpcs * weight; - total_weight += weight; - // We want to keep one measurement that's at least 60s old to provide a - // starting value for the next window. - ++iter; - } - - if (measurements_.size() > 1) { - measurements_.erase(measurements_.begin(), iter.base()); - } - // After iterating through the measurements if the total_weight is zero, - // then all of the measurements occurred at time == now, and returning the - // current number of outstanding RPCs is most correct. - return total_weight == 0.0 ? outstanding_rpcs_ - : static_cast(sum / total_weight); - } + outstanding_rpcs_(initial_outstanding_rpcs), + last_refresh_status_(std::move(last_refresh_status)) {} StatusOr instant_outstanding_rpcs() { std::scoped_lock lk(mu_); @@ -123,32 +69,19 @@ class ChannelUsage : public std::enable_shared_from_this> { std::shared_ptr AcquireStub() { std::scoped_lock lk(mu_); ++outstanding_rpcs_; - auto time = clock_->Now(); - measurements_.emplace_back(outstanding_rpcs_, time); return stub_; } void ReleaseStub() { std::scoped_lock lk(mu_); --outstanding_rpcs_; - measurements_.emplace_back(outstanding_rpcs_, clock_->Now()); } private: mutable std::mutex mu_; std::shared_ptr stub_; - std::shared_ptr clock_ = std::make_shared(); int outstanding_rpcs_ = 0; Status last_refresh_status_; - struct Measurement { - Measurement(int outstanding_rpcs, std::chrono::steady_clock::time_point p) - : outstanding_rpcs(outstanding_rpcs), timestamp(p) {} - int outstanding_rpcs; - std::chrono::steady_clock::time_point timestamp; - }; - // Older measurements are removed as part of the average_outstanding_rpcs - // method. - std::deque measurements_; }; GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END diff --git a/google/cloud/bigtable/internal/channel_usage_test.cc b/google/cloud/bigtable/internal/channel_usage_test.cc index 2553cb3f89db3..baeccf1119198 100644 --- a/google/cloud/bigtable/internal/channel_usage_test.cc +++ b/google/cloud/bigtable/internal/channel_usage_test.cc @@ -69,56 +69,6 @@ TEST(ChannelUsageTest, SetLastRefreshStatus) { channel->set_last_refresh_status(expected_status); EXPECT_THAT(channel->instant_outstanding_rpcs(), StatusIs(expected_status.code())); - EXPECT_THAT(channel->average_outstanding_rpcs(), - StatusIs(expected_status.code())); -} - -TEST(ChannelUsageTest, AverageOutstandingRpcs) { - auto clock = std::make_shared(); - auto mock = std::make_shared(); - auto channel = std::make_shared>(mock, clock); - EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(0)); - - auto start = std::chrono::steady_clock::now(); - clock->SetTime(start); - - for (int i = 0; i < 10; ++i) (void)channel->AcquireStub(); - EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10)); - - clock->AdvanceTime(std::chrono::seconds(1)); - // sum=10 total_weight=1 - EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10)); - - for (int i = 0; i < 10; ++i) (void)channel->AcquireStub(); - clock->AdvanceTime(std::chrono::seconds(1)); - // sum=30, total_weight=2 - EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(15)); - - for (int i = 0; i < 20; ++i) channel->ReleaseStub(); - clock->AdvanceTime(std::chrono::seconds(1)); - // sum=30, total_weight=3 - EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10)); - - clock->AdvanceTime(std::chrono::seconds(2)); - // sum=30, total_weight=5 - EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(6)); - - for (int i = 0; i < 100; ++i) (void)channel->AcquireStub(); - clock->AdvanceTime(std::chrono::seconds(25)); - // sum=2530, total_weight=84 - EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(84)); - - clock->AdvanceTime(std::chrono::seconds(35)); - // First 5s of measurements have aged out. - EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100)); - - clock->AdvanceTime(std::chrono::seconds(60)); - // All measurements have aged out. - EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100)); - - clock->AdvanceTime(std::chrono::seconds(3600)); - // All measurements have aged out. - EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100)); } TEST(ChannelUsageTest, MakeWeak) { diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool.h b/google/cloud/bigtable/internal/dynamic_channel_pool.h index f8428cdd1ac4e..a27205ac90777 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool.h +++ b/google/cloud/bigtable/internal/dynamic_channel_pool.h @@ -121,9 +121,6 @@ class DynamicChannelPool // performing this work. We might as well cancel those timer futures now. refresh_state_->timers().CancelAll(); if (remove_channel_poll_timer_.valid()) remove_channel_poll_timer_.cancel(); - if (pool_size_increase_cooldown_timer_.valid()) { - pool_size_increase_cooldown_timer_.cancel(); - } if (pool_size_decrease_cooldown_timer_.valid()) { pool_size_decrease_cooldown_timer_.cancel(); } @@ -216,6 +213,7 @@ class DynamicChannelPool absl::visit(ChannelAddVisitor(channels_.size()), sizing_policy_.channels_to_add_per_resize)); } + num_pending_channels_ += num_channels_to_add; std::vector new_channel_ids; new_channel_ids.reserve(num_channels_to_add); for (std::size_t i = 0; i < num_channels_to_add; ++i) { @@ -244,6 +242,7 @@ class DynamicChannelPool if (new_stub.ok()) new_stubs.push_back(*std::move(new_stub)); } std::scoped_lock lk(mu_); + num_pending_channels_ -= new_channel_ids.size(); channels_.insert(channels_.end(), std::make_move_iterator(new_stubs.begin()), std::make_move_iterator(new_stubs.end())); @@ -302,11 +301,6 @@ class DynamicChannelPool // Not yet implemented. } - void SetSizeIncreaseCooldownTimer(std::scoped_lock const&) { - pool_size_increase_cooldown_timer_ = cq_.MakeRelativeTimer( - sizing_policy_.pool_size_increase_cooldown_interval); - } - void SetSizeDecreaseCooldownTimer(std::scoped_lock const&) { pool_size_decrease_cooldown_timer_ = cq_.MakeRelativeTimer( sizing_policy_.pool_size_decrease_cooldown_interval); @@ -328,11 +322,10 @@ class DynamicChannelPool std::shared_ptr refresh_state_; StubFactoryFn stub_factory_fn_; std::vector>> channels_; + std::size_t num_pending_channels_ = 0; DynamicChannelPoolSizingPolicy sizing_policy_; std::vector>> draining_channels_; future remove_channel_poll_timer_; - future> - pool_size_increase_cooldown_timer_; future> pool_size_decrease_cooldown_timer_; std::uint32_t next_channel_id_; diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc index b1c0710191653..b15bc119ce3df 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc +++ b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc @@ -15,7 +15,6 @@ #include "google/cloud/bigtable/internal/dynamic_channel_pool.h" #include "google/cloud/bigtable/testing/mock_bigtable_stub.h" #include "google/cloud/internal/make_status.h" -#include "google/cloud/testing_util/fake_clock.h" #include "google/cloud/testing_util/fake_completion_queue_impl.h" #include "google/cloud/testing_util/mock_completion_queue_impl.h" #include "google/cloud/testing_util/status_matchers.h" @@ -49,10 +48,6 @@ class DynamicChannelPoolTestWrapper { void RemoveChannels() { pool_->RemoveChannels(); } - void SetSizeIncreaseCooldownTimer(std::scoped_lock const& lk) { - pool_->SetSizeIncreaseCooldownTimer(lk); - } - void SetSizeDecreaseCooldownTimer(std::scoped_lock const& lk) { pool_->SetSizeDecreaseCooldownTimer(lk); } @@ -72,6 +67,15 @@ class DynamicChannelPoolTestWrapper { return pool_->draining_channels_; } + std::size_t num_pending_channels() const { + return pool_->num_pending_channels_; + } + + DynamicChannelPoolTestWrapper& set_num_pending_channels(std::size_t n) { + pool_->num_pending_channels_ = n; + return *this; + } + protected: std::shared_ptr> pool_; }; @@ -80,7 +84,6 @@ namespace { using ::google::cloud::bigtable::testing::MockBigtableStub; using ::google::cloud::testing_util::FakeCompletionQueueImpl; -using ::google::cloud::testing_util::FakeSteadyClock; using ::google::cloud::testing_util::MockCompletionQueueImpl; using ::testing::Eq; using ::testing::IsEmpty; @@ -91,7 +94,6 @@ class DynamicChannelPoolTest : public ::testing::Test { DynamicChannelPoolTest() : fake_cq_impl_(std::make_shared()), mock_cq_impl_(std::make_shared()), - fake_clock_impl_(std::make_shared()), thread_([this] { cq_.Run(); }) {} ~DynamicChannelPoolTest() override { @@ -102,7 +104,6 @@ class DynamicChannelPoolTest : public ::testing::Test { protected: std::shared_ptr fake_cq_impl_; std::shared_ptr mock_cq_impl_; - std::shared_ptr fake_clock_impl_; CompletionQueue cq_; std::thread thread_; }; @@ -138,14 +139,27 @@ TEST_F(DynamicChannelPoolTest, ScheduleAddChannelsPoolUndersized) { stub_factory_fn.AsStdFunction(), sizing_policy); DynamicChannelPoolTestWrapper wrapper(pool); - EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); - auto test_fn = [](std::vector const& new_channel_ids) { - EXPECT_THAT(new_channel_ids, - ::testing::ElementsAreArray({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); - }; { + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); + auto test_fn = [](std::vector const& new_channel_ids) { + EXPECT_THAT(new_channel_ids, + ::testing::ElementsAreArray({0, 1, 2, 3, 4, 5, 6, 7, 8, 9})); + }; + auto lk = wrapper.CreateLock(); + wrapper.ScheduleAddChannels(lk, test_fn); + EXPECT_THAT(wrapper.num_pending_channels(), Eq(10)); + } + + { + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); + auto test_fn = [](std::vector const& new_channel_ids) { + EXPECT_THAT(new_channel_ids, + ::testing::ElementsAreArray( + {10, 11, 12, 13, 14, 15, 16, 17, 18, 19})); + }; auto lk = wrapper.CreateLock(); wrapper.ScheduleAddChannels(lk, test_fn); + EXPECT_THAT(wrapper.num_pending_channels(), Eq(20)); } } @@ -159,9 +173,9 @@ TEST_F(DynamicChannelPoolTest, ScheduleAddChannelsPoolNearMax) { std::vector>> channels; channels.push_back(std::make_shared>( - std::make_shared(), fake_clock_impl_, 0)); + std::make_shared(), 0)); channels.push_back(std::make_shared>( - std::make_shared(), fake_clock_impl_, 0)); + std::make_shared(), 0)); DynamicChannelPoolSizingPolicy sizing_policy; sizing_policy.minimum_channel_pool_size = 2; sizing_policy.maximum_channel_pool_size = 3; @@ -208,9 +222,9 @@ TEST_F(DynamicChannelPoolTest, ScheduleAddChannelsPoolNotNearMax) { std::vector>> channels; channels.push_back(std::make_shared>( - std::make_shared(), fake_clock_impl_, 0)); + std::make_shared(), 0)); channels.push_back(std::make_shared>( - std::make_shared(), fake_clock_impl_, 0)); + std::make_shared(), 0)); DynamicChannelPoolSizingPolicy sizing_policy; sizing_policy.minimum_channel_pool_size = 2; sizing_policy.maximum_channel_pool_size = 10; @@ -257,9 +271,9 @@ TEST_F(DynamicChannelPoolTest, ScheduleAddChannelsPoolNotNearMaxPercentage) { std::vector>> channels; channels.push_back(std::make_shared>( - std::make_shared(), fake_clock_impl_, 0)); + std::make_shared(), 0)); channels.push_back(std::make_shared>( - std::make_shared(), fake_clock_impl_, 0)); + std::make_shared(), 0)); DynamicChannelPoolSizingPolicy sizing_policy; sizing_policy.minimum_channel_pool_size = 2; sizing_policy.maximum_channel_pool_size = 10; @@ -316,16 +330,14 @@ TEST_F(DynamicChannelPoolTest, AddChannels) { EXPECT_THAT(id, Eq(0)); EXPECT_THAT(instance, Eq(instance_name)); EXPECT_THAT(priming, Eq(StubManager::Priming::kSynchronousPriming)); - return std::make_shared>( - mock_stub_0, fake_clock_impl_, 20); + return std::make_shared>(mock_stub_0, 20); }) .WillOnce([&](int id, std::string const& instance, StubManager::Priming priming) { EXPECT_THAT(id, Eq(1)); EXPECT_THAT(instance, Eq(instance_name)); EXPECT_THAT(priming, Eq(StubManager::Priming::kSynchronousPriming)); - return std::make_shared>( - mock_stub_1, fake_clock_impl_, 20); + return std::make_shared>(mock_stub_1, 20); }); std::vector>> channels; @@ -344,8 +356,10 @@ TEST_F(DynamicChannelPoolTest, AddChannels) { stub_factory_fn.AsStdFunction(), sizing_policy); DynamicChannelPoolTestWrapper wrapper(pool); std::vector new_channel_ids = {0, 1}; + wrapper.set_num_pending_channels(new_channel_ids.size()); wrapper.AddChannels(new_channel_ids); EXPECT_THAT(pool->size(), Eq(2)); + EXPECT_THAT(wrapper.num_pending_channels(), Eq(0)); fake_cq_impl_->SimulateCompletion(false); } @@ -467,7 +481,7 @@ TEST_F(DynamicChannelPoolTest, RemoveChannelsLoneChannelDrained) { std::vector>> draining_channels; draining_channels.push_back(std::make_shared>( - std::make_shared(), fake_clock_impl_, 0)); + std::make_shared(), 0)); auto const& d = wrapper.SetDrainingChannels(draining_channels); wrapper.RemoveChannels(); @@ -508,13 +522,13 @@ TEST_F(DynamicChannelPoolTest, RemoveChannelsSomeChannelsDrained) { std::vector>> draining_channels; draining_channels.push_back(std::make_shared>( - std::make_shared(), fake_clock_impl_, 1)); + std::make_shared(), 1)); draining_channels.push_back(std::make_shared>( - std::make_shared(), fake_clock_impl_, 0)); + std::make_shared(), 0)); draining_channels.push_back(std::make_shared>( - std::make_shared(), fake_clock_impl_, 0)); + std::make_shared(), 0)); draining_channels.push_back(std::make_shared>( - std::make_shared(), fake_clock_impl_, 2)); + std::make_shared(), 2)); auto const& d = wrapper.SetDrainingChannels(draining_channels); EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer)