Google Cloud Pub/Sub C++ Client  1.32.1
A C++ Client Library for Google Cloud Pub/Sub
subscription_admin_connection.cc
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/subscription_admin_connection.h"
16 #include "google/cloud/pubsub/internal/defaults.h"
17 #include "google/cloud/pubsub/internal/subscriber_logging.h"
18 #include "google/cloud/pubsub/internal/subscriber_metadata.h"
19 #include "google/cloud/pubsub/internal/subscriber_stub.h"
20 #include "google/cloud/pubsub/options.h"
21 #include "google/cloud/internal/retry_loop.h"
22 #include "google/cloud/log.h"
23 #include <memory>
24 
25 namespace google {
26 namespace cloud {
27 namespace pubsub_internal {
28 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
29 namespace {
30 
31 using ::google::cloud::internal::Idempotency;
32 using ::google::cloud::internal::RetryLoop;
33 
34 class SubscriptionAdminConnectionImpl
36  public:
37  explicit SubscriptionAdminConnectionImpl(
38  std::shared_ptr<pubsub_internal::SubscriberStub> stub,
39  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
40  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy)
41  : stub_(std::move(stub)),
42  retry_policy_(std::move(retry_policy)),
43  backoff_policy_(std::move(backoff_policy)) {}
44 
45  ~SubscriptionAdminConnectionImpl() override = default;
46 
47  StatusOr<google::pubsub::v1::Subscription> CreateSubscription(
48  CreateSubscriptionParams p) override {
49  return RetryLoop(
50  retry_policy_->clone(), backoff_policy_->clone(),
51  Idempotency::kIdempotent,
52  [this](grpc::ClientContext& context,
53  google::pubsub::v1::Subscription const& request) {
54  return stub_->CreateSubscription(context, request);
55  },
56  p.subscription, __func__);
57  }
58 
59  StatusOr<google::pubsub::v1::Subscription> GetSubscription(
60  GetSubscriptionParams p) override {
61  google::pubsub::v1::GetSubscriptionRequest request;
62  request.set_subscription(p.subscription.FullName());
63  return RetryLoop(
64  retry_policy_->clone(), backoff_policy_->clone(),
65  Idempotency::kIdempotent,
66  [this](grpc::ClientContext& context,
67  google::pubsub::v1::GetSubscriptionRequest const& request) {
68  return stub_->GetSubscription(context, request);
69  },
70  request, __func__);
71  }
72 
73  StatusOr<google::pubsub::v1::Subscription> UpdateSubscription(
74  UpdateSubscriptionParams p) override {
75  return RetryLoop(
76  retry_policy_->clone(), backoff_policy_->clone(),
77  Idempotency::kIdempotent,
78  [this](grpc::ClientContext& context,
79  google::pubsub::v1::UpdateSubscriptionRequest const& request) {
80  return stub_->UpdateSubscription(context, request);
81  },
82  p.request, __func__);
83  }
84 
85  pubsub::ListSubscriptionsRange ListSubscriptions(
86  ListSubscriptionsParams p) override {
87  google::pubsub::v1::ListSubscriptionsRequest request;
88  request.set_project(std::move(p.project_id));
89  auto& stub = stub_;
90  // Because we do not have C++14 generalized lambda captures we cannot just
91  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
92  auto retry =
93  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
94  auto backoff =
95  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
96  char const* function_name = __func__;
97  auto list_functor =
98  [stub, retry, backoff, function_name](
99  google::pubsub::v1::ListSubscriptionsRequest const& request) {
100  return RetryLoop(
101  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
102  [stub](grpc::ClientContext& c,
103  google::pubsub::v1::ListSubscriptionsRequest const& r) {
104  return stub->ListSubscriptions(c, r);
105  },
106  request, function_name);
107  };
108 
109  return internal::MakePaginationRange<pubsub::ListSubscriptionsRange>(
110  std::move(request), std::move(list_functor),
111  [](google::pubsub::v1::ListSubscriptionsResponse response) {
112  std::vector<google::pubsub::v1::Subscription> items;
113  items.reserve(response.subscriptions_size());
114  for (auto& item : *response.mutable_subscriptions()) {
115  items.push_back(std::move(item));
116  }
117  return items;
118  });
119  }
120 
121  Status DeleteSubscription(DeleteSubscriptionParams p) override {
122  google::pubsub::v1::DeleteSubscriptionRequest request;
123  request.set_subscription(p.subscription.FullName());
124  return RetryLoop(
125  retry_policy_->clone(), backoff_policy_->clone(),
126  Idempotency::kIdempotent,
127  [this](grpc::ClientContext& context,
128  google::pubsub::v1::DeleteSubscriptionRequest const& request) {
129  return stub_->DeleteSubscription(context, request);
130  },
131  request, __func__);
132  }
133 
134  Status ModifyPushConfig(ModifyPushConfigParams p) override {
135  return RetryLoop(
136  retry_policy_->clone(), backoff_policy_->clone(),
137  Idempotency::kIdempotent,
138  [this](grpc::ClientContext& context,
139  google::pubsub::v1::ModifyPushConfigRequest const& request) {
140  return stub_->ModifyPushConfig(context, request);
141  },
142  p.request, __func__);
143  }
144 
145  StatusOr<google::pubsub::v1::Snapshot> CreateSnapshot(
146  CreateSnapshotParams p) override {
147  auto const idempotency = p.request.name().empty()
148  ? Idempotency::kNonIdempotent
149  : Idempotency::kIdempotent;
150  return RetryLoop(
151  retry_policy_->clone(), backoff_policy_->clone(), idempotency,
152  [this](grpc::ClientContext& context,
153  google::pubsub::v1::CreateSnapshotRequest const& request) {
154  return stub_->CreateSnapshot(context, request);
155  },
156  p.request, __func__);
157  }
158 
159  StatusOr<google::pubsub::v1::Snapshot> GetSnapshot(
160  GetSnapshotParams p) override {
161  google::pubsub::v1::GetSnapshotRequest request;
162  request.set_snapshot(p.snapshot.FullName());
163  return RetryLoop(
164  retry_policy_->clone(), backoff_policy_->clone(),
165  Idempotency::kIdempotent,
166  [this](grpc::ClientContext& context,
167  google::pubsub::v1::GetSnapshotRequest const& request) {
168  return stub_->GetSnapshot(context, request);
169  },
170  request, __func__);
171  }
172 
173  pubsub::ListSnapshotsRange ListSnapshots(ListSnapshotsParams p) override {
174  google::pubsub::v1::ListSnapshotsRequest request;
175  request.set_project(std::move(p.project_id));
176  auto& stub = stub_;
177  // Because we do not have C++14 generalized lambda captures we cannot just
178  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
179  auto retry =
180  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
181  auto backoff =
182  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
183  char const* function_name = __func__;
184  auto list_functor =
185  [stub, retry, backoff, function_name](
186  google::pubsub::v1::ListSnapshotsRequest const& request) {
187  return RetryLoop(
188  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
189  [stub](grpc::ClientContext& c,
190  google::pubsub::v1::ListSnapshotsRequest const& r) {
191  return stub->ListSnapshots(c, r);
192  },
193  request, function_name);
194  };
195 
196  return internal::MakePaginationRange<pubsub::ListSnapshotsRange>(
197  std::move(request), std::move(list_functor),
198  [](google::pubsub::v1::ListSnapshotsResponse response) {
199  std::vector<google::pubsub::v1::Snapshot> items;
200  items.reserve(response.snapshots_size());
201  for (auto& item : *response.mutable_snapshots()) {
202  items.push_back(std::move(item));
203  }
204  return items;
205  });
206  }
207 
208  StatusOr<google::pubsub::v1::Snapshot> UpdateSnapshot(
209  UpdateSnapshotParams p) override {
210  return RetryLoop(
211  retry_policy_->clone(), backoff_policy_->clone(),
212  Idempotency::kIdempotent,
213  [this](grpc::ClientContext& context,
214  google::pubsub::v1::UpdateSnapshotRequest const& request) {
215  return stub_->UpdateSnapshot(context, request);
216  },
217  p.request, __func__);
218  }
219 
220  Status DeleteSnapshot(DeleteSnapshotParams p) override {
221  google::pubsub::v1::DeleteSnapshotRequest request;
222  request.set_snapshot(p.snapshot.FullName());
223  return RetryLoop(
224  retry_policy_->clone(), backoff_policy_->clone(),
225  Idempotency::kIdempotent,
226  [this](grpc::ClientContext& context,
227  google::pubsub::v1::DeleteSnapshotRequest const& request) {
228  return stub_->DeleteSnapshot(context, request);
229  },
230  request, __func__);
231  }
232 
233  StatusOr<google::pubsub::v1::SeekResponse> Seek(SeekParams p) override {
234  return RetryLoop(
235  retry_policy_->clone(), backoff_policy_->clone(),
236  Idempotency::kIdempotent,
237  [this](grpc::ClientContext& context,
238  google::pubsub::v1::SeekRequest const& request) {
239  return stub_->Seek(context, request);
240  },
241  p.request, __func__);
242  }
243 
244  private:
245  std::shared_ptr<pubsub_internal::SubscriberStub> stub_;
246  std::unique_ptr<pubsub::RetryPolicy const> retry_policy_;
247  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy_;
248 };
249 } // namespace
250 
251 std::shared_ptr<pubsub::SubscriptionAdminConnection>
252 MakeSubscriptionAdminConnection(Options const& opts,
253  std::shared_ptr<SubscriberStub> stub) {
254  stub = std::make_shared<SubscriberMetadata>(std::move(stub));
255  auto const& tracing = opts.get<TracingComponentsOption>();
256  if (internal::Contains(tracing, "rpc")) {
257  GCP_LOG(INFO) << "Enabled logging for gRPC calls";
258  stub = std::make_shared<SubscriberLogging>(
259  std::move(stub), opts.get<GrpcTracingOptionsOption>(),
260  internal::Contains(tracing, "rpc-streams"));
261  }
262  return std::make_shared<SubscriptionAdminConnectionImpl>(
263  std::move(stub), opts.get<pubsub::RetryPolicyOption>()->clone(),
265 }
266 
267 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
268 } // namespace pubsub_internal
269 
270 namespace pubsub {
271 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
272 
274  std::initializer_list<pubsub_internal::NonConstructible>) {
276 }
277 
279  Options opts) {
280  internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
281  PolicyOptionList>(opts, __func__);
282  opts = pubsub_internal::DefaultCommonOptions(std::move(opts));
283  auto stub =
284  pubsub_internal::CreateDefaultSubscriberStub(opts, /*channel_id=*/0);
285  return pubsub_internal::MakeSubscriptionAdminConnection(std::move(opts),
286  std::move(stub));
287 }
288 
290  pubsub::ConnectionOptions const& options,
291  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
292  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
293  auto opts = internal::MakeOptions(options);
294  if (retry_policy) opts.set<RetryPolicyOption>(retry_policy->clone());
295  if (backoff_policy) opts.set<BackoffPolicyOption>(backoff_policy->clone());
296  return MakeSubscriptionAdminConnection(std::move(opts));
297 }
298 
300 
301 StatusOr<google::pubsub::v1::Subscription>
302 // NOLINTNEXTLINE(performance-unnecessary-value-param)
304  return Status{StatusCode::kUnimplemented, "needs-override"};
305 }
306 
307 StatusOr<google::pubsub::v1::Subscription>
308 // NOLINTNEXTLINE(performance-unnecessary-value-param)
310  return Status{StatusCode::kUnimplemented, "needs-override"};
311 }
312 
313 StatusOr<google::pubsub::v1::Subscription>
314 // NOLINTNEXTLINE(performance-unnecessary-value-param)
316  return Status{StatusCode::kUnimplemented, "needs-override"};
317 }
318 
320  ListSubscriptionsParams) { // NOLINT(performance-unnecessary-value-param)
321  return internal::MakeUnimplementedPaginationRange<ListSubscriptionsRange>();
322 }
323 
325  DeleteSubscriptionParams) { // NOLINT(performance-unnecessary-value-param)
326  return Status{StatusCode::kUnimplemented, "needs-override"};
327 }
328 
329 // NOLINTNEXTLINE(performance-unnecessary-value-param)
331  return Status{StatusCode::kUnimplemented, "needs-override"};
332 }
333 
334 StatusOr<google::pubsub::v1::Snapshot>
335 // NOLINTNEXTLINE(performance-unnecessary-value-param)
337  return Status{StatusCode::kUnimplemented, "needs-override"};
338 }
339 
340 StatusOr<google::pubsub::v1::Snapshot> SubscriptionAdminConnection::GetSnapshot(
341  GetSnapshotParams) { // NOLINT(performance-unnecessary-value-param)
342  return Status{StatusCode::kUnimplemented, "needs-override"};
343 }
344 
345 StatusOr<google::pubsub::v1::Snapshot>
346 // NOLINTNEXTLINE(performance-unnecessary-value-param)
348  return Status{StatusCode::kUnimplemented, "needs-override"};
349 }
350 
352  ListSnapshotsParams) { // NOLINT(performance-unnecessary-value-param)
353  return internal::MakeUnimplementedPaginationRange<ListSnapshotsRange>();
354 }
355 
356 // NOLINTNEXTLINE(performance-unnecessary-value-param)
358  return Status{StatusCode::kUnimplemented, "needs-override"};
359 }
360 
361 StatusOr<google::pubsub::v1::SeekResponse> SubscriptionAdminConnection::Seek(
362  SeekParams) { // NOLINT(performance-unnecessary-value-param)
363  return Status{StatusCode::kUnimplemented, "needs-override"};
364 }
365 
366 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
367 } // namespace pubsub
368 } // namespace cloud
369 } // namespace google