Google Cloud Pub/Sub C++ Client  1.31.1
A C++ Client Library for Google Cloud Pub/Sub
endurance.cc
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/publisher.h"
16 #include "google/cloud/pubsub/subscriber.h"
17 #include "google/cloud/pubsub/subscription_admin_client.h"
18 #include "google/cloud/pubsub/testing/random_names.h"
19 #include "google/cloud/pubsub/topic_admin_client.h"
20 #include "google/cloud/internal/format_time_point.h"
21 #include "google/cloud/internal/getenv.h"
22 #include "google/cloud/internal/random.h"
23 #include "google/cloud/testing_util/command_line_parsing.h"
24 #include "absl/memory/memory.h"
25 #include <chrono>
26 #include <iostream>
27 #include <limits>
28 #include <numeric>
29 #include <sstream>
30 #include <string>
31 #include <vector>
32 
33 namespace {
34 namespace pubsub = ::google::cloud::pubsub;
35 using ::google::cloud::future;
36 using ::google::cloud::Status;
37 using ::google::cloud::StatusOr;
38 
39 auto constexpr kDescription = R"""(
40 An endurance test for the Cloud Pub/Sub C++ client library.
41 
42 This experiment is largely a torture test for the library. The objective is to
43 detect bugs that escape unit and integration tests. Such tests are typically
44 short-lived and predictable, so we write a test that is long-lived and
45 unpredictable to find problems that would go otherwise unnoticed.
46 
47 The test creates a number of threads publishing messages and a number of
48 subscription sessions. Periodically these publishers and subscriptions are
49 replaced with new ones.
50 
51 For flow control purposes, the benchmark keeps a limited number of messages in
52 flight.
53 )""";
54 
55 struct Config {
56  std::string project_id;
57  std::string topic_id;
58 
59  std::int64_t pending_lwm = 10 * 1000;
60  std::int64_t pending_hwm = 100 * 1000;
61 
62  int publisher_count = 4;
63  int subscription_count = 4;
64  int session_count = 8;
65 
66  std::int64_t minimum_samples = 30 * 1000;
67  std::int64_t maximum_samples = (std::numeric_limits<std::int64_t>::max)();
68  std::chrono::seconds minimum_runtime = std::chrono::seconds(5);
69  std::chrono::seconds maximum_runtime = std::chrono::seconds(300);
70 
71  bool show_help = false;
72 };
73 
74 StatusOr<Config> ParseArgs(std::vector<std::string> args);
75 
76 class ExperimentFlowControl {
77  public:
78  ExperimentFlowControl(int subscription_count, std::int64_t lwm,
79  std::int64_t hwm)
80  : subscription_count_(subscription_count), lwm_(lwm), hwm_(hwm) {}
81 
82  pubsub::Message GenerateMessage(int task);
83  void Published(bool success);
84  void Received(pubsub::Message const&);
85  void Shutdown();
86 
87  std::vector<std::chrono::microseconds> ClearSamples() {
88  std::lock_guard<std::mutex> lk(mu_);
89  std::vector<std::chrono::microseconds> tmp;
90  tmp.swap(samples_);
91  return tmp;
92  }
93 
94  void Debug(std::ostream& os) const {
95  std::lock_guard<std::mutex> lk(mu_);
96  os << "subscription_count=" << subscription_count_ << ", lwm=" << lwm_
97  << ", hwm=" << hwm_ << ", pending=" << pending_
98  << ", sent=" << sent_count_ << ", received=" << received_count_
99  << ", expected=" << expected_count_ << ", failures=" << failures_
100  << ", overflow=" << overflow_ << ", shutdown=" << shutdown_
101  << ", samples.size()=" << samples_.size();
102  }
103 
104  std::int64_t SentCount() const {
105  std::lock_guard<std::mutex> lk(mu_);
106  return sent_count_;
107  }
108 
109  std::int64_t ExpectedCount() const {
110  std::lock_guard<std::mutex> lk(mu_);
111  return expected_count_;
112  }
113 
114  std::int64_t ReceivedCount() const {
115  std::lock_guard<std::mutex> lk(mu_);
116  return received_count_;
117  }
118 
119  std::int64_t WaitReceivedCount(std::int64_t count) {
120  std::unique_lock<std::mutex> lk(mu_);
121  cv_.wait(lk, [&] { return received_count_ >= count; });
122  return received_count_;
123  }
124 
125  private:
126  int const subscription_count_;
127  std::int64_t const lwm_;
128  std::int64_t const hwm_;
129 
130  mutable std::mutex mu_;
131  std::condition_variable cv_;
132  std::int64_t pending_ = 0;
133  std::int64_t sent_count_ = 0;
134  std::int64_t expected_count_ = 0;
135  std::int64_t received_count_ = 0;
136  std::int64_t failures_ = 0;
137  bool overflow_ = false;
138  bool shutdown_ = false;
139  std::vector<std::chrono::microseconds> samples_;
140 };
141 
142 bool ExperimentCompleted(Config const& config,
143  ExperimentFlowControl const& flow_control,
144  std::chrono::steady_clock::time_point start);
145 
146 void PublisherTask(Config const& config, ExperimentFlowControl& flow_control,
147  int task);
148 
149 class Cleanup {
150  public:
151  Cleanup() = default;
152  ~Cleanup() {
153  for (auto i = actions_.rbegin(); i != actions_.rend(); ++i) (*i)();
154  }
155  void Defer(std::function<void()> f) { actions_.push_back(std::move(f)); }
156 
157  private:
158  std::vector<std::function<void()>> actions_;
159 };
160 
161 } // namespace
162 
163 int main(int argc, char* argv[]) {
164  auto config = ParseArgs({argv, argv + argc});
165  if (!config) {
166  std::cerr << "Error parsing command-line arguments\n";
167  std::cerr << config.status() << "\n";
168  return 1;
169  }
170  if (config->show_help) return 0;
171 
172  pubsub::TopicAdminClient topic_admin(pubsub::MakeTopicAdminConnection());
173  pubsub::SubscriptionAdminClient subscription_admin(
175 
176  auto generator = google::cloud::internal::MakeDefaultPRNG();
177 
178  auto const configured_topic = config->topic_id;
179 
180  // If there is no pre-defined topic for this test, create one and
181  // automatically remove it at the end of the test.
182  std::function<void()> delete_topic = [] {};
183  if (config->topic_id.empty()) {
184  config->topic_id = google::cloud::pubsub_testing::RandomTopicId(generator);
185  auto topic = pubsub::Topic(config->project_id, config->topic_id);
186  auto create = topic_admin.CreateTopic(pubsub::TopicBuilder{topic});
187  if (!create) {
188  std::cout << "CreateTopic() failed: " << create.status() << "\n";
189  return 1;
190  }
191  delete_topic = [topic_admin, topic]() mutable {
192  (void)topic_admin.DeleteTopic(topic);
193  };
194  }
195 
196  std::cout << "# Running Cloud Pub/Sub experiment"
197  << "\n# Start time: "
198  << google::cloud::internal::FormatRfc3339(
199  std::chrono::system_clock::now())
200  << "\n# Configured topic: " << configured_topic
201  << "\n# Actual topic: " << config->topic_id
202  << "\n# Flow Control LWM: " << config->pending_lwm
203  << "\n# Flow Control HWM: " << config->pending_hwm
204  << "\n# Publisher Count: " << config->publisher_count
205  << "\n# Subscription Count: " << config->subscription_count
206  << "\n# Session Count: " << config->session_count
207  << "\n# Minimum Samples: " << config->minimum_samples
208  << "\n# Maximum Samples: " << config->maximum_samples
209  << "\n# Minimum Runtime: " << config->minimum_runtime.count() << "s"
210  << "\n# Maximum Runtime: " << config->maximum_runtime.count() << "s"
211  << std::endl;
212 
213  auto const topic = pubsub::Topic(config->project_id, config->topic_id);
214  auto const subscriptions = [&] {
215  std::vector<pubsub::Subscription> subscriptions;
216  for (int i = 0; i != config->subscription_count; ++i) {
217  auto sub = pubsub::Subscription(
218  config->project_id,
219  google::cloud::pubsub_testing::RandomSubscriptionId(generator));
220  auto create = subscription_admin.CreateSubscription(topic, sub);
221  if (!create) continue;
222  subscriptions.push_back(std::move(sub));
223  }
224  return subscriptions;
225  }();
226  if (subscriptions.empty()) {
227  std::cerr << "Could not create any subscriptions\n";
228  return 1;
229  }
230 
231  ExperimentFlowControl flow_control(config->subscription_count,
232  config->pending_lwm, config->pending_hwm);
233 
234  auto handler = [&](pubsub::Message const& m, pubsub::AckHandler h) {
235  std::move(h).ack();
236  flow_control.Received(m);
237  };
238 
239  std::vector<std::unique_ptr<pubsub::Subscriber>> subscribers;
240  std::vector<future<google::cloud::Status>> sessions;
241  for (auto i = 0; i != config->session_count; ++i) {
242  auto const& subscription = subscriptions[i % subscriptions.size()];
243  auto subscriber = absl::make_unique<pubsub::Subscriber>(
244  pubsub::MakeSubscriberConnection(subscription));
245  sessions.push_back(subscriber->Subscribe(handler));
246  subscribers.push_back(std::move(subscriber));
247  }
248  auto cleanup_sessions = [&] {
249  for (auto& s : sessions) s.cancel();
250  for (auto& s : sessions) s.wait_for(std::chrono::seconds(3));
251  sessions.clear();
252  };
253 
254  auto tasks = [&] {
255  std::vector<std::thread> tasks(config->publisher_count);
256  int task_id = 0;
257  std::generate(tasks.begin(), tasks.end(), [&] {
258  return std::thread{PublisherTask, std::cref(*config),
259  std::ref(flow_control), task_id++};
260  });
261  return tasks;
262  }();
263 
264  auto ts = [] {
265  return google::cloud::internal::FormatRfc3339(
266  std::chrono::system_clock::now());
267  };
268 
269  auto divide_duration = [](std::chrono::microseconds duration, int count,
270  std::chrono::microseconds min,
271  std::chrono::microseconds max) {
272  auto const d = duration / count;
273  if (d < min) return min;
274  if (d > max) return max;
275  return d;
276  };
277 
278  auto const cycle =
279  divide_duration(config->minimum_runtime, 50, std::chrono::seconds(10),
280  std::chrono::seconds(60));
281 
282  auto const report_interval =
283  divide_duration(config->minimum_runtime, 100, std::chrono::seconds(5),
284  std::chrono::seconds(15));
285 
286  auto const start = std::chrono::steady_clock::now();
287  auto report_deadline = start + report_interval;
288 
289  std::uniform_int_distribution<std::size_t> session_selector(
290  0, sessions.size() - 1);
291  std::cout << "Timestamp,RunningCount,Count,Min,Max,Average(us)\n";
292 
293  Cleanup cleanup;
294  cleanup.Defer(delete_topic);
295  for (auto const& sub : subscriptions) {
296  cleanup.Defer([subscription_admin, sub]() mutable {
297  (void)subscription_admin.DeleteSubscription(sub);
298  });
299  }
300 
301  while (!ExperimentCompleted(*config, flow_control, start)) {
302  std::this_thread::sleep_for(cycle);
303  auto const idx = session_selector(generator);
304  sessions[idx].cancel();
305  sessions[idx] = subscribers[idx]->Subscribe(handler);
306 
307  auto const now = std::chrono::steady_clock::now();
308  if (now >= report_deadline) {
309  report_deadline = now + report_interval;
310  auto const samples = flow_control.ClearSamples();
311  if (samples.empty()) {
312  std::cout << "# " << ts() << ',';
313  flow_control.Debug(std::cout);
314  std::cout << std::endl;
315  } else {
316  auto const p = std::minmax_element(samples.begin(), samples.end());
317  auto const sum = std::accumulate(samples.begin(), samples.end(),
318  std::chrono::microseconds{0});
319  auto const mean = sum / samples.size();
320  auto const received_count = flow_control.ReceivedCount();
321  std::cout << ts() << ',' << received_count << ',' << samples.size()
322  << ',' << p.first->count() << ',' << p.second->count() << ','
323  << mean.count() << std::endl;
324  }
325  }
326  }
327 
328  flow_control.Shutdown();
329  for (auto& t : tasks) t.join();
330  auto const sent_count = flow_control.SentCount();
331  std::cout << "# " << ts() << " - sent: " << sent_count << " messages"
332  << std::endl;
333 
334  auto const expected = flow_control.ExpectedCount();
335  // Wait until about 95% of the messages are received.
336  for (int p : {50, 60, 70, 80, 90, 95, 98, 99}) {
337  auto const received_count =
338  flow_control.WaitReceivedCount(expected * p / 100);
339  std::cout << "# " << ts() << " - received at least " << p << "% ["
340  << received_count << " / " << expected
341  << "] of the expected messages" << std::endl;
342  }
343 
344  std::cout << "# " << ts() << " - received: " << flow_control.ReceivedCount()
345  << " messages" << std::endl;
346  cleanup_sessions();
347  std::cout << "# " << ts() << " - received: " << flow_control.ReceivedCount()
348  << " messages" << std::endl;
349 
350  return 0;
351 }
352 
353 namespace {
354 
355 pubsub::Message ExperimentFlowControl::GenerateMessage(int task) {
356  std::unique_lock<std::mutex> lk(mu_);
357  cv_.wait(lk, [&] { return shutdown_ || !overflow_; });
358  ++pending_;
359  ++sent_count_;
360  if (pending_ + (expected_count_ - received_count_) >= hwm_) overflow_ = true;
361  auto const ts = std::to_string(
362  std::chrono::steady_clock::now().time_since_epoch().count());
363  if (shutdown_) {
364  return pubsub::MessageBuilder{}
365  .SetData("shutdown:" + std::to_string(task))
366  .SetAttributes({{"timestamp", ts}})
367  .Build();
368  }
369  return pubsub::MessageBuilder{}
370  .SetData("task:" + std::to_string(task))
371  .SetAttributes({{"timestamp", ts}})
372  .Build();
373 }
374 
375 void ExperimentFlowControl::Published(bool success) {
376  std::unique_lock<std::mutex> lk(mu_);
377  --pending_;
378  if (success) {
379  expected_count_ += subscription_count_;
380  } else {
381  ++failures_;
382  }
383  if ((expected_count_ - received_count_) >= hwm_) overflow_ = true;
384 }
385 
386 void ExperimentFlowControl::Received(pubsub::Message const& m) {
387  auto const now = std::chrono::steady_clock::now();
388  auto const message_timestamp = [&m] {
389  std::chrono::steady_clock::duration ts{0};
390  for (auto const& kv : m.attributes()) {
391  if (kv.first == "timestamp") {
392  ts = std::chrono::steady_clock::duration(std::stoll(kv.second));
393  break;
394  }
395  }
396  return std::chrono::steady_clock::time_point{} + ts;
397  }();
398  auto const elapsed = std::chrono::duration_cast<std::chrono::microseconds>(
399  now - message_timestamp);
400  std::unique_lock<std::mutex> lk(mu_);
401  ++received_count_;
402  samples_.push_back(elapsed);
403  if (expected_count_ - received_count_ > lwm_) return;
404  overflow_ = false;
405  cv_.notify_all();
406 }
407 
408 void ExperimentFlowControl::Shutdown() {
409  std::unique_lock<std::mutex> lk(mu_);
410  shutdown_ = true;
411  cv_.notify_all();
412 }
413 
414 bool ExperimentCompleted(Config const& config,
415  ExperimentFlowControl const& flow_control,
416  std::chrono::steady_clock::time_point start) {
417  auto const now = std::chrono::steady_clock::now();
418  auto const samples = flow_control.ReceivedCount();
419  if (now >= start + config.maximum_runtime) return true;
420  if (samples >= config.maximum_samples) return true;
421  if (now < start + config.minimum_runtime) return false;
422  return samples >= config.minimum_samples;
423 }
424 
425 void PublisherTask(Config const& config, ExperimentFlowControl& flow_control,
426  int task) {
427  auto make_publisher = [config, task] {
428  auto const topic = pubsub::Topic(config.project_id, config.topic_id);
429  return pubsub::Publisher(pubsub::MakePublisherConnection(
430  topic, {},
431  pubsub::ConnectionOptions{}.set_channel_pool_domain(
432  "publisher:" + std::to_string(task))));
433  };
434  auto publisher = make_publisher();
435 
436  using clock = std::chrono::steady_clock;
437  auto const start = clock::now();
438  auto next_refresh = start + std::chrono::seconds(30);
439  future<void> last_publish;
440  while (!ExperimentCompleted(config, flow_control, start)) {
441  auto ts = clock::now();
442  if (ts >= next_refresh) {
443  next_refresh = ts + std::chrono::seconds(30);
444  publisher.Flush();
445  auto d = std::move(last_publish);
446  if (d.valid()) d.get();
447  publisher = make_publisher();
448  }
449  auto message = flow_control.GenerateMessage(task);
450  auto const shutdown = message.data().rfind("shutdown:", 0) == 0;
451  last_publish = publisher.Publish(std::move(message))
452  .then([&flow_control](future<StatusOr<std::string>> f) {
453  flow_control.Published(f.get().ok());
454  });
455  if (shutdown) break;
456  }
457  publisher.Flush();
458  if (last_publish.valid()) last_publish.get();
459 }
460 
461 using ::google::cloud::internal::GetEnv;
462 using ::google::cloud::testing_util::OptionDescriptor;
463 using ::google::cloud::testing_util::ParseDuration;
464 
465 google::cloud::StatusOr<Config> ParseArgsImpl(std::vector<std::string> args,
466  std::string const& description) {
467  Config options;
468  options.project_id = GetEnv("GOOGLE_CLOUD_PROJECT").value_or("");
469  bool show_help = false;
470  bool show_description = false;
471 
472  std::vector<OptionDescriptor> desc{
473  {"--help", "print usage information",
474  [&show_help](std::string const&) { show_help = true; }},
475  {"--description", "print benchmark description",
476  [&show_description](std::string const&) { show_description = true; }},
477  {"--project-id", "use the given project id for the benchmark",
478  [&options](std::string const& val) { options.project_id = val; }},
479  {"--topic-id", "use an existing topic for the benchmark",
480  [&options](std::string const& val) { options.topic_id = val; }},
481  {"--pending-lwm", "flow control from publisher to subscriber",
482  [&options](std::string const& val) {
483  options.pending_lwm = std::stol(val);
484  }},
485  {"--pending-hwm", "flow control from publisher to subscriber",
486  [&options](std::string const& val) {
487  options.pending_hwm = std::stol(val);
488  }},
489  {"--publisher-count", "number of publishing threads",
490  [&options](std::string const& val) {
491  options.publisher_count = std::stoi(val);
492  }},
493  {"--subscription-count", "number of subscriptions",
494  [&options](std::string const& val) {
495  options.subscription_count = std::stoi(val);
496  }},
497  {"--session-count", "number of subscription sessions",
498  [&options](std::string const& val) {
499  options.session_count = std::stoi(val);
500  }},
501  {"--minimum-samples", "minimum number of samples to capture",
502  [&options](std::string const& val) {
503  options.minimum_samples = std::stoi(val);
504  }},
505  {"--maximum-samples", "maximum number of samples to capture",
506  [&options](std::string const& val) {
507  options.maximum_samples = std::stoi(val);
508  }},
509  {"--minimum-runtime", "run for at least this time",
510  [&options](std::string const& val) {
511  options.minimum_runtime = ParseDuration(val);
512  }},
513  {"--maximum-runtime", "run for at most this time",
514  [&options](std::string const& val) {
515  options.maximum_runtime = ParseDuration(val);
516  }},
517  };
518  auto const usage = BuildUsage(desc, args[0]);
519  auto unparsed = OptionsParse(desc, args);
520 
521  if (show_description) {
522  std::cout << description << "\n\n";
523  }
524 
525  if (show_help) {
526  std::cout << usage << "\n";
527  options.show_help = true;
528  return options;
529  }
530 
531  if (options.project_id.empty()) {
533  "missing or empty --project-id option");
534  }
535 
536  return options;
537 }
538 
539 google::cloud::StatusOr<Config> SelfTest(std::string const& cmd) {
540  auto error = [](std::string m) {
542  std::move(m));
543  };
544  for (auto const& var : {"GOOGLE_CLOUD_PROJECT"}) {
545  auto const value = GetEnv(var).value_or("");
546  if (!value.empty()) continue;
547  std::ostringstream os;
548  os << "The environment variable " << var << " is not set or empty";
549  return error(std::move(os).str());
550  }
551  auto config = ParseArgsImpl({cmd, "--help"}, kDescription);
552  if (!config || !config->show_help) return error("--help parsing");
553  config = ParseArgsImpl({cmd, "--description", "--help"}, kDescription);
554  if (!config || !config->show_help) return error("--description parsing");
555  config = ParseArgsImpl({cmd, "--project-id="}, kDescription);
556  if (config) return error("--project-id validation");
557  config = ParseArgsImpl({cmd, "--topic-id=test-topic"}, kDescription);
558  if (!config) return error("--topic-id");
559 
560  return ParseArgsImpl(
561  {
562  cmd,
563  "--project-id=" + GetEnv("GOOGLE_CLOUD_PROJECT").value_or(""),
564  "--publisher-count=1",
565  "--subscription-count=1",
566  "--pending-lwm=8000",
567  "--pending-hwm=10000",
568  "--session-count=1",
569  "--minimum-samples=1",
570  "--maximum-samples=10",
571  "--minimum-runtime=0s",
572  "--maximum-runtime=2s",
573  },
574  kDescription);
575 }
576 
577 google::cloud::StatusOr<Config> ParseArgs(std::vector<std::string> args) {
578  bool auto_run =
579  GetEnv("GOOGLE_CLOUD_CPP_AUTO_RUN_EXAMPLES").value_or("") == "yes";
580  if (auto_run) return SelfTest(args[0]);
581  return ParseArgsImpl(std::move(args), kDescription);
582 }
583 
584 } // namespace