Class: Google::Cloud::PubSub::AsyncPublisher
- Inherits:
-
Object
- Object
- Google::Cloud::PubSub::AsyncPublisher
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/pubsub/async_publisher.rb,
lib/google/cloud/pubsub/async_publisher/batch.rb
Overview
Used to publish multiple messages in batches to a topic. See Topic#async_publisher
Instance Attribute Summary collapse
-
#callback_threads ⇒ Numeric
readonly
The number of threads to handle the published messages' callbacks.
-
#flow_control ⇒ Object
readonly
Returns the value of attribute flow_control.
-
#interval ⇒ Numeric
readonly
The number of seconds to collect messages before the batch is published.
-
#max_bytes ⇒ Integer
readonly
The maximum size of messages to be collected before the batch is published.
-
#max_messages ⇒ Integer
readonly
The maximum number of messages to be collected before the batch is published.
-
#publish_threads ⇒ Numeric
readonly
The number of threads used to publish messages.
-
#topic_name ⇒ String
readonly
The name of the topic the messages are published to.
Instance Method Summary collapse
-
#enable_message_ordering! ⇒ Object
Enables message ordering for messages with ordering keys.
-
#flush ⇒ AsyncPublisher
Forces all messages in the current batch to be published immediately.
-
#message_ordering? ⇒ Boolean
Whether message ordering for messages with ordering keys has been enabled.
-
#publish(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object
Add a message to the async publisher to be published to the topic.
-
#resume_publish(ordering_key) ⇒ boolean
Resume publishing ordered messages for the provided ordering key.
-
#started? ⇒ boolean
Whether the publisher has been started.
-
#stop ⇒ AsyncPublisher
Begins the process of stopping the publisher.
-
#stop!(timeout = nil) ⇒ AsyncPublisher
Stop this publisher and block until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until
timeout
seconds have passed. -
#stopped? ⇒ boolean
Whether the publisher has been stopped.
-
#wait!(timeout = nil) ⇒ AsyncPublisher
Blocks until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until
timeout
seconds have passed.
Instance Attribute Details
#callback_threads ⇒ Numeric (readonly)
The number of threads to handle the published messages' callbacks. Default is 4.
60 61 62 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60 def callback_threads @callback_threads end |
#flow_control ⇒ Object (readonly)
Returns the value of attribute flow_control.
69 70 71 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 69 def flow_control @flow_control end |
#interval ⇒ Numeric (readonly)
The number of seconds to collect messages before the batch is published. Default is 0.01.
60 61 62 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60 def interval @interval end |
#max_bytes ⇒ Integer (readonly)
The maximum size of messages to be collected before the batch is published. Default is 1,000,000 (1MB).
60 61 62 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60 def max_bytes @max_bytes end |
#max_messages ⇒ Integer (readonly)
The maximum number of messages to be collected before the batch is published. Default is 100.
60 61 62 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60 def @max_messages end |
#publish_threads ⇒ Numeric (readonly)
The number of threads used to publish messages. Default is 2.
60 61 62 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60 def publish_threads @publish_threads end |
#topic_name ⇒ String (readonly)
The name of the topic the messages are published to. The value is a
fully-qualified topic name in the form projects/{project_id}/topics/{topic_id}
.
60 61 62 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60 def topic_name @topic_name end |
Instance Method Details
#enable_message_ordering! ⇒ Object
Enables message ordering for messages with ordering keys. When
enabled, messages published with the same ordering_key
will be
delivered in the order they were published.
See #message_ordering?. See Topic#publish_async, Subscription#listen, and Message#ordering_key.
268 269 270 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 268 def synchronize { @ordered = true } end |
#flush ⇒ AsyncPublisher
Forces all messages in the current batch to be published immediately.
235 236 237 238 239 240 241 242 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 235 def flush synchronize do publish_batches! @cond.signal end self end |
#message_ordering? ⇒ Boolean
Whether message ordering for messages with ordering keys has been
enabled. When enabled, messages published with the same ordering_key
will be delivered in the order they were published. When disabled,
messages may be delivered in any order.
See #enable_message_ordering!. See Topic#publish_async, Subscription#listen, and Message#ordering_key.
283 284 285 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 283 def synchronize { @ordered } end |
#publish(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object
Add a message to the async publisher to be published to the topic. Messages will be collected in batches and published together. See Topic#publish_async
140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 140 def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback msg = Convert. data, attributes, ordering_key, extra_attrs begin @flow_controller.acquire msg.to_proto.bytesize rescue FlowControlLimitError => e stop_publish ordering_key, e if ordering_key raise end synchronize do raise AsyncPublisherStopped if @stopped raise OrderedMessagesDisabled if !@ordered && !msg.ordering_key.empty? # default is empty string batch = msg if batch.canceled? @flow_controller.release msg.to_proto.bytesize raise OrderingKeyError, batch.ordering_key end batch_action = batch.add msg, callback if batch_action == :full publish_batches! elsif @published_at.nil? # Set initial time to now to start the background counter @published_at = Time.now end @cond.signal end nil end |
#resume_publish(ordering_key) ⇒ boolean
Resume publishing ordered messages for the provided ordering key.
295 296 297 298 299 300 301 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 295 def resume_publish ordering_key synchronize do batch = resolve_batch_for_ordering_key ordering_key return if batch.nil? batch.resume! end end |
#started? ⇒ boolean
Whether the publisher has been started.
248 249 250 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 248 def started? !stopped? end |
#stop ⇒ AsyncPublisher
Begins the process of stopping the publisher. Messages already in the queue will be published, but no new messages can be added. Use #wait! to block until the publisher is fully stopped and all pending messages have been published.
178 179 180 181 182 183 184 185 186 187 188 189 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 178 def stop synchronize do break if @stopped @stopped = true publish_batches! stop: true @cond.signal @publish_thread_pool.shutdown end self end |
#stop!(timeout = nil) ⇒ AsyncPublisher
225 226 227 228 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 225 def stop! timeout = nil stop wait! timeout end |
#stopped? ⇒ boolean
Whether the publisher has been stopped.
256 257 258 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 256 def stopped? synchronize { @stopped } end |
#wait!(timeout = nil) ⇒ AsyncPublisher
203 204 205 206 207 208 209 210 211 212 |
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 203 def wait! timeout = nil synchronize do @publish_thread_pool.wait_for_termination timeout @callback_thread_pool.shutdown @callback_thread_pool.wait_for_termination timeout end self end |