Google Cloud Pub/Sub C++ Client  1.32.1
A C++ Client Library for Google Cloud Pub/Sub
subscriber_options.h
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_OPTIONS_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_OPTIONS_H
17 
18 #include "google/cloud/pubsub/options.h"
19 #include "google/cloud/pubsub/version.h"
20 #include "google/cloud/options.h"
21 #include <chrono>
22 #include <thread>
23 
24 namespace google {
25 namespace cloud {
26 namespace pubsub {
27 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
28 class SubscriberOptions;
29 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
30 } // namespace pubsub
31 
32 namespace pubsub_internal {
33 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
34 Options MakeOptions(pubsub::SubscriberOptions);
35 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
36 } // namespace pubsub_internal
37 
38 namespace pubsub {
39 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
40 
41 /**
42  * Configure how a `Subscriber` handles incoming messages.
43  *
44  * There are two main algorithms controlled by this function: the dispatching of
45  * application callbacks, and requesting more data from the service.
46  *
47  * @par Callback Concurrency Control
48  * @parblock
49  * The subscription configuration determines the upper limit (set
50  * `set_concurrency_watermarks()`) how many callbacks are *scheduled* at a time.
51  * As long as this limit is not reached the library will continue to schedule
52  * callbacks, once the limit is reached the library will wait until the number
53  * of executing callbacks goes below the low watermark.
54  *
55  * A callback is "executing" until the `AckHandler::ack()` or
56  * `AckHandler::nack()` function is called on the associated `AckHandler`.
57  * Applications can use this to move long-running computations out of the
58  * library internal thread pool.
59  *
60  * Note that callbacks are "scheduled", but they may not immediately execute.
61  * For example, callbacks may be sequenced if the concurrency control parameters
62  * are higher than the number of I/O threads configured in the
63  * `SubscriberConnection`.
64  *
65  * The default value for the concurrency high watermarks is set to the value
66  * returned by `std::thread::hardware_concurrency()` (or 4 if your standard
67  * library returns `0` for this parameter).
68  * @endparblock
69  *
70  * @par Message Flow Control
71  * @parblock
72  * The subscription will request more messages from the service as long as
73  * both the outstanding message count (see `set_message_count_watermarks()`)
74  * and the number of bytes in the outstanding messages (see
75  * `set_message_size_watermarks()`) are below the high watermarks for these
76  * values.
77  *
78  * Once either of the high watermarks are breached the library will wait until
79  * **both** the values are below their low watermarks before requesting more
80  * messages from the service.
81  *
82  * In this algorithm a message is outstanding until the `AckHandler::ack()` or
83  * `AckHandler::nack()` function is called on the associated `AckHandler`. Note
84  * that if the concurrency control algorithm has not scheduled a callback this
85  * can also put back pressure on the flow control algorithm.
86  * @endparblock
87  *
88  * @par Example: setting the concurrency control parameters
89  * @snippet samples.cc subscriber-concurrency
90  *
91  * @par Example: setting the flow control parameters
92  * @snippet samples.cc subscriber-flow-control
93  */
95  public:
97 
98  /**
99  * Initialize the subscriber options.
100  *
101  * Expected options are any of the types in the `SubscriberOptionList`
102  *
103  * @note Unrecognized options will be ignored. To debug issues with options
104  * set `GOOGLE_CLOUD_CPP_ENABLE_CLOG=yes` in the environment and
105  * unexpected options will be logged.
106  *
107  * @param opts configuration options
108  */
109  explicit SubscriberOptions(Options opts);
110 
111  /**
112  * The maximum deadline for each incoming message.
113  *
114  * Configure how long does the application have to respond (ACK or NACK) an
115  * incoming message. Note that this might be longer, or shorter, than the
116  * deadline configured in the server-side subscription.
117  *
118  * The value `0` is reserved to leave the deadline unmodified and just use the
119  * server-side configuration.
120  *
121  * @note The deadline applies to each message as it is delivered to the
122  * application, thus, if the library receives a batch of N messages their
123  * deadline for all the messages is extended repeatedly. Only once the
124  * message is delivered to a callback does the deadline become immutable.
125  */
126  std::chrono::seconds max_deadline_time() const {
127  return opts_.get<MaxDeadlineTimeOption>();
128  }
129 
130  /// Set the maximum deadline for incoming messages.
131  SubscriberOptions& set_max_deadline_time(std::chrono::seconds d) {
133  return *this;
134  }
135 
136  /**
137  * Set the maximum time by which the deadline for each incoming message is
138  * extended.
139  *
140  * The Cloud Pub/Sub C++ client library will extend the deadline by at most
141  * this amount, while waiting for an ack or nack. The default extension is 10
142  * minutes. An application may wish to reduce this extension so that the
143  * Pub/Sub service will resend a message sooner when it does not hear back
144  * from a Subscriber.
145  *
146  * The value is clamped between 10 seconds and 10 minutes.
147  *
148  * @param extension the maximum time that the deadline can be extended by,
149  * measured in seconds.
150  */
151  SubscriberOptions& set_max_deadline_extension(std::chrono::seconds extension);
152  std::chrono::seconds max_deadline_extension() const {
154  }
155 
156  /**
157  * Set the maximum number of outstanding messages per streaming pull.
158  *
159  * The Cloud Pub/Sub C++ client library uses streaming pull requests to
160  * receive messages from the service. The service will stop delivering
161  * messages if @p message_count or more messages have not been acknowledged
162  * nor rejected.
163  *
164  * @par Example
165  * @snippet samples.cc subscriber-flow-control
166  *
167  * @param message_count the maximum number of messages outstanding, use 0 or
168  * negative numbers to make the message count unlimited.
169  */
170  SubscriberOptions& set_max_outstanding_messages(std::int64_t message_count);
171  std::int64_t max_outstanding_messages() const {
173  }
174 
175  /**
176  * Set the maximum number of outstanding bytes per streaming pull.
177  *
178  * The Cloud Pub/Sub C++ client library uses streaming pull requests to
179  * receive messages from the service. The service will stop delivering
180  * messages if @p bytes or more worth of messages have not been
181  * acknowledged nor rejected.
182  *
183  * @par Example
184  * @snippet samples.cc subscriber-flow-control
185  *
186  * @param bytes the maximum number of bytes outstanding, use 0 or negative
187  * numbers to make the number of bytes unlimited.
188  */
189  SubscriberOptions& set_max_outstanding_bytes(std::int64_t bytes);
190  std::int64_t max_outstanding_bytes() const {
192  }
193 
194  /**
195  * Set the maximum callback concurrency.
196  *
197  * The Cloud Pub/Sub C++ client library will schedule parallel callbacks as
198  * long as the number of outstanding callbacks is less than this maximum.
199  *
200  * Note that this controls the number of callbacks *scheduled*, not the number
201  * of callbacks actually executing at a time. The application needs to create
202  * (or configure) the background threads pool with enough parallelism to
203  * execute more than one callback at a time.
204  *
205  * Some applications may want to share a thread pool across many
206  * subscriptions, the additional level of control (scheduled vs. running
207  * callbacks) allows applications, for example, to ensure that at most `K`
208  * threads in the pool are used by any given subscription.
209  *
210  * @par Example
211  * @snippet samples.cc subscriber-concurrency
212  *
213  * @param v the new value, 0 resets to the default
214  */
215  SubscriberOptions& set_max_concurrency(std::size_t v);
216 
217  /// Maximum number of callbacks scheduled by the library at a time.
218  std::size_t max_concurrency() const {
219  return opts_.get<MaxConcurrencyOption>();
220  }
221 
222  /**
223  * Control how often the session polls for automatic shutdowns.
224  *
225  * Applications can shutdown a session by calling `.cancel()` on the returned
226  * `future<Status>`. In addition, applications can fire & forget a session,
227  * which is only shutdown once the completion queue servicing the session
228  * shuts down. In this latter case the session polls periodically to detect
229  * if the CQ has shutdown. This controls how often this polling happens.
230  */
231  SubscriberOptions& set_shutdown_polling_period(std::chrono::milliseconds v) {
233  return *this;
234  }
235  std::chrono::milliseconds shutdown_polling_period() const {
237  }
238 
239  private:
240  friend Options pubsub_internal::MakeOptions(SubscriberOptions);
241  Options opts_;
242 };
243 
244 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
245 } // namespace pubsub
246 
247 namespace pubsub_internal {
248 inline namespace GOOGLE_CLOUD_CPP_PUBSUB_NS {
249 
250 inline Options MakeOptions(pubsub::SubscriberOptions o) {
251  return std::move(o.opts_);
252 }
253 
254 } // namespace GOOGLE_CLOUD_CPP_PUBSUB_NS
255 } // namespace pubsub_internal
256 } // namespace cloud
257 } // namespace google
258 
259 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_SUBSCRIBER_OPTIONS_H