Google Cloud Pub/Sub C++ Client  1.32.1
A C++ Client Library for Google Cloud Pub/Sub
topic_admin_connection.cc
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/topic_admin_connection.h"
16 #include "google/cloud/pubsub/internal/defaults.h"
17 #include "google/cloud/pubsub/internal/publisher_logging.h"
18 #include "google/cloud/pubsub/internal/publisher_metadata.h"
19 #include "google/cloud/pubsub/internal/publisher_stub.h"
20 #include "google/cloud/pubsub/options.h"
21 #include "google/cloud/internal/retry_loop.h"
22 #include "google/cloud/log.h"
23 #include "absl/strings/str_split.h"
24 #include <initializer_list>
25 #include <memory>
26 
27 namespace google {
28 namespace cloud {
29 namespace pubsub_internal {
30 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
31 namespace {
32 
33 using ::google::cloud::internal::Idempotency;
34 using ::google::cloud::internal::RetryLoop;
35 
36 class TopicAdminConnectionImpl : public pubsub::TopicAdminConnection {
37  public:
38  explicit TopicAdminConnectionImpl(
39  std::shared_ptr<pubsub_internal::PublisherStub> stub,
40  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
41  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy)
42  : stub_(std::move(stub)),
43  retry_policy_(std::move(retry_policy)),
44  backoff_policy_(std::move(backoff_policy)) {}
45 
46  ~TopicAdminConnectionImpl() override = default;
47 
48  StatusOr<google::pubsub::v1::Topic> CreateTopic(
49  CreateTopicParams p) override {
50  return RetryLoop(
51  retry_policy_->clone(), backoff_policy_->clone(),
52  Idempotency::kIdempotent,
53  [this](grpc::ClientContext& context,
54  google::pubsub::v1::Topic const& request) {
55  return stub_->CreateTopic(context, request);
56  },
57  p.topic, __func__);
58  }
59 
60  StatusOr<google::pubsub::v1::Topic> GetTopic(GetTopicParams p) override {
61  google::pubsub::v1::GetTopicRequest request;
62  request.set_topic(p.topic.FullName());
63  return RetryLoop(
64  retry_policy_->clone(), backoff_policy_->clone(),
65  Idempotency::kIdempotent,
66  [this](grpc::ClientContext& context,
67  google::pubsub::v1::GetTopicRequest const& request) {
68  return stub_->GetTopic(context, request);
69  },
70  request, __func__);
71  }
72 
73  StatusOr<google::pubsub::v1::Topic> UpdateTopic(
74  UpdateTopicParams p) override {
75  return RetryLoop(
76  retry_policy_->clone(), backoff_policy_->clone(),
77  Idempotency::kIdempotent,
78  [this](grpc::ClientContext& context,
79  google::pubsub::v1::UpdateTopicRequest const& request) {
80  return stub_->UpdateTopic(context, request);
81  },
82  p.request, __func__);
83  }
84 
85  pubsub::ListTopicsRange ListTopics(ListTopicsParams p) override {
86  google::pubsub::v1::ListTopicsRequest request;
87  request.set_project(std::move(p.project_id));
88  auto& stub = stub_;
89  // Because we do not have C++14 generalized lambda captures we cannot just
90  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
91  auto retry =
92  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
93  auto backoff =
94  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
95  char const* function_name = __func__;
96  auto list_functor =
97  [stub, retry, backoff,
98  function_name](google::pubsub::v1::ListTopicsRequest const& request) {
99  return RetryLoop(
100  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
101  [stub](grpc::ClientContext& c,
102  google::pubsub::v1::ListTopicsRequest const& r) {
103  return stub->ListTopics(c, r);
104  },
105  request, function_name);
106  };
107  return internal::MakePaginationRange<pubsub::ListTopicsRange>(
108  std::move(request), list_functor,
109  [](google::pubsub::v1::ListTopicsResponse response) {
110  std::vector<google::pubsub::v1::Topic> items;
111  items.reserve(response.topics_size());
112  for (auto& item : *response.mutable_topics()) {
113  items.push_back(std::move(item));
114  }
115  return items;
116  });
117  }
118 
119  Status DeleteTopic(DeleteTopicParams p) override {
120  google::pubsub::v1::DeleteTopicRequest request;
121  request.set_topic(p.topic.FullName());
122  return RetryLoop(
123  retry_policy_->clone(), backoff_policy_->clone(),
124  Idempotency::kIdempotent,
125  [this](grpc::ClientContext& context,
126  google::pubsub::v1::DeleteTopicRequest const& request) {
127  return stub_->DeleteTopic(context, request);
128  },
129  request, __func__);
130  }
131 
132  StatusOr<google::pubsub::v1::DetachSubscriptionResponse> DetachSubscription(
133  DetachSubscriptionParams p) override {
134  google::pubsub::v1::DetachSubscriptionRequest request;
135  request.set_subscription(p.subscription.FullName());
136  grpc::ClientContext context;
137  return RetryLoop(
138  retry_policy_->clone(), backoff_policy_->clone(),
139  Idempotency::kIdempotent,
140  [this](grpc::ClientContext& context,
141  google::pubsub::v1::DetachSubscriptionRequest const& request) {
142  return stub_->DetachSubscription(context, request);
143  },
144  request, __func__);
145  }
146 
147  pubsub::ListTopicSubscriptionsRange ListTopicSubscriptions(
148  ListTopicSubscriptionsParams p) override {
149  google::pubsub::v1::ListTopicSubscriptionsRequest request;
150  request.set_topic(std::move(p.topic_full_name));
151  auto& stub = stub_;
152  // Because we do not have C++14 generalized lambda captures we cannot just
153  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
154  auto retry =
155  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
156  auto backoff =
157  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
158  char const* function_name = __func__;
159  auto list_functor =
160  [stub, retry, backoff, function_name](
161  google::pubsub::v1::ListTopicSubscriptionsRequest const& request) {
162  return RetryLoop(
163  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
164  [stub](
165  grpc::ClientContext& c,
166  google::pubsub::v1::ListTopicSubscriptionsRequest const& r) {
167  return stub->ListTopicSubscriptions(c, r);
168  },
169  request, function_name);
170  };
171  return internal::MakePaginationRange<pubsub::ListTopicSubscriptionsRange>(
172  std::move(request), std::move(list_functor),
173  [](google::pubsub::v1::ListTopicSubscriptionsResponse response) {
174  std::vector<std::string> items;
175  items.reserve(response.subscriptions_size());
176  for (auto& item : *response.mutable_subscriptions()) {
177  items.push_back(std::move(item));
178  }
179  return items;
180  });
181  }
182 
183  pubsub::ListTopicSnapshotsRange ListTopicSnapshots(
184  ListTopicSnapshotsParams p) override {
185  google::pubsub::v1::ListTopicSnapshotsRequest request;
186  request.set_topic(std::move(p.topic_full_name));
187  auto& stub = stub_;
188  // Because we do not have C++14 generalized lambda captures we cannot just
189  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
190  auto retry =
191  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
192  auto backoff =
193  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
194  char const* function_name = __func__;
195  auto list_functor =
196  [stub, retry, backoff, function_name](
197  google::pubsub::v1::ListTopicSnapshotsRequest const& request) {
198  return RetryLoop(
199  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
200  [stub](grpc::ClientContext& c,
201  google::pubsub::v1::ListTopicSnapshotsRequest const& r) {
202  return stub->ListTopicSnapshots(c, r);
203  },
204  request, function_name);
205  };
206  return internal::MakePaginationRange<pubsub::ListTopicSnapshotsRange>(
207  std::move(request), list_functor,
208  [](google::pubsub::v1::ListTopicSnapshotsResponse response) {
209  std::vector<std::string> items;
210  items.reserve(response.snapshots_size());
211  for (auto& item : *response.mutable_snapshots()) {
212  items.push_back(std::move(item));
213  }
214  return items;
215  });
216  }
217 
218  private:
219  std::shared_ptr<pubsub_internal::PublisherStub> stub_;
220  std::unique_ptr<pubsub::RetryPolicy const> retry_policy_;
221  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy_;
222 };
223 } // namespace
224 
225 std::shared_ptr<pubsub::TopicAdminConnection> MakeTopicAdminConnection(
226  Options const& opts, std::shared_ptr<PublisherStub> stub) {
227  stub = std::make_shared<pubsub_internal::PublisherMetadata>(std::move(stub));
228  if (internal::Contains(opts.get<TracingComponentsOption>(), "rpc")) {
229  GCP_LOG(INFO) << "Enabled logging for gRPC calls";
230  stub = std::make_shared<pubsub_internal::PublisherLogging>(
231  std::move(stub), opts.get<GrpcTracingOptionsOption>());
232  }
233  return std::make_shared<TopicAdminConnectionImpl>(
234  std::move(stub), opts.get<pubsub::RetryPolicyOption>()->clone(),
236 }
237 
238 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
239 } // namespace pubsub_internal
240 
241 namespace pubsub {
242 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
243 
245 
246 StatusOr<google::pubsub::v1::Topic> TopicAdminConnection::CreateTopic(
247  CreateTopicParams) { // NOLINT(performance-unnecessary-value-param)
248  return Status{StatusCode::kUnimplemented, "needs-override"};
249 }
250 
251 StatusOr<google::pubsub::v1::Topic> TopicAdminConnection::GetTopic(
252  GetTopicParams) { // NOLINT(performance-unnecessary-value-param)
253  return Status{StatusCode::kUnimplemented, "needs-override"};
254 }
255 
256 StatusOr<google::pubsub::v1::Topic> TopicAdminConnection::UpdateTopic(
257  UpdateTopicParams) { // NOLINT(performance-unnecessary-value-param)
258  return Status{StatusCode::kUnimplemented, "needs-override"};
259 }
260 
261 // NOLINTNEXTLINE(performance-unnecessary-value-param)
263  return internal::MakeUnimplementedPaginationRange<ListTopicsRange>();
264 }
265 
266 // NOLINTNEXTLINE(performance-unnecessary-value-param)
268  return Status{StatusCode::kUnimplemented, "needs-override"};
269 }
270 
271 StatusOr<google::pubsub::v1::DetachSubscriptionResponse>
272 // NOLINTNEXTLINE(performance-unnecessary-value-param)
274  return Status{StatusCode::kUnimplemented, "needs-override"};
275 }
276 
277 ListTopicSubscriptionsRange TopicAdminConnection::ListTopicSubscriptions(
278  ListTopicSubscriptionsParams) { // NOLINT(performance-unnecessary-value-param)
279  return internal::MakeUnimplementedPaginationRange<
280  ListTopicSubscriptionsRange>();
281 }
282 
283 ListTopicSnapshotsRange TopicAdminConnection::ListTopicSnapshots(
284  ListTopicSnapshotsParams) { // NOLINT(performance-unnecessary-value-param)
285  return internal::MakeUnimplementedPaginationRange<ListTopicSnapshotsRange>();
286 }
287 
289  std::initializer_list<pubsub_internal::NonConstructible>) {
291 }
292 
294  internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
295  PolicyOptionList>(opts, __func__);
296  opts = pubsub_internal::DefaultCommonOptions(std::move(opts));
297  auto stub =
298  pubsub_internal::CreateDefaultPublisherStub(opts, /*channel_id=*/0);
299  return pubsub_internal::MakeTopicAdminConnection(std::move(opts),
300  std::move(stub));
301 }
302 
304  ConnectionOptions const& options,
305  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
306  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
307  auto opts = internal::MakeOptions(options);
308  if (retry_policy) opts.set<RetryPolicyOption>(retry_policy->clone());
309  if (backoff_policy) opts.set<BackoffPolicyOption>(backoff_policy->clone());
310  return MakeTopicAdminConnection(std::move(opts));
311 }
312 
313 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
314 } // namespace pubsub
315 } // namespace cloud
316 } // namespace google