Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 5 additions & 72 deletions google/cloud/bigtable/internal/channel_usage.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
#include "google/cloud/internal/clock.h"
#include "google/cloud/status_or.h"
#include "google/cloud/version.h"
#include <chrono>
#include <deque>
#include <memory>
#include <mutex>
#include <utility>
Expand All @@ -35,67 +33,15 @@ GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
template <typename T>
class ChannelUsage : public std::enable_shared_from_this<ChannelUsage<T>> {
public:
using Clock = ::google::cloud::internal::SteadyClock;
ChannelUsage() = default;
explicit ChannelUsage(std::shared_ptr<T> stub, std::shared_ptr<Clock> clock =
std::make_shared<Clock>())
: stub_(std::move(stub)), clock_(std::move(clock)) {}
explicit ChannelUsage(std::shared_ptr<T> stub) : stub_(std::move(stub)) {}

// This constructor is only used in testing.
ChannelUsage(std::shared_ptr<T> stub, std::shared_ptr<Clock> clock,
int initial_outstanding_rpcs)
ChannelUsage(std::shared_ptr<T> stub, int initial_outstanding_rpcs,
Status last_refresh_status = {})
: stub_(std::move(stub)),
clock_(std::move(clock)),
outstanding_rpcs_(initial_outstanding_rpcs) {}

// Computes the weighted average of outstanding RPCs on the channel over the
// past 60 seconds.
StatusOr<int> average_outstanding_rpcs() {
auto constexpr kWindowSeconds = 60;
auto constexpr kWindowDuration = std::chrono::seconds(kWindowSeconds);
std::scoped_lock lk(mu_);
if (!last_refresh_status_.ok()) return last_refresh_status_;
// If there are no measurements then the stub has never been used. In real
// use this will be 0. In testing we sometimes set an initial value.
if (measurements_.empty()) return outstanding_rpcs_;
auto now = clock_->Now();
auto last_time = now;
auto window_start = now - kWindowDuration;

double sum = 0.0;
double total_weight = 0.0;
auto iter = measurements_.rbegin();
while (iter != measurements_.rend() && iter->timestamp >= window_start) {
double weight =
std::chrono::duration<double>(last_time - iter->timestamp).count();
last_time = iter->timestamp;
sum += iter->outstanding_rpcs * weight;
total_weight += weight;
++iter;
}

// It's unlikely we will have a measurement at precisely the beginning of
// the window. So, we need to use the first measurement outside the window
// to compute a measurement for the missing part of the window using a
// weight equal to the missing time.
if (iter != measurements_.rend()) {
double weight = std::max(0.0, kWindowSeconds - total_weight);
sum += iter->outstanding_rpcs * weight;
total_weight += weight;
// We want to keep one measurement that's at least 60s old to provide a
// starting value for the next window.
++iter;
}

if (measurements_.size() > 1) {
measurements_.erase(measurements_.begin(), iter.base());
}
// After iterating through the measurements if the total_weight is zero,
// then all of the measurements occurred at time == now, and returning the
// current number of outstanding RPCs is most correct.
return total_weight == 0.0 ? outstanding_rpcs_
: static_cast<int>(sum / total_weight);
}
outstanding_rpcs_(initial_outstanding_rpcs),
last_refresh_status_(std::move(last_refresh_status)) {}

StatusOr<int> instant_outstanding_rpcs() {
std::scoped_lock lk(mu_);
Expand Down Expand Up @@ -123,32 +69,19 @@ class ChannelUsage : public std::enable_shared_from_this<ChannelUsage<T>> {
std::shared_ptr<T> AcquireStub() {
std::scoped_lock lk(mu_);
++outstanding_rpcs_;
auto time = clock_->Now();
measurements_.emplace_back(outstanding_rpcs_, time);
return stub_;
}

void ReleaseStub() {
std::scoped_lock lk(mu_);
--outstanding_rpcs_;
measurements_.emplace_back(outstanding_rpcs_, clock_->Now());
}

private:
mutable std::mutex mu_;
std::shared_ptr<T> stub_;
std::shared_ptr<Clock> clock_ = std::make_shared<Clock>();
int outstanding_rpcs_ = 0;
Status last_refresh_status_;
struct Measurement {
Measurement(int outstanding_rpcs, std::chrono::steady_clock::time_point p)
: outstanding_rpcs(outstanding_rpcs), timestamp(p) {}
int outstanding_rpcs;
std::chrono::steady_clock::time_point timestamp;
};
// Older measurements are removed as part of the average_outstanding_rpcs
// method.
std::deque<Measurement> measurements_;
};

GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Expand Down
50 changes: 0 additions & 50 deletions google/cloud/bigtable/internal/channel_usage_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,56 +69,6 @@ TEST(ChannelUsageTest, SetLastRefreshStatus) {
channel->set_last_refresh_status(expected_status);
EXPECT_THAT(channel->instant_outstanding_rpcs(),
StatusIs(expected_status.code()));
EXPECT_THAT(channel->average_outstanding_rpcs(),
StatusIs(expected_status.code()));
}

TEST(ChannelUsageTest, AverageOutstandingRpcs) {
auto clock = std::make_shared<testing_util::FakeSteadyClock>();
auto mock = std::make_shared<MockBigtableStub>();
auto channel = std::make_shared<ChannelUsage<BigtableStub>>(mock, clock);
EXPECT_THAT(channel->instant_outstanding_rpcs(), IsOkAndHolds(0));

auto start = std::chrono::steady_clock::now();
clock->SetTime(start);

for (int i = 0; i < 10; ++i) (void)channel->AcquireStub();
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10));

clock->AdvanceTime(std::chrono::seconds(1));
// sum=10 total_weight=1
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10));

