Google Cloud Pub/Sub C++ Client  1.33.0
A C++ Client Library for Google Cloud Pub/Sub
subscription_admin_connection.cc
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/subscription_admin_connection.h"
16 #include "google/cloud/pubsub/internal/defaults.h"
17 #include "google/cloud/pubsub/internal/subscriber_auth.h"
18 #include "google/cloud/pubsub/internal/subscriber_logging.h"
19 #include "google/cloud/pubsub/internal/subscriber_metadata.h"
20 #include "google/cloud/pubsub/internal/subscriber_stub.h"
21 #include "google/cloud/pubsub/options.h"
22 #include "google/cloud/internal/retry_loop.h"
23 #include "google/cloud/log.h"
24 #include <memory>
25 
26 namespace google {
27 namespace cloud {
28 namespace pubsub {
30 namespace {
31 
32 using ::google::cloud::internal::Idempotency;
33 using ::google::cloud::internal::RetryLoop;
34 
35 class SubscriptionAdminConnectionImpl
37  public:
38  explicit SubscriptionAdminConnectionImpl(
39  std::unique_ptr<google::cloud::BackgroundThreads> background,
40  std::shared_ptr<pubsub_internal::SubscriberStub> stub,
41  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
42  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy)
43  : background_(std::move(background)),
44  stub_(std::move(stub)),
45  retry_policy_(std::move(retry_policy)),
46  backoff_policy_(std::move(backoff_policy)) {}
47 
48  ~SubscriptionAdminConnectionImpl() override = default;
49 
50  StatusOr<google::pubsub::v1::Subscription> CreateSubscription(
51  CreateSubscriptionParams p) override {
52  return RetryLoop(
53  retry_policy_->clone(), backoff_policy_->clone(),
54  Idempotency::kIdempotent,
55  [this](grpc::ClientContext& context,
56  google::pubsub::v1::Subscription const& request) {
57  return stub_->CreateSubscription(context, request);
58  },
59  p.subscription, __func__);
60  }
61 
62  StatusOr<google::pubsub::v1::Subscription> GetSubscription(
63  GetSubscriptionParams p) override {
64  google::pubsub::v1::GetSubscriptionRequest request;
65  request.set_subscription(p.subscription.FullName());
66  return RetryLoop(
67  retry_policy_->clone(), backoff_policy_->clone(),
68  Idempotency::kIdempotent,
69  [this](grpc::ClientContext& context,
70  google::pubsub::v1::GetSubscriptionRequest const& request) {
71  return stub_->GetSubscription(context, request);
72  },
73  request, __func__);
74  }
75 
76  StatusOr<google::pubsub::v1::Subscription> UpdateSubscription(
77  UpdateSubscriptionParams p) override {
78  return RetryLoop(
79  retry_policy_->clone(), backoff_policy_->clone(),
80  Idempotency::kIdempotent,
81  [this](grpc::ClientContext& context,
82  google::pubsub::v1::UpdateSubscriptionRequest const& request) {
83  return stub_->UpdateSubscription(context, request);
84  },
85  p.request, __func__);
86  }
87 
88  pubsub::ListSubscriptionsRange ListSubscriptions(
89  ListSubscriptionsParams p) override {
90  google::pubsub::v1::ListSubscriptionsRequest request;
91  request.set_project(std::move(p.project_id));
92  auto& stub = stub_;
93  // Because we do not have C++14 generalized lambda captures we cannot just
94  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
95  auto retry =
96  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
97  auto backoff =
98  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
99  char const* function_name = __func__;
100  auto list_functor =
101  [stub, retry, backoff, function_name](
102  google::pubsub::v1::ListSubscriptionsRequest const& request) {
103  return RetryLoop(
104  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
105  [stub](grpc::ClientContext& c,
106  google::pubsub::v1::ListSubscriptionsRequest const& r) {
107  return stub->ListSubscriptions(c, r);
108  },
109  request, function_name);
110  };
111 
112  return internal::MakePaginationRange<pubsub::ListSubscriptionsRange>(
113  std::move(request), std::move(list_functor),
114  [](google::pubsub::v1::ListSubscriptionsResponse response) {
115  std::vector<google::pubsub::v1::Subscription> items;
116  items.reserve(response.subscriptions_size());
117  for (auto& item : *response.mutable_subscriptions()) {
118  items.push_back(std::move(item));
119  }
120  return items;
121  });
122  }
123 
124  Status DeleteSubscription(DeleteSubscriptionParams p) override {
125  google::pubsub::v1::DeleteSubscriptionRequest request;
126  request.set_subscription(p.subscription.FullName());
127  return RetryLoop(
128  retry_policy_->clone(), backoff_policy_->clone(),
129  Idempotency::kIdempotent,
130  [this](grpc::ClientContext& context,
131  google::pubsub::v1::DeleteSubscriptionRequest const& request) {
132  return stub_->DeleteSubscription(context, request);
133  },
134  request, __func__);
135  }
136 
137  Status ModifyPushConfig(ModifyPushConfigParams p) override {
138  return RetryLoop(
139  retry_policy_->clone(), backoff_policy_->clone(),
140  Idempotency::kIdempotent,
141  [this](grpc::ClientContext& context,
142  google::pubsub::v1::ModifyPushConfigRequest const& request) {
143  return stub_->ModifyPushConfig(context, request);
144  },
145  p.request, __func__);
146  }
147 
148  StatusOr<google::pubsub::v1::Snapshot> CreateSnapshot(
149  CreateSnapshotParams p) override {
150  auto const idempotency = p.request.name().empty()
151  ? Idempotency::kNonIdempotent
152  : Idempotency::kIdempotent;
153  return RetryLoop(
154  retry_policy_->clone(), backoff_policy_->clone(), idempotency,
155  [this](grpc::ClientContext& context,
156  google::pubsub::v1::CreateSnapshotRequest const& request) {
157  return stub_->CreateSnapshot(context, request);
158  },
159  p.request, __func__);
160  }
161 
162  StatusOr<google::pubsub::v1::Snapshot> GetSnapshot(
163  GetSnapshotParams p) override {
164  google::pubsub::v1::GetSnapshotRequest request;
165  request.set_snapshot(p.snapshot.FullName());
166  return RetryLoop(
167  retry_policy_->clone(), backoff_policy_->clone(),
168  Idempotency::kIdempotent,
169  [this](grpc::ClientContext& context,
170  google::pubsub::v1::GetSnapshotRequest const& request) {
171  return stub_->GetSnapshot(context, request);
172  },
173  request, __func__);
174  }
175 
176  pubsub::ListSnapshotsRange ListSnapshots(ListSnapshotsParams p) override {
177  google::pubsub::v1::ListSnapshotsRequest request;
178  request.set_project(std::move(p.project_id));
179  auto& stub = stub_;
180  // Because we do not have C++14 generalized lambda captures we cannot just
181  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
182  auto retry =
183  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
184  auto backoff =
185  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
186  char const* function_name = __func__;
187  auto list_functor =
188  [stub, retry, backoff, function_name](
189  google::pubsub::v1::ListSnapshotsRequest const& request) {
190  return RetryLoop(
191  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
192  [stub](grpc::ClientContext& c,
193  google::pubsub::v1::ListSnapshotsRequest const& r) {
194  return stub->ListSnapshots(c, r);
195  },
196  request, function_name);
197  };
198 
199  return internal::MakePaginationRange<pubsub::ListSnapshotsRange>(
200  std::move(request), std::move(list_functor),
201  [](google::pubsub::v1::ListSnapshotsResponse response) {
202  std::vector<google::pubsub::v1::Snapshot> items;
203  items.reserve(response.snapshots_size());
204  for (auto& item : *response.mutable_snapshots()) {
205  items.push_back(std::move(item));
206  }
207  return items;
208  });
209  }
210 
211  StatusOr<google::pubsub::v1::Snapshot> UpdateSnapshot(
212  UpdateSnapshotParams p) override {
213  return RetryLoop(
214  retry_policy_->clone(), backoff_policy_->clone(),
215  Idempotency::kIdempotent,
216  [this](grpc::ClientContext& context,
217  google::pubsub::v1::UpdateSnapshotRequest const& request) {
218  return stub_->UpdateSnapshot(context, request);
219  },
220  p.request, __func__);
221  }
222 
223  Status DeleteSnapshot(DeleteSnapshotParams p) override {
224  google::pubsub::v1::DeleteSnapshotRequest request;
225  request.set_snapshot(p.snapshot.FullName());
226  return RetryLoop(
227  retry_policy_->clone(), backoff_policy_->clone(),
228  Idempotency::kIdempotent,
229  [this](grpc::ClientContext& context,
230  google::pubsub::v1::DeleteSnapshotRequest const& request) {
231  return stub_->DeleteSnapshot(context, request);
232  },
233  request, __func__);
234  }
235 
236  StatusOr<google::pubsub::v1::SeekResponse> Seek(SeekParams p) override {
237  return RetryLoop(
238  retry_policy_->clone(), backoff_policy_->clone(),
239  Idempotency::kIdempotent,
240  [this](grpc::ClientContext& context,
241  google::pubsub::v1::SeekRequest const& request) {
242  return stub_->Seek(context, request);
243  },
244  p.request, __func__);
245  }
246 
247  private:
248  std::unique_ptr<google::cloud::BackgroundThreads> background_;
249  std::shared_ptr<pubsub_internal::SubscriberStub> stub_;
250  std::unique_ptr<pubsub::RetryPolicy const> retry_policy_;
251  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy_;
252 };
253 
254 // Decorates a SubscriptionAdminStub. This works for both mock and real stubs.
255 std::shared_ptr<pubsub_internal::SubscriberStub> DecorateSubscriptionAdminStub(
256  Options const& opts,
257  std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
258  std::shared_ptr<pubsub_internal::SubscriberStub> stub) {
259  if (auth->RequiresConfigureContext()) {
260  stub = std::make_shared<pubsub_internal::SubscriberAuth>(std::move(auth),
261  std::move(stub));
262  }
263  stub = std::make_shared<pubsub_internal::SubscriberMetadata>(std::move(stub));
264  auto const& tracing = opts.get<TracingComponentsOption>();
265  if (internal::Contains(tracing, "rpc")) {
266  GCP_LOG(INFO) << "Enabled logging for gRPC calls";
267  stub = std::make_shared<pubsub_internal::SubscriberLogging>(
268  std::move(stub), opts.get<GrpcTracingOptionsOption>(),
269  internal::Contains(tracing, "rpc-streams"));
270  }
271  return stub;
272 }
273 
274 } // namespace
275 
277 
278 StatusOr<google::pubsub::v1::Subscription>
279 // NOLINTNEXTLINE(performance-unnecessary-value-param)
281  return Status{StatusCode::kUnimplemented, "needs-override"};
282 }
283 
284 StatusOr<google::pubsub::v1::Subscription>
285 // NOLINTNEXTLINE(performance-unnecessary-value-param)
287  return Status{StatusCode::kUnimplemented, "needs-override"};
288 }
289 
290 StatusOr<google::pubsub::v1::Subscription>
291 // NOLINTNEXTLINE(performance-unnecessary-value-param)
293  return Status{StatusCode::kUnimplemented, "needs-override"};
294 }
295 
297  ListSubscriptionsParams) { // NOLINT(performance-unnecessary-value-param)
298  return internal::MakeUnimplementedPaginationRange<ListSubscriptionsRange>();
299 }
300 
302  DeleteSubscriptionParams) { // NOLINT(performance-unnecessary-value-param)
303  return Status{StatusCode::kUnimplemented, "needs-override"};
304 }
305 
306 // NOLINTNEXTLINE(performance-unnecessary-value-param)
308  return Status{StatusCode::kUnimplemented, "needs-override"};
309 }
310 
311 StatusOr<google::pubsub::v1::Snapshot>
312 // NOLINTNEXTLINE(performance-unnecessary-value-param)
314  return Status{StatusCode::kUnimplemented, "needs-override"};
315 }
316 
317 StatusOr<google::pubsub::v1::Snapshot> SubscriptionAdminConnection::GetSnapshot(
318  GetSnapshotParams) { // NOLINT(performance-unnecessary-value-param)
319  return Status{StatusCode::kUnimplemented, "needs-override"};
320 }
321 
322 StatusOr<google::pubsub::v1::Snapshot>
323 // NOLINTNEXTLINE(performance-unnecessary-value-param)
325  return Status{StatusCode::kUnimplemented, "needs-override"};
326 }
327 
329  ListSnapshotsParams) { // NOLINT(performance-unnecessary-value-param)
330  return internal::MakeUnimplementedPaginationRange<ListSnapshotsRange>();
331 }
332 
333 // NOLINTNEXTLINE(performance-unnecessary-value-param)
335  return Status{StatusCode::kUnimplemented, "needs-override"};
336 }
337 
338 StatusOr<google::pubsub::v1::SeekResponse> SubscriptionAdminConnection::Seek(
339  SeekParams) { // NOLINT(performance-unnecessary-value-param)
340  return Status{StatusCode::kUnimplemented, "needs-override"};
341 }
342 
344  std::initializer_list<pubsub_internal::NonConstructible>) {
346 }
347 
349  Options opts) {
350  internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
351  PolicyOptionList>(opts, __func__);
352  opts = pubsub_internal::DefaultCommonOptions(std::move(opts));
353  auto background = internal::MakeBackgroundThreadsFactory(opts)();
354  auto auth = google::cloud::internal::CreateAuthenticationStrategy(
355  background->cq(), opts);
356  auto stub = pubsub_internal::CreateDefaultSubscriberStub(auth->CreateChannel(
357  opts.get<EndpointOption>(), internal::MakeChannelArguments(opts)));
358  stub = DecorateSubscriptionAdminStub(opts, std::move(auth), std::move(stub));
359  return std::make_shared<SubscriptionAdminConnectionImpl>(
360  std::move(background), std::move(stub),
363 }
364 
366  pubsub::ConnectionOptions const& options,
367  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
368  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
369  auto opts = internal::MakeOptions(options);
370  if (retry_policy) opts.set<RetryPolicyOption>(retry_policy->clone());
371  if (backoff_policy) opts.set<BackoffPolicyOption>(backoff_policy->clone());
372  return MakeSubscriptionAdminConnection(std::move(opts));
373 }
374 
376 } // namespace pubsub
377 
378 namespace pubsub_internal {
380 
381 std::shared_ptr<pubsub::SubscriptionAdminConnection>
382 MakeTestSubscriptionAdminConnection(Options const& opts,
383  std::shared_ptr<SubscriberStub> stub) {
384  auto background = internal::MakeBackgroundThreadsFactory(opts)();
385  auto auth = google::cloud::internal::CreateAuthenticationStrategy(
386  background->cq(), opts);
387  stub = pubsub::DecorateSubscriptionAdminStub(opts, std::move(auth),
388  std::move(stub));
389  return std::make_shared<pubsub::SubscriptionAdminConnectionImpl>(
390  std::move(background), std::move(stub),
393 }
394 
396 } // namespace pubsub_internal
397 } // namespace cloud
398 } // namespace google