Google Cloud Pub/Sub C++ Client  2.1.0
A C++ Client Library for Google Cloud Pub/Sub
publisher.h
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PUBLISHER_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PUBLISHER_H
17 
18 #include "google/cloud/pubsub/connection_options.h"
19 #include "google/cloud/pubsub/publisher_connection.h"
20 #include "google/cloud/pubsub/publisher_options.h"
21 #include "google/cloud/pubsub/version.h"
22 #include <string>
23 
24 namespace google {
25 namespace cloud {
26 namespace pubsub {
28 /**
29  * Publish messages to the Cloud Pub/Sub service.
30  *
31  * This class is used to publish message to a given topic, with a fixed
32  * configuration such as credentials, batching, background threads, etc.
33  * Applications that publish messages to multiple topics need to create separate
34  * instances of this class. Applications wanting to publish events with
35  * different batching configuration also need to create separate instances.
36  *
37  * @see https://cloud.google.com/pubsub for an overview of the Cloud Pub/Sub
38  * service.
39  *
40  * @par Example
41  * @snippet samples.cc publish
42  *
43  * @par Message Ordering
44  * A `Publisher` configured to preserve message ordering will sequence the
45  * messages that share a common ordering key (see
46  * `MessageBuilder::SetOrderingKey()`). Messages will be batched by ordering
47  * key, and new batches will wait until the status of the previous batch is
48  * known. On an error, all pending and queued messages are discarded, and the
49  * publisher rejects any new messages for the ordering key that experienced
50  * problems. The application must call `Publisher::ResumePublishing()` to
51  * to restore publishing.
52  *
53  * @par Performance
54  * `Publisher` objects are relatively cheap to create, copy, and move. However,
55  * each `Publisher` object must be created with a
56  * `std::shared_ptr<PublisherConnection>`, which itself is relatively expensive
57  * to create. Therefore, connection instances should be shared when possible.
58  * See the `MakePublisherConnection()` method and the `PublisherConnection`
59  * interface for more details.
60  *
61  * @par Thread Safety
62  * Instances of this class created via copy-construction or copy-assignment
63  * share the underlying pool of connections. Access to these copies via multiple
64  * threads is guaranteed to work. Two threads operating on the same instance of
65  * this class is not guaranteed to work.
66  *
67  * @par Background Threads
68  * This class uses the background threads configured via the `Options` from
69  * `GrpcOptionList`. Applications can create their own pool of background
70  * threads by (a) creating their own #google::cloud::CompletionQueue, (b)
71  * passing this completion queue as a `GrpcCompletionQueueOption`, and (c)
72  * attaching any number of threads to the completion queue.
73  *
74  * @par Example: using a custom thread pool
75  * @snippet samples.cc custom-thread-pool-publisher
76  *
77  * @par Asynchronous Functions
78  * Some of the member functions in this class return a `future<T>` (or
79  * `future<StatusOr<T>>`) object. Readers are probably familiar with
80  * [`std::future<T>`][std-future-link]. Our version adds a `.then()` function to
81  * attach a callback to the future, which is invoked when the future is
82  * satisfied. This function returns a `future<U>` where `U` is the return value
83  * of the attached function. More details in the #google::cloud::future
84  * documentation.
85  *
86  * @par Error Handling
87  * This class uses `StatusOr<T>` to report errors. When an operation fails to
88  * perform its work the returned `StatusOr<T>` contains the error details. If
89  * the `ok()` member function in the `StatusOr<T>` returns `true` then it
90  * contains the expected result. Please consult the #google::cloud::StatusOr
91  * documentation for more details.
92  *
93  * @par Batching Configuration Example
94  * @snippet samples.cc publisher-options
95  *
96  * [std-future-link]: https://en.cppreference.com/w/cpp/thread/future
97  */
98 class Publisher {
99  public:
100  explicit Publisher(std::shared_ptr<PublisherConnection> connection)
101  : connection_(std::move(connection)) {}
102 
103  Publisher(Publisher const&) = default;
104  Publisher& operator=(Publisher const&) = default;
105  Publisher(Publisher&&) = default;
106  Publisher& operator=(Publisher&&) = default;
107 
108  friend bool operator==(Publisher const& a, Publisher const& b) {
109  return a.connection_ == b.connection_;
110  }
111  friend bool operator!=(Publisher const& a, Publisher const& b) {
112  return !(a == b);
113  }
114 
115  /**
116  * Publishes a message to this publisher's topic
117  *
118  * Note that the message may be batched, depending on the Publisher's
119  * configuration. It could be delayed until the batch has enough messages,
120  * or enough data, or enough time has elapsed. See the `PublisherOptionList`
121  * documentation for more details.
122  *
123  * @par Idempotency
124  * This is a non-idempotent operation, but the client library will
125  * automatically retry RPCs that fail with transient errors. As Cloud Pub/Sub
126  * has "at least once" delivery semantics applications are expected to handle
127  * duplicate messages without problems. The application can disable retries
128  * by changing the retry policy, please see the example below.
129  *
130  * @par Example
131  * @snippet samples.cc publish
132  *
133  * @par Disabling Retries Example
134  * @snippet samples.cc publisher-disable-retries
135  *
136  * @par Changing Retry Parameters Example
137  * @snippet samples.cc publisher-retry-settings
138  *
139  * @return a future that becomes satisfied when the message is published or on
140  * a unrecoverable error. On success, the future is satisfied with the
141  * server-assigned ID of the message. IDs are guaranteed to be unique
142  * within the topic.
143  */
144  future<StatusOr<std::string>> Publish(Message m) {
145  return connection_->Publish({std::move(m)});
146  }
147 
148  /**
149  * Forcibly publishes any batched messages.
150  *
151  * As applications can configure a `Publisher` to buffer messages, it is
152  * sometimes useful to flush them before any of the normal criteria to send
153  * the RPCs is met.
154  *
155  * @par Idempotency
156  * See the description in `Publish()`.
157  *
158  * @par Example
159  * @snippet samples.cc publish-custom-attributes
160  *
161  * @note This function does not return any status or error codes, the
162  * application can use the `future<StatusOr<std::string>>` returned in
163  * each `Publish()` call to find out what the results are.
164  */
165  void Flush() { connection_->Flush({}); }
166 
167  /**
168  * Resumes publishing after an error.
169  *
170  * If the publisher options have message ordering enabled (see
171  * `MessageOrderingOption`) all messages for a key that experience failure
172  * will be rejected until the application calls this function.
173  *
174  * @par Idempotency
175  * This function never initiates a remote RPC, so there are no considerations
176  * around retrying it. Note, however, that more than one `Publish()` request
177  * may fail for the same ordering key. The application needs to call this
178  * function after **each** error before it can resume publishing messages
179  * with the same ordering key.
180  *
181  * @par Example
182  * @snippet samples.cc resume-publish
183  */
184  void ResumePublish(std::string ordering_key) {
185  connection_->ResumePublish({std::move(ordering_key)});
186  }
187 
188  /// @deprecated Use `Publisher(connection)` and provide any configuration
189  /// options when initializing the @p connection object.
190  GOOGLE_CLOUD_CPP_DEPRECATED("use `Publisher(connection)` instead")
191  explicit Publisher(std::shared_ptr<PublisherConnection> connection,
192  PublisherOptions const& /* options*/)
193  : Publisher(std::move(connection)) {}
194 
195  private:
196  std::shared_ptr<PublisherConnection> connection_;
197 };
198 
200 } // namespace pubsub
201 } // namespace cloud
202 } // namespace google
203 
204 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_PUBSUB_PUBLISHER_H