Google Cloud Pub/Sub C++ Client  1.33.0
A C++ Client Library for Google Cloud Pub/Sub
schema_admin_connection.cc
Go to the documentation of this file.
1 // Copyright 2021 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/schema_admin_connection.h"
16 #include "google/cloud/pubsub/internal/defaults.h"
17 #include "google/cloud/pubsub/internal/schema_auth.h"
18 #include "google/cloud/pubsub/internal/schema_logging.h"
19 #include "google/cloud/pubsub/internal/schema_metadata.h"
20 #include "google/cloud/pubsub/internal/schema_stub.h"
21 #include "google/cloud/pubsub/options.h"
22 #include "google/cloud/pubsub/retry_policy.h"
23 #include "google/cloud/internal/retry_loop.h"
24 #include "google/cloud/log.h"
25 #include <memory>
26 
27 namespace google {
28 namespace cloud {
29 namespace pubsub {
31 namespace {
32 
33 using ::google::cloud::internal::Idempotency;
34 using ::google::cloud::internal::RetryLoop;
35 
36 class SchemaAdminConnectionImpl : public pubsub::SchemaAdminConnection {
37  public:
38  explicit SchemaAdminConnectionImpl(
39  std::unique_ptr<google::cloud::BackgroundThreads> background,
40  std::shared_ptr<pubsub_internal::SchemaStub> stub,
41  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
42  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy)
43  : background_(std::move(background)),
44  stub_(std::move(stub)),
45  retry_policy_(std::move(retry_policy)),
46  backoff_policy_(std::move(backoff_policy)) {}
47 
48  ~SchemaAdminConnectionImpl() override = default;
49 
50  StatusOr<google::pubsub::v1::Schema> CreateSchema(
51  google::pubsub::v1::CreateSchemaRequest const& request) override {
52  return RetryLoop(
53  retry_policy_->clone(), backoff_policy_->clone(),
54  Idempotency::kIdempotent,
55  [this](grpc::ClientContext& context,
56  google::pubsub::v1::CreateSchemaRequest const& request) {
57  return stub_->CreateSchema(context, request);
58  },
59  request, __func__);
60  }
61  StatusOr<google::pubsub::v1::Schema> GetSchema(
62  google::pubsub::v1::GetSchemaRequest const& request) override {
63  return RetryLoop(
64  retry_policy_->clone(), backoff_policy_->clone(),
65  Idempotency::kIdempotent,
66  [this](grpc::ClientContext& context,
67  google::pubsub::v1::GetSchemaRequest const& request) {
68  return stub_->GetSchema(context, request);
69  },
70  request, __func__);
71  }
72 
73  pubsub::ListSchemasRange ListSchemas(
74  google::pubsub::v1::ListSchemasRequest const& request) override {
75  auto& stub = stub_;
76  // Because we do not have C++14 generalized lambda captures we cannot just
77  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
78  auto retry =
79  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
80  auto backoff =
81  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
82  char const* function_name = __func__;
83  auto list_functor =
84  [stub, retry, backoff,
85  function_name](google::pubsub::v1::ListSchemasRequest const& request) {
86  return RetryLoop(
87  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
88  [stub](grpc::ClientContext& c,
89  google::pubsub::v1::ListSchemasRequest const& r) {
90  return stub->ListSchemas(c, r);
91  },
92  request, function_name);
93  };
94  return internal::MakePaginationRange<pubsub::ListSchemasRange>(
95  std::move(request), list_functor,
96  [](google::pubsub::v1::ListSchemasResponse response) {
97  std::vector<google::pubsub::v1::Schema> items;
98  items.reserve(response.schemas_size());
99  for (auto& item : *response.mutable_schemas()) {
100  items.push_back(std::move(item));
101  }
102  return items;
103  });
104  }
105 
106  Status DeleteSchema(
107  google::pubsub::v1::DeleteSchemaRequest const& request) override {
108  return RetryLoop(
109  retry_policy_->clone(), backoff_policy_->clone(),
110  Idempotency::kIdempotent,
111  [this](grpc::ClientContext& context,
112  google::pubsub::v1::DeleteSchemaRequest const& request) {
113  return stub_->DeleteSchema(context, request);
114  },
115  request, __func__);
116  }
117  StatusOr<google::pubsub::v1::ValidateSchemaResponse> ValidateSchema(
118  google::pubsub::v1::ValidateSchemaRequest const& request) override {
119  return RetryLoop(
120  retry_policy_->clone(), backoff_policy_->clone(),
121  Idempotency::kIdempotent,
122  [this](grpc::ClientContext& context,
123  google::pubsub::v1::ValidateSchemaRequest const& request) {
124  return stub_->ValidateSchema(context, request);
125  },
126  request, __func__);
127  }
128  StatusOr<google::pubsub::v1::ValidateMessageResponse> ValidateMessage(
129  google::pubsub::v1::ValidateMessageRequest const& request) override {
130  return RetryLoop(
131  retry_policy_->clone(), backoff_policy_->clone(),
132  Idempotency::kIdempotent,
133  [this](grpc::ClientContext& context,
134  google::pubsub::v1::ValidateMessageRequest const& request) {
135  return stub_->ValidateMessage(context, request);
136  },
137  request, __func__);
138  }
139 
140  private:
141  std::unique_ptr<google::cloud::BackgroundThreads> background_;
142  std::shared_ptr<pubsub_internal::SchemaStub> stub_;
143  std::unique_ptr<pubsub::RetryPolicy const> retry_policy_;
144  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy_;
145 };
146 
147 // Decorates a SchemaAdminStub. This works for both mock and real stubs.
148 std::shared_ptr<pubsub_internal::SchemaStub> DecorateSchemaAdminStub(
149  Options const& opts,
150  std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
151  std::shared_ptr<pubsub_internal::SchemaStub> stub) {
152  if (auth->RequiresConfigureContext()) {
153  stub = std::make_shared<pubsub_internal::SchemaAuth>(std::move(auth),
154  std::move(stub));
155  }
156  stub = std::make_shared<pubsub_internal::SchemaMetadata>(std::move(stub));
157  if (internal::Contains(opts.get<TracingComponentsOption>(), "rpc")) {
158  GCP_LOG(INFO) << "Enabled logging for gRPC calls";
159  stub = std::make_shared<pubsub_internal::SchemaLogging>(
160  std::move(stub), opts.get<GrpcTracingOptionsOption>());
161  }
162  return stub;
163 }
164 
165 } // namespace
166 
168 
170  std::initializer_list<pubsub_internal::NonConstructible>) {
172 }
173 
175  internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
176  PolicyOptionList>(opts, __func__);
177  opts = pubsub_internal::DefaultCommonOptions(std::move(opts));
178 
179  auto background = internal::MakeBackgroundThreadsFactory(opts)();
180  auto auth = google::cloud::internal::CreateAuthenticationStrategy(
181  background->cq(), opts);
182 
183  auto stub = pubsub_internal::CreateDefaultSchemaStub(auth->CreateChannel(
184  opts.get<EndpointOption>(), internal::MakeChannelArguments(opts)));
185 
186  stub = DecorateSchemaAdminStub(opts, std::move(auth), std::move(stub));
187  return std::make_shared<SchemaAdminConnectionImpl>(
188  std::move(background), std::move(stub),
191 }
192 
194  pubsub::ConnectionOptions const& options,
195  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
196  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
197  auto opts = internal::MakeOptions(options);
198  if (retry_policy) opts.set<RetryPolicyOption>(retry_policy->clone());
199  if (backoff_policy) opts.set<BackoffPolicyOption>(backoff_policy->clone());
200  return MakeSchemaAdminConnection(std::move(opts));
201 }
202 
204 } // namespace pubsub
205 
206 namespace pubsub_internal {
208 
209 std::shared_ptr<pubsub::SchemaAdminConnection> MakeTestSchemaAdminConnection(
210  Options const& opts, std::shared_ptr<SchemaStub> stub) {
211  auto background = internal::MakeBackgroundThreadsFactory(opts)();
212  auto auth = google::cloud::internal::CreateAuthenticationStrategy(
213  background->cq(), opts);
214  stub =
215  pubsub::DecorateSchemaAdminStub(opts, std::move(auth), std::move(stub));
216  return std::make_shared<pubsub::SchemaAdminConnectionImpl>(
217  std::move(background), std::move(stub),
220 }
221 
223 } // namespace pubsub_internal
224 } // namespace cloud
225 } // namespace google