Google Cloud Pub/Sub C++ Client  1.32.1
A C++ Client Library for Google Cloud Pub/Sub
Public Member Functions | List of all members
google::cloud::pubsub::v1::SubscriberOptions Class Reference

Configure how a Subscriber handles incoming messages. More...

#include <google/cloud/pubsub/subscriber_options.h>

Public Member Functions

 SubscriberOptions ()
 
 SubscriberOptions (Options opts)
 Initialize the subscriber options. More...
 
std::chrono::seconds max_deadline_time () const
 The maximum deadline for each incoming message. More...
 
SubscriberOptionsset_max_deadline_time (std::chrono::seconds d)
 Set the maximum deadline for incoming messages. More...
 
SubscriberOptionsset_max_deadline_extension (std::chrono::seconds extension)
 Set the maximum time by which the deadline for each incoming message is extended. More...
 
std::chrono::seconds max_deadline_extension () const
 
SubscriberOptionsset_max_outstanding_messages (std::int64_t message_count)
 Set the maximum number of outstanding messages per streaming pull. More...
 
std::int64_t max_outstanding_messages () const
 
SubscriberOptionsset_max_outstanding_bytes (std::int64_t bytes)
 Set the maximum number of outstanding bytes per streaming pull. More...
 
std::int64_t max_outstanding_bytes () const
 
SubscriberOptionsset_max_concurrency (std::size_t v)
 Set the maximum callback concurrency. More...
 
std::size_t max_concurrency () const
 Maximum number of callbacks scheduled by the library at a time. More...
 
SubscriberOptionsset_shutdown_polling_period (std::chrono::milliseconds v)
 Control how often the session polls for automatic shutdowns. More...
 
std::chrono::milliseconds shutdown_polling_period () const
 

Detailed Description

Configure how a Subscriber handles incoming messages.

There are two main algorithms controlled by this function: the dispatching of application callbacks, and requesting more data from the service.

Callback Concurrency Control

The subscription configuration determines the upper limit (set set_concurrency_watermarks()) how many callbacks are scheduled at a time. As long as this limit is not reached the library will continue to schedule callbacks, once the limit is reached the library will wait until the number of executing callbacks goes below the low watermark.

A callback is "executing" until the AckHandler::ack() or AckHandler::nack() function is called on the associated AckHandler. Applications can use this to move long-running computations out of the library internal thread pool.

Note that callbacks are "scheduled", but they may not immediately execute. For example, callbacks may be sequenced if the concurrency control parameters are higher than the number of I/O threads configured in the SubscriberConnection.

The default value for the concurrency high watermarks is set to the value returned by std::thread::hardware_concurrency() (or 4 if your standard library returns 0 for this parameter).

Message Flow Control

The subscription will request more messages from the service as long as both the outstanding message count (see set_message_count_watermarks()) and the number of bytes in the outstanding messages (see set_message_size_watermarks()) are below the high watermarks for these values.

Once either of the high watermarks are breached the library will wait until both the values are below their low watermarks before requesting more messages from the service.

In this algorithm a message is outstanding until the AckHandler::ack() or AckHandler::nack() function is called on the associated AckHandler. Note that if the concurrency control algorithm has not scheduled a callback this can also put back pressure on the flow control algorithm.

Example: setting the concurrency control parameters
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
// Create a subscriber with 16 threads handling I/O work, by default the
// library creates `std::thread::hardware_concurrency()` threads.
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}
.set<GrpcBackgroundThreadPoolSizeOption>(16)));
// Create a subscription where up to 8 messages are handled concurrently. By
// default the library uses `std::thread::hardware_concurrency()` as the
// maximum number of concurrent callbacks.
auto session = subscriber.Subscribe(
// This handler executes in the I/O threads, applications could use,
// std::async(), a thread-pool, or any other mechanism to transfer the
// execution to other threads.
std::cout << "Received message " << m << "\n";
std::move(h).ack();
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
return std::make_pair(subscriber, std::move(session));
};
Defines the interface to acknowledge and reject messages.
Definition: ack_handler.h:48
The C++ representation for a Cloud Pub/Sub messages.
Definition: message.h:78
Receive messages from the Cloud Pub/Sub service.
Definition: subscriber.h:89
future< Status > Subscribe(Callable &&cb)
Creates a new session to receive messages from subscription.
Definition: subscriber.h:129
Objects of this class identify a Cloud Pub/Sub subscription.
Definition: subscription.h:37
std::shared_ptr< SubscriberConnection > MakeSubscriberConnection(Subscription subscription, std::initializer_list< pubsub_internal::NonConstructible >)
Creates a new SubscriberConnection object to work with Subscriber.
Contains all the Cloud Pubsub C++ client types and functions.
Definition: ack_handler.cc:20
The maximum callback concurrency.
Definition: options.h:264
Example: setting the flow control parameters
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
// Change the flow control watermarks, by default the client library uses
// 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the
// size watermarks. Recall that the library stops requesting messages if
// any of the high watermarks are reached, and the library resumes
// requesting messages when *both* low watermarks are reached.
auto constexpr kMiB = 1024 * 1024L;
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}
.set<pubsub::MaxOutstandingBytesOption>(8 * kMiB)));
auto session = subscriber.Subscribe(
std::move(h).ack();
std::cout << "Received message " << m << "\n";
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
return std::make_pair(subscriber, std::move(session));
};
The maximum number of outstanding messages per streaming pull.
Definition: options.h:225

