Google Cloud Pub/Sub C++ Client 2.13.0
A C++ Client Library for Google Cloud Pub/Sub
Loading...
Searching...
No Matches
Public Member Functions | Friends | List of all members
google::cloud::pubsub::Publisher Class Reference

Publish messages to the Cloud Pub/Sub service. More...

#include <google/cloud/pubsub/publisher.h>

Public Member Functions

 Publisher (std::shared_ptr< PublisherConnection > connection)
 
 Publisher (Publisher const &)=default
 
Publisheroperator= (Publisher const &)=default
 
 Publisher (Publisher &&)=default
 
Publisheroperator= (Publisher &&)=default
 
future< StatusOr< std::string > > Publish (Message m)
 Publishes a message to this publisher's topic. More...
 
void Flush ()
 Forcibly publishes any batched messages. More...
 
void ResumePublish (std::string ordering_key)
 Resumes publishing after an error. More...
 
 Publisher (std::shared_ptr< PublisherConnection > connection, PublisherOptions const &)
 

Friends

bool operator== (Publisher const &a, Publisher const &b)
 
bool operator!= (Publisher const &a, Publisher const &b)
 

Detailed Description

Publish messages to the Cloud Pub/Sub service.

This class is used to publish messages to a fixed topic, with a fixed configuration such as credentials, batching, background threads, etc. Applications that publish messages to multiple topics need to create separate instances of this class. Applications wanting to publish events with different batching configuration also need to create separate instances.

See also
https://cloud.google.com/pubsub for an overview of the Cloud Pub/Sub service.
Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
auto message_id = publisher.Publish(
pubsub::MessageBuilder{}.SetData("Hello World!").Build());
auto done = message_id.then([](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
std::cout << "Hello World! published with id=" << *id << "\n";
});
// Block until the message is published
done.get();
}
Constructs Message objects.
Definition: message.h:144
Message Build() &&
Creates a new message.
Definition: message.h:149
MessageBuilder & SetData(std::string data) &
Sets the message payload to data.
Definition: message.h:152
Publish messages to the Cloud Pub/Sub service.
Definition: publisher.h:98
Message Ordering
A Publisher configured to preserve message ordering will sequence the messages that share a common ordering key (see MessageBuilder::SetOrderingKey()). Messages will be batched by ordering key, and new batches will wait until the status of the previous batch is known. On an error, all pending and queued messages are discarded, and the publisher rejects any new messages for the ordering key that experienced problems. The application must call Publisher::ResumePublishing() to to restore publishing.
Performance
Publisher objects are relatively cheap to create, copy, and move. However, each Publisher object must be created with a std::shared_ptr<PublisherConnection>, which itself is relatively expensive to create. Therefore, connection instances should be shared when possible. See the MakePublisherConnection() method and the PublisherConnection interface for more details.
Thread Safety
Instances of this class created via copy-construction or copy-assignment share the underlying pool of connections. Access to these copies via multiple threads is guaranteed to work. Two threads operating on the same instance of this class is not guaranteed to work.
Background Threads
This class uses the background threads configured via the Options from GrpcOptionList. Applications can create their own pool of background threads by (a) creating their own google::cloud::CompletionQueue, (b) passing this completion queue as a GrpcCompletionQueueOption, and (c) attaching any number of threads to the completion queue.
Example: using a custom thread pool
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcCompletionQueueOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
// Create our own completion queue to run the background activity, such as
// flushing the publisher.
// Setup one or more of threads to service this completion queue. These must
// remain running until all the work is done.
std::vector<std::thread> tasks;
std::generate_n(std::back_inserter(tasks), 4, [&cq] {
return std::thread([cq]() mutable { cq.Run(); });
});
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
std::move(topic), Options{}.set<GrpcCompletionQueueOption>(cq)));
std::vector<future<void>> ids;
for (char const* data : {"1", "2", "3", "go!"}) {
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>> f) {
auto s = f.get();
if (!s) return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for (auto& id : ids) id.get();
// Shutdown the completion queue and join the threads
cq.Shutdown();
for (auto& t : tasks) t.join();
}
Options & set(ValueTypeT< T > v)
void Flush()
Forcibly publishes any batched messages.
Definition: publisher.h:165
Objects of this class identify a Cloud Pub/Sub topic.
Definition: topic.h:37
std::shared_ptr< PublisherConnection > MakePublisherConnection(Topic topic, std::initializer_list< internal::NonConstructible >)
Creates a new PublisherConnection object to work with Publisher.
Asynchronous Functions
Some of the member functions in this class return a future<T> (or future<StatusOr<T>>) object. Readers are probably familiar with std::future<T>. Our version adds a .then() function to attach a callback to the future, which is invoked when the future is satisfied. This function returns a future<U> where U is the return value of the attached function. More details in the google::cloud::future documentation.
Error Handling
This class uses StatusOr<T> to report errors. When an operation fails to perform its work the returned StatusOr<T> contains the error details. If the ok() member function in the StatusOr<T> returns true then it contains the expected result. Please consult the google::cloud::StatusOr documentation for more details.
Batching Configuration Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// By default, the publisher will flush a batch after 10ms, after it
// contains more than 100 message, or after it contains more than 1MiB of
// data, whichever comes first. This changes those defaults.
std::move(topic),
.set<pubsub::MaxHoldTimeOption>(std::chrono::milliseconds(20))
.set<pubsub::MaxBatchBytesOption>(4 * 1024 * 1024L)
std::vector<future<void>> ids;
for (char const* data : {"1", "2", "3", "go!"}) {
ids.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
.then([data](future<StatusOr<std::string>> f) {
auto s = f.get();
if (!s) return;
std::cout << "Sent '" << data << "' (" << *s << ")\n";
}));
}
publisher.Flush();
// Block until they are actually sent.
for (auto& id : ids) id.get();
}
The maximum number of messages in a batch.
Definition: options.h:105
The maximum hold time for the messages.
Definition: options.h:89

