Class: Google::Cloud::PubSub::Subscriber

Inherits:
Object
  • Object
show all
Includes:
MonitorMixin
Defined in:
lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscriber/inventory.rb,
lib/google/cloud/pubsub/subscriber/sequencer.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb,
lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb

Overview

Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::PubSub::Subscription#listen

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#callbackProc (readonly)

The procedure that will handle the messages received from the subscription.

Returns:

  • (Proc)

    the current value of callback



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def callback
  @callback
end

#callback_threadsInteger (readonly)

The number of threads used to handle the received messages. Default is 8.

Returns:

  • (Integer)

    the current value of callback_threads



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def callback_threads
  @callback_threads
end

#deadlineNumeric (readonly)

The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.

Returns:

  • (Numeric)

    the current value of deadline



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def deadline
  @deadline
end

#message_orderingBoolean (readonly)

Whether message ordering has been enabled.

Returns:

  • (Boolean)

    the current value of message_ordering



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def message_ordering
  @message_ordering
end

#push_threadsInteger (readonly)

The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#modify_ack_deadline!). Default is 4.

Returns:

  • (Integer)

    the current value of push_threads



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def push_threads
  @push_threads
end

#streamsInteger (readonly)

The number of concurrent streams to open to pull messages from the subscription. Default is 4.

Returns:

  • (Integer)

    the current value of streams



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def streams
  @streams
end

#subscription_nameString (readonly)

The name of the subscription the messages are pulled from.

Returns:

  • (String)

    the current value of subscription_name



64
65
66
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64

def subscription_name
  @subscription_name
end

Instance Method Details

#last_errorException?

The most recent unhandled error to occur while listening to messages on the subscriber.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Start listening for messages and errors.
subscriber.start

# If an error was raised, it can be retrieved here:
subscriber.last_error #=> nil

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Returns:

  • (Exception, nil)

    error The most recent error raised.



286
287
288
# File 'lib/google/cloud/pubsub/subscriber.rb', line 286

def last_error
  synchronize { @last_error }
end

#max_duration_per_lease_extensionInteger

The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).

Returns:

  • (Integer)

    The maximum number of seconds.



342
343
344
# File 'lib/google/cloud/pubsub/subscriber.rb', line 342

def max_duration_per_lease_extension
  @inventory[:max_duration_per_lease_extension]
end

#max_outstanding_bytesInteger Also known as: inventory_bytesize

The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).

Returns:

  • (Integer)

    The maximum number of bytes.



308
309
310
# File 'lib/google/cloud/pubsub/subscriber.rb', line 308

def max_outstanding_bytes
  @inventory[:max_outstanding_bytes]
end

#max_outstanding_messagesInteger Also known as: inventory_limit, inventory

The number of received messages to be collected by subscriber. Default is 1,000.

Returns:

  • (Integer)

    The maximum number of messages.



295
296
297
# File 'lib/google/cloud/pubsub/subscriber.rb', line 295

def max_outstanding_messages
  @inventory[:max_outstanding_messages]
end

#max_total_lease_durationInteger Also known as: inventory_extension

The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).

Returns:

  • (Integer)

    The maximum number of seconds.



330
331
332
# File 'lib/google/cloud/pubsub/subscriber.rb', line 330

def max_total_lease_duration
  @inventory[:max_total_lease_duration]
end

#on_error {|callback| ... } ⇒ Object

Register to be notified of errors when raised.

If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.

Multiple error handlers can be added.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Register to be notified when unhandled errors occur.
subscriber.on_error do |error|
  # log error
  puts error
end

# Start listening for messages and errors.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Yields:

  • (callback)

    The block to be called when an error is raised.

Yield Parameters:

  • error (Exception)

    The error raised.



250
251
252
253
254
# File 'lib/google/cloud/pubsub/subscriber.rb', line 250

def on_error &block
  synchronize do
    @error_callbacks << block
  end
end

#startSubscriber

Starts the subscriber pulling from the subscription and processing the received messages.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/google/cloud/pubsub/subscriber.rb', line 113

def start
  start_pool = synchronize do
    @started = true
    @stopped = false

    # Start the buffer before the streams are all started
    @buffer.start
    @stream_pool.map do |stream|
      Thread.new { stream.start }
    end
  end
  start_pool.map(&:join)

  self
end

#started?boolean

Whether the subscriber has been started.

Returns:

  • (boolean)

    true when started, false otherwise.



202
203
204
# File 'lib/google/cloud/pubsub/subscriber.rb', line 202

def started?
  synchronize { @started }
end

#stopSubscriber

Immediately stops the subscriber. No new messages will be pulled from the subscription. All actions taken on received messages that have not yet been sent to the API will be sent to the API. All received but unprocessed messages will be released back to the API and redelivered. Use #wait! to block until the subscriber is fully stopped and all received messages have been processed or released.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
# File 'lib/google/cloud/pubsub/subscriber.rb', line 139

def stop
  stop_pool = synchronize do
    @started = false
    @stopped = true

    @stream_pool.map do |stream|
      Thread.new { stream.stop }
    end
  end
  stop_pool.map(&:join)
  # Stop the buffer after the streams are all stopped
  synchronize { @buffer.stop }

  self
end

#stop!(timeout = nil) ⇒ Subscriber

Stop this subscriber and block until the subscriber is fully stopped and all received messages have been processed or released, or until timeout seconds have passed.

The same as calling #stop and #wait!.

Parameters:

  • timeout (Number, nil) (defaults to: nil)

    The number of seconds to block until the subscriber is fully stopped. Default will block indefinitely.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



192
193
194
195
# File 'lib/google/cloud/pubsub/subscriber.rb', line 192

def stop! timeout = nil
  stop
  wait! timeout
end

#stopped?boolean

Whether the subscriber has been stopped.

Returns:

  • (boolean)

    true when stopped, false otherwise.



211
212
213
# File 'lib/google/cloud/pubsub/subscriber.rb', line 211

def stopped?
  synchronize { @stopped }
end

#use_legacy_flow_control?Boolean

Whether to enforce flow control at the client side only or to enforce it at both the client and the server. For more details about flow control see https://cloud.google.com/pubsub/docs/pull#config.

server side flow control are enforced.

Returns:

  • (Boolean)

    true when only client side flow control is enforced, false when both client and



321
322
323
# File 'lib/google/cloud/pubsub/subscriber.rb', line 321

def use_legacy_flow_control?
  @inventory[:use_legacy_flow_control]
end

#wait!(timeout = nil) ⇒ Subscriber

Blocks until the subscriber is fully stopped and all received messages have been processed or released, or until timeout seconds have passed.

Does not stop the subscriber. To stop the subscriber, first call #stop and then call #wait! to block until the subscriber is stopped.

Parameters:

  • timeout (Number, nil) (defaults to: nil)

    The number of seconds to block until the subscriber is fully stopped. Default will block indefinitely.

Returns:

  • (Subscriber)

    returns self so calls can be chained.



169
170
171
172
173
174
175
176
177
178
# File 'lib/google/cloud/pubsub/subscriber.rb', line 169

def wait! timeout = nil
  wait_pool = synchronize do
    @stream_pool.map do |stream|
      Thread.new { stream.wait! timeout }
    end
  end
  wait_pool.map(&:join)

  self
end