Class: Google::Cloud::PubSub::Subscriber

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscriber/inventory.rb,
lib/google/cloud/pubsub/subscriber/sequencer.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb,
lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb

Overview

Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::PubSub::Subscription#listen

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#callbackProc (readonly)

The procedure that will handle the messages received from the subscription.

Returns:

  • (Proc)

    the current value of callback



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def callback
  @callback
end

#callback_threadsInteger (readonly)

The number of threads used to handle the received messages. Default is 8.

Returns:

  • (Integer)

    the current value of callback_threads



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def callback_threads
  @callback_threads
end

#deadlineNumeric (readonly)

The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.

Returns:

  • (Numeric)

    the current value of deadline



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def deadline
  @deadline
end

#message_orderingBoolean (readonly)

Whether message ordering has been enabled.

Returns:

  • (Boolean)

    the current value of message_ordering



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def message_ordering
  @message_ordering
end

#push_threadsInteger (readonly)

The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#modify_ack_deadline!). Default is 4.

Returns:

  • (Integer)

    the current value of push_threads



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def push_threads
  @push_threads
end

#streamsInteger (readonly)

The number of concurrent streams to open to pull messages from the subscription. Default is 2.

Returns:

  • (Integer)

    the current value of streams



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def streams
  @streams
end

#subscription_nameString (readonly)

The name of the subscription the messages are pulled from.

Returns:

  • (String)

    the current value of subscription_name



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def subscription_name
  @subscription_name
end

Instance Method Details

#last_errorException?

The most recent unhandled error to occur while listening to messages on the subscriber.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start listening for messages and errors.
subscriber.start

# If an error was raised, it can be retrieved here:
subscriber.last_error #=> nil

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Returns:

  • (Exception, nil)

    error The most recent error raised.



279
280
281
# File 'lib/google/cloud/pubsub/subscriber.rb', line 279

def last_error
  synchronize { @last_error }
end

#max_duration_per_lease_extensionInteger

The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).

Returns:

  • (Integer)

    The maximum number of seconds.



335
336
337
# File 'lib/google/cloud/pubsub/subscriber.rb', line 335

def max_duration_per_lease_extension
  @inventory[:max_duration_per_lease_extension]
end

#max_outstanding_bytesInteger Also known as: inventory_bytesize

The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).

Returns:

  • (Integer)

    The maximum number of bytes.



301
302
303
# File 'lib/google/cloud/pubsub/subscriber.rb', line 301

def max_outstanding_bytes
  @inventory[:max_outstanding_bytes]
end

#max_outstanding_messagesInteger Also known as: inventory_limit, inventory

The number of received messages to be collected by subscriber. Default is 1,000.

Returns:

  • (Integer)

    The maximum number of messages.



288
289
290
# File 'lib/google/cloud/pubsub/subscriber.rb', line 288

def max_outstanding_messages
  @inventory[:max_outstanding_messages]
end

#max_total_lease_durationInteger Also known as: inventory_extension

The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).

Returns:

  • (Integer)

    The maximum number of seconds.



323
324
325
# File 'lib/google/cloud/pubsub/subscriber.rb', line 323

def max_total_lease_duration
  @inventory[:max_total_lease_duration]
end

#min_duration_per_lease_extensionInteger

The minimum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).

Returns:

  • (Integer)

    The minimum number of seconds.



345
346
347
# File 'lib/google/cloud/pubsub/subscriber.rb', line 345

def min_duration_per_lease_extension
  @inventory[:min_duration_per_lease_extension]
end

#on_error {|callback| ... } ⇒ Object

Register to be notified of errors when raised.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.

Multiple error handlers can be added.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Register to be notified when unhandled errors occur.
subscriber.on_error do |error|
  # log error
  puts error
end

# Start listening for messages and errors.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Yields:

  • (callback)

    The block to be called when an error is raised.

Yield Parameters:

  • error (Exception)

    The error raised.



243
244
245
246
247
# File 'lib/google/cloud/pubsub/subscriber.rb', line 243

def on_error &block
  synchronize do
    @error_callbacks << block
  end
end

#startSubscriber

Starts the subscriber pulling from the subscription and processing the received messages.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/google/cloud/pubsub/subscriber.rb', line 118

def start
  start_pool = synchronize do
    @started = true
    @stopped = false

    # Start the buffer before the streams are all started
    @buffer.start
    @stream_pool.map do |stream|
      Thread.new { stream.start }
    end
  end
  start_pool.map(&:join)

  self
end

#started?boolean

Whether the subscriber has been started.

Returns:

  • (boolean)

    true when started, false otherwise.



195
196
197
# File 'lib/google/cloud/pubsub/subscriber.rb', line 195

def started?
  synchronize { @started }
end

#stopSubscriber

Immediately stops the subscriber. No new messages will be pulled from the subscription. Use #wait! to block until all received messages have been processed or released: All actions taken on received messages that have not yet been sent to the API will be sent to the API. All received but unprocessed messages will be released back to the API and redelivered.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



143
144
145
146
147
148
149
150
151
# File 'lib/google/cloud/pubsub/subscriber.rb', line 143

def stop
  synchronize do
    @started = false
    @stopped = true
    @stream_pool.map(&:stop)
    wait_stop_buffer_thread!
    self
  end
end

#stop!(timeout = nil) ⇒ Subscriber

Stop this subscriber and block until the subscriber is fully stopped and all received messages have been processed or released, or until timeout seconds have passed.

The same as calling #stop and #wait!.

Parameters:

  • timeout (Number, nil) (defaults to: nil)

    The number of seconds to block until the subscriber is fully stopped. Default will block indefinitely.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



185
186
187
188
# File 'lib/google/cloud/pubsub/subscriber.rb', line 185

def stop! timeout = nil
  stop
  wait! timeout
end

#stopped?boolean

Whether the subscriber has been stopped.

Returns:

  • (boolean)

    true when stopped, false otherwise.



204
205
206
# File 'lib/google/cloud/pubsub/subscriber.rb', line 204

def stopped?
  synchronize { @stopped }
end

#use_legacy_flow_control?Boolean

Whether to enforce flow control at the client side only or to enforce it at both the client and the server. For more details about flow control see https://cloud.google.com/pubsub/docs/pull#config.

server side flow control are enforced.

Returns:

  • (Boolean)

    true when only client side flow control is enforced, false when both client and



314
315
316
# File 'lib/google/cloud/pubsub/subscriber.rb', line 314

def use_legacy_flow_control?
  @inventory[:use_legacy_flow_control]
end

#wait!(timeout = nil) ⇒ Subscriber

Blocks until the subscriber is fully stopped and all received messages have been processed or released, or until timeout seconds have passed.

Does not stop the subscriber. To stop the subscriber, first call #stop and then call #wait! to block until the subscriber is stopped.

Parameters:

  • timeout (Number, nil) (defaults to: nil)

    The number of seconds to block until the subscriber is fully stopped. Default will block indefinitely.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



167
168
169
170
171
# File 'lib/google/cloud/pubsub/subscriber.rb', line 167

def wait! timeout = nil
  wait_stop_buffer_thread!
  @wait_stop_buffer_thread.join timeout
  self
end