Constructor & Destructor Documentation

◆ Publisher() [1/4]

google::cloud::pubsub::Publisher::Publisher ( std::shared_ptr< PublisherConnection connection)
inlineexplicit

◆ Publisher() [2/4]

google::cloud::pubsub::Publisher::Publisher ( Publisher const &  )
default

◆ Publisher() [3/4]

google::cloud::pubsub::Publisher::Publisher ( Publisher &&  )
default

◆ Publisher() [4/4]

google::cloud::pubsub::Publisher::Publisher ( std::shared_ptr< PublisherConnection connection,
PublisherOptions const &   
)
inlineexplicit
Deprecated:
Use Publisher(connection) and provide any configuration options when initializing the connection object.

Member Function Documentation

◆ Flush()

void google::cloud::pubsub::Publisher::Flush ( )
inline

Forcibly publishes any batched messages.

As applications can configure a Publisher to buffer messages, it is sometimes useful to flush them before any of the normal criteria to send the RPCs is met.

Idempotency
See the description in Publish().
Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
std::vector<future<void>> done;
for (int i = 0; i != 10; ++i) {
auto message_id = publisher.Publish(
.SetData("Hello World! [" + std::to_string(i) + "]")
.SetAttribute("origin", "cpp-sample")
.SetAttribute("username", "gcp")
.Build());
done.push_back(message_id.then([i](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
std::cout << "Message " << i << " published with id=" << *id << "\n";
}));
}
publisher.Flush();
// Block until all the messages are published (optional)
for (auto& f : done) f.get();
}
MessageBuilder & SetAttribute(std::string const &key, std::string value) &
Inserts or sets an attribute on the message.
Definition: message.h:192
Note
This function does not return any status or error codes, the application can use the future<StatusOr<std::string>> returned in each Publish() call to find out what the results are.

◆ operator=() [1/2]

Publisher & google::cloud::pubsub::Publisher::operator= ( Publisher &&  )
default

◆ operator=() [2/2]

Publisher & google::cloud::pubsub::Publisher::operator= ( Publisher const &  )
default

◆ Publish()

future< StatusOr< std::string > > google::cloud::pubsub::Publisher::Publish ( Message  m)
inline

Publishes a message to this publisher's topic.

Note that the message may be batched, depending on the Publisher's configuration. It could be delayed until the batch has enough messages, or enough data, or enough time has elapsed. See the PublisherOptionList documentation for more details.

