From 01c4020460fe685247c22ec0403f3c6350f2a11a Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Tue, 17 Mar 2026 16:45:52 -0400 Subject: [PATCH 1/3] impl(bigtable): add remaining methods to DynamicChannelPool --- .../bigtable/internal/dynamic_channel_pool.h | 181 +++++- .../internal/dynamic_channel_pool_test.cc | 571 ++++++++++++++++++ 2 files changed, 739 insertions(+), 13 deletions(-) diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool.h b/google/cloud/bigtable/internal/dynamic_channel_pool.h index a27205ac90777..ce7a95610d736 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool.h +++ b/google/cloud/bigtable/internal/dynamic_channel_pool.h @@ -21,7 +21,6 @@ #include "google/cloud/bigtable/internal/stub_manager.h" #include "google/cloud/bigtable/options.h" #include "google/cloud/completion_queue.h" -#include "google/cloud/internal/clock.h" #include "google/cloud/internal/random.h" #include "google/cloud/version.h" #include @@ -140,8 +139,7 @@ class DynamicChannelPool return channels_.empty(); } - // If the pool is not under a pool resize cooldown, call - // CheckPoolChannelHealth. + // Calls CheckPoolChannelHealth before picking a channel. // // Pick two random channels from channels_ and return the channel with the // lower number of outstanding_rpcs. This is the "quick" path. @@ -156,8 +154,44 @@ class DynamicChannelPool // If there are no healthy channels in channels_, create a new channel and // use that one. Also call ScheduleAddChannels to replenish channels_. std::shared_ptr> GetChannelRandomTwoLeastUsed() { - // Not yet implemented. - return {}; + std::scoped_lock lk(mu_); + CheckPoolChannelHealth(lk); + + ChannelSelectionData d; + d.iterators.reserve(channels_.size()); + for (auto iter = channels_.begin(); iter != channels_.end(); ++iter) { + d.iterators.push_back(iter); + } + std::shuffle(d.iterators.begin(), d.iterators.end(), rng_); + d.shuffle_iter = d.iterators.begin(); + + if (d.shuffle_iter != d.iterators.end()) { + d.channel_1_iter = *d.shuffle_iter; + d.channel_1_rpcs = (*d.channel_1_iter)->instant_outstanding_rpcs(); + ++d.shuffle_iter; + } + + if (d.shuffle_iter != d.iterators.end()) { + d.channel_2_iter = *d.shuffle_iter; + d.channel_2_rpcs = (*d.channel_2_iter)->instant_outstanding_rpcs(); + } + + // This is the most common case so we try it first. + if (d.channel_1_rpcs.ok() && d.channel_2_rpcs.ok()) { + return *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter + : *d.channel_2_iter; + } + if (d.iterators.size() == 1 && d.channel_1_rpcs.ok()) { + // Pool contains exactly 1 good channel. + return *d.channel_1_iter; + } + if (d.iterators.empty()) { + // Pool is empty, create a channel immediately and return it. + channels_.push_back(*stub_factory_fn_(next_channel_id_++, instance_name_, + StubManager::Priming::kNoPriming)); + return channels_.front(); + } + return HandleBadChannels(lk, d); } private: @@ -180,6 +214,71 @@ class DynamicChannelPool SetSizeDecreaseCooldownTimer(lk); } + struct ChannelSelectionData { + using ChannelSelect = + typename std::vector>>::iterator; + std::vector iterators; + ChannelSelect channel_1_iter; + ChannelSelect channel_2_iter; + StatusOr channel_1_rpcs = Status{StatusCode::kNotFound, ""}; + StatusOr channel_2_rpcs = Status{StatusCode::kNotFound, ""}; + typename std::vector::iterator shuffle_iter; + + static void FindGoodChannel( + std::vector& iterators, ChannelSelect& iter, + StatusOr& rpcs, + typename std::vector::iterator& shuffle_iter, + std::vector& bad_channel_iters) { + if (!rpcs.ok()) { + bad_channel_iters.push_back(iter); + while (shuffle_iter != iterators.end() && !rpcs.ok()) { + iter = *shuffle_iter; + rpcs = (*iter)->instant_outstanding_rpcs(); + if (!rpcs.ok()) bad_channel_iters.push_back(iter); + ++shuffle_iter; + } + } + } + }; + + // We have one or more bad channels. Spending time finding a good channel + // will be cheaper than trying to use a bad channel in the long run. + std::shared_ptr> HandleBadChannels( + std::scoped_lock const& lk, ChannelSelectionData& d) { + std::vector bad_channel_iters; + if (d.shuffle_iter != d.iterators.end()) ++d.shuffle_iter; + ChannelSelectionData::FindGoodChannel(d.iterators, d.channel_1_iter, + d.channel_1_rpcs, d.shuffle_iter, + bad_channel_iters); + ChannelSelectionData::FindGoodChannel(d.iterators, d.channel_2_iter, + d.channel_2_rpcs, d.shuffle_iter, + bad_channel_iters); + EvictBadChannels(lk, bad_channel_iters); + ScheduleRemoveChannels(lk); + + if (d.channel_1_rpcs.ok() && d.channel_2_rpcs.ok()) { + return *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter + : *d.channel_2_iter; + } + + std::shared_ptr> channel; + if (d.channel_1_rpcs.ok()) { + channel = *d.channel_1_iter; + } else if (d.channel_2_rpcs.ok()) { + channel = *d.channel_2_iter; + } else { + // We have no usable channels in the entire pool; this is bad. + // Create a channel immediately to unblock application. + channels_.push_back(*stub_factory_fn_(next_channel_id_++, instance_name_, + StubManager::Priming::kNoPriming)); + std::swap(channels_.front(), channels_.back()); + channel = channels_.front(); + } + + ScheduleAddChannels(lk); + return channel; + } + struct ChannelAddVisitor { std::size_t pool_size; explicit ChannelAddVisitor(std::size_t pool_size) : pool_size(pool_size) {} @@ -297,8 +396,24 @@ class DynamicChannelPool void EvictBadChannels( std::scoped_lock const&, std::vector< - typename std::vector>>::iterator>&) { - // Not yet implemented. + typename std::vector>>::iterator>& + bad_channel_iters) { + auto back_iter = channels_.rbegin(); + for (auto& bad_channel_iter : bad_channel_iters) { + bool swapped = false; + while (!swapped && back_iter != channels_.rend()) { + auto b = (*back_iter)->instant_outstanding_rpcs(); + if (b.ok()) { + std::swap(*back_iter, *bad_channel_iter); + draining_channels_.push_back(std::move(*back_iter)); + swapped = true; + } + ++back_iter; + } + } + for (std::size_t i = 0; i < bad_channel_iters.size(); ++i) { + channels_.pop_back(); + } } void SetSizeDecreaseCooldownTimer(std::scoped_lock const&) { @@ -306,13 +421,53 @@ class DynamicChannelPool sizing_policy_.pool_size_decrease_cooldown_interval); } - // Computes the average_rpcs_pre_channel across all channels in the pool, - // excluding any channels that are awaiting removal in draining_channels_. + // Computes the average RPCs per channel across all channels in the pool, + // by summing the outstanding_rpc from each channel and dividing by the + // number of active channels plus the num_pending_channels_. + // Any channels that are awaiting removal in draining_channels_ are excluded + // from this calculation. // The computed average is compared to the thresholds in the sizing policy - // and calls either ScheduleRemoveChannels or ScheduleAddChannels as - // appropriate. If either is called the resize_cooldown_timer is also set. - void CheckPoolChannelHealth(std::scoped_lock const&) { - // Not yet implemented + // and calls either ScheduleRemoveChannel or ScheduleAddChannel as + // appropriate. If ScheduleRemoveChannel is called the resize_cooldown_timer + // is also set. + void CheckPoolChannelHealth(std::scoped_lock const& lk) { + int average_rpcs_per_channel = + channels_.empty() + ? 0 + : std::accumulate(channels_.begin(), channels_.end(), 0, + [](int a, auto const& b) { + auto rpcs_b = b->instant_outstanding_rpcs(); + return a + (rpcs_b.ok() ? *rpcs_b : 0); + }) / + static_cast(channels_.size() + num_pending_channels_); + + if (channels_.size() < sizing_policy_.minimum_channel_pool_size || + (average_rpcs_per_channel > + sizing_policy_.maximum_average_outstanding_rpcs_per_channel && + channels_.size() < sizing_policy_.maximum_channel_pool_size)) { + // Channel/stub creation is expensive, instead of making the current RPC + // wait on this, use an existing channel right now, and schedule a channel + // to be added. + ScheduleAddChannels(lk); + return; + } + + if ((!pool_size_decrease_cooldown_timer_.valid() || + pool_size_decrease_cooldown_timer_.is_ready()) && + average_rpcs_per_channel < + sizing_policy_.minimum_average_outstanding_rpcs_per_channel && + channels_.size() > sizing_policy_.minimum_channel_pool_size) { + if (pool_size_decrease_cooldown_timer_.is_ready()) { + pool_size_decrease_cooldown_timer_.get(); + } + auto random_channel = std::uniform_int_distribution( + 0, channels_.size() - 1)(rng_); + std::swap(channels_[random_channel], channels_.back()); + draining_channels_.push_back(std::move(channels_.back())); + channels_.pop_back(); + ScheduleRemoveChannels(lk); + SetSizeDecreaseCooldownTimer(lk); + } } mutable std::mutex mu_; diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc index b15bc119ce3df..5841e4c2750ab 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc +++ b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc @@ -32,6 +32,15 @@ class DynamicChannelPoolTestWrapper { std::shared_ptr> pool) : pool_(std::move(pool)) {} + using ChannelSelectionData = + DynamicChannelPool::ChannelSelectionData; + + std::shared_ptr> HandleBadChannels( + std::scoped_lock const& lk, + DynamicChannelPool::ChannelSelectionData& d) { + return pool_->HandleBadChannels(lk, d); + } + void ScheduleAddChannels( std::scoped_lock const& lk, std::function const&)> const& test_fn) { @@ -48,10 +57,22 @@ class DynamicChannelPoolTestWrapper { void RemoveChannels() { pool_->RemoveChannels(); } + void EvictBadChannels( + std::scoped_lock const& lk, + std::vector>>::iterator>& + bad_channel_iters) { + pool_->EvictBadChannels(lk, bad_channel_iters); + } + void SetSizeDecreaseCooldownTimer(std::scoped_lock const& lk) { pool_->SetSizeDecreaseCooldownTimer(lk); } + void CheckPoolChannelHealth(std::scoped_lock const& lk) { + pool_->CheckPoolChannelHealth(lk); + } + std::scoped_lock CreateLock() { return std::scoped_lock(pool_->mu_); } @@ -108,6 +129,192 @@ class DynamicChannelPoolTest : public ::testing::Test { std::thread thread_; }; +TEST_F(DynamicChannelPoolTest, SelectLeastUsedFromTwoChannels) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + + // There should be no attempt to grow the pool. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + auto stub_factory_fn = [&](int, std::string const&, StubManager::Priming) { + return std::make_shared>( + std::make_shared(), 20); + }; + + std::vector>> channels; + auto mock_stub_0 = std::make_shared(); + EXPECT_CALL(*mock_stub_0, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + auto mock_stub_1 = std::make_shared(); + EXPECT_CALL(*mock_stub_1, CheckAndMutateRow).Times(0); + int initial_rpc_count = 0; + channels.push_back(std::make_shared>( + std::move(mock_stub_0), initial_rpc_count++)); + channels.push_back(std::make_shared>( + std::move(mock_stub_1), initial_rpc_count)); + + // Pool creation should set the pool size decrease cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + sizing_policy.maximum_channel_pool_size = 2; + sizing_policy.minimum_channel_pool_size = 2; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn, sizing_policy); + auto selected_stub = pool->GetChannelRandomTwoLeastUsed(); + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, OneInitialChannel) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + + // There should be no attempt to grow the pool after creation. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + { // Pool created with 1 channel. + auto mock_stub = std::make_shared(); + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + EXPECT_CALL(stub_factory_fn, Call).Times(0); + // .WillOnce(::testing::Return( + // std::make_shared>(mock_stub))); + + std::vector>> channels; + auto mock_stub_0 = std::make_shared(); + EXPECT_CALL(*mock_stub_0, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + + int initial_rpc_count = 0; + channels.push_back(std::make_shared>( + std::move(mock_stub_0), initial_rpc_count)); + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer( + sizing_policy.pool_size_decrease_cooldown_interval); + }); + + sizing_policy.maximum_channel_pool_size = 1; + sizing_policy.minimum_channel_pool_size = 1; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + EXPECT_THAT(pool->size(), Eq(1)); + + auto selected_stub = pool->GetChannelRandomTwoLeastUsed(); + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + } + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, EmptyInitialPool) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + + // There should be no attempt to grow the pool after creation. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + { // Pool created with 0 channels. + auto mock_stub = std::make_shared(); + EXPECT_CALL(*mock_stub, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + EXPECT_CALL(stub_factory_fn, Call) + .Times(1) + .WillOnce(::testing::Return( + std::make_shared>(mock_stub))); + + std::vector>> channels; + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + sizing_policy.maximum_channel_pool_size = 1; + sizing_policy.minimum_channel_pool_size = 0; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + + EXPECT_THAT(*pool, ::testing::IsEmpty()); + + auto selected_stub = pool->GetChannelRandomTwoLeastUsed(); + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + + EXPECT_THAT(pool->size(), Eq(1)); + } + fake_cq_impl_->SimulateCompletion(false); +} + TEST_F(DynamicChannelPoolTest, ScheduleAddChannelsPoolUndersized) { auto instance_name = bigtable::InstanceResource(Project("my-project"), "my-instance") @@ -549,6 +756,370 @@ TEST_F(DynamicChannelPoolTest, RemoveChannelsSomeChannelsDrained) { fake_cq_impl_->SimulateCompletion(false); } +TEST_F(DynamicChannelPoolTest, HandleBadChannelsTwoChannelsOneBad) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + // return make_ready_future(StatusOr(std::chrono::system_clock::now())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // HandleBadChannels will call ScheduleRemoveChannels. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel"))); + + auto mock_stub = std::make_shared(); + EXPECT_CALL(*mock_stub, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + + channels.push_back( + std::make_shared>(mock_stub, 1)); + + DynamicChannelPoolTestWrapper::ChannelSelectionData data; + for (auto iter = channels.begin(); iter != channels.end(); ++iter) { + data.iterators.push_back(iter); + } + data.shuffle_iter = data.iterators.begin(); + data.channel_1_iter = *data.shuffle_iter; + data.channel_1_rpcs = (*data.channel_1_iter)->instant_outstanding_rpcs(); + ++data.shuffle_iter; + data.channel_2_iter = *data.shuffle_iter; + data.channel_2_rpcs = (*data.channel_2_iter)->instant_outstanding_rpcs(); + + sizing_policy.minimum_channel_pool_size = 2; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + auto draining_channels = wrapper.SetDrainingChannels({}); + + // ScheduleAddChannels will be called. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); + + std::shared_ptr> selected_stub; + { + auto lock = wrapper.CreateLock(); + selected_stub = wrapper.HandleBadChannels(lock, data); + } + EXPECT_THAT(draining_channels, IsEmpty()); + + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + EXPECT_THAT(pool->size(), Eq(1)); + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, HandleBadChannelsTwoChannelsOtherOneBad) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + // return make_ready_future(StatusOr(std::chrono::system_clock::now())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // HandleBadChannels will call ScheduleRemoveChannels. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + auto mock_stub = std::make_shared(); + EXPECT_CALL(*mock_stub, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + channels.push_back( + std::make_shared>(mock_stub, 1)); + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel"))); + + DynamicChannelPoolTestWrapper::ChannelSelectionData data; + for (auto iter = channels.begin(); iter != channels.end(); ++iter) { + data.iterators.push_back(iter); + } + data.shuffle_iter = data.iterators.begin(); + data.channel_1_iter = *data.shuffle_iter; + data.channel_1_rpcs = (*data.channel_1_iter)->instant_outstanding_rpcs(); + ++data.shuffle_iter; + data.channel_2_iter = *data.shuffle_iter; + data.channel_2_rpcs = (*data.channel_2_iter)->instant_outstanding_rpcs(); + + sizing_policy.minimum_channel_pool_size = 2; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + auto draining_channels = wrapper.SetDrainingChannels({}); + + // ScheduleAddChannels will be called. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); + + std::shared_ptr> selected_stub; + { + auto lock = wrapper.CreateLock(); + selected_stub = wrapper.HandleBadChannels(lock, data); + } + EXPECT_THAT(draining_channels, IsEmpty()); + + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + EXPECT_THAT(pool->size(), Eq(1)); + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, HandleBadChannelsThreeChannelsOneBad) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + // return make_ready_future(StatusOr(std::chrono::system_clock::now())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // HandleBadChannels will call ScheduleRemoveChannels. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + auto mock_stub_0 = std::make_shared(); + EXPECT_CALL(*mock_stub_0, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + channels.push_back( + std::make_shared>(mock_stub_0, 1)); + + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel"))); + + auto mock_stub_1 = std::make_shared(); + EXPECT_CALL(*mock_stub_1, CheckAndMutateRow).Times(0); + channels.push_back( + std::make_shared>(mock_stub_1, 2)); + + DynamicChannelPoolTestWrapper::ChannelSelectionData data; + for (auto iter = channels.begin(); iter != channels.end(); ++iter) { + data.iterators.push_back(iter); + } + data.shuffle_iter = data.iterators.begin(); + data.channel_1_iter = *data.shuffle_iter; + data.channel_1_rpcs = (*data.channel_1_iter)->instant_outstanding_rpcs(); + ++data.shuffle_iter; + data.channel_2_iter = *data.shuffle_iter; + data.channel_2_rpcs = (*data.channel_2_iter)->instant_outstanding_rpcs(); + + sizing_policy.minimum_channel_pool_size = 2; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + auto draining_channels = wrapper.SetDrainingChannels({}); + + // ScheduleAddChannels will NOT be called. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + + std::shared_ptr> selected_stub; + { + auto lock = wrapper.CreateLock(); + selected_stub = wrapper.HandleBadChannels(lock, data); + } + EXPECT_THAT(draining_channels, IsEmpty()); + + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + EXPECT_THAT(pool->size(), Eq(2)); + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, HandleBadChannelsAllChannelsBad) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + // return make_ready_future(StatusOr(std::chrono::system_clock::now())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // HandleBadChannels will call ScheduleRemoveChannels. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + auto mock_stub = std::make_shared(); + EXPECT_CALL(*mock_stub, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + + EXPECT_CALL(stub_factory_fn, Call) + .WillOnce( + [&](std::uint32_t id, std::string const&, StubManager::Priming p) { + EXPECT_THAT(id, Eq(4)); + EXPECT_THAT(p, Eq(StubManager::Priming::kNoPriming)); + return std::make_shared>(mock_stub); + }); + + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel 0"))); + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel 1"))); + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel 2"))); + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel 3"))); + + DynamicChannelPoolTestWrapper::ChannelSelectionData data; + for (auto iter = channels.begin(); iter != channels.end(); ++iter) { + data.iterators.push_back(iter); + } + data.shuffle_iter = data.iterators.begin(); + data.channel_1_iter = *data.shuffle_iter; + data.channel_1_rpcs = (*data.channel_1_iter)->instant_outstanding_rpcs(); + ++data.shuffle_iter; + data.channel_2_iter = *data.shuffle_iter; + data.channel_2_rpcs = (*data.channel_2_iter)->instant_outstanding_rpcs(); + + sizing_policy.minimum_channel_pool_size = 2; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + auto draining_channels = wrapper.SetDrainingChannels({}); + + // ScheduleAddChannels will be called. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); + + std::shared_ptr> selected_stub; + { + auto lock = wrapper.CreateLock(); + selected_stub = wrapper.HandleBadChannels(lock, data); + } + EXPECT_THAT(draining_channels, IsEmpty()); + + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + EXPECT_THAT(pool->size(), Eq(1)); + + fake_cq_impl_->SimulateCompletion(false); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal From 2194531a35e4a3d61adb8b7fa1117bafe32a8bd9 Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Tue, 17 Mar 2026 17:40:28 -0400 Subject: [PATCH 2/3] update logic to not invalidate iterators --- .../bigtable/internal/dynamic_channel_pool.h | 26 ++++++++++++------- .../internal/dynamic_channel_pool_test.cc | 4 +-- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool.h b/google/cloud/bigtable/internal/dynamic_channel_pool.h index ce7a95610d736..e58e18dcb460c 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool.h +++ b/google/cloud/bigtable/internal/dynamic_channel_pool.h @@ -95,6 +95,8 @@ template class DynamicChannelPool : public std::enable_shared_from_this> { public: + // This function should only return an error status if priming is attempted + // and it is unsuccessful. using StubFactoryFn = std::function>>( std::uint32_t id, std::string const& instance_name, @@ -186,7 +188,9 @@ class DynamicChannelPool return *d.channel_1_iter; } if (d.iterators.empty()) { - // Pool is empty, create a channel immediately and return it. + // Pool is empty, create a channel immediately and return it. While the + // return value is a StatusOr, it will only ever contain an error if + // priming is attempted. channels_.push_back(*stub_factory_fn_(next_channel_id_++, instance_name_, StubManager::Priming::kNoPriming)); return channels_.front(); @@ -253,28 +257,30 @@ class DynamicChannelPool ChannelSelectionData::FindGoodChannel(d.iterators, d.channel_2_iter, d.channel_2_rpcs, d.shuffle_iter, bad_channel_iters); - EvictBadChannels(lk, bad_channel_iters); - ScheduleRemoveChannels(lk); - - if (d.channel_1_rpcs.ok() && d.channel_2_rpcs.ok()) { - return *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter - : *d.channel_2_iter; - } std::shared_ptr> channel; - if (d.channel_1_rpcs.ok()) { + + if (d.channel_1_rpcs.ok() && d.channel_2_rpcs.ok()) { + channel = *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter + : *d.channel_2_iter; + } else if (d.channel_1_rpcs.ok()) { channel = *d.channel_1_iter; } else if (d.channel_2_rpcs.ok()) { channel = *d.channel_2_iter; } else { // We have no usable channels in the entire pool; this is bad. - // Create a channel immediately to unblock application. + // Create a channel immediately to unblock application. While the + // return value is a StatusOr, it will only ever contain an error if + // priming is attempted. channels_.push_back(*stub_factory_fn_(next_channel_id_++, instance_name_, StubManager::Priming::kNoPriming)); std::swap(channels_.front(), channels_.back()); channel = channels_.front(); } + // Wait until we no longer need valid iterators to call EvictBadChannels. + EvictBadChannels(lk, bad_channel_iters); + ScheduleRemoveChannels(lk); ScheduleAddChannels(lk); return channel; } diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc index 5841e4c2750ab..d51925cc4bb68 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc +++ b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc @@ -999,8 +999,8 @@ TEST_F(DynamicChannelPoolTest, HandleBadChannelsThreeChannelsOneBad) { DynamicChannelPoolTestWrapper wrapper(pool); auto draining_channels = wrapper.SetDrainingChannels({}); - // ScheduleAddChannels will NOT be called. - EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + // ScheduleAddChannels will be called as the minimum pool size is 2. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); std::shared_ptr> selected_stub; { From b64f2e63a29b3b2909382e688972a7c7ea0e8f4f Mon Sep 17 00:00:00 2001 From: Scott Hart Date: Tue, 17 Mar 2026 19:12:40 -0400 Subject: [PATCH 3/3] add more tests --- .../bigtable/internal/dynamic_channel_pool.h | 3 - .../internal/dynamic_channel_pool_test.cc | 139 +++++++++++++++++- 2 files changed, 135 insertions(+), 7 deletions(-) diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool.h b/google/cloud/bigtable/internal/dynamic_channel_pool.h index e58e18dcb460c..fc1551faa962c 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool.h +++ b/google/cloud/bigtable/internal/dynamic_channel_pool.h @@ -259,7 +259,6 @@ class DynamicChannelPool bad_channel_iters); std::shared_ptr> channel; - if (d.channel_1_rpcs.ok() && d.channel_2_rpcs.ok()) { channel = *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter : *d.channel_2_iter; @@ -277,7 +276,6 @@ class DynamicChannelPool std::swap(channels_.front(), channels_.back()); channel = channels_.front(); } - // Wait until we no longer need valid iterators to call EvictBadChannels. EvictBadChannels(lk, bad_channel_iters); ScheduleRemoveChannels(lk); @@ -446,7 +444,6 @@ class DynamicChannelPool return a + (rpcs_b.ok() ? *rpcs_b : 0); }) / static_cast(channels_.size() + num_pending_channels_); - if (channels_.size() < sizing_policy_.minimum_channel_pool_size || (average_rpcs_per_channel > sizing_policy_.maximum_average_outstanding_rpcs_per_channel && diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc index d51925cc4bb68..3891c184d44b8 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc +++ b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc @@ -777,7 +777,6 @@ TEST_F(DynamicChannelPoolTest, HandleBadChannelsTwoChannelsOneBad) { Eq(std::chrono::nanoseconds( sizing_policy.pool_size_decrease_cooldown_interval) .count())); - // return make_ready_future(StatusOr(std::chrono::system_clock::now())); return cq_.MakeRelativeTimer(std::chrono::seconds(600)); }) // HandleBadChannels will call ScheduleRemoveChannels. @@ -864,7 +863,6 @@ TEST_F(DynamicChannelPoolTest, HandleBadChannelsTwoChannelsOtherOneBad) { Eq(std::chrono::nanoseconds( sizing_policy.pool_size_decrease_cooldown_interval) .count())); - // return make_ready_future(StatusOr(std::chrono::system_clock::now())); return cq_.MakeRelativeTimer(std::chrono::seconds(600)); }) // HandleBadChannels will call ScheduleRemoveChannels. @@ -949,7 +947,6 @@ TEST_F(DynamicChannelPoolTest, HandleBadChannelsThreeChannelsOneBad) { Eq(std::chrono::nanoseconds( sizing_policy.pool_size_decrease_cooldown_interval) .count())); - // return make_ready_future(StatusOr(std::chrono::system_clock::now())); return cq_.MakeRelativeTimer(std::chrono::seconds(600)); }) // HandleBadChannels will call ScheduleRemoveChannels. @@ -1040,7 +1037,6 @@ TEST_F(DynamicChannelPoolTest, HandleBadChannelsAllChannelsBad) { Eq(std::chrono::nanoseconds( sizing_policy.pool_size_decrease_cooldown_interval) .count())); - // return make_ready_future(StatusOr(std::chrono::system_clock::now())); return cq_.MakeRelativeTimer(std::chrono::seconds(600)); }) // HandleBadChannels will call ScheduleRemoveChannels. @@ -1120,6 +1116,141 @@ TEST_F(DynamicChannelPoolTest, HandleBadChannelsAllChannelsBad) { fake_cq_impl_->SimulateCompletion(false); } +TEST_F(DynamicChannelPoolTest, CheckChannelPoolHealthNeedsIncrease) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + channels.push_back(std::make_shared>( + std::make_shared(), 30)); + + { + sizing_policy.minimum_channel_pool_size = 1; + sizing_policy.maximum_channel_pool_size = 1; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + // ScheduleAddChannels will NOT be called as the pool has max channels. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + auto lock = wrapper.CreateLock(); + wrapper.CheckPoolChannelHealth(lock); + EXPECT_THAT(wrapper.num_pending_channels(), Eq(0)); + } + { + sizing_policy.minimum_channel_pool_size = 2; + sizing_policy.maximum_channel_pool_size = 10; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + // ScheduleAddChannels will be called. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); + auto lock = wrapper.CreateLock(); + wrapper.CheckPoolChannelHealth(lock); + EXPECT_THAT(wrapper.num_pending_channels(), Eq(1)); + } +} + +TEST_F(DynamicChannelPoolTest, CheckChannelPoolHealthNeedsDecrease) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }) + // ScheduleRemoveChannels should start polling. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // ScheduleRemoveChannels should set the pool size increase cooldown + // timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + channels.push_back(std::make_shared>( + std::make_shared(), 2)); + channels.push_back(std::make_shared>( + std::make_shared(), 2)); + channels.push_back(std::make_shared>( + std::make_shared(), 2)); + channels.push_back(std::make_shared>( + std::make_shared(), 2)); + + sizing_policy.minimum_channel_pool_size = 2; + sizing_policy.maximum_channel_pool_size = 10; + sizing_policy.minimum_average_outstanding_rpcs_per_channel = 5; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + // ScheduleAddChannels will NOT be called. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + + { + auto lock = wrapper.CreateLock(); + wrapper.CheckPoolChannelHealth(lock); + } + + EXPECT_THAT(wrapper.num_pending_channels(), Eq(0)); + EXPECT_THAT(pool->size(), Eq(3)); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal