Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 171 additions & 13 deletions google/cloud/bigtable/internal/dynamic_channel_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "google/cloud/bigtable/internal/stub_manager.h"
#include "google/cloud/bigtable/options.h"
#include "google/cloud/completion_queue.h"
#include "google/cloud/internal/clock.h"
#include "google/cloud/internal/random.h"
#include "google/cloud/version.h"
#include <cmath>
Expand Down Expand Up @@ -96,6 +95,8 @@ template <typename T>
class DynamicChannelPool
: public std::enable_shared_from_this<DynamicChannelPool<T>> {
public:
// This function should only return an error status if priming is attempted
// and it is unsuccessful.
using StubFactoryFn =
std::function<StatusOr<std::shared_ptr<ChannelUsage<T>>>(
std::uint32_t id, std::string const& instance_name,
Expand Down Expand Up @@ -140,8 +141,7 @@ class DynamicChannelPool
return channels_.empty();
}

// If the pool is not under a pool resize cooldown, call
// CheckPoolChannelHealth.
// Calls CheckPoolChannelHealth before picking a channel.
//
// Pick two random channels from channels_ and return the channel with the
// lower number of outstanding_rpcs. This is the "quick" path.
Expand All @@ -156,8 +156,46 @@ class DynamicChannelPool
// If there are no healthy channels in channels_, create a new channel and
// use that one. Also call ScheduleAddChannels to replenish channels_.
std::shared_ptr<ChannelUsage<T>> GetChannelRandomTwoLeastUsed() {
// Not yet implemented.
return {};
std::scoped_lock lk(mu_);
CheckPoolChannelHealth(lk);

ChannelSelectionData d;
d.iterators.reserve(channels_.size());
for (auto iter = channels_.begin(); iter != channels_.end(); ++iter) {
d.iterators.push_back(iter);
}
std::shuffle(d.iterators.begin(), d.iterators.end(), rng_);
d.shuffle_iter = d.iterators.begin();

if (d.shuffle_iter != d.iterators.end()) {
d.channel_1_iter = *d.shuffle_iter;
d.channel_1_rpcs = (*d.channel_1_iter)->instant_outstanding_rpcs();
++d.shuffle_iter;
}

if (d.shuffle_iter != d.iterators.end()) {
d.channel_2_iter = *d.shuffle_iter;
d.channel_2_rpcs = (*d.channel_2_iter)->instant_outstanding_rpcs();
}

// This is the most common case so we try it first.
if (d.channel_1_rpcs.ok() && d.channel_2_rpcs.ok()) {
return *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter
: *d.channel_2_iter;
}
if (d.iterators.size() == 1 && d.channel_1_rpcs.ok()) {
// Pool contains exactly 1 good channel.
return *d.channel_1_iter;
}
if (d.iterators.empty()) {
// Pool is empty, create a channel immediately and return it. While the
// return value is a StatusOr<T>, it will only ever contain an error if
// priming is attempted.
channels_.push_back(*stub_factory_fn_(next_channel_id_++, instance_name_,
StubManager::Priming::kNoPriming));
return channels_.front();
}
return HandleBadChannels(lk, d);
}

private:
Expand All @@ -180,6 +218,71 @@ class DynamicChannelPool
SetSizeDecreaseCooldownTimer(lk);
}

struct ChannelSelectionData {
using ChannelSelect =
typename std::vector<std::shared_ptr<ChannelUsage<T>>>::iterator;
std::vector<ChannelSelect> iterators;
ChannelSelect channel_1_iter;
ChannelSelect channel_2_iter;
StatusOr<int> channel_1_rpcs = Status{StatusCode::kNotFound, ""};
StatusOr<int> channel_2_rpcs = Status{StatusCode::kNotFound, ""};
typename std::vector<ChannelSelect>::iterator shuffle_iter;

static void FindGoodChannel(
std::vector<ChannelSelect>& iterators, ChannelSelect& iter,
StatusOr<int>& rpcs,
typename std::vector<ChannelSelect>::iterator& shuffle_iter,
std::vector<ChannelSelect>& bad_channel_iters) {
if (!rpcs.ok()) {
bad_channel_iters.push_back(iter);
while (shuffle_iter != iterators.end() && !rpcs.ok()) {
iter = *shuffle_iter;
rpcs = (*iter)->instant_outstanding_rpcs();
if (!rpcs.ok()) bad_channel_iters.push_back(iter);
++shuffle_iter;
}
}
}
};

// We have one or more bad channels. Spending time finding a good channel
// will be cheaper than trying to use a bad channel in the long run.
std::shared_ptr<ChannelUsage<T>> HandleBadChannels(
std::scoped_lock<std::mutex> const& lk, ChannelSelectionData& d) {
std::vector<typename ChannelSelectionData::ChannelSelect> bad_channel_iters;
if (d.shuffle_iter != d.iterators.end()) ++d.shuffle_iter;
ChannelSelectionData::FindGoodChannel(d.iterators, d.channel_1_iter,
d.channel_1_rpcs, d.shuffle_iter,
bad_channel_iters);
ChannelSelectionData::FindGoodChannel(d.iterators, d.channel_2_iter,
d.channel_2_rpcs, d.shuffle_iter,
bad_channel_iters);

std::shared_ptr<ChannelUsage<T>> channel;
if (d.channel_1_rpcs.ok() && d.channel_2_rpcs.ok()) {
channel = *d.channel_1_rpcs < *d.channel_2_rpcs ? *d.channel_1_iter
: *d.channel_2_iter;
} else if (d.channel_1_rpcs.ok()) {
channel = *d.channel_1_iter;
} else if (d.channel_2_rpcs.ok()) {
channel = *d.channel_2_iter;
} else {
// We have no usable channels in the entire pool; this is bad.
// Create a channel immediately to unblock application. While the
// return value is a StatusOr<T>, it will only ever contain an error if
// priming is attempted.
channels_.push_back(*stub_factory_fn_(next_channel_id_++, instance_name_,
StubManager::Priming::kNoPriming));
std::swap(channels_.front(), channels_.back());
channel = channels_.front();
}
// Wait until we no longer need valid iterators to call EvictBadChannels.
EvictBadChannels(lk, bad_channel_iters);
ScheduleRemoveChannels(lk);
ScheduleAddChannels(lk);
return channel;
}

struct ChannelAddVisitor {
std::size_t pool_size;
explicit ChannelAddVisitor(std::size_t pool_size) : pool_size(pool_size) {}
Expand Down Expand Up @@ -297,22 +400,77 @@ class DynamicChannelPool
void EvictBadChannels(
std::scoped_lock<std::mutex> const&,
std::vector<
typename std::vector<std::shared_ptr<ChannelUsage<T>>>::iterator>&) {
// Not yet implemented.
typename std::vector<std::shared_ptr<ChannelUsage<T>>>::iterator>&
bad_channel_iters) {
auto back_iter = channels_.rbegin();
for (auto& bad_channel_iter : bad_channel_iters) {
bool swapped = false;
while (!swapped && back_iter != channels_.rend()) {
auto b = (*back_iter)->instant_outstanding_rpcs();
if (b.ok()) {
std::swap(*back_iter, *bad_channel_iter);
draining_channels_.push_back(std::move(*back_iter));
swapped = true;
}
++back_iter;
}
}
for (std::size_t i = 0; i < bad_channel_iters.size(); ++i) {
channels_.pop_back();
}
}

void SetSizeDecreaseCooldownTimer(std::scoped_lock<std::mutex> const&) {
pool_size_decrease_cooldown_timer_ = cq_.MakeRelativeTimer(
sizing_policy_.pool_size_decrease_cooldown_interval);
}

// Computes the average_rpcs_pre_channel across all channels in the pool,
// excluding any channels that are awaiting removal in draining_channels_.
// Computes the average RPCs per channel across all channels in the pool,
// by summing the outstanding_rpc from each channel and dividing by the
// number of active channels plus the num_pending_channels_.
// Any channels that are awaiting removal in draining_channels_ are excluded
// from this calculation.
// The computed average is compared to the thresholds in the sizing policy
// and calls either ScheduleRemoveChannels or ScheduleAddChannels as
// appropriate. If either is called the resize_cooldown_timer is also set.
void CheckPoolChannelHealth(std::scoped_lock<std::mutex> const&) {
// Not yet implemented
// and calls either ScheduleRemoveChannel or ScheduleAddChannel as
// appropriate. If ScheduleRemoveChannel is called the resize_cooldown_timer
// is also set.
void CheckPoolChannelHealth(std::scoped_lock<std::mutex> const& lk) {
int average_rpcs_per_channel =
channels_.empty()
? 0
: std::accumulate(channels_.begin(), channels_.end(), 0,
[](int a, auto const& b) {
auto rpcs_b = b->instant_outstanding_rpcs();
return a + (rpcs_b.ok() ? *rpcs_b : 0);
}) /
static_cast<int>(channels_.size() + num_pending_channels_);
if (channels_.size() < sizing_policy_.minimum_channel_pool_size ||
(average_rpcs_per_channel >
sizing_policy_.maximum_average_outstanding_rpcs_per_channel &&
channels_.size() < sizing_policy_.maximum_channel_pool_size)) {
// Channel/stub creation is expensive, instead of making the current RPC
// wait on this, use an existing channel right now, and schedule a channel
// to be added.
ScheduleAddChannels(lk);
return;
}

if ((!pool_size_decrease_cooldown_timer_.valid() ||
pool_size_decrease_cooldown_timer_.is_ready()) &&
average_rpcs_per_channel <
sizing_policy_.minimum_average_outstanding_rpcs_per_channel &&
channels_.size() > sizing_policy_.minimum_channel_pool_size) {
if (pool_size_decrease_cooldown_timer_.is_ready()) {
pool_size_decrease_cooldown_timer_.get();
}
auto random_channel = std::uniform_int_distribution<std::size_t>(
0, channels_.size() - 1)(rng_);
std::swap(channels_[random_channel], channels_.back());
draining_channels_.push_back(std::move(channels_.back()));
channels_.pop_back();
ScheduleRemoveChannels(lk);
SetSizeDecreaseCooldownTimer(lk);
}
}

mutable std::mutex mu_;
Expand Down
Loading
Loading