Idempotency
This is a non-idempotent operation, but the client library will automatically retry RPCs that fail with transient errors. As Cloud Pub/Sub has "at least once" delivery semantics applications are expected to handle duplicate messages without problems. The application can disable retries by changing the retry policy, please see the example below.
Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
auto message_id = publisher.Publish(
pubsub::MessageBuilder{}.SetData("Hello World!").Build());
auto done = message_id.then([](future<StatusOr<std::string>> f) {
auto id = f.get();
if (!id) throw std::move(id).status();
std::cout << "Hello World! published with id=" << *id << "\n";
});
// Block until the message is published
done.get();
}
Disabling Retries Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
std::move(topic),
pubsub::LimitedErrorCountRetryPolicy(/*maximum_failures=*/0)
.clone())
.set<pubsub::BackoffPolicyOption>(
/*initial_delay=*/std::chrono::milliseconds(200),
/*maximum_delay=*/std::chrono::seconds(45),
/*scaling=*/2.0)
.clone())));
std::vector<future<bool>> done;
for (char const* data : {"1", "2", "3", "go!"}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
return f.get().ok();
}));
}
publisher.Flush();
int count = 0;
for (auto& f : done) {
if (f.get()) ++count;
}
std::cout << count << " messages sent successfully\n";
}
google::cloud::internal::LimitedErrorCountRetryPolicy< pubsub_internal::RetryTraits > LimitedErrorCountRetryPolicy
A retry policy that limits the number of times a request can fail.
Definition: retry_policy.h:55
google::cloud::internal::ExponentialBackoffPolicy ExponentialBackoffPolicy
A truncated exponential backoff policy with randomized periods.
Definition: backoff_policy.h:31
The retry policy.
Definition: options.h:58
Changing Retry Parameters Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
// By default a publisher will retry for 60 seconds, with an initial backoff
// of 100ms, a maximum backoff of 60 seconds, and the backoff will grow by
// 30% after each attempt. This changes those defaults.
std::move(topic),
/*maximum_duration=*/std::chrono::minutes(10))
.clone())
.set<pubsub::BackoffPolicyOption>(
/*initial_delay=*/std::chrono::milliseconds(200),
/*maximum_delay=*/std::chrono::seconds(45),
/*scaling=*/2.0)
.clone())));
std::vector<future<bool>> done;
for (char const* data : {"1", "2", "3", "go!"}) {
done.push_back(
publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
return f.get().ok();
}));
}
publisher.Flush();
int count = 0;
for (auto& f : done) {
if (f.get()) ++count;
}
std::cout << count << " messages sent successfully\n";
}
::google::cloud::internal::LimitedTimeRetryPolicy< pubsub_internal::RetryTraits > LimitedTimeRetryPolicy
A retry policy that limits based on time.
Definition: retry_policy.h:50
Returns
a future that becomes satisfied when the message is published or on a unrecoverable error. On success, the future is satisfied with the server-assigned ID of the message. IDs are guaranteed to be unique within the topic.

◆ ResumePublish()

void google::cloud::pubsub::Publisher::ResumePublish ( std::string  ordering_key)
inline

Resumes publishing after an error.

If the publisher options have message ordering enabled (see MessageOrderingOption) all messages for a key that experience failure will be rejected until the application calls this function.

Idempotency
This function never initiates a remote RPC, so there are no considerations around retrying it. Note, however, that more than one Publish() request may fail for the same ordering key. The application needs to call this function after each error before it can resume publishing messages with the same ordering key.
Example
namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
[](pubsub::Publisher publisher) {
struct SampleData {
std::string ordering_key;
std::string data;
} data[] = {
{"key1", "message1"}, {"key2", "message2"}, {"key1", "message3"},
{"key1", "message4"}, {"key1", "message5"},
};
std::vector<future<void>> done;
for (auto& datum : data) {
auto const& da = datum; // workaround MSVC lambda capture confusion
auto handler = [da, publisher](future<StatusOr<std::string>> f) mutable {
auto const msg = da.ordering_key + "#" + da.data;
auto id = f.get();
if (!id) {
std::cout << "An error has occurred publishing " << msg << "\n";
publisher.ResumePublish(da.ordering_key);
return;
}
std::cout << "Message " << msg << " published as id=" << *id << "\n";
};
done.push_back(
publisher
.Publish(pubsub::MessageBuilder{}
.SetData("Hello World! [" + datum.data + "]")
.SetOrderingKey(datum.ordering_key)
.Build())
.then(handler));
}
publisher.Flush();
// Block until all the messages are published (optional)
for (auto& f : done) f.get();
}
future< StatusOr< std::string > > Publish(Message m)
Publishes a message to this publisher's topic.
Definition: publisher.h:144

Friends And Related Function Documentation

◆ operator!=

bool operator!= ( Publisher const &  a,
Publisher const &  b 
)
friend

◆ operator==

bool operator== ( Publisher const &  a,
Publisher const &  b 
)
friend