Google Cloud Pub/Sub C++ Client  1.32.1
A C++ Client Library for Google Cloud Pub/Sub
subscriber_connection.cc
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/pubsub/subscriber_connection.h"
16 #include "google/cloud/pubsub/internal/defaults.h"
17 #include "google/cloud/pubsub/internal/subscriber_logging.h"
18 #include "google/cloud/pubsub/internal/subscriber_metadata.h"
19 #include "google/cloud/pubsub/internal/subscriber_round_robin.h"
20 #include "google/cloud/pubsub/internal/subscription_session.h"
21 #include "google/cloud/pubsub/options.h"
22 #include "google/cloud/pubsub/retry_policy.h"
23 #include "google/cloud/internal/random.h"
24 #include "google/cloud/log.h"
25 #include <algorithm>
26 #include <limits>
27 #include <memory>
28 
29 namespace google {
30 namespace cloud {
31 namespace pubsub {
32 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
33 
35 
36 // NOLINTNEXTLINE(performance-unnecessary-value-param)
38  return make_ready_future(
39  Status{StatusCode::kUnimplemented, "needs-override"});
40 }
41 
43  Subscription subscription,
44  std::initializer_list<pubsub_internal::NonConstructible>) {
45  return MakeSubscriberConnection(std::move(subscription));
46 }
47 
49  Subscription subscription, Options opts) {
50  internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
51  PolicyOptionList, SubscriberOptionList>(
52  opts, __func__);
53  opts = pubsub_internal::DefaultSubscriberOptions(std::move(opts));
54  std::vector<std::shared_ptr<pubsub_internal::SubscriberStub>> children(
56  int id = 0;
57  std::generate(children.begin(), children.end(), [&id, &opts] {
58  return pubsub_internal::CreateDefaultSubscriberStub(opts, id++);
59  });
60  return pubsub_internal::MakeSubscriberConnection(
61  std::move(subscription), std::move(opts), std::move(children));
62 }
63 
65  Subscription subscription, SubscriberOptions options,
66  ConnectionOptions connection_options,
67  std::unique_ptr<pubsub::RetryPolicy const> retry_policy,
68  std::unique_ptr<pubsub::BackoffPolicy const> backoff_policy) {
69  auto opts = internal::MergeOptions(
70  pubsub_internal::MakeOptions(std::move(options)),
71  internal::MakeOptions(std::move(connection_options)));
72  if (retry_policy) opts.set<RetryPolicyOption>(retry_policy->clone());
73  if (backoff_policy) opts.set<BackoffPolicyOption>(backoff_policy->clone());
74  return MakeSubscriberConnection(std::move(subscription), std::move(opts));
75 }
76 
77 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
78 } // namespace pubsub
79 
80 namespace pubsub_internal {
81 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
82 namespace {
83 class SubscriberConnectionImpl : public pubsub::SubscriberConnection {
84  public:
85  explicit SubscriberConnectionImpl(
86  pubsub::Subscription subscription, Options opts,
87  std::shared_ptr<pubsub_internal::SubscriberStub> stub)
88  : subscription_(std::move(subscription)),
89  opts_(std::move(opts)),
90  stub_(std::move(stub)),
91  background_(internal::MakeBackgroundThreadsFactory(opts_)()),
92  generator_(internal::MakeDefaultPRNG()) {}
93 
94  ~SubscriberConnectionImpl() override = default;
95 
96  future<Status> Subscribe(SubscribeParams p) override {
97  auto client_id = [this] {
98  std::lock_guard<std::mutex> lk(mu_);
99  auto constexpr kLength = 32;
100  auto constexpr kChars = "abcdefghijklmnopqrstuvwxyz0123456789";
101  return internal::Sample(generator_, kLength, kChars);
102  }();
103  return CreateSubscriptionSession(subscription_, opts_, stub_,
104  background_->cq(), std::move(client_id),
105  std::move(p));
106  }
107 
108  private:
109  pubsub::Subscription const subscription_;
110  Options const opts_;
111  std::shared_ptr<pubsub_internal::SubscriberStub> stub_;
112  std::shared_ptr<BackgroundThreads> background_;
113  std::mutex mu_;
114  internal::DefaultPRNG generator_;
115 };
116 } // namespace
117 
118 std::shared_ptr<pubsub::SubscriberConnection> MakeSubscriberConnection(
119  pubsub::Subscription subscription, Options opts,
120  std::vector<std::shared_ptr<SubscriberStub>> stubs) {
121  if (stubs.empty()) return nullptr;
122  std::shared_ptr<SubscriberStub> stub =
123  std::make_shared<SubscriberRoundRobin>(std::move(stubs));
124  stub = std::make_shared<SubscriberMetadata>(std::move(stub));
125  auto const& tracing = opts.get<TracingComponentsOption>();
126  if (internal::Contains(tracing, "rpc")) {
127  GCP_LOG(INFO) << "Enabled logging for gRPC calls";
128  stub = std::make_shared<SubscriberLogging>(
129  std::move(stub), opts.get<GrpcTracingOptionsOption>(),
130  internal::Contains(tracing, "rpc-streams"));
131  }
132  return std::make_shared<SubscriberConnectionImpl>(
133  std::move(subscription), std::move(opts), std::move(stub));
134 }
135 
136 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
137 } // namespace pubsub_internal
138 } // namespace cloud
139 } // namespace google