15 #include "google/cloud/pubsub/topic_admin_connection.h"
16 #include "google/cloud/pubsub/internal/create_channel.h"
17 #include "google/cloud/pubsub/internal/defaults.h"
18 #include "google/cloud/pubsub/internal/publisher_auth.h"
19 #include "google/cloud/pubsub/internal/publisher_logging.h"
20 #include "google/cloud/pubsub/internal/publisher_metadata.h"
21 #include "google/cloud/pubsub/internal/publisher_stub.h"
22 #include "google/cloud/pubsub/options.h"
23 #include "google/cloud/internal/retry_loop.h"
24 #include "google/cloud/log.h"
25 #include "absl/strings/str_split.h"
26 #include <initializer_list>
40 explicit TopicAdminConnectionImpl(
42 std::shared_ptr<pubsub_internal::PublisherStub> stub,
43 std::unique_ptr<
pubsub::RetryPolicy
const> retry_policy,
44 std::unique_ptr<
pubsub::BackoffPolicy
const> backoff_policy)
45 : background_(std::move(background)),
46 stub_(std::move(stub)),
47 retry_policy_(std::move(retry_policy)),
48 backoff_policy_(std::move(backoff_policy)) {}
50 ~TopicAdminConnectionImpl()
override =
default;
52 StatusOr<
google::pubsub::v1::Topic> CreateTopic(
55 retry_policy_->clone(), backoff_policy_->clone(),
56 Idempotency::kIdempotent,
57 [
this](grpc::ClientContext& context,
58 google::pubsub::v1::Topic
const& request) {
59 return stub_->CreateTopic(context, request);
65 google::pubsub::v1::GetTopicRequest request;
68 retry_policy_->clone(), backoff_policy_->clone(),
69 Idempotency::kIdempotent,
70 [
this](grpc::ClientContext& context,
71 google::pubsub::v1::GetTopicRequest
const& request) {
72 return stub_->GetTopic(context, request);
77 StatusOr<
google::pubsub::v1::Topic> UpdateTopic(
80 retry_policy_->clone(), backoff_policy_->clone(),
81 Idempotency::kIdempotent,
82 [
this](grpc::ClientContext& context,
83 google::pubsub::v1::UpdateTopicRequest
const& request) {
84 return stub_->UpdateTopic(context, request);
90 google::pubsub::v1::ListTopicsRequest request;
96 std::shared_ptr<
pubsub::RetryPolicy
const>(retry_policy_->clone());
98 std::shared_ptr<
pubsub::BackoffPolicy
const>(backoff_policy_->clone());
99 char const* function_name =
__func__;
101 [stub, retry, backoff,
102 function_name](
google::pubsub::v1::ListTopicsRequest
const& request) {
104 retry->clone(), backoff->clone(), Idempotency::kIdempotent,
105 [stub](grpc::ClientContext& c,
106 google::pubsub::v1::ListTopicsRequest
const& r) {
107 return stub->ListTopics(c, r);
109 request, function_name);
111 return internal::MakePaginationRange<
pubsub::ListTopicsRange>(
112 std::move(request), list_functor,
113 [](
google::pubsub::v1::ListTopicsResponse response) {
114 std::vector<
google::pubsub::v1::Topic> items;
115 items.reserve(response.topics_size());
116 for (
auto& item : *response.mutable_topics()) {
117 items.push_back(std::move(item));
124 google::pubsub::v1::DeleteTopicRequest request;
127 retry_policy_->clone(), backoff_policy_->clone(),
128 Idempotency::kIdempotent,
129 [
this](grpc::ClientContext& context,
130 google::pubsub::v1::DeleteTopicRequest
const& request) {
131 return stub_->DeleteTopic(context, request);
136 StatusOr<
google::pubsub::v1::DetachSubscriptionResponse> DetachSubscription(
138 google::pubsub::v1::DetachSubscriptionRequest request;
140 grpc::ClientContext context;
142 retry_policy_->clone(), backoff_policy_->clone(),
143 Idempotency::kIdempotent,
144 [
this](grpc::ClientContext& context,
145 google::pubsub::v1::DetachSubscriptionRequest
const& request) {
146 return stub_->DetachSubscription(context, request);
151 pubsub::ListTopicSubscriptionsRange ListTopicSubscriptions(
153 google::pubsub::v1::ListTopicSubscriptionsRequest request;
159 std::shared_ptr<
pubsub::RetryPolicy
const>(retry_policy_->clone());
161 std::shared_ptr<
pubsub::BackoffPolicy
const>(backoff_policy_->clone());
162 char const* function_name =
__func__;
164 [stub, retry, backoff, function_name](
165 google::pubsub::v1::ListTopicSubscriptionsRequest
const& request) {
167 retry->clone(), backoff->clone(), Idempotency::kIdempotent,
169 grpc::ClientContext& c,
170 google::pubsub::v1::ListTopicSubscriptionsRequest
const& r) {
171 return stub->ListTopicSubscriptions(c, r);
173 request, function_name);
175 return internal::MakePaginationRange<
pubsub::ListTopicSubscriptionsRange>(
176 std::move(request), std::move(list_functor),
177 [](
google::pubsub::v1::ListTopicSubscriptionsResponse response) {
178 std::vector<std::string> items;
179 items.reserve(response.subscriptions_size());
180 for (
auto& item : *response.mutable_subscriptions()) {
181 items.push_back(std::move(item));
187 pubsub::ListTopicSnapshotsRange ListTopicSnapshots(
189 google::pubsub::v1::ListTopicSnapshotsRequest request;
195 std::shared_ptr<
pubsub::RetryPolicy
const>(retry_policy_->clone());
197 std::shared_ptr<
pubsub::BackoffPolicy
const>(backoff_policy_->clone());
198 char const* function_name =
__func__;
200 [stub, retry, backoff, function_name](
201 google::pubsub::v1::ListTopicSnapshotsRequest
const& request) {
203 retry->clone(), backoff->clone(), Idempotency::kIdempotent,
204 [stub](grpc::ClientContext& c,
205 google::pubsub::v1::ListTopicSnapshotsRequest
const& r) {
206 return stub->ListTopicSnapshots(c, r);
208 request, function_name);
210 return internal::MakePaginationRange<
pubsub::ListTopicSnapshotsRange>(
211 std::move(request), list_functor,
212 [](
google::pubsub::v1::ListTopicSnapshotsResponse response) {
213 std::vector<std::string> items;
214 items.reserve(response.snapshots_size());
215 for (
auto& item : *response.mutable_snapshots()) {
216 items.push_back(std::move(item));
224 std::shared_ptr<pubsub_internal::PublisherStub> stub_;
225 std::unique_ptr<
pubsub::RetryPolicy
const> retry_policy_;
226 std::unique_ptr<
pubsub::BackoffPolicy
const> backoff_policy_;
230 std::shared_ptr<pubsub_internal::PublisherStub> DecorateTopicAdminStub(
232 std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
233 std::shared_ptr<pubsub_internal::PublisherStub> stub) {
234 if (auth->RequiresConfigureContext()) {
235 stub = std::make_shared<pubsub_internal::PublisherAuth>(std::move(auth),
238 stub = std::make_shared<pubsub_internal::PublisherMetadata>(std::move(stub));
240 GCP_LOG(INFO) <<
"Enabled logging for gRPC calls";
241 stub = std::make_shared<pubsub_internal::PublisherLogging>(
268 return internal::MakeUnimplementedPaginationRange<ListTopicsRange>();
276 StatusOr<
google::pubsub::v1::DetachSubscriptionResponse>
284 return internal::MakeUnimplementedPaginationRange<
285 ListTopicSubscriptionsRange>();
290 return internal::MakeUnimplementedPaginationRange<ListTopicSnapshotsRange>();
294 std::initializer_list<pubsub_internal::NonConstructible>) {
299 internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
300 PolicyOptionList>(opts,
__func__);
301 opts
= pubsub_internal::DefaultCommonOptions(std::move(opts));
303 auto background = internal::MakeBackgroundThreadsFactory(opts)();
304 auto auth =
google::
cloud::internal::CreateAuthenticationStrategy(
305 background->
cq(), opts);
307 auto stub = pubsub_internal::CreateDefaultPublisherStub(auth->CreateChannel(
310 stub = DecorateTopicAdminStub(opts, std::move(auth), std::move(stub));
311 return std::make_shared<TopicAdminConnectionImpl>(
312 std::move(background), std::move(stub),
318 ConnectionOptions
const& options,
319 std::unique_ptr<
pubsub::RetryPolicy
const> retry_policy,
320 std::unique_ptr<
pubsub::BackoffPolicy
const> backoff_policy) {
321 auto opts = internal::MakeOptions(options);
330 namespace pubsub_internal {
334 Options const& opts, std::shared_ptr<PublisherStub> stub) {
335 auto background = internal::MakeBackgroundThreadsFactory(opts)();
336 auto auth =
google::
cloud::internal::CreateAuthenticationStrategy(
337 background->
cq(), opts);
338 stub =
pubsub::DecorateTopicAdminStub(opts, std::move(auth), std::move(stub));
339 return std::make_shared<
pubsub::TopicAdminConnectionImpl>(
340 std::move(background), std::move(stub),