Google Cloud Pub/Sub C++ Client  1.32.1
A C++ Client Library for Google Cloud Pub/Sub
Public Member Functions | List of all members
google::cloud::pubsub::v1::Subscriber Class Reference

Receive messages from the Cloud Pub/Sub service. More...

#include <google/cloud/pubsub/subscriber.h>

Public Member Functions

 Subscriber (std::shared_ptr< SubscriberConnection > connection)
 
template<typename Callable >
future< Status > Subscribe (Callable &&cb)
 Creates a new session to receive messages from subscription. More...
 

Detailed Description

Receive messages from the Cloud Pub/Sub service.

This class is used to receive message from a given subscription, with a fixed configuration such as credentials, and background threads. Applications that receive messages from multiple subscriptions need to create separate instances of this class. Applications wanting to receive events with configuration parameters also need to create separate instances.

See also
https://cloud.google.com/pubsub for an overview of the Cloud Pub/Sub service.
Example: subscriber quickstart
namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
return subscriber.Subscribe(
std::cout << "Received message " << m << "\n";
std::move(h).ack();
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
};
Defines the interface to acknowledge and reject messages.
Definition: ack_handler.h:48
The C++ representation for a Cloud Pub/Sub messages.
Definition: message.h:78
Receive messages from the Cloud Pub/Sub service.
Definition: subscriber.h:89
Contains all the Cloud Pubsub C++ client types and functions.
Definition: ack_handler.cc:20
Performance
Subscriber objects are relatively cheap to create, copy, and move. However, each Subscriber object must be created with a std::shared_ptr<SubscriberConnection>, which itself is relatively expensive to create. Therefore, connection instances should be shared when possible. See the MakeSubscriberConnection() function and the SubscriberConnection interface for more details.
Thread Safety
Instances of this class created via copy-construction or copy-assignment share the underlying pool of connections. Access to these copies via multiple threads is guaranteed to work. Two threads operating on the same instance of this class is not guaranteed to work.
Background Threads
This class uses the background threads configured via the Options from GrpcOptionList. Applications can create their own pool of background threads by (a) creating their own google::cloud::v1::CompletionQueue, (b) passing this completion queue as a GrpcCompletionQueueOption, and (c) attaching any number of threads to the completion queue.
Example: using a custom thread pool
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcCompletionQueueOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string subscription_id) {
// Create our own completion queue to run the background activity.
google::cloud::CompletionQueue cq;
// Setup one or more of threads to service this completion queue. These must
// remain running until all the work is done.
std::vector<std::thread> tasks;
std::generate_n(std::back_inserter(tasks), 4, [&cq] {
return std::thread([cq]() mutable { cq.Run(); });
});
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}.set<GrpcCompletionQueueOption>(cq)));
// Because this is an example we want to exit eventually, use a mutex and
// condition variable to notify the current thread and stop the example.
std::mutex mu;
std::condition_variable cv;
int count = 0;
auto await_count = [&] {
std::unique_lock<std::mutex> lk(mu);
cv.wait(lk, [&] { return count >= 4; });
};
auto increase_count = [&] {
std::unique_lock<std::mutex> lk(mu);
if (++count >= 4) cv.notify_one();
};
// Receive messages in the previously allocated thread pool.
auto session = subscriber.Subscribe(
std::cout << "Received message " << m << "\n";
increase_count();
std::move(h).ack();
});
await_count();
session.cancel();
// Report any final status, blocking until the subscription loop completes,
// either with a failure or because it was canceled.
auto status = session.get();
std::cout << "Message count=" << count << ", status=" << status << "\n";
// Shutdown the completion queue and join the threads
cq.Shutdown();
for (auto& t : tasks) t.join();
}
Objects of this class identify a Cloud Pub/Sub subscription.
Definition: subscription.h:37
absl::Time t
std::shared_ptr< SubscriberConnection > MakeSubscriberConnection(Subscription subscription, std::initializer_list< pubsub_internal::NonConstructible >)
Creates a new SubscriberConnection object to work with Subscriber.
Asynchronous Functions
Some of the member functions in this class return a future<T> (or future<StatusOr<T>>) object. Readers are probably familiar with std::future<T>. Our version adds a .then() function to attach a callback to the future, which is invoked when the future is satisfied. This function returns a future<U> where U is the return value of the attached function. More details in the google::cloud::v1::future documentation.
Error Handling
This class uses StatusOr<T> to report errors. When an operation fails to perform its work the returned StatusOr<T> contains the error details. If the ok() member function in the StatusOr<T> returns true then it contains the expected result. Please consult the google::cloud::v1::StatusOr documentation for more details.
Changing Retry Parameters Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
// By default a subscriber will retry for 60 seconds, with an initial
// backoff of 100ms, a maximum backoff of 60 seconds, and the backoff will
// grow by 30% after each attempt. This changes those defaults.
pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
Options{}
/*maximum_duration=*/std::chrono::minutes(1))
.clone())
.set<pubsub::BackoffPolicyOption>(
/*initial_delay=*/std::chrono::milliseconds(200),
/*maximum_delay=*/std::chrono::seconds(10),
/*scaling=*/2.0)
.clone())));
auto session = subscriber.Subscribe(
std::move(h).ack();
std::cout << "Received message " << m << "\n";
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
return std::make_pair(subscriber, std::move(session));
};
future< Status > Subscribe(Callable &&cb)
Creates a new session to receive messages from subscription.
Definition: subscriber.h:129
::google::cloud::internal::LimitedTimeRetryPolicy< pubsub_internal::RetryTraits > LimitedTimeRetryPolicy
A retry policy that limits based on time.
Definition: retry_policy.h:51
google::cloud::internal::ExponentialBackoffPolicy ExponentialBackoffPolicy
A truncated exponential backoff policy with randomized periods.
std::chrono::seconds seconds

Definition at line 89 of file subscriber.h.

Constructor & Destructor Documentation

◆ Subscriber()

google::cloud::pubsub::v1::Subscriber::Subscriber ( std::shared_ptr< SubscriberConnection connection)
inlineexplicit

Definition at line 91 of file subscriber.h.

Member Function Documentation

◆ Subscribe()

template<typename Callable >
future<Status> google::cloud::pubsub::v1::Subscriber::Subscribe ( Callable &&  cb)
inline

Creates a new session to receive messages from subscription.

Note
Callable must be CopyConstructible, as cb will be stored in a std::function<>.
Idempotency

This is an idempotent operation; it only reads messages from the service. Will make multiple attempts to start a connection to the service, subject to the retry policies configured in the SubscriberConnection. Once a successful connection is established the library will try to resume the connection even if the connection fails with a permanent error. Resuming the connection is subject to the retry policies as described earlier.

Note that calling AckHandler::ack() and/or AckHandler::nack() is handled differently with respect to retrying. Check the documentation of these functions for details.

Example
namespace pubsub = ::google::cloud::pubsub;
auto sample = [](pubsub::Subscriber subscriber) {
return subscriber.Subscribe(
std::cout << "Received message " << m << "\n";
std::move(h).ack();
PleaseIgnoreThisSimplifiesTestingTheSamples();
});
};
Parameters
cbthe callable invoked when messages are received. This must be usable to construct a std::function<void(pubsub::Message, pubsub::AckHandler)>.
Returns
a future that is satisfied when the session will no longer receive messages. For example, because there was an unrecoverable error trying to receive data. Calling .cancel() in this object will (eventually) terminate the session and satisfy the future.

Definition at line 129 of file subscriber.h.