Google Cloud Pub/Sub C++ Client  1.33.0
A C++ Client Library for Google Cloud Pub/Sub
publisher_connection.cc
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/publisher_connection.h"
16 #include "google/cloud/pubsub/internal/batching_publisher_connection.h"
17 #include "google/cloud/pubsub/internal/create_channel.h"
18 #include "google/cloud/pubsub/internal/default_batch_sink.h"
19 #include "google/cloud/pubsub/internal/defaults.h"
20 #include "google/cloud/pubsub/internal/flow_controlled_publisher_connection.h"
21 #include "google/cloud/pubsub/internal/non_constructible.h"
22 #include "google/cloud/pubsub/internal/ordering_key_publisher_connection.h"
23 #include "google/cloud/pubsub/internal/publisher_auth.h"
24 #include "google/cloud/pubsub/internal/publisher_logging.h"
25 #include "google/cloud/pubsub/internal/publisher_metadata.h"
26 #include "google/cloud/pubsub/internal/publisher_round_robin.h"
27 #include "google/cloud/pubsub/internal/publisher_stub.h"
28 #include "google/cloud/pubsub/internal/rejects_with_ordering_key.h"
29 #include "google/cloud/pubsub/internal/sequential_batch_sink.h"
30 #include "google/cloud/pubsub/options.h"
31 #include "google/cloud/future_void.h"
32 #include "google/cloud/log.h"
33 #include <memory>
34 
35 namespace google {
36 namespace cloud {
37 namespace pubsub {
39 namespace {
40 class ContainingPublisherConnection : public PublisherConnection {
41  public:
42  ContainingPublisherConnection(std::shared_ptr<BackgroundThreads> background,
43  std::shared_ptr<PublisherConnection> child)
44  : background_(std::move(background)), child_(std::move(child)) {}
45 
46  ~ContainingPublisherConnection() override = default;
47 
48  future<StatusOr<std::string>> Publish(PublishParams p) override {
49  return child_->Publish(std::move(p));
50  }
51  void Flush(FlushParams p) override { child_->Flush(std::move(p)); }
52  void ResumePublish(ResumePublishParams p) override {
53  child_->ResumePublish(std::move(p));
54  }
55 
56  private:
57  std::shared_ptr<BackgroundThreads> background_;
58  std::shared_ptr<PublisherConnection> child_;
59 };
60 
61 std::shared_ptr<pubsub_internal::PublisherStub> DecoratePublisherStub(
62  Options const& opts,
63  std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
64  std::vector<std::shared_ptr<pubsub_internal::PublisherStub>> children) {
65  std::shared_ptr<pubsub_internal::PublisherStub> stub =
66  std::make_shared<pubsub_internal::PublisherRoundRobin>(
67  std::move(children));
68  if (auth->RequiresConfigureContext()) {
69  stub = std::make_shared<pubsub_internal::PublisherAuth>(std::move(auth),
70  std::move(stub));
71  }
72  stub = std::make_shared<pubsub_internal::PublisherMetadata>(std::move(stub));
73  if (internal::Contains(opts.get<TracingComponentsOption>(), "rpc")) {
74  GCP_LOG(INFO) << "Enabled logging for gRPC calls";
75  stub = std::make_shared<pubsub_internal::PublisherLogging>(
76  std::move(stub), opts.get<GrpcTracingOptionsOption>());
77  }
78  return stub;
79 }
80 
81 std::shared_ptr<pubsub::PublisherConnection> ConnectionFromDecoratedStub(
82  pubsub::Topic topic, Options opts,
83  std::shared_ptr<BackgroundThreads> background,
84  std::shared_ptr<pubsub_internal::PublisherStub> stub) {
85  auto make_connection = [&]() -> std::shared_ptr<pubsub::PublisherConnection> {
86  auto cq = background->cq();
87  std::shared_ptr<pubsub_internal::BatchSink> sink =
88  pubsub_internal::DefaultBatchSink::Create(stub, cq, opts);
90  auto factory = [topic, opts, sink, cq](std::string const& key) {
91  return pubsub_internal::BatchingPublisherConnection::Create(
92  topic, opts, key,
93  pubsub_internal::SequentialBatchSink::Create(sink), cq);
94  };
95  return pubsub_internal::OrderingKeyPublisherConnection::Create(
96  std::move(factory));
97  }
98  return pubsub_internal::RejectsWithOrderingKey::Create(
99  pubsub_internal::BatchingPublisherConnection::Create(
100  std::move(topic), opts, {}, std::move(sink), std::move(cq)));
101  };
102  auto connection = make_connection();
105  connection = pubsub_internal::FlowControlledPublisherConnection::Create(
106  std::move(opts), std::move(connection));
107  }
108  return std::make_shared<pubsub::ContainingPublisherConnection>(
109  std::move(background), std::move(connection));
110 }
111 } // namespace
112 
114 
115 // NOLINTNEXTLINE(performance-unnecessary-value-param)
117  return make_ready_future(StatusOr<std::string>(
118  Status{StatusCode::kUnimplemented, "needs-override"}));
119 }
120 
122 
123 // NOLINTNEXTLINE(performance-unnecessary-value-param)
125 
127  Topic topic, std::initializer_list<pubsub_internal::NonConstructible>) {
128  return MakePublisherConnection(std::move(topic));
129 }
130 
132  Options opts) {
133  internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
134  PolicyOptionList, PublisherOptionList>(
135  opts, __func__);
136  opts = pubsub_internal::DefaultPublisherOptions(std::move(opts));
137 
138  auto background = internal::MakeBackgroundThreadsFactory(opts)();
139  auto auth = google::cloud::internal::CreateAuthenticationStrategy(
140  background->cq(), opts);
141  std::vector<std::shared_ptr<pubsub_internal::PublisherStub>> children(
143  int id = 0;
144  std::generate(children.begin(), children.end(), [&id, &opts, &auth] {
145  return pubsub_internal::CreateDefaultPublisherStub(
146  auth->CreateChannel(opts.get<EndpointOption>(),
147  pubsub_internal::MakeChannelArguments(opts, id++)));
148  });
149  auto stub = DecoratePublisherStub(opts, std::move(auth), std::move(children));
150  return ConnectionFromDecoratedStub(std::move(topic), std::move(opts),
151  std::move(background), std::move(stub));
152 }
153 
155  Topic topic, PublisherOptions options, ConnectionOptions connection_options,
156  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
157  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
158  auto opts = internal::MergeOptions(
159  pubsub_internal::MakeOptions(std::move(options)),
160  internal::MakeOptions(std::move(connection_options)));
161  if (retry_policy) opts.set<RetryPolicyOption>(retry_policy->clone());
162  if (backoff_policy) opts.set<BackoffPolicyOption>(backoff_policy->clone());
163  return MakePublisherConnection(std::move(topic), std::move(opts));
164 }
165 
167 } // namespace pubsub
168 
169 namespace pubsub_internal {
171 
172 std::shared_ptr<pubsub::PublisherConnection> MakeTestPublisherConnection(
173  pubsub::Topic topic, Options opts,
174  std::vector<std::shared_ptr<PublisherStub>> stubs) {
175  auto background = internal::MakeBackgroundThreadsFactory(opts)();
176  auto auth = google::cloud::internal::CreateAuthenticationStrategy(
177  background->cq(), opts);
178  auto stub =
179  pubsub::DecoratePublisherStub(opts, std::move(auth), std::move(stubs));
180  return pubsub::ConnectionFromDecoratedStub(std::move(topic), std::move(opts),
181  std::move(background),
182  std::move(stub));
183 }
184 
186 } // namespace pubsub_internal
187 } // namespace cloud
188 } // namespace google