for (int i = 0; i < 10; ++i) (void)channel->AcquireStub();
clock->AdvanceTime(std::chrono::seconds(1));
// sum=30, total_weight=2
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(15));

for (int i = 0; i < 20; ++i) channel->ReleaseStub();
clock->AdvanceTime(std::chrono::seconds(1));
// sum=30, total_weight=3
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(10));

clock->AdvanceTime(std::chrono::seconds(2));
// sum=30, total_weight=5
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(6));

for (int i = 0; i < 100; ++i) (void)channel->AcquireStub();
clock->AdvanceTime(std::chrono::seconds(25));
// sum=2530, total_weight=84
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(84));

clock->AdvanceTime(std::chrono::seconds(35));
// First 5s of measurements have aged out.
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100));

clock->AdvanceTime(std::chrono::seconds(60));
// All measurements have aged out.
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100));

clock->AdvanceTime(std::chrono::seconds(3600));
// All measurements have aged out.
EXPECT_THAT(channel->average_outstanding_rpcs(), IsOkAndHolds(100));
}

TEST(ChannelUsageTest, MakeWeak) {
Expand Down
13 changes: 3 additions & 10 deletions google/cloud/bigtable/internal/dynamic_channel_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,6 @@ class DynamicChannelPool
// performing this work. We might as well cancel those timer futures now.
refresh_state_->timers().CancelAll();
if (remove_channel_poll_timer_.valid()) remove_channel_poll_timer_.cancel();
if (pool_size_increase_cooldown_timer_.valid()) {
pool_size_increase_cooldown_timer_.cancel();
}
if (pool_size_decrease_cooldown_timer_.valid()) {
pool_size_decrease_cooldown_timer_.cancel();
}
Expand Down Expand Up @@ -216,6 +213,7 @@ class DynamicChannelPool
absl::visit(ChannelAddVisitor(channels_.size()),
sizing_policy_.channels_to_add_per_resize));
}
num_pending_channels_ += num_channels_to_add;
std::vector<int> new_channel_ids;
new_channel_ids.reserve(num_channels_to_add);
for (std::size_t i = 0; i < num_channels_to_add; ++i) {
Expand Down Expand Up @@ -244,6 +242,7 @@ class DynamicChannelPool
if (new_stub.ok()) new_stubs.push_back(*std::move(new_stub));
}
std::scoped_lock lk(mu_);
num_pending_channels_ -= new_channel_ids.size();
channels_.insert(channels_.end(),
std::make_move_iterator(new_stubs.begin()),
std::make_move_iterator(new_stubs.end()));
Expand Down Expand Up @@ -302,11 +301,6 @@ class DynamicChannelPool
// Not yet implemented.
}

void SetSizeIncreaseCooldownTimer(std::scoped_lock<std::mutex> const&) {
pool_size_increase_cooldown_timer_ = cq_.MakeRelativeTimer(
sizing_policy_.pool_size_increase_cooldown_interval);
}

void SetSizeDecreaseCooldownTimer(std::scoped_lock<std::mutex> const&) {
pool_size_decrease_cooldown_timer_ = cq_.MakeRelativeTimer(
sizing_policy_.pool_size_decrease_cooldown_interval);
Expand All @@ -328,11 +322,10 @@ class DynamicChannelPool
std::shared_ptr<ConnectionRefreshState> refresh_state_;
StubFactoryFn stub_factory_fn_;
std::vector<std::shared_ptr<ChannelUsage<T>>> channels_;
std::size_t num_pending_channels_ = 0;
DynamicChannelPoolSizingPolicy sizing_policy_;
std::vector<std::shared_ptr<ChannelUsage<T>>> draining_channels_;
future<void> remove_channel_poll_timer_;
future<StatusOr<std::chrono::system_clock::time_point>>
pool_size_increase_cooldown_timer_;
future<StatusOr<std::chrono::system_clock::time_point>>
pool_size_decrease_cooldown_timer_;
std::uint32_t next_channel_id_;
Expand Down
Loading
Loading