15 #include "google/cloud/pubsub/subscription_admin_connection.h"
16 #include "google/cloud/pubsub/internal/defaults.h"
17 #include "google/cloud/pubsub/internal/subscriber_auth.h"
18 #include "google/cloud/pubsub/internal/subscriber_logging.h"
19 #include "google/cloud/pubsub/internal/subscriber_metadata.h"
20 #include "google/cloud/pubsub/internal/subscriber_stub.h"
21 #include "google/cloud/pubsub/options.h"
22 #include "google/cloud/internal/retry_loop.h"
23 #include "google/cloud/log.h"
35 class SubscriptionAdminConnectionImpl
38 explicit SubscriptionAdminConnectionImpl(
40 std::shared_ptr<pubsub_internal::SubscriberStub> stub,
41 std::unique_ptr<
pubsub::RetryPolicy
const> retry_policy,
42 std::unique_ptr<
pubsub::BackoffPolicy
const> backoff_policy)
43 : background_(std::move(background)),
44 stub_(std::move(stub)),
45 retry_policy_(std::move(retry_policy)),
46 backoff_policy_(std::move(backoff_policy)) {}
48 ~SubscriptionAdminConnectionImpl()
override =
default;
50 StatusOr<
google::pubsub::v1::Subscription> CreateSubscription(
53 retry_policy_->clone(), backoff_policy_->clone(),
54 Idempotency::kIdempotent,
55 [
this](grpc::ClientContext& context,
56 google::pubsub::v1::Subscription
const& request) {
57 return stub_->CreateSubscription(context, request);
62 StatusOr<
google::pubsub::v1::Subscription> GetSubscription(
64 google::pubsub::v1::GetSubscriptionRequest request;
67 retry_policy_->clone(), backoff_policy_->clone(),
68 Idempotency::kIdempotent,
69 [
this](grpc::ClientContext& context,
70 google::pubsub::v1::GetSubscriptionRequest
const& request) {
71 return stub_->GetSubscription(context, request);
76 StatusOr<
google::pubsub::v1::Subscription> UpdateSubscription(
79 retry_policy_->clone(), backoff_policy_->clone(),
80 Idempotency::kIdempotent,
81 [
this](grpc::ClientContext& context,
82 google::pubsub::v1::UpdateSubscriptionRequest
const& request) {
83 return stub_->UpdateSubscription(context, request);
88 pubsub::ListSubscriptionsRange ListSubscriptions(
90 google::pubsub::v1::ListSubscriptionsRequest request;
96 std::shared_ptr<
pubsub::RetryPolicy
const>(retry_policy_->clone());
98 std::shared_ptr<
pubsub::BackoffPolicy
const>(backoff_policy_->clone());
99 char const* function_name =
__func__;
101 [stub, retry, backoff, function_name](
102 google::pubsub::v1::ListSubscriptionsRequest
const& request) {
104 retry->clone(), backoff->clone(), Idempotency::kIdempotent,
105 [stub](grpc::ClientContext& c,
106 google::pubsub::v1::ListSubscriptionsRequest
const& r) {
107 return stub->ListSubscriptions(c, r);
109 request, function_name);
112 return internal::MakePaginationRange<
pubsub::ListSubscriptionsRange>(
113 std::move(request), std::move(list_functor),
114 [](
google::pubsub::v1::ListSubscriptionsResponse response) {
115 std::vector<
google::pubsub::v1::Subscription> items;
116 items.reserve(response.subscriptions_size());
117 for (
auto& item : *response.mutable_subscriptions()) {
118 items.push_back(std::move(item));
125 google::pubsub::v1::DeleteSubscriptionRequest request;
128 retry_policy_->clone(), backoff_policy_->clone(),
129 Idempotency::kIdempotent,
130 [
this](grpc::ClientContext& context,
131 google::pubsub::v1::DeleteSubscriptionRequest
const& request) {
132 return stub_->DeleteSubscription(context, request);
139 retry_policy_->clone(), backoff_policy_->clone(),
140 Idempotency::kIdempotent,
141 [
this](grpc::ClientContext& context,
142 google::pubsub::v1::ModifyPushConfigRequest
const& request) {
143 return stub_->ModifyPushConfig(context, request);
148 StatusOr<
google::pubsub::v1::Snapshot> CreateSnapshot(
150 auto const idempotency = p
.request.name().empty()
151 ? Idempotency::kNonIdempotent
152 : Idempotency::kIdempotent;
154 retry_policy_->clone(), backoff_policy_->clone(), idempotency,
155 [
this](grpc::ClientContext& context,
156 google::pubsub::v1::CreateSnapshotRequest
const& request) {
157 return stub_->CreateSnapshot(context, request);
162 StatusOr<
google::pubsub::v1::Snapshot> GetSnapshot(
164 google::pubsub::v1::GetSnapshotRequest request;
167 retry_policy_->clone(), backoff_policy_->clone(),
168 Idempotency::kIdempotent,
169 [
this](grpc::ClientContext& context,
170 google::pubsub::v1::GetSnapshotRequest
const& request) {
171 return stub_->GetSnapshot(context, request);
177 google::pubsub::v1::ListSnapshotsRequest request;
183 std::shared_ptr<
pubsub::RetryPolicy
const>(retry_policy_->clone());
185 std::shared_ptr<
pubsub::BackoffPolicy
const>(backoff_policy_->clone());
186 char const* function_name =
__func__;
188 [stub, retry, backoff, function_name](
189 google::pubsub::v1::ListSnapshotsRequest
const& request) {
191 retry->clone(), backoff->clone(), Idempotency::kIdempotent,
192 [stub](grpc::ClientContext& c,
193 google::pubsub::v1::ListSnapshotsRequest
const& r) {
194 return stub->ListSnapshots(c, r);
196 request, function_name);
199 return internal::MakePaginationRange<
pubsub::ListSnapshotsRange>(
200 std::move(request), std::move(list_functor),
201 [](
google::pubsub::v1::ListSnapshotsResponse response) {
202 std::vector<
google::pubsub::v1::Snapshot> items;
203 items.reserve(response.snapshots_size());
204 for (
auto& item : *response.mutable_snapshots()) {
205 items.push_back(std::move(item));
211 StatusOr<
google::pubsub::v1::Snapshot> UpdateSnapshot(
214 retry_policy_->clone(), backoff_policy_->clone(),
215 Idempotency::kIdempotent,
216 [
this](grpc::ClientContext& context,
217 google::pubsub::v1::UpdateSnapshotRequest
const& request) {
218 return stub_->UpdateSnapshot(context, request);
224 google::pubsub::v1::DeleteSnapshotRequest request;
227 retry_policy_->clone(), backoff_policy_->clone(),
228 Idempotency::kIdempotent,
229 [
this](grpc::ClientContext& context,
230 google::pubsub::v1::DeleteSnapshotRequest
const& request) {
231 return stub_->DeleteSnapshot(context, request);
238 retry_policy_->clone(), backoff_policy_->clone(),
239 Idempotency::kIdempotent,
240 [
this](grpc::ClientContext& context,
241 google::pubsub::v1::SeekRequest
const& request) {
242 return stub_->Seek(context, request);
249 std::shared_ptr<pubsub_internal::SubscriberStub> stub_;
250 std::unique_ptr<
pubsub::RetryPolicy
const> retry_policy_;
251 std::unique_ptr<
pubsub::BackoffPolicy
const> backoff_policy_;
255 std::shared_ptr<pubsub_internal::SubscriberStub> DecorateSubscriptionAdminStub(
257 std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
258 std::shared_ptr<pubsub_internal::SubscriberStub> stub) {
259 if (auth->RequiresConfigureContext()) {
260 stub = std::make_shared<pubsub_internal::SubscriberAuth>(std::move(auth),
263 stub = std::make_shared<pubsub_internal::SubscriberMetadata>(std::move(stub));
265 if (internal::Contains(tracing,
"rpc")) {
266 GCP_LOG(INFO) <<
"Enabled logging for gRPC calls";
267 stub = std::make_shared<pubsub_internal::SubscriberLogging>(
269 internal::Contains(tracing,
"rpc-streams"));
278 StatusOr<
google::pubsub::v1::Subscription>
284 StatusOr<
google::pubsub::v1::Subscription>
290 StatusOr<
google::pubsub::v1::Subscription>
298 return internal::MakeUnimplementedPaginationRange<ListSubscriptionsRange>();
311 StatusOr<
google::pubsub::v1::Snapshot>
322 StatusOr<
google::pubsub::v1::Snapshot>
330 return internal::MakeUnimplementedPaginationRange<ListSnapshotsRange>();
344 std::initializer_list<pubsub_internal::NonConstructible>) {
350 internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
351 PolicyOptionList>(opts,
__func__);
352 opts
= pubsub_internal::DefaultCommonOptions(std::move(opts));
353 auto background = internal::MakeBackgroundThreadsFactory(opts)();
354 auto auth =
google::
cloud::internal::CreateAuthenticationStrategy(
355 background->
cq(), opts);
356 auto stub = pubsub_internal::CreateDefaultSubscriberStub(auth->CreateChannel(
358 stub = DecorateSubscriptionAdminStub(opts, std::move(auth), std::move(stub));
359 return std::make_shared<SubscriptionAdminConnectionImpl>(
360 std::move(background), std::move(stub),
366 pubsub::ConnectionOptions
const& options,
367 std::unique_ptr<
pubsub::RetryPolicy
const> retry_policy,
368 std::unique_ptr<
pubsub::BackoffPolicy
const> backoff_policy) {
369 auto opts = internal::MakeOptions(options);
378 namespace pubsub_internal {
382 MakeTestSubscriptionAdminConnection(
Options const& opts,
383 std::shared_ptr<SubscriberStub> stub) {
384 auto background = internal::MakeBackgroundThreadsFactory(opts)();
385 auto auth =
google::
cloud::internal::CreateAuthenticationStrategy(
386 background->
cq(), opts);
387 stub =
pubsub::DecorateSubscriptionAdminStub(opts, std::move(auth),
389 return std::make_shared<
pubsub::SubscriptionAdminConnectionImpl>(
390 std::move(background), std::move(stub),