Google Cloud Pub/Sub C++ Client  1.32.1
A C++ Client Library for Google Cloud Pub/Sub
schema_admin_connection.cc
Go to the documentation of this file.
1 // Copyright 2021 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/schema_admin_connection.h"
16 #include "google/cloud/pubsub/internal/defaults.h"
17 #include "google/cloud/pubsub/internal/schema_logging.h"
18 #include "google/cloud/pubsub/internal/schema_metadata.h"
19 #include "google/cloud/pubsub/internal/schema_stub.h"
20 #include "google/cloud/pubsub/options.h"
21 #include "google/cloud/pubsub/retry_policy.h"
22 #include "google/cloud/internal/retry_loop.h"
23 #include "google/cloud/log.h"
24 #include <memory>
25 
26 namespace google {
27 namespace cloud {
28 namespace pubsub_internal {
29 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
30 namespace {
31 
32 using ::google::cloud::internal::Idempotency;
33 using ::google::cloud::internal::RetryLoop;
34 
35 class SchemaAdminConnectionImpl : public pubsub::SchemaAdminConnection {
36  public:
37  explicit SchemaAdminConnectionImpl(
38  std::shared_ptr<pubsub_internal::SchemaStub> stub,
39  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
40  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy)
41  : stub_(std::move(stub)),
42  retry_policy_(std::move(retry_policy)),
43  backoff_policy_(std::move(backoff_policy)) {}
44 
45  ~SchemaAdminConnectionImpl() override = default;
46 
47  StatusOr<google::pubsub::v1::Schema> CreateSchema(
48  google::pubsub::v1::CreateSchemaRequest const& request) override {
49  return RetryLoop(
50  retry_policy_->clone(), backoff_policy_->clone(),
51  Idempotency::kIdempotent,
52  [this](grpc::ClientContext& context,
53  google::pubsub::v1::CreateSchemaRequest const& request) {
54  return stub_->CreateSchema(context, request);
55  },
56  request, __func__);
57  }
58  StatusOr<google::pubsub::v1::Schema> GetSchema(
59  google::pubsub::v1::GetSchemaRequest const& request) override {
60  return RetryLoop(
61  retry_policy_->clone(), backoff_policy_->clone(),
62  Idempotency::kIdempotent,
63  [this](grpc::ClientContext& context,
64  google::pubsub::v1::GetSchemaRequest const& request) {
65  return stub_->GetSchema(context, request);
66  },
67  request, __func__);
68  }
69 
70  pubsub::ListSchemasRange ListSchemas(
71  google::pubsub::v1::ListSchemasRequest const& request) override {
72  auto& stub = stub_;
73  // Because we do not have C++14 generalized lambda captures we cannot just
74  // use the unique_ptr<> here, so convert to shared_ptr<> instead.
75  auto retry =
76  std::shared_ptr<pubsub::RetryPolicy const>(retry_policy_->clone());
77  auto backoff =
78  std::shared_ptr<pubsub::BackoffPolicy const>(backoff_policy_->clone());
79  char const* function_name = __func__;
80  auto list_functor =
81  [stub, retry, backoff,
82  function_name](google::pubsub::v1::ListSchemasRequest const& request) {
83  return RetryLoop(
84  retry->clone(), backoff->clone(), Idempotency::kIdempotent,
85  [stub](grpc::ClientContext& c,
86  google::pubsub::v1::ListSchemasRequest const& r) {
87  return stub->ListSchemas(c, r);
88  },
89  request, function_name);
90  };
91  return internal::MakePaginationRange<pubsub::ListSchemasRange>(
92  std::move(request), list_functor,
93  [](google::pubsub::v1::ListSchemasResponse response) {
94  std::vector<google::pubsub::v1::Schema> items;
95  items.reserve(response.schemas_size());
96  for (auto& item : *response.mutable_schemas()) {
97  items.push_back(std::move(item));
98  }
99  return items;
100  });
101  }
102 
103  Status DeleteSchema(
104  google::pubsub::v1::DeleteSchemaRequest const& request) override {
105  return RetryLoop(
106  retry_policy_->clone(), backoff_policy_->clone(),
107  Idempotency::kIdempotent,
108  [this](grpc::ClientContext& context,
109  google::pubsub::v1::DeleteSchemaRequest const& request) {
110  return stub_->DeleteSchema(context, request);
111  },
112  request, __func__);
113  }
114  StatusOr<google::pubsub::v1::ValidateSchemaResponse> ValidateSchema(
115  google::pubsub::v1::ValidateSchemaRequest const& request) override {
116  return RetryLoop(
117  retry_policy_->clone(), backoff_policy_->clone(),
118  Idempotency::kIdempotent,
119  [this](grpc::ClientContext& context,
120  google::pubsub::v1::ValidateSchemaRequest const& request) {
121  return stub_->ValidateSchema(context, request);
122  },
123  request, __func__);
124  }
125  StatusOr<google::pubsub::v1::ValidateMessageResponse> ValidateMessage(
126  google::pubsub::v1::ValidateMessageRequest const& request) override {
127  return RetryLoop(
128  retry_policy_->clone(), backoff_policy_->clone(),
129  Idempotency::kIdempotent,
130  [this](grpc::ClientContext& context,
131  google::pubsub::v1::ValidateMessageRequest const& request) {
132  return stub_->ValidateMessage(context, request);
133  },
134  request, __func__);
135  }
136 
137  private:
138  std::shared_ptr<pubsub_internal::SchemaStub> stub_;
139  std::unique_ptr<pubsub::RetryPolicy const> retry_policy_;
140  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy_;
141 };
142 } // namespace
143 
144 std::shared_ptr<pubsub::SchemaAdminConnection> MakeSchemaAdminConnection(
145  Options const& opts, std::shared_ptr<SchemaStub> stub) {
146  stub = std::make_shared<SchemaMetadata>(std::move(stub));
147  if (internal::Contains(opts.get<TracingComponentsOption>(), "rpc")) {
148  GCP_LOG(INFO) << "Enabled logging for gRPC calls";
149  stub = std::make_shared<SchemaLogging>(
150  std::move(stub), opts.get<GrpcTracingOptionsOption>());
151  }
152  return std::make_shared<SchemaAdminConnectionImpl>(
153  std::move(stub), opts.get<pubsub::RetryPolicyOption>()->clone(),
155 }
156 
157 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
158 } // namespace pubsub_internal
159 
160 namespace pubsub {
161 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
162 
164 
166  std::initializer_list<pubsub_internal::NonConstructible>) {
168 }
169 
171  internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
172  PolicyOptionList>(opts, __func__);
173  opts = pubsub_internal::DefaultCommonOptions(std::move(opts));
174  auto stub = pubsub_internal::CreateDefaultSchemaStub(opts, /*channel_id=*/0);
175  return pubsub_internal::MakeSchemaAdminConnection(std::move(opts),
176  std::move(stub));
177 }
178 
180  pubsub::ConnectionOptions const& options,
181  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
182  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
183  auto opts = internal::MakeOptions(options);
184  if (retry_policy) opts.set<RetryPolicyOption>(retry_policy->clone());
185  if (backoff_policy) opts.set<BackoffPolicyOption>(backoff_policy->clone());
186  return MakeSchemaAdminConnection(std::move(opts));
187 }
188 
189 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
190 } // namespace pubsub
191 } // namespace cloud
192 } // namespace google