Definition at line 94 of file subscriber_options.h.

Constructor & Destructor Documentation

◆ SubscriberOptions() [1/2]

google::cloud::pubsub::v1::SubscriberOptions::SubscriberOptions ( )
inline

Definition at line 96 of file subscriber_options.h.

◆ SubscriberOptions() [2/2]

google::cloud::pubsub::v1::SubscriberOptions::SubscriberOptions ( Options  opts)
explicit

Initialize the subscriber options.

Expected options are any of the types in the SubscriberOptionList

Note
Unrecognized options will be ignored. To debug issues with options set GOOGLE_CLOUD_CPP_ENABLE_CLOG=yes in the environment and unexpected options will be logged.
Parameters
optsconfiguration options

Definition at line 26 of file subscriber_options.cc.

Member Function Documentation

◆ max_concurrency()

std::size_t google::cloud::pubsub::v1::SubscriberOptions::max_concurrency ( ) const
inline

Maximum number of callbacks scheduled by the library at a time.

Definition at line 218 of file subscriber_options.h.

◆ max_deadline_extension()

std::chrono::seconds google::cloud::pubsub::v1::SubscriberOptions::max_deadline_extension ( ) const
inline

Definition at line 152 of file subscriber_options.h.

◆ max_deadline_time()

std::chrono::seconds google::cloud::pubsub::v1::SubscriberOptions::max_deadline_time ( ) const
inline

The maximum deadline for each incoming message.

Configure how long does the application have to respond (ACK or NACK) an incoming message. Note that this might be longer, or shorter, than the deadline configured in the server-side subscription.

The value 0 is reserved to leave the deadline unmodified and just use the server-side configuration.

Note
The deadline applies to each message as it is delivered to the application, thus, if the library receives a batch of N messages their deadline for all the messages is extended repeatedly. Only once the message is delivered to a callback does the deadline become immutable.

Definition at line 126 of file subscriber_options.h.

◆ max_outstanding_bytes()

std::int64_t google::cloud::pubsub::v1::SubscriberOptions::max_outstanding_bytes ( ) const
inline

Definition at line 190 of file subscriber_options.h.

◆ max_outstanding_messages()

std::int64_t google::cloud::pubsub::v1::SubscriberOptions::max_outstanding_messages ( ) const
inline

Definition at line 171 of file subscriber_options.h.

◆ set_max_concurrency()

SubscriberOptions & google::cloud::pubsub::v1::SubscriberOptions::set_max_concurrency ( std::size_t  v)

Set the maximum callback concurrency.

The Cloud Pub/Sub C++ client library will schedule parallel callbacks as long as the number of outstanding callbacks is less than this maximum.

Note that this controls the number of callbacks scheduled, not the number of callbacks actually executing at a time. The application needs to create (or configure) the background threads pool with enough parallelism to execute more than one callback at a time.

Some applications may want to share a thread pool across many subscriptions, the additional level of control (scheduled vs. running callbacks) allows applications, for example, to ensure that at most K threads in the pool are used by any given subscription.

Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
// Create a subscriber with 16 threads handling I/O work, by default the
// library creates `std::thread::hardware_concurrency()` threads.
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}
.set<GrpcBackgroundThreadPoolSizeOption>(16)));
// Create a subscription where up to 8 messages are handled concurrently. By
// default the library uses `std::thread::hardware_concurrency()` as the
// maximum number of concurrent callbacks.
auto session = subscriber.Subscribe(
// This handler executes in the I/O threads, applications could use,
// std::async(), a thread-pool, or any other mechanism to transfer the
// execution to other threads.
std::cout << "Received message " << m << "\n";
std::move(h).ack();
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
return std::make_pair(subscriber, std::move(session));
};
Parameters
vthe new value, 0 resets to the default

