Google Cloud Pub/Sub C++ Client  2.1.0
A C++ Client Library for Google Cloud Pub/Sub
subscriber.h
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_H
17 
18 #include "google/cloud/pubsub/ack_handler.h"
19 #include "google/cloud/pubsub/exactly_once_ack_handler.h"
20 #include "google/cloud/pubsub/message.h"
21 #include "google/cloud/pubsub/subscriber_connection.h"
22 #include "google/cloud/pubsub/subscription.h"
23 #include "google/cloud/pubsub/version.h"
24 #include "google/cloud/status.h"
25 #include <functional>
26 
27 namespace google {
28 namespace cloud {
29 namespace pubsub {
31 /**
32  * Receive messages from the Cloud Pub/Sub service.
33  *
34  * This class is used to receive message from a given subscription, with a fixed
35  * configuration such as credentials, and background threads. Applications that
36  * receive messages from multiple subscriptions need to create separate
37  * instances of this class. Applications wanting to receive events with
38  * configuration parameters also need to create separate instances.
39  *
40  * @see https://cloud.google.com/pubsub for an overview of the Cloud Pub/Sub
41  * service.
42  *
43  * @par Example: subscriber quickstart
44  * @snippet samples.cc subscribe
45  *
46  * @par Performance
47  * `Subscriber` objects are relatively cheap to create, copy, and move. However,
48  * each `Subscriber` object must be created with a
49  * `std::shared_ptr<SubscriberConnection>`, which itself is relatively expensive
50  * to create. Therefore, connection instances should be shared when possible.
51  * See the `MakeSubscriberConnection()` function and the `SubscriberConnection`
52  * interface for more details.
53  *
54  * @par Thread Safety
55  * Instances of this class created via copy-construction or copy-assignment
56  * share the underlying pool of connections. Access to these copies via multiple
57  * threads is guaranteed to work. Two threads operating on the same instance of
58  * this class is not guaranteed to work.
59  *
60  * @par Background Threads
61  * This class uses the background threads configured via the `Options` from
62  * `GrpcOptionList`. Applications can create their own pool of background
63  * threads by (a) creating their own #google::cloud::CompletionQueue, (b)
64  * passing this completion queue as a `GrpcCompletionQueueOption`, and (c)
65  * attaching any number of threads to the completion queue.
66  *
67  * @par Example: using a custom thread pool
68  * @snippet samples.cc custom-thread-pool-subscriber
69  *
70  * @par Asynchronous Functions
71  * Some of the member functions in this class return a `future<T>` (or
72  * `future<StatusOr<T>>`) object. Readers are probably familiar with
73  * [`std::future<T>`][std-future-link]. Our version adds a `.then()` function to
74  * attach a callback to the future, which is invoked when the future is
75  * satisfied. This function returns a `future<U>` where `U` is the return value
76  * of the attached function. More details in the #google::cloud::future
77  * documentation.
78  *
79  * @par Error Handling
80  * This class uses `StatusOr<T>` to report errors. When an operation fails to
81  * perform its work the returned `StatusOr<T>` contains the error details. If
82  * the `ok()` member function in the `StatusOr<T>` returns `true` then it
83  * contains the expected result. Please consult the #google::cloud::StatusOr
84  * documentation for more details.
85  *
86  * @par Changing Retry Parameters Example
87  * @snippet samples.cc subscriber-retry-settings
88  *
89  * [std-future-link]: https://en.cppreference.com/w/cpp/thread/future
90  */
91 class Subscriber {
92  public:
93  explicit Subscriber(std::shared_ptr<SubscriberConnection> connection)
94  : connection_(std::move(connection)) {}
95 
96  /**
97  * Creates a new session to receive messages from @p subscription.
98  *
99  * @par Idempotency
100  * @parblock
101  * This is an idempotent operation; it only reads messages from the service.
102  * Will make multiple attempts to start a connection to the service, subject
103  * to the retry policies configured in the `SubscriberConnection`. Once a
104  * successful connection is established the library will try to resume the
105  * connection even if the connection fails with a permanent error. Resuming
106  * the connection is subject to the retry policies as described earlier.
107  *
108  * Note that calling `AckHandler::ack()` and/or `AckHandler::nack()` is
109  * handled differently with respect to retrying. Check the documentation of
110  * these functions for details.
111  * @endparblock
112  *
113  * @par Example
114  * @snippet samples.cc subscribe
115  *
116  * @param cb the callable invoked when messages are received.
117  * @return a future that is satisfied when the session will no longer receive
118  * messages. For example, because there was an unrecoverable error trying
119  * to receive data. Calling `.cancel()` in this object will (eventually)
120  * terminate the session and satisfy the future.
121  */
122  future<Status> Subscribe(ApplicationCallback cb) {
123  return connection_->Subscribe({std::move(cb)});
124  }
125 
126  /**
127  * Creates a new session to receive messages from @p subscription using
128  * exactly-once delivery.
129  *
130  * @par Idempotency
131  * @parblock
132  * This is an idempotent operation; it only reads messages from the service.
133  * Will make multiple attempts to start a connection to the service, subject
134  * to the retry policies configured in the `SubscriberConnection`. Once a
135  * successful connection is established the library will try to resume the
136  * connection even if the connection fails with a permanent error. Resuming
137  * the connection is subject to the retry policies as described earlier.
138  *
139  * Note that calling `ExactlyOnceAckHandler::ack()` and/or
140  * `ExactlyOnceAckHandler::nack()` have their own rules with respect to
141  * retrying. Check the documentation of these functions for details.
142  * @endparblock
143  *
144  * @par Example
145  * @snippet samples.cc exactly-once-subscribe
146  *
147  * @param cb the callable invoked when messages are received.
148  * @return a future that is satisfied when the session will no longer receive
149  * messages. For example, because there was an unrecoverable error trying
150  * to receive data. Calling `.cancel()` in this object will (eventually)
151  * terminate the session and satisfy the future.
152  */
153  future<Status> Subscribe(ExactlyOnceApplicationCallback cb) {
154  return connection_->ExactlyOnceSubscribe({std::move(cb)});
155  }
156 
157  private:
158  std::shared_ptr<SubscriberConnection> connection_;
159 };
160 
162 } // namespace pubsub
163 } // namespace cloud
164 } // namespace google
165 
166 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_H