Google Cloud Pub/Sub C++ Client  1.32.1
A C++ Client Library for Google Cloud Pub/Sub
subscriber.h
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_H
17 
18 #include "google/cloud/pubsub/message.h"
19 #include "google/cloud/pubsub/subscriber_connection.h"
20 #include "google/cloud/pubsub/subscription.h"
21 #include "google/cloud/pubsub/version.h"
22 #include "google/cloud/status.h"
23 #include <functional>
24 
25 namespace google {
26 namespace cloud {
27 namespace pubsub {
28 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
29 /**
30  * Receive messages from the Cloud Pub/Sub service.
31  *
32  * This class is used to receive message from a given subscription, with a fixed
33  * configuration such as credentials, and background threads. Applications that
34  * receive messages from multiple subscriptions need to create separate
35  * instances of this class. Applications wanting to receive events with
36  * configuration parameters also need to create separate instances.
37  *
38  * @see https://cloud.google.com/pubsub for an overview of the Cloud Pub/Sub
39  * service.
40  *
41  * @par Example: subscriber quickstart
42  * @snippet samples.cc subscribe
43  *
44  * @par Performance
45  * `Subscriber` objects are relatively cheap to create, copy, and move. However,
46  * each `Subscriber` object must be created with a
47  * `std::shared_ptr<SubscriberConnection>`, which itself is relatively expensive
48  * to create. Therefore, connection instances should be shared when possible.
49  * See the `MakeSubscriberConnection()` function and the `SubscriberConnection`
50  * interface for more details.
51  *
52  * @par Thread Safety
53  * Instances of this class created via copy-construction or copy-assignment
54  * share the underlying pool of connections. Access to these copies via multiple
55  * threads is guaranteed to work. Two threads operating on the same instance of
56  * this class is not guaranteed to work.
57  *
58  * @par Background Threads
59  * This class uses the background threads configured via the `Options` from
60  * `GrpcOptionList`. Applications can create their own pool of background
61  * threads by (a) creating their own #google::cloud::v1::CompletionQueue, (b)
62  * passing this completion queue as a `GrpcCompletionQueueOption`, and (c)
63  * attaching any number of threads to the completion queue.
64  *
65  * @par Example: using a custom thread pool
66  * @snippet samples.cc custom-thread-pool-subscriber
67  *
68  * @par Asynchronous Functions
69  * Some of the member functions in this class return a `future<T>` (or
70  * `future<StatusOr<T>>`) object. Readers are probably familiar with
71  * [`std::future<T>`][std-future-link]. Our version adds a `.then()` function to
72  * attach a callback to the future, which is invoked when the future is
73  * satisfied. This function returns a `future<U>` where `U` is the return value
74  * of the attached function. More details in the #google::cloud::v1::future
75  * documentation.
76  *
77  * @par Error Handling
78  * This class uses `StatusOr<T>` to report errors. When an operation fails to
79  * perform its work the returned `StatusOr<T>` contains the error details. If
80  * the `ok()` member function in the `StatusOr<T>` returns `true` then it
81  * contains the expected result. Please consult the #google::cloud::v1::StatusOr
82  * documentation for more details.
83  *
84  * @par Changing Retry Parameters Example
85  * @snippet samples.cc subscriber-retry-settings
86  *
87  * [std-future-link]: https://en.cppreference.com/w/cpp/thread/future
88  */
89 class Subscriber {
90  public:
91  explicit Subscriber(std::shared_ptr<SubscriberConnection> connection)
92  : connection_(std::move(connection)) {}
93 
94  /**
95  * Creates a new session to receive messages from @p subscription.
96  *
97  * @note Callable must be `CopyConstructible`, as @p cb will be stored in a
98  * [`std::function<>`][std-function-link].
99  *
100  * @par Idempotency
101  * @parblock
102  * This is an idempotent operation; it only reads messages from the service.
103  * Will make multiple attempts to start a connection to the service, subject
104  * to the retry policies configured in the `SubscriberConnection`. Once a
105  * successful connection is established the library will try to resume the
106  * connection even if the connection fails with a permanent error. Resuming
107  * the connection is subject to the retry policies as described earlier.
108  *
109  * Note that calling `AckHandler::ack()` and/or `AckHandler::nack()` is
110  * handled differently with respect to retrying. Check the documentation of
111  * these functions for details.
112  * @endparblock
113  *
114  * @par Example
115  * @snippet samples.cc subscribe
116  *
117  * @param cb the callable invoked when messages are received. This must be
118  * usable to construct a
119  * `std::function<void(pubsub::Message, pubsub::AckHandler)>`.
120  * @return a future that is satisfied when the session will no longer receive
121  * messages. For example, because there was an unrecoverable error trying
122  * to receive data. Calling `.cancel()` in this object will (eventually)
123  * terminate the session and satisfy the future.
124  *
125  * [std-function-link]:
126  * https://en.cppreference.com/w/cpp/utility/functional/function
127  */
128  template <typename Callable>
129  future<Status> Subscribe(Callable&& cb) {
130  std::function<void(Message, AckHandler)> f(std::forward<Callable>(cb));
131  return connection_->Subscribe({std::move(f)});
132  }
133 
134  private:
135  std::shared_ptr<SubscriberConnection> connection_;
136 };
137 
138 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
139 } // namespace pubsub
140 } // namespace cloud
141 } // namespace google
142 
143 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_H