diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool.h b/google/cloud/bigtable/internal/dynamic_channel_pool.h index a27205ac90777..d7cabda6ada82 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool.h +++ b/google/cloud/bigtable/internal/dynamic_channel_pool.h @@ -21,7 +21,6 @@ #include "google/cloud/bigtable/internal/stub_manager.h" #include "google/cloud/bigtable/options.h" #include "google/cloud/completion_queue.h" -#include "google/cloud/internal/clock.h" #include "google/cloud/internal/random.h" #include "google/cloud/version.h" #include @@ -96,6 +95,8 @@ template class DynamicChannelPool : public std::enable_shared_from_this> { public: + // This function should only return an error status if priming is attempted + // and it is unsuccessful. using StubFactoryFn = std::function>>( std::uint32_t id, std::string const& instance_name, @@ -140,8 +141,7 @@ class DynamicChannelPool return channels_.empty(); } - // If the pool is not under a pool resize cooldown, call - // CheckPoolChannelHealth. + // Calls CheckPoolChannelHealth before picking a channel. // // Pick two random channels from channels_ and return the channel with the // lower number of outstanding_rpcs. This is the "quick" path. @@ -156,8 +156,47 @@ class DynamicChannelPool // If there are no healthy channels in channels_, create a new channel and // use that one. Also call ScheduleAddChannels to replenish channels_. std::shared_ptr> GetChannelRandomTwoLeastUsed() { - // Not yet implemented. - return {}; + std::scoped_lock lk(mu_); + CheckPoolChannelHealth(lk); + + ChannelSelectionData d; + d.iterators.reserve(channels_.size()); + for (auto iter = channels_.begin(); iter != channels_.end(); ++iter) { + d.iterators.push_back(iter); + } + std::shuffle(d.iterators.begin(), d.iterators.end(), rng_); + d.shuffle_iter = d.iterators.begin(); + + if (d.shuffle_iter != d.iterators.end()) { + d.channel_1_iter = *d.shuffle_iter; + d.channel_1_rpcs = (*d.channel_1_iter)->instant_outstanding_rpcs(); + ++d.shuffle_iter; + } + + if (d.shuffle_iter != d.iterators.end()) { + d.channel_2_iter = *d.shuffle_iter; + d.channel_2_rpcs = (*d.channel_2_iter)->instant_outstanding_rpcs(); + } + + // This is the most common case so we try it first. + if (d.channel_1_rpcs.ok() && d.channel_2_rpcs.ok()) { + return *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter + : *d.channel_2_iter; + } + if (d.iterators.size() == 1 && d.channel_1_rpcs.ok()) { + // Pool contains exactly 1 good channel. + return *d.channel_1_iter; + } + if (d.iterators.empty()) { + // Pool is empty, create a channel immediately and return it. While the + // return value is a StatusOr, it will only ever contain an error if + // priming is attempted. + channels_.push_back(stub_factory_fn_(next_channel_id_++, instance_name_, + StubManager::Priming::kNoPriming) + .value()); + return channels_.front(); + } + return HandleBadChannels(lk, d); } private: @@ -180,6 +219,76 @@ class DynamicChannelPool SetSizeDecreaseCooldownTimer(lk); } + struct ChannelSelectionData { + using ChannelSelect = + typename std::vector>>::iterator; + std::vector iterators; + ChannelSelect channel_1_iter; + ChannelSelect channel_2_iter; + StatusOr channel_1_rpcs = Status{StatusCode::kNotFound, ""}; + StatusOr channel_2_rpcs = Status{StatusCode::kNotFound, ""}; + typename std::vector::iterator shuffle_iter; + + static void FindGoodChannel( + std::vector& iterators, ChannelSelect& iter, + StatusOr& rpcs, + typename std::vector::iterator& shuffle_iter, + std::vector& bad_channel_iters) { + if (!rpcs.ok()) { + bad_channel_iters.push_back(iter); + while (shuffle_iter != iterators.end() && !rpcs.ok()) { + iter = *shuffle_iter; + rpcs = (*iter)->instant_outstanding_rpcs(); + if (!rpcs.ok()) bad_channel_iters.push_back(iter); + ++shuffle_iter; + } + } + } + }; + + // We have one or more bad channels. Spending time finding a good channel + // will be cheaper than trying to use a bad channel in the long run. + std::shared_ptr> HandleBadChannels( + std::scoped_lock const& lk, ChannelSelectionData& d) { + std::vector bad_channel_iters; + if (d.shuffle_iter != d.iterators.end()) ++d.shuffle_iter; + ChannelSelectionData::FindGoodChannel(d.iterators, d.channel_1_iter, + d.channel_1_rpcs, d.shuffle_iter, + bad_channel_iters); + ChannelSelectionData::FindGoodChannel(d.iterators, d.channel_2_iter, + d.channel_2_rpcs, d.shuffle_iter, + bad_channel_iters); + + std::shared_ptr> channel; + if (d.channel_1_rpcs.ok() || d.channel_2_rpcs.ok()) { + if (d.channel_1_rpcs.ok() && d.channel_2_rpcs.ok()) { + channel = *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter + : *d.channel_2_iter; + } else if (d.channel_1_rpcs.ok()) { + channel = *d.channel_1_iter; + } else if (d.channel_2_rpcs.ok()) { + channel = *d.channel_2_iter; + } + // Wait until we no longer need valid iterators to call EvictBadChannels. + EvictBadChannels(lk, bad_channel_iters); + } else { + // Call EvictBadChannels before we channels_.push_back to avoid + // invalidating bad_channel_iters if there is a realloc of the vector. + EvictBadChannels(lk, bad_channel_iters); + // We have no usable channels in the entire pool; this is bad. + // Create a channel immediately to unblock application. While the + // return value is a StatusOr, it will only ever contain an error if + // priming is attempted. + channels_.push_back(stub_factory_fn_(next_channel_id_++, instance_name_, + StubManager::Priming::kNoPriming) + .value()); + std::swap(channels_.front(), channels_.back()); + channel = channels_.front(); + } + ScheduleRemoveChannels(lk); + return channel; + } + struct ChannelAddVisitor { std::size_t pool_size; explicit ChannelAddVisitor(std::size_t pool_size) : pool_size(pool_size) {} @@ -297,8 +406,24 @@ class DynamicChannelPool void EvictBadChannels( std::scoped_lock const&, std::vector< - typename std::vector>>::iterator>&) { - // Not yet implemented. + typename std::vector>>::iterator>& + bad_channel_iters) { + auto back_iter = channels_.rbegin(); + for (auto& bad_channel_iter : bad_channel_iters) { + bool swapped = false; + while (!swapped && back_iter != channels_.rend()) { + auto b = (*back_iter)->instant_outstanding_rpcs(); + if (b.ok()) { + std::swap(*back_iter, *bad_channel_iter); + draining_channels_.push_back(std::move(*back_iter)); + swapped = true; + } + ++back_iter; + } + } + for (std::size_t i = 0; i < bad_channel_iters.size(); ++i) { + channels_.pop_back(); + } } void SetSizeDecreaseCooldownTimer(std::scoped_lock const&) { @@ -306,13 +431,52 @@ class DynamicChannelPool sizing_policy_.pool_size_decrease_cooldown_interval); } - // Computes the average_rpcs_pre_channel across all channels in the pool, - // excluding any channels that are awaiting removal in draining_channels_. + // Computes the average RPCs per channel across all channels in the pool, + // by summing the outstanding_rpc from each channel and dividing by the + // number of active channels plus the num_pending_channels_. + // Any channels that are awaiting removal in draining_channels_ are excluded + // from this calculation. // The computed average is compared to the thresholds in the sizing policy - // and calls either ScheduleRemoveChannels or ScheduleAddChannels as - // appropriate. If either is called the resize_cooldown_timer is also set. - void CheckPoolChannelHealth(std::scoped_lock const&) { - // Not yet implemented + // and calls either ScheduleRemoveChannel or ScheduleAddChannel as + // appropriate. If ScheduleRemoveChannel is called the resize_cooldown_timer + // is also set. + void CheckPoolChannelHealth(std::scoped_lock const& lk) { + int average_rpcs_per_channel = + channels_.empty() + ? 0 + : std::accumulate(channels_.begin(), channels_.end(), 0, + [](int a, auto const& b) { + auto rpcs_b = b->instant_outstanding_rpcs(); + return a + (rpcs_b.ok() ? *rpcs_b : 0); + }) / + static_cast(channels_.size() + num_pending_channels_); + if (channels_.size() < sizing_policy_.minimum_channel_pool_size || + (average_rpcs_per_channel > + sizing_policy_.maximum_average_outstanding_rpcs_per_channel && + channels_.size() < sizing_policy_.maximum_channel_pool_size)) { + // Channel/stub creation is expensive, instead of making the current RPC + // wait on this, use an existing channel right now, and schedule a channel + // to be added. + ScheduleAddChannels(lk); + return; + } + + if ((!pool_size_decrease_cooldown_timer_.valid() || + pool_size_decrease_cooldown_timer_.is_ready()) && + average_rpcs_per_channel < + sizing_policy_.minimum_average_outstanding_rpcs_per_channel && + channels_.size() > sizing_policy_.minimum_channel_pool_size) { + if (pool_size_decrease_cooldown_timer_.is_ready()) { + pool_size_decrease_cooldown_timer_.get(); + } + auto random_channel = std::uniform_int_distribution( + 0, channels_.size() - 1)(rng_); + std::swap(channels_[random_channel], channels_.back()); + draining_channels_.push_back(std::move(channels_.back())); + channels_.pop_back(); + ScheduleRemoveChannels(lk); + SetSizeDecreaseCooldownTimer(lk); + } } mutable std::mutex mu_; diff --git a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc index b15bc119ce3df..73f04f583c8a0 100644 --- a/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc +++ b/google/cloud/bigtable/internal/dynamic_channel_pool_test.cc @@ -32,6 +32,15 @@ class DynamicChannelPoolTestWrapper { std::shared_ptr> pool) : pool_(std::move(pool)) {} + using ChannelSelectionData = + DynamicChannelPool::ChannelSelectionData; + + std::shared_ptr> HandleBadChannels( + std::scoped_lock const& lk, + DynamicChannelPool::ChannelSelectionData& d) { + return pool_->HandleBadChannels(lk, d); + } + void ScheduleAddChannels( std::scoped_lock const& lk, std::function const&)> const& test_fn) { @@ -48,10 +57,22 @@ class DynamicChannelPoolTestWrapper { void RemoveChannels() { pool_->RemoveChannels(); } + void EvictBadChannels( + std::scoped_lock const& lk, + std::vector>>::iterator>& + bad_channel_iters) { + pool_->EvictBadChannels(lk, bad_channel_iters); + } + void SetSizeDecreaseCooldownTimer(std::scoped_lock const& lk) { pool_->SetSizeDecreaseCooldownTimer(lk); } + void CheckPoolChannelHealth(std::scoped_lock const& lk) { + pool_->CheckPoolChannelHealth(lk); + } + std::scoped_lock CreateLock() { return std::scoped_lock(pool_->mu_); } @@ -108,6 +129,192 @@ class DynamicChannelPoolTest : public ::testing::Test { std::thread thread_; }; +TEST_F(DynamicChannelPoolTest, SelectLeastUsedFromTwoChannels) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + + // There should be no attempt to grow the pool. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + auto stub_factory_fn = [&](int, std::string const&, StubManager::Priming) { + return std::make_shared>( + std::make_shared(), 20); + }; + + std::vector>> channels; + auto mock_stub_0 = std::make_shared(); + EXPECT_CALL(*mock_stub_0, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + auto mock_stub_1 = std::make_shared(); + EXPECT_CALL(*mock_stub_1, CheckAndMutateRow).Times(0); + int initial_rpc_count = 0; + channels.push_back(std::make_shared>( + std::move(mock_stub_0), initial_rpc_count++)); + channels.push_back(std::make_shared>( + std::move(mock_stub_1), initial_rpc_count)); + + // Pool creation should set the pool size decrease cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + sizing_policy.maximum_channel_pool_size = 2; + sizing_policy.minimum_channel_pool_size = 2; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn, sizing_policy); + auto selected_stub = pool->GetChannelRandomTwoLeastUsed(); + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, OneInitialChannel) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + + // There should be no attempt to grow the pool after creation. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + { // Pool created with 1 channel. + auto mock_stub = std::make_shared(); + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + EXPECT_CALL(stub_factory_fn, Call).Times(0); + // .WillOnce(::testing::Return( + // std::make_shared>(mock_stub))); + + std::vector>> channels; + auto mock_stub_0 = std::make_shared(); + EXPECT_CALL(*mock_stub_0, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + + int initial_rpc_count = 0; + channels.push_back(std::make_shared>( + std::move(mock_stub_0), initial_rpc_count)); + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer( + sizing_policy.pool_size_decrease_cooldown_interval); + }); + + sizing_policy.maximum_channel_pool_size = 1; + sizing_policy.minimum_channel_pool_size = 1; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + EXPECT_THAT(pool->size(), Eq(1)); + + auto selected_stub = pool->GetChannelRandomTwoLeastUsed(); + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + } + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, EmptyInitialPool) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + DynamicChannelPoolSizingPolicy sizing_policy; + + // There should be no attempt to grow the pool after creation. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + + { // Pool created with 0 channels. + auto mock_stub = std::make_shared(); + EXPECT_CALL(*mock_stub, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + EXPECT_CALL(stub_factory_fn, Call) + .Times(1) + .WillOnce(::testing::Return( + std::make_shared>(mock_stub))); + + std::vector>> channels; + + // Pool creation should set the pool size increase cooldown timer. + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }); + + sizing_policy.maximum_channel_pool_size = 1; + sizing_policy.minimum_channel_pool_size = 0; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + + EXPECT_THAT(*pool, ::testing::IsEmpty()); + + auto selected_stub = pool->GetChannelRandomTwoLeastUsed(); + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + + EXPECT_THAT(pool->size(), Eq(1)); + } + fake_cq_impl_->SimulateCompletion(false); +} + TEST_F(DynamicChannelPoolTest, ScheduleAddChannelsPoolUndersized) { auto instance_name = bigtable::InstanceResource(Project("my-project"), "my-instance") @@ -549,6 +756,489 @@ TEST_F(DynamicChannelPoolTest, RemoveChannelsSomeChannelsDrained) { fake_cq_impl_->SimulateCompletion(false); } +TEST_F(DynamicChannelPoolTest, HandleBadChannelsTwoChannelsOneBad) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // HandleBadChannels will call ScheduleRemoveChannels. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel"))); + + auto mock_stub = std::make_shared(); + EXPECT_CALL(*mock_stub, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + + channels.push_back( + std::make_shared>(mock_stub, 1)); + + DynamicChannelPoolTestWrapper::ChannelSelectionData data; + for (auto iter = channels.begin(); iter != channels.end(); ++iter) { + data.iterators.push_back(iter); + } + data.shuffle_iter = data.iterators.begin(); + data.channel_1_iter = *data.shuffle_iter; + data.channel_1_rpcs = (*data.channel_1_iter)->instant_outstanding_rpcs(); + ++data.shuffle_iter; + data.channel_2_iter = *data.shuffle_iter; + data.channel_2_rpcs = (*data.channel_2_iter)->instant_outstanding_rpcs(); + + sizing_policy.minimum_channel_pool_size = 2; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + auto draining_channels = wrapper.SetDrainingChannels({}); + + std::shared_ptr> selected_stub; + { + auto lock = wrapper.CreateLock(); + selected_stub = wrapper.HandleBadChannels(lock, data); + } + EXPECT_THAT(draining_channels, IsEmpty()); + + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + EXPECT_THAT(pool->size(), Eq(1)); + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, HandleBadChannelsTwoChannelsOtherOneBad) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // HandleBadChannels will call ScheduleRemoveChannels. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + auto mock_stub = std::make_shared(); + EXPECT_CALL(*mock_stub, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + channels.push_back( + std::make_shared>(mock_stub, 1)); + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel"))); + + DynamicChannelPoolTestWrapper::ChannelSelectionData data; + for (auto iter = channels.begin(); iter != channels.end(); ++iter) { + data.iterators.push_back(iter); + } + data.shuffle_iter = data.iterators.begin(); + data.channel_1_iter = *data.shuffle_iter; + data.channel_1_rpcs = (*data.channel_1_iter)->instant_outstanding_rpcs(); + ++data.shuffle_iter; + data.channel_2_iter = *data.shuffle_iter; + data.channel_2_rpcs = (*data.channel_2_iter)->instant_outstanding_rpcs(); + + sizing_policy.minimum_channel_pool_size = 2; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + auto draining_channels = wrapper.SetDrainingChannels({}); + + std::shared_ptr> selected_stub; + { + auto lock = wrapper.CreateLock(); + selected_stub = wrapper.HandleBadChannels(lock, data); + } + EXPECT_THAT(draining_channels, IsEmpty()); + + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + EXPECT_THAT(pool->size(), Eq(1)); + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, HandleBadChannelsThreeChannelsOneBad) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // HandleBadChannels will call ScheduleRemoveChannels. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + auto mock_stub_0 = std::make_shared(); + EXPECT_CALL(*mock_stub_0, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + channels.push_back( + std::make_shared>(mock_stub_0, 1)); + + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel"))); + + auto mock_stub_1 = std::make_shared(); + EXPECT_CALL(*mock_stub_1, CheckAndMutateRow).Times(0); + channels.push_back( + std::make_shared>(mock_stub_1, 2)); + + DynamicChannelPoolTestWrapper::ChannelSelectionData data; + for (auto iter = channels.begin(); iter != channels.end(); ++iter) { + data.iterators.push_back(iter); + } + data.shuffle_iter = data.iterators.begin(); + data.channel_1_iter = *data.shuffle_iter; + data.channel_1_rpcs = (*data.channel_1_iter)->instant_outstanding_rpcs(); + ++data.shuffle_iter; + data.channel_2_iter = *data.shuffle_iter; + data.channel_2_rpcs = (*data.channel_2_iter)->instant_outstanding_rpcs(); + + sizing_policy.minimum_channel_pool_size = 2; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + auto draining_channels = wrapper.SetDrainingChannels({}); + + std::shared_ptr> selected_stub; + { + auto lock = wrapper.CreateLock(); + selected_stub = wrapper.HandleBadChannels(lock, data); + } + EXPECT_THAT(draining_channels, IsEmpty()); + + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + EXPECT_THAT(pool->size(), Eq(2)); + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, HandleBadChannelsAllChannelsBad) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // HandleBadChannels will call ScheduleRemoveChannels. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + auto mock_stub = std::make_shared(); + EXPECT_CALL(*mock_stub, CheckAndMutateRow) + .WillOnce([](grpc::ClientContext&, Options const&, + google::bigtable::v2::CheckAndMutateRowRequest const&) { + google::bigtable::v2::CheckAndMutateRowResponse response; + response.set_predicate_matched(true); + return response; + }); + + EXPECT_CALL(stub_factory_fn, Call) + .WillOnce( + [&](std::uint32_t id, std::string const&, StubManager::Priming p) { + EXPECT_THAT(id, Eq(4)); + EXPECT_THAT(p, Eq(StubManager::Priming::kNoPriming)); + return std::make_shared>(mock_stub); + }); + + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel 0"))); + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel 1"))); + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel 2"))); + channels.push_back(std::make_shared>( + std::make_shared(), 0, + internal::InternalError("bad channel 3"))); + + DynamicChannelPoolTestWrapper::ChannelSelectionData data; + for (auto iter = channels.begin(); iter != channels.end(); ++iter) { + data.iterators.push_back(iter); + } + data.shuffle_iter = data.iterators.begin(); + data.channel_1_iter = *data.shuffle_iter; + data.channel_1_rpcs = (*data.channel_1_iter)->instant_outstanding_rpcs(); + ++data.shuffle_iter; + data.channel_2_iter = *data.shuffle_iter; + data.channel_2_rpcs = (*data.channel_2_iter)->instant_outstanding_rpcs(); + + sizing_policy.minimum_channel_pool_size = 2; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + auto draining_channels = wrapper.SetDrainingChannels({}); + + std::shared_ptr> selected_stub; + { + auto lock = wrapper.CreateLock(); + selected_stub = wrapper.HandleBadChannels(lock, data); + } + EXPECT_THAT(draining_channels, IsEmpty()); + + grpc::ClientContext context; + auto response = + selected_stub->AcquireStub()->CheckAndMutateRow(context, {}, {}); + ASSERT_STATUS_OK(response); + EXPECT_TRUE(response->predicate_matched()); + EXPECT_THAT(pool->size(), Eq(1)); + + fake_cq_impl_->SimulateCompletion(false); +} + +TEST_F(DynamicChannelPoolTest, CheckChannelPoolHealthNeedsIncrease) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + channels.push_back(std::make_shared>( + std::make_shared(), 30)); + + { + sizing_policy.minimum_channel_pool_size = 1; + sizing_policy.maximum_channel_pool_size = 1; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + // ScheduleAddChannels will NOT be called as the pool has max channels. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + auto lock = wrapper.CreateLock(); + wrapper.CheckPoolChannelHealth(lock); + EXPECT_THAT(wrapper.num_pending_channels(), Eq(0)); + } + { + sizing_policy.minimum_channel_pool_size = 2; + sizing_policy.maximum_channel_pool_size = 10; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + // ScheduleAddChannels will be called. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(1); + auto lock = wrapper.CreateLock(); + wrapper.CheckPoolChannelHealth(lock); + EXPECT_THAT(wrapper.num_pending_channels(), Eq(1)); + } +} + +TEST_F(DynamicChannelPoolTest, CheckChannelPoolHealthNeedsDecrease) { + auto instance_name = + bigtable::InstanceResource(Project("my-project"), "my-instance") + .FullName(); + auto refresh_state = std::make_shared( + fake_cq_impl_, std::chrono::milliseconds(1), + std::chrono::milliseconds(10)); + std::vector>> channels; + DynamicChannelPoolSizingPolicy sizing_policy; + + MockFunction>>( + std::uint32_t, std::string const&, StubManager::Priming)> + stub_factory_fn; + + EXPECT_CALL(*mock_cq_impl_, MakeRelativeTimer) + // Pool creation should set the pool size increase cooldown timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return make_ready_future(StatusOr(std::chrono::system_clock::now())); + }) + // ScheduleRemoveChannels should start polling. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.remove_channel_polling_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }) + // ScheduleRemoveChannels should set the pool size increase cooldown + // timer. + .WillOnce([&](std::chrono::nanoseconds ns) { + EXPECT_THAT(ns.count(), + Eq(std::chrono::nanoseconds( + sizing_policy.pool_size_decrease_cooldown_interval) + .count())); + return cq_.MakeRelativeTimer(std::chrono::seconds(600)); + }); + + channels.push_back(std::make_shared>( + std::make_shared(), 2)); + channels.push_back(std::make_shared>( + std::make_shared(), 2)); + channels.push_back(std::make_shared>( + std::make_shared(), 2)); + channels.push_back(std::make_shared>( + std::make_shared(), 2)); + + sizing_policy.minimum_channel_pool_size = 2; + sizing_policy.maximum_channel_pool_size = 10; + sizing_policy.minimum_average_outstanding_rpcs_per_channel = 5; + auto pool = DynamicChannelPool::Create( + instance_name, CompletionQueue(mock_cq_impl_), channels, refresh_state, + stub_factory_fn.AsStdFunction(), sizing_policy); + DynamicChannelPoolTestWrapper wrapper(pool); + + // ScheduleAddChannels will NOT be called. + EXPECT_CALL(*mock_cq_impl_, RunAsync).Times(0); + + { + auto lock = wrapper.CreateLock(); + wrapper.CheckPoolChannelHealth(lock); + } + + EXPECT_THAT(wrapper.num_pending_channels(), Eq(0)); + EXPECT_THAT(pool->size(), Eq(3)); +} + } // namespace GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END } // namespace bigtable_internal