Google Cloud Pub/Sub C++ Client  1.33.0
A C++ Client Library for Google Cloud Pub/Sub
topic_admin_connection.cc
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/topic_admin_connection.h"
16 #include "google/cloud/pubsub/internal/create_channel.h"
17 #include "google/cloud/pubsub/internal/defaults.h"
18 #include "google/cloud/pubsub/internal/publisher_auth.h"
19 #include "google/cloud/pubsub/internal/publisher_logging.h"
20 #include "google/cloud/pubsub/internal/publisher_metadata.h"
21 #include "google/cloud/pubsub/internal/publisher_stub.h"
22 #include "google/cloud/pubsub/options.h"
23 #include "google/cloud/internal/retry_loop.h"
24 #include "google/cloud/log.h"
25 #include "absl/strings/str_split.h"
26 #include <initializer_list>
27 #include <memory>
28 
29 namespace google {
30 namespace cloud {
31 namespace pubsub {
33 namespace {
34 
35 using ::google::cloud::internal::Idempotency;
36 using ::google::cloud::internal::RetryLoop;
37 
38 class TopicAdminConnectionImpl : public pubsub::TopicAdminConnection {
39  public:
40  explicit TopicAdminConnectionImpl(
41  std::unique_ptr<google::cloud::BackgroundThreads> background,
42  std::shared_ptr<pubsub_internal::PublisherStub> stub,
43  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
44  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy)
45  : background_(std::move(background)),
46  stub_(std::move(stub)),
47  retry_policy_(std::move(retry_policy)),
48  backoff_policy_(std::move(backoff_policy)) {}
49 
50  ~TopicAdminConnectionImpl() override = default;
51 
52  StatusOr<google::pubsub::v1::Topic> CreateTopic(
53  CreateTopicParams p) override {
54  return RetryLoop(
55  retry_policy_->clone(), backoff_policy_->clone(),
56  Idempotency::kIdempotent,
57  [this](grpc::ClientContext& context,
58  google::pubsub::v1::Topic const& request) {
59  return stub_->CreateTopic(context, request);
60  },
61  p.topic, __func__);
62  }
63 
64  StatusOr<google::pubsub::v1::Topic> GetTopic(GetTopicParams p) override {
65  google::pubsub::v1::GetTopicRequest request;
66  request.set_topic(p.topic.FullName());
67  return RetryLoop(
68  retry_policy_->clone(), backoff_policy_->clone(),
69  Idempotency::kIdempotent,
70  [this](grpc::ClientContext& context,
71  google::pubsub::v1::GetTopicRequest const& request) {
72  return stub_->GetTopic(context, request);
73  },
74  request, __func__);
75  }
76 
77  StatusOr<google::pubsub::v1::Topic> UpdateTopic(
78  UpdateTopicParams p) override {
79  return RetryLoop(
80  retry_policy_->clone(), backoff_policy_->clone(),
81  Idempotency::kIdempotent,
82  [this](grpc::ClientContext& context,
83  google::pubsub::v1::UpdateTopicRequest const& request) {
84  return stub_->UpdateTopic(context, request);
85  },
86  p.request, __func__);
87  }
88 
89  pubsub::ListTopicsRange ListTopics(ListTopicsParams p) override {
90  google::pubsub::v1::ListTopicsRequest request;
91  request.set_project(std::move(p.project_id));
92  auto& stub = stub_;
93  // Because we do not have C++14 generalized lambda captures we cannot just
94  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
95  auto retry =
96  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
97  auto backoff =
98  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
99  char const* function_name = __func__;
100  auto list_functor =
101  [stub, retry, backoff,
102  function_name](google::pubsub::v1::ListTopicsRequest const& request) {
103  return RetryLoop(
104  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
105  [stub](grpc::ClientContext& c,
106  google::pubsub::v1::ListTopicsRequest const& r) {
107  return stub->ListTopics(c, r);
108  },
109  request, function_name);
110  };
111  return internal::MakePaginationRange<pubsub::ListTopicsRange>(
112  std::move(request), list_functor,
113  [](google::pubsub::v1::ListTopicsResponse response) {
114  std::vector<google::pubsub::v1::Topic> items;
115  items.reserve(response.topics_size());
116  for (auto& item : *response.mutable_topics()) {
117  items.push_back(std::move(item));
118  }
119  return items;
120  });
121  }
122 
123  Status DeleteTopic(DeleteTopicParams p) override {
124  google::pubsub::v1::DeleteTopicRequest request;
125  request.set_topic(p.topic.FullName());
126  return RetryLoop(
127  retry_policy_->clone(), backoff_policy_->clone(),
128  Idempotency::kIdempotent,
129  [this](grpc::ClientContext& context,
130  google::pubsub::v1::DeleteTopicRequest const& request) {
131  return stub_->DeleteTopic(context, request);
132  },
133  request, __func__);
134  }
135 
136  StatusOr<google::pubsub::v1::DetachSubscriptionResponse> DetachSubscription(
137  DetachSubscriptionParams p) override {
138  google::pubsub::v1::DetachSubscriptionRequest request;
139  request.set_subscription(p.subscription.FullName());
140  grpc::ClientContext context;
141  return RetryLoop(
142  retry_policy_->clone(), backoff_policy_->clone(),
143  Idempotency::kIdempotent,
144  [this](grpc::ClientContext& context,
145  google::pubsub::v1::DetachSubscriptionRequest const& request) {
146  return stub_->DetachSubscription(context, request);
147  },
148  request, __func__);
149  }
150 
151  pubsub::ListTopicSubscriptionsRange ListTopicSubscriptions(
152  ListTopicSubscriptionsParams p) override {
153  google::pubsub::v1::ListTopicSubscriptionsRequest request;
154  request.set_topic(std::move(p.topic_full_name));
155  auto& stub = stub_;
156  // Because we do not have C++14 generalized lambda captures we cannot just
157  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
158  auto retry =
159  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
160  auto backoff =
161  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
162  char const* function_name = __func__;
163  auto list_functor =
164  [stub, retry, backoff, function_name](
165  google::pubsub::v1::ListTopicSubscriptionsRequest const& request) {
166  return RetryLoop(
167  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
168  [stub](
169  grpc::ClientContext& c,
170  google::pubsub::v1::ListTopicSubscriptionsRequest const& r) {
171  return stub->ListTopicSubscriptions(c, r);
172  },
173  request, function_name);
174  };
175  return internal::MakePaginationRange<pubsub::ListTopicSubscriptionsRange>(
176  std::move(request), std::move(list_functor),
177  [](google::pubsub::v1::ListTopicSubscriptionsResponse response) {
178  std::vector<std::string> items;
179  items.reserve(response.subscriptions_size());
180  for (auto& item : *response.mutable_subscriptions()) {
181  items.push_back(std::move(item));
182  }
183  return items;
184  });
185  }
186 
187  pubsub::ListTopicSnapshotsRange ListTopicSnapshots(
188  ListTopicSnapshotsParams p) override {
189  google::pubsub::v1::ListTopicSnapshotsRequest request;
190  request.set_topic(std::move(p.topic_full_name));
191  auto& stub = stub_;
192  // Because we do not have C++14 generalized lambda captures we cannot just
193  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
194  auto retry =
195  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
196  auto backoff =
197  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
198  char const* function_name = __func__;
199  auto list_functor =
200  [stub, retry, backoff, function_name](
201  google::pubsub::v1::ListTopicSnapshotsRequest const& request) {
202  return RetryLoop(
203  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
204  [stub](grpc::ClientContext& c,
205  google::pubsub::v1::ListTopicSnapshotsRequest const& r) {
206  return stub->ListTopicSnapshots(c, r);
207  },
208  request, function_name);
209  };
210  return internal::MakePaginationRange<pubsub::ListTopicSnapshotsRange>(
211  std::move(request), list_functor,
212  [](google::pubsub::v1::ListTopicSnapshotsResponse response) {
213  std::vector<std::string> items;
214  items.reserve(response.snapshots_size());
215  for (auto& item : *response.mutable_snapshots()) {
216  items.push_back(std::move(item));
217  }
218  return items;
219  });
220  }
221 
222  private:
223  std::unique_ptr<google::cloud::BackgroundThreads> background_;
224  std::shared_ptr<pubsub_internal::PublisherStub> stub_;
225  std::unique_ptr<pubsub::RetryPolicy const> retry_policy_;
226  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy_;
227 };
228 
229 // Decorates a TopicAdminStub. This works for both mock and real stubs.
230 std::shared_ptr<pubsub_internal::PublisherStub> DecorateTopicAdminStub(
231  Options const& opts,
232  std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
233  std::shared_ptr<pubsub_internal::PublisherStub> stub) {
234  if (auth->RequiresConfigureContext()) {
235  stub = std::make_shared<pubsub_internal::PublisherAuth>(std::move(auth),
236  std::move(stub));
237  }
238  stub = std::make_shared<pubsub_internal::PublisherMetadata>(std::move(stub));
239  if (internal::Contains(opts.get<TracingComponentsOption>(), "rpc")) {
240  GCP_LOG(INFO) << "Enabled logging for gRPC calls";
241  stub = std::make_shared<pubsub_internal::PublisherLogging>(
242  std::move(stub), opts.get<GrpcTracingOptionsOption>());
243  }
244  return stub;
245 }
246 
247 } // namespace
248 
250 
251 StatusOr<google::pubsub::v1::Topic> TopicAdminConnection::CreateTopic(
252  CreateTopicParams) { // NOLINT(performance-unnecessary-value-param)
253  return Status{StatusCode::kUnimplemented, "needs-override"};
254 }
255 
256 StatusOr<google::pubsub::v1::Topic> TopicAdminConnection::GetTopic(
257  GetTopicParams) { // NOLINT(performance-unnecessary-value-param)
258  return Status{StatusCode::kUnimplemented, "needs-override"};
259 }
260 
261 StatusOr<google::pubsub::v1::Topic> TopicAdminConnection::UpdateTopic(
262  UpdateTopicParams) { // NOLINT(performance-unnecessary-value-param)
263  return Status{StatusCode::kUnimplemented, "needs-override"};
264 }
265 
266 // NOLINTNEXTLINE(performance-unnecessary-value-param)
268  return internal::MakeUnimplementedPaginationRange<ListTopicsRange>();
269 }
270 
271 // NOLINTNEXTLINE(performance-unnecessary-value-param)
273  return Status{StatusCode::kUnimplemented, "needs-override"};
274 }
275 
276 StatusOr<google::pubsub::v1::DetachSubscriptionResponse>
277 // NOLINTNEXTLINE(performance-unnecessary-value-param)
279  return Status{StatusCode::kUnimplemented, "needs-override"};
280 }
281 
282 ListTopicSubscriptionsRange TopicAdminConnection::ListTopicSubscriptions(
283  ListTopicSubscriptionsParams) { // NOLINT(performance-unnecessary-value-param)
284  return internal::MakeUnimplementedPaginationRange<
285  ListTopicSubscriptionsRange>();
286 }
287 
288 ListTopicSnapshotsRange TopicAdminConnection::ListTopicSnapshots(
289  ListTopicSnapshotsParams) { // NOLINT(performance-unnecessary-value-param)
290  return internal::MakeUnimplementedPaginationRange<ListTopicSnapshotsRange>();
291 }
292 
294  std::initializer_list<pubsub_internal::NonConstructible>) {
296 }
297 
299  internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
300  PolicyOptionList>(opts, __func__);
301  opts = pubsub_internal::DefaultCommonOptions(std::move(opts));
302 
303  auto background = internal::MakeBackgroundThreadsFactory(opts)();
304  auto auth = google::cloud::internal::CreateAuthenticationStrategy(
305  background->cq(), opts);
306 
307  auto stub = pubsub_internal::CreateDefaultPublisherStub(auth->CreateChannel(
308  opts.get<EndpointOption>(), internal::MakeChannelArguments(opts)));
309 
310  stub = DecorateTopicAdminStub(opts, std::move(auth), std::move(stub));
311  return std::make_shared<TopicAdminConnectionImpl>(
312  std::move(background), std::move(stub),
315 }
316 
318  ConnectionOptions const& options,
319  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
320  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
321  auto opts = internal::MakeOptions(options);
322  if (retry_policy) opts.set<RetryPolicyOption>(retry_policy->clone());
323  if (backoff_policy) opts.set<BackoffPolicyOption>(backoff_policy->clone());
324  return MakeTopicAdminConnection(std::move(opts));
325 }
326 
328 } // namespace pubsub
329 
330 namespace pubsub_internal {
332 
333 std::shared_ptr<pubsub::TopicAdminConnection> MakeTestTopicAdminConnection(
334  Options const& opts, std::shared_ptr<PublisherStub> stub) {
335  auto background = internal::MakeBackgroundThreadsFactory(opts)();
336  auto auth = google::cloud::internal::CreateAuthenticationStrategy(
337  background->cq(), opts);
338  stub = pubsub::DecorateTopicAdminStub(opts, std::move(auth), std::move(stub));
339  return std::make_shared<pubsub::TopicAdminConnectionImpl>(
340  std::move(background), std::move(stub),
343 }
344 
346 } // namespace pubsub_internal
347 } // namespace cloud
348 } // namespace google