Google Cloud Pub/Sub C++ Client  2.5.0
A C++ Client Library for Google Cloud Pub/Sub
Public Member Functions | List of all members
google::cloud::pubsub::Subscriber Class Reference

Receive messages from the Cloud Pub/Sub service. More...

#include <google/cloud/pubsub/subscriber.h>

Public Member Functions

 Subscriber (std::shared_ptr< SubscriberConnection > connection, Options opts={})
 
future< StatusSubscribe (ApplicationCallback cb, Options opts={})
 Creates a new session to receive messages from subscription. More...
 
future< StatusSubscribe (ExactlyOnceApplicationCallback cb, Options opts={})
 Creates a new session to receive messages from subscription using exactly-once delivery. More...
 
StatusOr< PullResponsePull (Options opts={})
 Pulls one message from subscription. More...
 

Detailed Description

Receive messages from the Cloud Pub/Sub service.

This class is used to receive message from a given subscription, with a fixed configuration such as credentials, and background threads. Applications that receive messages from multiple subscriptions need to create separate instances of this class. Applications wanting to receive events with configuration parameters also need to create separate instances.

See also
https://cloud.google.com/pubsub for an overview of the Cloud Pub/Sub service.
Example: subscriber quickstart
namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
return subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::move(h).ack();
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
};
Performance
Subscriber objects are relatively cheap to create, copy, and move. However, each Subscriber object must be created with a std::shared_ptr<SubscriberConnection>, which itself is relatively expensive to create. Therefore, connection instances should be shared when possible. See the MakeSubscriberConnection() function and the SubscriberConnection interface for more details.
Thread Safety
Instances of this class created via copy-construction or copy-assignment share the underlying pool of connections. Access to these copies via multiple threads is guaranteed to work. Two threads operating on the same instance of this class is not guaranteed to work.
Background Threads
This class uses the background threads configured via the Options from GrpcOptionList. Applications can create their own pool of background threads by (a) creating their own google::cloud::CompletionQueue, (b) passing this completion queue as a GrpcCompletionQueueOption, and (c) attaching any number of threads to the completion queue.
Example: using a custom thread pool
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcCompletionQueueOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string subscription_id) {
// Create our own completion queue to run the background activity.
// Setup one or more of threads to service this completion queue. These must
// remain running until all the work is done.
std::vector<std::thread> tasks;
std::generate_n(std::back_inserter(tasks), 4, [&cq] {
return std::thread([cq]() mutable { cq.Run(); });
});
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}.set<GrpcCompletionQueueOption>(cq)));
// Because this is an example we want to exit eventually, use a mutex and
// condition variable to notify the current thread and stop the example.
std::mutex mu;
std::condition_variable cv;
int count = 0;
auto await_count = [&] {
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [&] { return count >= 4; });
};
auto increase_count = [&] {
std::unique_lock<std::mutex> lk(mu);
if (++count >= 4) cv.notify_one();
};
// Receive messages in the previously allocated thread pool.
auto session = subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
increase_count();
std::move(h).ack();
});
await_count();
session.cancel();
// Report any final status, blocking until the subscription loop completes,
// either with a failure or because it was canceled.
auto status = session.get();
std::cout << "Message count=" << count << ", status=" << status << "\n";
// Shutdown the completion queue and join the threads
cq.Shutdown();
for (auto& t : tasks) t.join();
}
std::shared_ptr< SubscriberConnection > MakeSubscriberConnection(Subscription subscription, std::initializer_list< internal::NonConstructible >)
Creates a new SubscriberConnection object to work with Subscriber.
Asynchronous Functions
Some of the member functions in this class return a future<T> (or future<StatusOr<T>>) object. Readers are probably familiar with std::future<T>. Our version adds a .then() function to attach a callback to the future, which is invoked when the future is satisfied. This function returns a future<U> where U is the return value of the attached function. More details in the google::cloud::future documentation.
Error Handling
This class uses StatusOr<T> to report errors. When an operation fails to perform its work the returned StatusOr<T> contains the error details. If the ok() member function in the StatusOr<T> returns true then it contains the expected result. Please consult the google::cloud::StatusOr documentation for more details.
Changing Retry Parameters Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
// By default a subscriber will retry for 60 seconds, with an initial
// backoff of 100ms, a maximum backoff of 60 seconds, and the backoff will
// grow by 30% after each attempt. This changes those defaults.
auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}
.set<pubsub::RetryPolicyOption>(
/*maximum_duration=*/std::chrono::minutes(1))
.clone())
.set<pubsub::BackoffPolicyOption>(
/*initial_delay=*/std::chrono::milliseconds(200),
/*maximum_delay=*/std::chrono::seconds(10),
/*scaling=*/2.0)
.clone())));
auto session = subscriber.Subscribe(
[](pubsub::Message const& m, pubsub::AckHandler h) {
std::move(h).ack();
std::cout << "Received message " << m << "\n";
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
return std::make_pair(subscriber, std::move(session));
};
google::cloud::internal::ExponentialBackoffPolicy ExponentialBackoffPolicy
A truncated exponential backoff policy with randomized periods.
::google::cloud::internal::LimitedTimeRetryPolicy< pubsub_internal::RetryTraits > LimitedTimeRetryPolicy
A retry policy that limits based on time.
Definition: retry_policy.h:50

Definition at line 91 of file subscriber.h.

Constructor & Destructor Documentation

◆ Subscriber()

google::cloud::pubsub::Subscriber::Subscriber ( std::shared_ptr< SubscriberConnection connection,
Options  opts = {} 
)
explicit

Member Function Documentation

◆ Pull()

StatusOr<PullResponse> google::cloud::pubsub::Subscriber::Pull ( Options  opts = {})

Pulls one message from subscription.

Idempotency

This is an idempotent operation; it only reads messages from the service. It will make multiple attempts to pull a message from the service, subject to the retry policies configured in the SubscriberConnection.

Note that calling PullAckHandler::ack() and/or PullAckHandler::nack() have their own rules with respect to retrying.

Example
auto response = subscriber.Pull();
if (!response) throw std::move(response).status();
std::cout << "Received message " << response->message << "\n";
std::move(response->handler).ack();
}
Receive messages from the Cloud Pub/Sub service.
Definition: subscriber.h:91
Parameters
optsany option overrides to use in this call. These options take precedence over the options passed in the constructor, and over any options provided in the PublisherConnection initialization.
Returns
a response including the message and a PullAckHandler to notify the library when the message has been successfully handled.

