Google Cloud Pub/Sub C++ Client  1.33.0
A C++ Client Library for Google Cloud Pub/Sub
subscriber_connection.cc
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/subscriber_connection.h"
16 #include "google/cloud/pubsub/internal/create_channel.h"
17 #include "google/cloud/pubsub/internal/defaults.h"
18 #include "google/cloud/pubsub/internal/subscriber_auth.h"
19 #include "google/cloud/pubsub/internal/subscriber_logging.h"
20 #include "google/cloud/pubsub/internal/subscriber_metadata.h"
21 #include "google/cloud/pubsub/internal/subscriber_round_robin.h"
22 #include "google/cloud/pubsub/internal/subscription_session.h"
23 #include "google/cloud/pubsub/options.h"
24 #include "google/cloud/pubsub/retry_policy.h"
25 #include "google/cloud/internal/random.h"
26 #include "google/cloud/log.h"
27 #include <algorithm>
28 #include <memory>
29 
30 namespace google {
31 namespace cloud {
32 namespace pubsub {
34 namespace {
35 
36 class SubscriberConnectionImpl : public pubsub::SubscriberConnection {
37  public:
38  explicit SubscriberConnectionImpl(
39  pubsub::Subscription subscription, Options opts,
40  std::shared_ptr<pubsub_internal::SubscriberStub> stub)
41  : subscription_(std::move(subscription)),
42  opts_(std::move(opts)),
43  stub_(std::move(stub)),
44  background_(internal::MakeBackgroundThreadsFactory(opts_)()),
45  generator_(internal::MakeDefaultPRNG()) {}
46 
47  ~SubscriberConnectionImpl() override = default;
48 
49  future<Status> Subscribe(SubscribeParams p) override {
50  auto client_id = [this] {
51  std::lock_guard<std::mutex> lk(mu_);
52  auto constexpr kLength = 32;
53  auto constexpr kChars = "abcdefghijklmnopqrstuvwxyz0123456789";
54  return internal::Sample(generator_, kLength, kChars);
55  }();
56  return CreateSubscriptionSession(subscription_, opts_, stub_,
57  background_->cq(), std::move(client_id),
58  std::move(p));
59  }
60 
61  private:
62  pubsub::Subscription const subscription_;
63  Options const opts_;
64  std::shared_ptr<pubsub_internal::SubscriberStub> stub_;
65  std::shared_ptr<BackgroundThreads> background_;
66  std::mutex mu_;
67  internal::DefaultPRNG generator_;
68 };
69 
70 std::shared_ptr<pubsub_internal::SubscriberStub> DecorateSubscriberStub(
71  Options const& opts,
72  std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
73  std::vector<std::shared_ptr<pubsub_internal::SubscriberStub>> children) {
74  std::shared_ptr<pubsub_internal::SubscriberStub> stub =
75  std::make_shared<pubsub_internal::SubscriberRoundRobin>(
76  std::move(children));
77  if (auth->RequiresConfigureContext()) {
78  stub = std::make_shared<pubsub_internal::SubscriberAuth>(std::move(auth),
79  std::move(stub));
80  }
81  stub = std::make_shared<pubsub_internal::SubscriberMetadata>(std::move(stub));
82  auto const& tracing = opts.get<TracingComponentsOption>();
83  if (internal::Contains(tracing, "rpc")) {
84  GCP_LOG(INFO) << "Enabled logging for gRPC calls";
85  stub = std::make_shared<pubsub_internal::SubscriberLogging>(
86  std::move(stub), opts.get<GrpcTracingOptionsOption>(),
87  internal::Contains(tracing, "rpc-streams"));
88  }
89  return stub;
90 }
91 
92 } // namespace
93 
95 
96 // NOLINTNEXTLINE(performance-unnecessary-value-param)
98  return make_ready_future(
99  Status{StatusCode::kUnimplemented, "needs-override"});
100 }
101 
103  Subscription subscription,
104  std::initializer_list<pubsub_internal::NonConstructible>) {
105  return MakeSubscriberConnection(std::move(subscription));
106 }
107 
109  Subscription subscription, Options opts) {
110  internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
111  PolicyOptionList, SubscriberOptionList>(
112  opts, __func__);
113  opts = pubsub_internal::DefaultSubscriberOptions(std::move(opts));
114 
115  auto background = internal::MakeBackgroundThreadsFactory(opts)();
116  auto auth = google::cloud::internal::CreateAuthenticationStrategy(
117  background->cq(), opts);
118  std::vector<std::shared_ptr<pubsub_internal::SubscriberStub>> children(
120  int id = 0;
121  std::generate(children.begin(), children.end(), [&id, &opts, &auth] {
122  return pubsub_internal::CreateDefaultSubscriberStub(
123  auth->CreateChannel(opts.get<EndpointOption>(),
124  pubsub_internal::MakeChannelArguments(opts, id++)));
125  });
126  auto stub =
127  DecorateSubscriberStub(opts, std::move(auth), std::move(children));
128  return std::make_shared<SubscriberConnectionImpl>(
129  std::move(subscription), std::move(opts), std::move(stub));
130 }
131 
133  Subscription subscription, SubscriberOptions options,
134  ConnectionOptions connection_options,
135  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
136  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
137  auto opts = internal::MergeOptions(
138  pubsub_internal::MakeOptions(std::move(options)),
139  internal::MakeOptions(std::move(connection_options)));
140  if (retry_policy) opts.set<RetryPolicyOption>(retry_policy->clone());
141  if (backoff_policy) opts.set<BackoffPolicyOption>(backoff_policy->clone());
142  return MakeSubscriberConnection(std::move(subscription), std::move(opts));
143 }
144 
146 } // namespace pubsub
147 
148 namespace pubsub_internal {
150 
151 std::shared_ptr<pubsub::SubscriberConnection> MakeTestSubscriberConnection(
152  pubsub::Subscription subscription, Options opts,
153  std::vector<std::shared_ptr<SubscriberStub>> stubs) {
154  auto background = internal::MakeBackgroundThreadsFactory(opts)();
155  auto auth = google::cloud::internal::CreateAuthenticationStrategy(
156  background->cq(), opts);
157  auto stub =
158  pubsub::DecorateSubscriberStub(opts, std::move(auth), std::move(stubs));
159  return std::make_shared<pubsub::SubscriberConnectionImpl>(
160  std::move(subscription), std::move(opts), std::move(stub));
161 }
162 
164 } // namespace pubsub_internal
165 } // namespace cloud
166 } // namespace google