15 #include "google/cloud/pubsub/subscriber_connection.h"
16 #include "google/cloud/pubsub/internal/create_channel.h"
17 #include "google/cloud/pubsub/internal/defaults.h"
18 #include "google/cloud/pubsub/internal/subscriber_auth.h"
19 #include "google/cloud/pubsub/internal/subscriber_logging.h"
20 #include "google/cloud/pubsub/internal/subscriber_metadata.h"
21 #include "google/cloud/pubsub/internal/subscriber_round_robin.h"
22 #include "google/cloud/pubsub/internal/subscription_session.h"
23 #include "google/cloud/pubsub/options.h"
24 #include "google/cloud/pubsub/retry_policy.h"
25 #include "google/cloud/internal/random.h"
26 #include "google/cloud/log.h"
38 explicit SubscriberConnectionImpl(
40 std::shared_ptr<pubsub_internal::SubscriberStub> stub)
41 : subscription_
(std::move(subscription)
),
42 opts_
(std::move(opts)
),
43 stub_(std::move(stub)),
44 background_(internal::MakeBackgroundThreadsFactory(opts_)()),
45 generator_(internal::MakeDefaultPRNG()) {}
47 ~SubscriberConnectionImpl()
override =
default;
50 auto client_id = [
this] {
51 std::lock_guard<std::mutex> lk(mu_);
52 auto constexpr kLength = 32;
53 auto constexpr kChars =
"abcdefghijklmnopqrstuvwxyz0123456789";
54 return internal::Sample(generator_, kLength, kChars);
56 return CreateSubscriptionSession(subscription_, opts_, stub_,
57 background_->
cq(), std::move(client_id),
64 std::shared_ptr<pubsub_internal::SubscriberStub> stub_;
67 internal::DefaultPRNG generator_;
70 std::shared_ptr<pubsub_internal::SubscriberStub> DecorateSubscriberStub(
72 std::shared_ptr<internal::GrpcAuthenticationStrategy> auth,
73 std::vector<std::shared_ptr<pubsub_internal::SubscriberStub>> children) {
74 std::shared_ptr<pubsub_internal::SubscriberStub> stub =
75 std::make_shared<pubsub_internal::SubscriberRoundRobin>(
77 if (auth->RequiresConfigureContext()) {
78 stub = std::make_shared<pubsub_internal::SubscriberAuth>(std::move(auth),
81 stub = std::make_shared<pubsub_internal::SubscriberMetadata>(std::move(stub));
83 if (internal::Contains(tracing,
"rpc")) {
84 GCP_LOG(INFO) <<
"Enabled logging for gRPC calls";
85 stub = std::make_shared<pubsub_internal::SubscriberLogging>(
87 internal::Contains(tracing,
"rpc-streams"));
104 std::initializer_list<pubsub_internal::NonConstructible>) {
110 internal::CheckExpectedOptions<CommonOptionList, GrpcOptionList,
111 PolicyOptionList, SubscriberOptionList>(
113 opts
= pubsub_internal::DefaultSubscriberOptions(std::move(opts));
115 auto background = internal::MakeBackgroundThreadsFactory(opts)();
116 auto auth =
google::
cloud::internal::CreateAuthenticationStrategy(
117 background->
cq(), opts);
118 std::vector<std::shared_ptr<pubsub_internal::SubscriberStub>> children(
121 std::generate(children.begin(), children.end(), [&id, &opts, &auth] {
122 return pubsub_internal::CreateDefaultSubscriberStub(
124 pubsub_internal::MakeChannelArguments(opts, id++)));
127 DecorateSubscriberStub(opts, std::move(auth), std::move(children));
128 return std::make_shared<SubscriberConnectionImpl>(
129 std::move(subscription), std::move(opts), std::move(stub));
134 ConnectionOptions connection_options,
135 std::unique_ptr<
pubsub::RetryPolicy
const> retry_policy,
136 std::unique_ptr<
pubsub::BackoffPolicy
const> backoff_policy) {
137 auto opts = internal::MergeOptions(
138 pubsub_internal::MakeOptions(std::move(options)),
139 internal::MakeOptions(std::move(connection_options)));
148 namespace pubsub_internal {
153 std::vector<std::shared_ptr<SubscriberStub>> stubs) {
154 auto background = internal::MakeBackgroundThreadsFactory(opts)();
155 auto auth =
google::
cloud::internal::CreateAuthenticationStrategy(
156 background->
cq(), opts);
158 pubsub::DecorateSubscriberStub(opts, std::move(auth), std::move(stubs));
159 return std::make_shared<
pubsub::SubscriberConnectionImpl>(
160 std::move(subscription), std::move(opts), std::move(stub));