◆ Subscribe() [1/2]

future<Status> google::cloud::pubsub::Subscriber::Subscribe ( ApplicationCallback  cb,
Options  opts = {} 
)

Creates a new session to receive messages from subscription.

Idempotency

This is an idempotent operation; it only reads messages from the service. Will make multiple attempts to start a connection to the service, subject to the retry policies configured in the SubscriberConnection. Once a successful connection is established the library will try to resume the connection even if the connection fails with a permanent error. Resuming the connection is subject to the retry policies as described earlier.

Note that calling AckHandler::ack() and/or AckHandler::nack() is handled differently with respect to retrying. Check the documentation of these functions for details.

Example
namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
return subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::AckHandler h) {
std::cout << "Received message " << m << "\n";
std::move(h).ack();
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
};
Parameters
cbthe callable invoked when messages are received.
optsany option overrides to use in this call. These options take precedence over the options passed in the constructor, and over any options provided in the PublisherConnection initialization.
Returns
a future that is satisfied when the session will no longer receive messages. For example, because there was an unrecoverable error trying to receive data. Calling .cancel() in this object will (eventually) terminate the session and satisfy the future.

◆ Subscribe() [2/2]

future<Status> google::cloud::pubsub::Subscriber::Subscribe ( ExactlyOnceApplicationCallback  cb,
Options  opts = {} 
)

Creates a new session to receive messages from subscription using exactly-once delivery.

Idempotency

This is an idempotent operation; it only reads messages from the service. Will make multiple attempts to start a connection to the service, subject to the retry policies configured in the SubscriberConnection. Once a successful connection is established the library will try to resume the connection even if the connection fails with a permanent error. Resuming the connection is subject to the retry policies as described earlier.

Note that calling ExactlyOnceAckHandler::ack() and/or ExactlyOnceAckHandler::nack() have their own rules with respect to retrying. Check the documentation of these functions for details.

Example
namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
return subscriber.Subscribe(
[&](pubsub::Message const& m, pubsub::ExactlyOnceAckHandler h) {
std::cout << "Received message " << m << "\n";
std::move(h).ack().then([id = m.message_id()](auto f) {
auto status = f.get();
std::cout << "Message id " << id
<< " ack() completed with status=" << status << "\n";
});
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
};
Parameters
cbthe callable invoked when messages are received.
optsany option overrides to use in this call. These options take precedence over the options passed in the constructor, and over any options provided in the PublisherConnection initialization.
Returns
a future that is satisfied when the session will no longer receive messages. For example, because there was an unrecoverable error trying to receive data. Calling .cancel() in this object will (eventually) terminate the session and satisfy the future.