Definition at line 51 of file subscriber_options.cc.

◆ set_max_deadline_extension()

SubscriberOptions & google::cloud::pubsub::v1::SubscriberOptions::set_max_deadline_extension ( std::chrono::seconds  extension)

Set the maximum time by which the deadline for each incoming message is extended.

The Cloud Pub/Sub C++ client library will extend the deadline by at most this amount, while waiting for an ack or nack. The default extension is 10 minutes. An application may wish to reduce this extension so that the Pub/Sub service will resend a message sooner when it does not hear back from a Subscriber.

The value is clamped between 10 seconds and 10 minutes.

Parameters
extensionthe maximum time that the deadline can be extended by, measured in seconds.

Definition at line 31 of file subscriber_options.cc.

◆ set_max_deadline_time()

SubscriberOptions& google::cloud::pubsub::v1::SubscriberOptions::set_max_deadline_time ( std::chrono::seconds  d)
inline

Set the maximum deadline for incoming messages.

Definition at line 131 of file subscriber_options.h.

◆ set_max_outstanding_bytes()

SubscriberOptions & google::cloud::pubsub::v1::SubscriberOptions::set_max_outstanding_bytes ( std::int64_t  bytes)

Set the maximum number of outstanding bytes per streaming pull.

The Cloud Pub/Sub C++ client library uses streaming pull requests to receive messages from the service. The service will stop delivering messages if bytes or more worth of messages have not been acknowledged nor rejected.

Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
// Change the flow control watermarks, by default the client library uses
// 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the
// size watermarks. Recall that the library stops requesting messages if
// any of the high watermarks are reached, and the library resumes
// requesting messages when *both* low watermarks are reached.
auto constexpr kMiB = 1024 * 1024L;
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}
.set<pubsub::MaxOutstandingBytesOption>(8 * kMiB)));
auto session = subscriber.Subscribe(
std::move(h).ack();
std::cout << "Received message " << m << "\n";
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
return std::make_pair(subscriber, std::move(session));
};
Parameters
bytesthe maximum number of bytes outstanding, use 0 or negative numbers to make the number of bytes unlimited.

Definition at line 45 of file subscriber_options.cc.

◆ set_max_outstanding_messages()

SubscriberOptions & google::cloud::pubsub::v1::SubscriberOptions::set_max_outstanding_messages ( std::int64_t  message_count)

Set the maximum number of outstanding messages per streaming pull.

The Cloud Pub/Sub C++ client library uses streaming pull requests to receive messages from the service. The service will stop delivering messages if message_count or more messages have not been acknowledged nor rejected.

Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
// Change the flow control watermarks, by default the client library uses
// 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the
// size watermarks. Recall that the library stops requesting messages if
// any of the high watermarks are reached, and the library resumes
// requesting messages when *both* low watermarks are reached.
auto constexpr kMiB = 1024 * 1024L;
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}
.set<pubsub::MaxOutstandingBytesOption>(8 * kMiB)));
auto session = subscriber.Subscribe(
std::move(h).ack();
std::cout << "Received message " << m << "\n";
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
return std::make_pair(subscriber, std::move(session));
};
Parameters
message_countthe maximum number of messages outstanding, use 0 or negative numbers to make the message count unlimited.

Definition at line 38 of file subscriber_options.cc.

◆ set_shutdown_polling_period()

SubscriberOptions& google::cloud::pubsub::v1::SubscriberOptions::set_shutdown_polling_period ( std::chrono::milliseconds  v)
inline

Control how often the session polls for automatic shutdowns.

Applications can shutdown a session by calling .cancel() on the returned future<Status>. In addition, applications can fire & forget a session, which is only shutdown once the completion queue servicing the session shuts down. In this latter case the session polls periodically to detect if the CQ has shutdown. This controls how often this polling happens.

Definition at line 231 of file subscriber_options.h.

◆ shutdown_polling_period()

std::chrono::milliseconds google::cloud::pubsub::v1::SubscriberOptions::shutdown_polling_period ( ) const
inline

Definition at line 235 of file subscriber_options.h.