Class: Google::Cloud::PubSub::Subscriber
- Inherits:
-
Object
- Object
- Google::Cloud::PubSub::Subscriber
- Includes:
- MonitorMixin
- Defined in:
- lib/google/cloud/pubsub/subscriber.rb,
lib/google/cloud/pubsub/subscriber/stream.rb,
lib/google/cloud/pubsub/subscriber/inventory.rb,
lib/google/cloud/pubsub/subscriber/sequencer.rb,
lib/google/cloud/pubsub/subscriber/enumerator_queue.rb,
lib/google/cloud/pubsub/subscriber/timed_unary_buffer.rb
Overview
Subscriber object used to stream and process messages from a Subscription. See Google::Cloud::PubSub::Subscription#listen
Instance Attribute Summary collapse
-
#callback ⇒ Proc
readonly
The procedure that will handle the messages received from the subscription.
-
#callback_threads ⇒ Integer
readonly
The number of threads used to handle the received messages.
-
#deadline ⇒ Numeric
readonly
The default number of seconds the stream will hold received messages before modifying the message's ack deadline.
-
#message_ordering ⇒ Boolean
readonly
Whether message ordering has been enabled.
-
#push_threads ⇒ Integer
readonly
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#modify_ack_deadline!).
-
#streams ⇒ Integer
readonly
The number of concurrent streams to open to pull messages from the subscription.
-
#subscription_name ⇒ String
readonly
The name of the subscription the messages are pulled from.
Instance Method Summary collapse
-
#last_error ⇒ Exception?
The most recent unhandled error to occur while listening to messages on the subscriber.
-
#max_duration_per_lease_extension ⇒ Integer
The maximum amount of time in seconds for a single lease extension attempt.
-
#max_outstanding_bytes ⇒ Integer
(also: #inventory_bytesize)
The total byte size of received messages to be collected by subscriber.
-
#max_outstanding_messages ⇒ Integer
(also: #inventory_limit, #inventory)
The number of received messages to be collected by subscriber.
-
#max_total_lease_duration ⇒ Integer
(also: #inventory_extension)
The number of seconds that received messages can be held awaiting processing.
-
#min_duration_per_lease_extension ⇒ Integer
The minimum amount of time in seconds for a single lease extension attempt.
-
#on_error {|callback| ... } ⇒ Object
Register to be notified of errors when raised.
-
#start ⇒ Subscriber
Starts the subscriber pulling from the subscription and processing the received messages.
-
#started? ⇒ boolean
Whether the subscriber has been started.
-
#stop ⇒ Subscriber
Immediately stops the subscriber.
-
#stop!(timeout = nil) ⇒ Subscriber
Stop this subscriber and block until the subscriber is fully stopped and all received messages have been processed or released, or until
timeout
seconds have passed. -
#stopped? ⇒ boolean
Whether the subscriber has been stopped.
-
#use_legacy_flow_control? ⇒ Boolean
Whether to enforce flow control at the client side only or to enforce it at both the client and the server.
-
#wait!(timeout = nil) ⇒ Subscriber
Blocks until the subscriber is fully stopped and all received messages have been processed or released, or until
timeout
seconds have passed.
Instance Attribute Details
#callback ⇒ Proc (readonly)
The procedure that will handle the messages received from the subscription.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def callback @callback end |
#callback_threads ⇒ Integer (readonly)
The number of threads used to handle the received messages. Default is 8.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def callback_threads @callback_threads end |
#deadline ⇒ Numeric (readonly)
The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is 60.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def deadline @deadline end |
#message_ordering ⇒ Boolean (readonly)
Whether message ordering has been enabled.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def @message_ordering end |
#push_threads ⇒ Integer (readonly)
The number of threads to handle acknowledgement (ReceivedMessage#ack!) and delay messages (ReceivedMessage#nack!, ReceivedMessage#modify_ack_deadline!). Default is 4.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def push_threads @push_threads end |
#streams ⇒ Integer (readonly)
The number of concurrent streams to open to pull messages from the subscription. Default is 2.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def streams @streams end |
#subscription_name ⇒ String (readonly)
The name of the subscription the messages are pulled from.
64 65 66 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 64 def subscription_name @subscription_name end |
Instance Method Details
#last_error ⇒ Exception?
The most recent unhandled error to occur while listening to messages on the subscriber.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
279 280 281 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 279 def last_error synchronize { @last_error } end |
#max_duration_per_lease_extension ⇒ Integer
The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).
335 336 337 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 335 def max_duration_per_lease_extension @inventory[:max_duration_per_lease_extension] end |
#max_outstanding_bytes ⇒ Integer Also known as: inventory_bytesize
The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB).
301 302 303 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 301 def max_outstanding_bytes @inventory[:max_outstanding_bytes] end |
#max_outstanding_messages ⇒ Integer Also known as: inventory_limit, inventory
The number of received messages to be collected by subscriber. Default is 1,000.
288 289 290 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 288 def @inventory[:max_outstanding_messages] end |
#max_total_lease_duration ⇒ Integer Also known as: inventory_extension
The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour).
323 324 325 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 323 def max_total_lease_duration @inventory[:max_total_lease_duration] end |
#min_duration_per_lease_extension ⇒ Integer
The minimum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).
345 346 347 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 345 def min_duration_per_lease_extension @inventory[:min_duration_per_lease_extension] end |
#on_error {|callback| ... } ⇒ Object
Register to be notified of errors when raised.
If an unhandled error has occurred the subscriber will attempt to recover from the error and resume listening.
Multiple error handlers can be added.
243 244 245 246 247 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 243 def on_error &block synchronize do @error_callbacks << block end end |
#start ⇒ Subscriber
Starts the subscriber pulling from the subscription and processing the received messages.
118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 118 def start start_pool = synchronize do @started = true @stopped = false # Start the buffer before the streams are all started @buffer.start @stream_pool.map do |stream| Thread.new { stream.start } end end start_pool.map(&:join) self end |
#started? ⇒ boolean
Whether the subscriber has been started.
195 196 197 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 195 def started? synchronize { @started } end |
#stop ⇒ Subscriber
Immediately stops the subscriber. No new messages will be pulled from the subscription. Use #wait! to block until all received messages have been processed or released: All actions taken on received messages that have not yet been sent to the API will be sent to the API. All received but unprocessed messages will be released back to the API and redelivered.
143 144 145 146 147 148 149 150 151 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 143 def stop synchronize do @started = false @stopped = true @stream_pool.map(&:stop) wait_stop_buffer_thread! self end end |
#stop!(timeout = nil) ⇒ Subscriber
185 186 187 188 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 185 def stop! timeout = nil stop wait! timeout end |
#stopped? ⇒ boolean
Whether the subscriber has been stopped.
204 205 206 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 204 def stopped? synchronize { @stopped } end |
#use_legacy_flow_control? ⇒ Boolean
Whether to enforce flow control at the client side only or to enforce it at both the client and the server. For more details about flow control see https://cloud.google.com/pubsub/docs/pull#config.
server side flow control are enforced.
314 315 316 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 314 def use_legacy_flow_control? @inventory[:use_legacy_flow_control] end |
#wait!(timeout = nil) ⇒ Subscriber
167 168 169 170 171 |
# File 'lib/google/cloud/pubsub/subscriber.rb', line 167 def wait! timeout = nil wait_stop_buffer_thread! @wait_stop_buffer_thread.join timeout self end |