Google Cloud Pub/Sub C++ Client  1.31.1
A C++ Client Library for Google Cloud Pub/Sub
publisher_connection.cc
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/publisher_connection.h"
16 #include "google/cloud/pubsub/internal/batching_publisher_connection.h"
17 #include "google/cloud/pubsub/internal/default_batch_sink.h"
18 #include "google/cloud/pubsub/internal/default_retry_policies.h"
19 #include "google/cloud/pubsub/internal/flow_controlled_publisher_connection.h"
20 #include "google/cloud/pubsub/internal/ordering_key_publisher_connection.h"
21 #include "google/cloud/pubsub/internal/publisher_logging.h"
22 #include "google/cloud/pubsub/internal/publisher_metadata.h"
23 #include "google/cloud/pubsub/internal/publisher_round_robin.h"
24 #include "google/cloud/pubsub/internal/publisher_stub.h"
25 #include "google/cloud/pubsub/internal/rejects_with_ordering_key.h"
26 #include "google/cloud/pubsub/internal/sequential_batch_sink.h"
27 #include "google/cloud/future_void.h"
28 #include "google/cloud/log.h"
29 #include <memory>
30 
31 namespace google {
32 namespace cloud {
33 namespace pubsub {
34 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
35 namespace {
36 class ContainingPublisherConnection : public PublisherConnection {
37  public:
38  ContainingPublisherConnection(std::shared_ptr<BackgroundThreads> background,
39  std::shared_ptr<PublisherConnection> child)
40  : background_(std::move(background)), child_(std::move(child)) {}
41 
42  ~ContainingPublisherConnection() override = default;
43 
44  future<StatusOr<std::string>> Publish(PublishParams p) override {
45  return child_->Publish(std::move(p));
46  }
47  void Flush(FlushParams p) override { child_->Flush(std::move(p)); }
48  void ResumePublish(ResumePublishParams p) override {
49  child_->ResumePublish(std::move(p));
50  }
51 
52  private:
53  std::shared_ptr<BackgroundThreads> background_;
54  std::shared_ptr<PublisherConnection> child_;
55 };
56 } // namespace
57 
59 
60 // NOLINTNEXTLINE(performance-unnecessary-value-param)
61 future<StatusOr<std::string>> PublisherConnection::Publish(PublishParams) {
62  return make_ready_future(StatusOr<std::string>(
64 }
65 
67 
68 // NOLINTNEXTLINE(performance-unnecessary-value-param)
70 
72  Topic topic, PublisherOptions options, ConnectionOptions connection_options,
73  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
74  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
75  std::vector<std::shared_ptr<pubsub_internal::PublisherStub>> children(
76  connection_options.num_channels());
77  int id = 0;
78  std::generate(children.begin(), children.end(), [&id, &connection_options] {
79  return pubsub_internal::CreateDefaultPublisherStub(connection_options,
80  id++);
81  });
82  return pubsub_internal::MakePublisherConnection(
83  std::move(topic), std::move(options), std::move(connection_options),
84  std::move(children), std::move(retry_policy), std::move(backoff_policy));
85 }
86 
87 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
88 } // namespace pubsub
89 
90 namespace pubsub_internal {
91 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
92 
93 std::shared_ptr<pubsub::PublisherConnection> MakePublisherConnection(
94  pubsub::Topic topic, pubsub::PublisherOptions options,
95  pubsub::ConnectionOptions connection_options,
96  std::shared_ptr<PublisherStub> stub,
97  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
98  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
99  std::vector<std::shared_ptr<PublisherStub>> children{std::move(stub)};
100  return MakePublisherConnection(
101  std::move(topic), std::move(options), std::move(connection_options),
102  std::move(children), std::move(retry_policy), std::move(backoff_policy));
103 }
104 
105 std::shared_ptr<pubsub::PublisherConnection> MakePublisherConnection(
106  pubsub::Topic topic, pubsub::PublisherOptions options,
107  pubsub::ConnectionOptions connection_options,
108  std::vector<std::shared_ptr<PublisherStub>> stubs,
109  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
110  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
111  if (stubs.empty()) return nullptr;
112  if (!retry_policy) retry_policy = DefaultRetryPolicy();
113  if (!backoff_policy) backoff_policy = DefaultBackoffPolicy();
114  std::shared_ptr<PublisherStub> stub =
115  std::make_shared<PublisherRoundRobin>(std::move(stubs));
116  stub = std::make_shared<PublisherMetadata>(std::move(stub));
117  if (connection_options.tracing_enabled("rpc")) {
118  GCP_LOG(INFO) << "Enabled logging for gRPC calls";
119  stub = std::make_shared<PublisherLogging>(
120  std::move(stub), connection_options.tracing_options());
121  }
122 
123  auto default_thread_pool_size = []() -> std::size_t {
124  auto constexpr kDefaultThreadPoolSize = 4;
125  auto const n = std::thread::hardware_concurrency();
126  return n == 0 ? kDefaultThreadPoolSize : n;
127  };
128  if (connection_options.background_thread_pool_size() == 0) {
129  connection_options.set_background_thread_pool_size(
130  default_thread_pool_size());
131  }
132 
133  auto background = connection_options.background_threads_factory()();
134  auto make_connection = [&]() -> std::shared_ptr<pubsub::PublisherConnection> {
135  auto cq = background->cq();
136  std::shared_ptr<BatchSink> sink = DefaultBatchSink::Create(
137  stub, cq, std::move(retry_policy), std::move(backoff_policy));
138  if (options.message_ordering()) {
139  auto factory = [topic, options, sink, cq](std::string const& key) {
140  return BatchingPublisherConnection::Create(
141  topic, options, key, SequentialBatchSink::Create(std::move(sink)),
142  cq);
143  };
144  return OrderingKeyPublisherConnection::Create(std::move(factory));
145  }
146  return RejectsWithOrderingKey::Create(BatchingPublisherConnection::Create(
147  std::move(topic), std::move(options), {}, sink, std::move(cq)));
148  };
149  auto connection = make_connection();
151  connection = FlowControlledPublisherConnection::Create(
152  options, std::move(connection));
153  }
154  return std::make_shared<pubsub::ContainingPublisherConnection>(
155  std::move(background), std::move(connection));
156 }
157 
158 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
159 } // namespace pubsub_internal
160 } // namespace cloud
161 } // namespace google