Class: Google::Cloud::PubSub::AsyncPublisher

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/google/cloud/pubsub/async_publisher.rb,
lib/google/cloud/pubsub/async_publisher/batch.rb

Overview

Used to publish multiple messages in batches to a topic. See Topic#async_publisher

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

topic.async_publisher.stop!

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#callback_threadsNumeric (readonly)

The number of threads to handle the published messages' callbacks. Default is 4.

Returns:

  • (Numeric)

    the current value of callback_threads



60
61
62
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60

def callback_threads
  @callback_threads
end

#flow_controlObject (readonly)

Returns the value of attribute flow_control.



69
70
71
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 69

def flow_control
  @flow_control
end

#intervalNumeric (readonly)

The number of seconds to collect messages before the batch is published. Default is 0.01.

Returns:

  • (Numeric)

    the current value of interval



60
61
62
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60

def interval
  @interval
end

#max_bytesInteger (readonly)

The maximum size of messages to be collected before the batch is published. Default is 1,000,000 (1MB).

Returns:

  • (Integer)

    the current value of max_bytes



60
61
62
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60

def max_bytes
  @max_bytes
end

#max_messagesInteger (readonly)

The maximum number of messages to be collected before the batch is published. Default is 100.

Returns:

  • (Integer)

    the current value of max_messages



60
61
62
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60

def max_messages
  @max_messages
end

#publish_threadsNumeric (readonly)

The number of threads used to publish messages. Default is 2.

Returns:

  • (Numeric)

    the current value of publish_threads



60
61
62
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60

def publish_threads
  @publish_threads
end

#topic_nameString (readonly)

The name of the topic the messages are published to. The value is a fully-qualified topic name in the form projects/{project_id}/topics/{topic_id}.

Returns:

  • (String)

    the current value of topic_name



60
61
62
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 60

def topic_name
  @topic_name
end

Instance Method Details

#enable_message_ordering!Object

Enables message ordering for messages with ordering keys. When enabled, messages published with the same ordering_key will be delivered in the order they were published.

See #message_ordering?. See Topic#publish_async, Subscription#listen, and Message#ordering_key.



268
269
270
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 268

def enable_message_ordering!
  synchronize { @ordered = true }
end

#flushAsyncPublisher

Forces all messages in the current batch to be published immediately.

Returns:



235
236
237
238
239
240
241
242
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 235

def flush
  synchronize do
    publish_batches!
    @cond.signal
  end

  self
end

#message_ordering?Boolean

Whether message ordering for messages with ordering keys has been enabled. When enabled, messages published with the same ordering_key will be delivered in the order they were published. When disabled, messages may be delivered in any order.

See #enable_message_ordering!. See Topic#publish_async, Subscription#listen, and Message#ordering_key.

Returns:

  • (Boolean)


283
284
285
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 283

def message_ordering?
  synchronize { @ordered }
end

#publish(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object

Add a message to the async publisher to be published to the topic. Messages will be collected in batches and published together. See Topic#publish_async

Parameters:

  • data (String, File) (defaults to: nil)

    The message payload. This will be converted to bytes encoded as ASCII-8BIT.

  • attributes (Hash) (defaults to: nil)

    Optional attributes for the message.

  • ordering_key (String) (defaults to: nil)

    Identifies related messages for which publish order should be respected.

Yields:

  • (result)

    the callback for when the message has been published

Yield Parameters:

  • result (PublishResult)

    the result of the asynchronous publish

Raises:



140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 140

def publish data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback
  msg = Convert.pubsub_message data, attributes, ordering_key, extra_attrs
  begin
    @flow_controller.acquire msg.to_proto.bytesize
  rescue FlowControlLimitError => e
    stop_publish ordering_key, e if ordering_key
    raise
  end

  synchronize do
    raise AsyncPublisherStopped if @stopped
    raise OrderedMessagesDisabled if !@ordered && !msg.ordering_key.empty? # default is empty string

    batch = resolve_batch_for_message msg
    if batch.canceled?
      @flow_controller.release msg.to_proto.bytesize
      raise OrderingKeyError, batch.ordering_key
    end
    batch_action = batch.add msg, callback
    if batch_action == :full
      publish_batches!
    elsif @published_at.nil?
      # Set initial time to now to start the background counter
      @published_at = Time.now
    end
    @cond.signal
  end

  nil
end

#resume_publish(ordering_key) ⇒ boolean

Resume publishing ordered messages for the provided ordering key.

Parameters:

  • ordering_key (String)

    Identifies related messages for which publish order should be respected.

Returns:

  • (boolean)

    true when resumed, false otherwise.



295
296
297
298
299
300
301
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 295

def resume_publish ordering_key
  synchronize do
    batch = resolve_batch_for_ordering_key ordering_key
    return if batch.nil?
    batch.resume!
  end
end

#started?boolean

Whether the publisher has been started.

Returns:

  • (boolean)

    true when started, false otherwise.



248
249
250
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 248

def started?
  !stopped?
end

#stopAsyncPublisher

Begins the process of stopping the publisher. Messages already in the queue will be published, but no new messages can be added. Use #wait! to block until the publisher is fully stopped and all pending messages have been published.

Returns:



178
179
180
181
182
183
184
185
186
187
188
189
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 178

def stop
  synchronize do
    break if @stopped

    @stopped = true
    publish_batches! stop: true
    @cond.signal
    @publish_thread_pool.shutdown
  end

  self
end

#stop!(timeout = nil) ⇒ AsyncPublisher

Stop this publisher and block until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until timeout seconds have passed.

The same as calling #stop and #wait!.

Parameters:

  • timeout (Number, nil) (defaults to: nil)

    The number of seconds to block until the publisher is fully stopped. Default will block indefinitely.

Returns:



225
226
227
228
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 225

def stop! timeout = nil
  stop
  wait! timeout
end

#stopped?boolean

Whether the publisher has been stopped.

Returns:

  • (boolean)

    true when stopped, false otherwise.



256
257
258
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 256

def stopped?
  synchronize { @stopped }
end

#wait!(timeout = nil) ⇒ AsyncPublisher

Blocks until the publisher is fully stopped, all pending messages have been published, and all callbacks have completed, or until timeout seconds have passed.

Does not stop the publisher. To stop the publisher, first call #stop and then call #wait! to block until the publisher is stopped

Parameters:

  • timeout (Number, nil) (defaults to: nil)

    The number of seconds to block until the publisher is fully stopped. Default will block indefinitely.

Returns:



203
204
205
206
207
208
209
210
211
212
# File 'lib/google/cloud/pubsub/async_publisher.rb', line 203

def wait! timeout = nil
  synchronize do
    @publish_thread_pool.wait_for_termination timeout

    @callback_thread_pool.shutdown
    @callback_thread_pool.wait_for_termination timeout
  end

  self
end