Class: Google::Cloud::PubSub::Subscription

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/subscription.rb,
lib/google/cloud/pubsub/subscription/list.rb,
lib/google/cloud/pubsub/subscription/push_config.rb

Overview

Subscription

A named resource representing the stream of messages from a single, specific Topic, to be delivered to the subscribing application.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do |received_message|
  # process message
  received_message.acknowledge!
end

# Handle exceptions from listener
subscriber.on_error do |exception|
   puts "Exception: #{exception.class} #{exception.message}"
end

# Gracefully shut down the subscriber
at_exit do
  subscriber.stop!
end

# Start background threads that will call the block passed to listen.
subscriber.start
sleep

Defined Under Namespace

Classes: List, PushConfig

Instance Method Summary collapse

Instance Method Details

#acknowledge(*messages) ⇒ Object Also known as: ack

Acknowledges receipt of a message. After an ack, the Pub/Sub system can remove the message from the subscription. Acknowledging a message whose ack deadline has expired may succeed, although the message may have been sent again. Acknowledging a message more than once will not result in an error. This is only used for messages received via pull.

See also ReceivedMessage#acknowledge!.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull immediate: false
sub.acknowledge received_messages

Parameters:



1085
1086
1087
1088
1089
1090
1091
# File 'lib/google/cloud/pubsub/subscription.rb', line 1085

def acknowledge *messages
  ack_ids = coerce_ack_ids messages
  return true if ack_ids.empty?
  ensure_service!
  service.acknowledge name, *ack_ids
  true
end

#bigquery_config {|bigquery_config| ... } ⇒ Google::Cloud::PubSub::V1::BigQueryConfig

Inspect the Subscription's bigquery configuration settings. The configuration can be changed by modifying the values in the method's block.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
sub.bigquery_config.table #=> "my-project:dataset-id.table-id"
sub.bigquery_config.use_topic_schema #=> true
sub.bigquery_config. #=> false

Update the bigquery configuration by passing a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
sub = pubsub.subscription "my-subscription"

sub.bigquery_config do |bc|
  bc. = true
  bc.use_topic_schema = false
end

Yields:

Yield Parameters:

  • bigquery_config (Google::Cloud::PubSub::V1::BigQueryConfig)

Returns:

  • (Google::Cloud::PubSub::V1::BigQueryConfig)


337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# File 'lib/google/cloud/pubsub/subscription.rb', line 337

def bigquery_config
  ensure_service!

  config = reference? ? Google::Cloud::PubSub::V1::BigQueryConfig.new : @grpc.bigquery_config

  if block_given?
    old_config = config.dup
    yield config
    new_config = config

    if old_config != new_config # has the object been changed?
      update_grpc = Google::Cloud::PubSub::V1::Subscription.new name: name, bigquery_config: new_config
      @grpc = service.update_subscription update_grpc, :bigquery_config
    end
  end

  config.freeze
end

#create_snapshot(snapshot_name = nil, labels: nil) ⇒ Google::Cloud::PubSub::Snapshot Also known as: new_snapshot

Creates a new Google::Cloud::PubSub::Snapshot from the subscription. The created snapshot is guaranteed to retain:

  • The existing backlog on the subscription. More precisely, this is defined as the messages in the subscription's backlog that are unacknowledged upon the successful completion of the create_snapshot operation; as well as:
  • Any messages published to the subscription's topic following the successful completion of the create_snapshot operation.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot "my-snapshot"
snapshot.name #=> "projects/my-project/snapshots/my-snapshot"

Without providing a name:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot
snapshot.name #=> "projects/my-project/snapshots/gcr-analysis-..."

Parameters:

  • snapshot_name (String, nil) (defaults to: nil)

    Name of the new snapshot. Optional. If the name is not provided, the server will assign a random name for this snapshot on the same project as the subscription. The value can be a simple snapshot ID (relative name), in which case the current project ID will be supplied, or a fully-qualified snapshot name in the form projects/{project_id}/snapshots/{snapshot_id}.

    The snapshot ID (relative name) must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and it must not start with goog.

  • labels (Hash) (defaults to: nil)

    A hash of user-provided labels associated with the snapshot. You can use these to organize and group your snapshots. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.

Returns:



1180
1181
1182
1183
1184
# File 'lib/google/cloud/pubsub/subscription.rb', line 1180

def create_snapshot snapshot_name = nil, labels: nil
  ensure_service!
  grpc = service.create_snapshot name, snapshot_name, labels: labels
  Snapshot.from_grpc grpc, service
end

#dead_letter_max_delivery_attemptsInteger?

Returns the maximum number of delivery attempts for any message in the subscription's dead letter policy if a dead letter policy is configured, otherwise nil. Dead lettering is done on a best effort basis. The same message might be dead lettered multiple times. The value must be between 5 and 100.

The number of delivery attempts is defined as 1 + (the sum of number of NACKs and number of times the acknowledgement deadline has been exceeded for the message). A NACK is any call to ModifyAckDeadline with a 0 deadline. Note that client libraries may automatically extend ack_deadlines.

This field will be honored on a best effort basis. If this parameter is nil or 0, a default value of 5 is used.

See also #dead_letter_max_delivery_attempts=, #dead_letter_topic=, #dead_letter_topic and #remove_dead_letter_policy.

Makes an API call to retrieve the dead_letter_policy when called on a reference object. See #reference?.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
sub.dead_letter_topic.name #=> "projects/my-project/topics/my-dead-letter-topic"
sub.dead_letter_max_delivery_attempts #=> 10

Returns:

  • (Integer, nil)

    A value between 5 and 100, or nil if no dead letter policy is configured.



539
540
541
542
# File 'lib/google/cloud/pubsub/subscription.rb', line 539

def dead_letter_max_delivery_attempts
  ensure_grpc!
  @grpc.dead_letter_policy&.max_delivery_attempts
end

#dead_letter_max_delivery_attempts=(new_dead_letter_max_delivery_attempts) ⇒ Object

Sets the maximum number of delivery attempts for any message in the subscription's dead letter policy. Dead lettering is done on a best effort basis. The same message might be dead lettered multiple times. The value must be between 5 and 100.

The number of delivery attempts is defined as 1 + (the sum of number of NACKs and number of times the acknowledgement deadline has been exceeded for the message). A NACK is any call to ModifyAckDeadline with a 0 deadline. Note that client libraries may automatically extend ack_deadlines.

This field will be honored on a best effort basis. If this parameter is 0, a default value of 5 is used.

Makes an API call to retrieve the dead_letter_policy when called on a reference object. See #reference?.

The dead letter topic must be set first. See #dead_letter_topic=, #dead_letter_topic and #remove_dead_letter_policy.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
sub.dead_letter_topic.name #=> "projects/my-project/topics/my-dead-letter-topic"

sub.dead_letter_max_delivery_attempts = 20

Parameters:

  • new_dead_letter_max_delivery_attempts (Integer, nil)

    A value between 5 and 100. If this parameter is nil or 0, a default value of 5 is used.

Raises:



575
576
577
578
579
580
581
582
583
584
585
586
# File 'lib/google/cloud/pubsub/subscription.rb', line 575

def dead_letter_max_delivery_attempts= new_dead_letter_max_delivery_attempts
  ensure_grpc!
  unless @grpc.dead_letter_policy&.dead_letter_topic
    # Service error message "3:Invalid resource name given (name=)." does not identify param.
    raise ArgumentError, "dead_letter_topic is required with dead_letter_max_delivery_attempts"
  end
  dead_letter_policy = @grpc.dead_letter_policy || Google::Cloud::PubSub::V1::DeadLetterPolicy.new
  dead_letter_policy.max_delivery_attempts = new_dead_letter_max_delivery_attempts
  update_grpc = Google::Cloud::PubSub::V1::Subscription.new name: name, dead_letter_policy: dead_letter_policy
  @grpc = service.update_subscription update_grpc, :dead_letter_policy
  @resource_name = nil
end

#dead_letter_topicTopic?

Returns the Topic to which dead letter messages should be published if a dead letter policy is configured, otherwise nil. Dead lettering is done on a best effort basis. The same message might be dead lettered multiple times.

See also #dead_letter_topic=, #dead_letter_max_delivery_attempts=, #dead_letter_max_delivery_attempts and #remove_dead_letter_policy.

Makes an API call to retrieve the topic name when called on a reference object. See #reference?.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
sub.dead_letter_topic.name #=> "projects/my-project/topics/my-dead-letter-topic"
sub.dead_letter_max_delivery_attempts #=> 10

Returns:



468
469
470
471
472
# File 'lib/google/cloud/pubsub/subscription.rb', line 468

def dead_letter_topic
  ensure_grpc!
  return nil unless @grpc.dead_letter_policy
  Topic.from_name @grpc.dead_letter_policy.dead_letter_topic, service
end

#dead_letter_topic=(new_dead_letter_topic) ⇒ Object

Sets the Topic to which dead letter messages for the subscription should be published. Dead lettering is done on a best effort basis. The same message might be dead lettered multiple times. The Cloud Pub/Sub service account associated with the enclosing subscription's parent project (i.e., service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) must have permission to Publish() to this topic.

The operation will fail if the topic does not exist. Users should ensure that there is a subscription attached to this topic since messages published to a topic with no subscriptions are lost.

Makes an API call to retrieve the dead_letter_policy value when called on a reference object. See #reference?.

See also #dead_letter_topic, #dead_letter_max_delivery_attempts=, #dead_letter_max_delivery_attempts and #remove_dead_letter_policy.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
dead_letter_topic = pubsub.topic "my-dead-letter-topic", skip_lookup: true
sub.dead_letter_topic = dead_letter_topic

Parameters:

  • new_dead_letter_topic (Topic)

    The topic to which dead letter messages for the subscription should be published.



502
503
504
505
506
507
508
509
# File 'lib/google/cloud/pubsub/subscription.rb', line 502

def dead_letter_topic= new_dead_letter_topic
  ensure_grpc!
  dead_letter_policy = @grpc.dead_letter_policy || Google::Cloud::PubSub::V1::DeadLetterPolicy.new
  dead_letter_policy.dead_letter_topic = new_dead_letter_topic.name
  update_grpc = Google::Cloud::PubSub::V1::Subscription.new name: name, dead_letter_policy: dead_letter_policy
  @grpc = service.update_subscription update_grpc, :dead_letter_policy
  @resource_name = nil
end

#deadlineInteger

This value is the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.

Makes an API call to retrieve the deadline value when called on a reference object. See #reference?.

Returns:

  • (Integer)


118
119
120
121
# File 'lib/google/cloud/pubsub/subscription.rb', line 118

def deadline
  ensure_grpc!
  @grpc.ack_deadline_seconds
end

#deadline=(new_deadline) ⇒ Object

Sets the maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.

Parameters:

  • new_deadline (Integer)

    The new deadline value.



130
131
132
133
134
# File 'lib/google/cloud/pubsub/subscription.rb', line 130

def deadline= new_deadline
  update_grpc = Google::Cloud::PubSub::V1::Subscription.new name: name, ack_deadline_seconds: new_deadline
  @grpc = service.update_subscription update_grpc, :ack_deadline_seconds
  @resource_name = nil
end

#deleteBoolean

Deletes an existing subscription. All pending messages in the subscription are immediately dropped.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
sub.delete

Returns:

  • (Boolean)

    Returns true if the subscription was deleted.



770
771
772
773
774
# File 'lib/google/cloud/pubsub/subscription.rb', line 770

def delete
  ensure_service!
  service.delete_subscription name
  true
end

#detachBoolean

Detaches a subscription from its topic. All messages retained in the subscription are dropped. Detached subscriptions don't receive messages from their topic and don't retain any backlog. Subsequent #pull and #listen (pull and streaming pull) operations will raise FAILED_PRECONDITION. If the subscription is a push subscription (see #push_config), pushes to the endpoint will stop. It may take a few minutes for the subscription's detached state to be reflected in subsequent calls to #detached?.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
sub.detach

# sleep 120
sub.detached? #=> true

Returns:

  • (Boolean)

    Returns true if the detach operation was successful.



796
797
798
799
800
# File 'lib/google/cloud/pubsub/subscription.rb', line 796

def detach
  ensure_service!
  service.detach_subscription name
  true
end

#detached?Boolean

Whether the subscription is detached from its topic. Detached subscriptions don't receive messages from their topic and don't retain any backlog. #pull and #listen (pull and streaming pull) operations will raise FAILED_PRECONDITION. If the subscription is a push subscription (see #push_config), pushes to the endpoint will not be made. The default value is false.

See Topic#subscribe and #detach.

Makes an API call to retrieve the detached value when called on a reference object. See #reference?.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
sub.detach

# sleep 120
sub.detached? #=> true

Returns:

  • (Boolean)


724
725
726
727
# File 'lib/google/cloud/pubsub/subscription.rb', line 724

def detached?
  ensure_grpc!
  @grpc.detached
end

#endpointString

Returns the URL locating the endpoint to which messages should be pushed. For example, a Webhook endpoint might use https://example.com/push.

Makes an API call to retrieve the endpoint value when called on a reference object. See #reference?.

Returns:

  • (String)


225
226
227
228
# File 'lib/google/cloud/pubsub/subscription.rb', line 225

def endpoint
  ensure_grpc!
  @grpc.push_config&.push_endpoint
end

#endpoint=(new_endpoint) ⇒ Object

Sets the URL locating the endpoint to which messages should be pushed. For example, a Webhook endpoint might use https://example.com/push.

Parameters:

  • new_endpoint (String)

    The new endpoint value.



236
237
238
239
240
241
242
243
244
245
246
# File 'lib/google/cloud/pubsub/subscription.rb', line 236

def endpoint= new_endpoint
  ensure_service!
  service.modify_push_config name, new_endpoint, {}

  return if reference?

  @grpc.push_config = Google::Cloud::PubSub::V1::PushConfig.new(
    push_endpoint: new_endpoint,
    attributes:    {}
  )
end

#exists?Boolean

Determines whether the subscription exists in the Pub/Sub service.

Makes an API call to determine whether the subscription resource exists when called on a reference object. See #reference?.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
sub.exists? #=> true

Returns:

  • (Boolean)


745
746
747
748
749
750
751
752
753
754
# File 'lib/google/cloud/pubsub/subscription.rb', line 745

def exists?
  # Always true if the object is not set as reference
  return true unless reference?
  # If we have a value, return it
  return @exists unless @exists.nil?
  ensure_grpc!
  @exists = true
rescue Google::Cloud::NotFoundError
  @exists = false
end

#expires_inNumeric?

The duration (in seconds) for when a subscription expires after the subscription goes inactive. A subscription is considered active as long as any connected subscriber is successfully consuming messages from the subscription or is issuing operations on the subscription.

If #expires_in= is not set, a default value of of 31 days will be used. The minimum allowed value is 1 day.

Makes an API call to retrieve the expires_in value when called on a reference object. See #reference?.

Returns:

  • (Numeric, nil)

    The expiration duration, or nil if unset.



407
408
409
410
411
412
413
# File 'lib/google/cloud/pubsub/subscription.rb', line 407

def expires_in
  ensure_grpc!

  return nil if @grpc.expiration_policy.nil?

  Convert.duration_to_number @grpc.expiration_policy.ttl
end

#expires_in=(ttl) ⇒ Object

Sets the duration (in seconds) for when a subscription expires after the subscription goes inactive.

See also #expires_in.

Parameters:

  • ttl (Numeric, nil)

    The expiration duration in seconds, or nil to unset.



424
425
426
427
428
429
430
# File 'lib/google/cloud/pubsub/subscription.rb', line 424

def expires_in= ttl
  new_expiration_policy = Google::Cloud::PubSub::V1::ExpirationPolicy.new ttl: Convert.number_to_duration(ttl)

  update_grpc = Google::Cloud::PubSub::V1::Subscription.new name: name, expiration_policy: new_expiration_policy
  @grpc = service.update_subscription update_grpc, :expiration_policy
  @resource_name = nil
end

#filterString

An expression written in the Cloud Pub/Sub filter language. If non-empty, then only Message instances whose attributes field matches the filter are delivered on this subscription. If empty, then no messages are filtered out.

Makes an API call to retrieve the filter value when called on a reference object. See #reference?.

Returns:

  • (String)

    The frozen filter string.



442
443
444
445
# File 'lib/google/cloud/pubsub/subscription.rb', line 442

def filter
  ensure_grpc!
  @grpc.filter.freeze
end

#labelsHash

A hash of user-provided labels associated with this subscription. Labels can be used to organize and group subscriptions.See Creating and Managing Labels.

The returned hash is frozen and changes are not allowed. Use #labels= to update the labels for this subscription.

Makes an API call to retrieve the labels value when called on a reference object. See #reference?.

Returns:

  • (Hash)

    The frozen labels hash.



369
370
371
372
# File 'lib/google/cloud/pubsub/subscription.rb', line 369

def labels
  ensure_grpc!
  @grpc.labels.to_h.freeze
end

#labels=(new_labels) ⇒ Object

Sets the hash of user-provided labels associated with this subscription. Labels can be used to organize and group subscriptions. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.

Parameters:

  • new_labels (Hash)

    The new labels hash.

Raises:

  • (ArgumentError)


386
387
388
389
390
391
# File 'lib/google/cloud/pubsub/subscription.rb', line 386

def labels= new_labels
  raise ArgumentError, "Value must be a Hash" if new_labels.nil?
  update_grpc = Google::Cloud::PubSub::V1::Subscription.new name: name, labels: new_labels
  @grpc = service.update_subscription update_grpc, :labels
  @resource_name = nil
end

#listen(deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}) {|received_message| ... } ⇒ Subscriber

Note:

At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.

Create a Google::Cloud::PubSub::Subscriber object that receives and processes messages using the code provided in the callback. Messages passed to the callback should acknowledge (ReceivedMessage#acknowledge!) or reject (ReceivedMessage#reject!) the message. If no action is taken, the message will be removed from the subscriber and made available for redelivery after the callback is completed.

Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.

To use ordering keys, the subscription must be created with message ordering enabled (See Topic#subscribe and #message_ordering?) before calling #listen. When enabled, the subscriber will deliver messages with the same ordering_key in the order they were published.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen do |received_message|
  # process message
  puts "Data: #{received_message.message.data}, published at #{received_message.message.published_at}"
  received_message.acknowledge!
end

# Start background threads that will call block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Configuring to increase concurrent callbacks:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen threads: { callback: 16 } do |rec_message|
  # store the message somewhere before acknowledging
  store_in_backend rec_message.data # takes a few seconds
  rec_message.acknowledge!
end

# Start background threads that will call block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Ordered messages are supported using ordering_key:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-ordered-topic-sub"
sub.message_ordering? #=> true

subscriber = sub.listen do |received_message|
  # messsages with the same ordering_key are received
  # in the order in which they were published.
  received_message.acknowledge!
end

# Start background threads that will call block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Set the maximum amount of time before redelivery if the subscriber fails to extend the deadline:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

subscriber = sub.listen inventory: { max_duration_per_lease_extension: 20 } do |received_message|
  # Process message very slowly with possibility of failure.
  process rec_message.data # takes minutes
  rec_message.acknowledge!
end

# Start background threads that will call block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Parameters:

  • deadline (Numeric) (defaults to: nil)

    The default number of seconds the stream will hold received messages before modifying the message's ack deadline. The minimum is 10, the maximum is 600. Default is #deadline. Optional.

    When using a reference object an API call will be made to retrieve the default deadline value for the subscription when this argument is not provided. See #reference?.

  • message_ordering (Boolean) (defaults to: nil)

    Whether message ordering has been enabled. The value provided must match the value set on the Pub/Sub service. See #message_ordering?. Optional.

    When using a reference object an API call will be made to retrieve the default message_ordering value for the subscription when this argument is not provided. See #reference?.

  • streams (Integer) (defaults to: nil)

    The number of concurrent streams to open to pull messages from the subscription. Default is 2. Optional.

  • inventory (Hash, Integer) (defaults to: nil)

    The settings to control how received messages are to be handled by the subscriber. When provided as an Integer instead of a Hash only max_outstanding_messages will be set. Optional.

    Hash keys and values may include the following:

    • :max_outstanding_messages [Integer] The number of received messages to be collected by subscriber. Default is 1,000. (Note: replaces :limit, which is deprecated.)
    • :max_outstanding_bytes [Integer] The total byte size of received messages to be collected by subscriber. Default is 100,000,000 (100MB). (Note: replaces :bytesize, which is deprecated.)
    • :use_legacy_flow_control [Boolean] Disables enforcing flow control settings at the Cloud PubSub server and the less accurate method of only enforcing flow control at the client side is used instead. Default is false.
    • :max_total_lease_duration [Integer] The number of seconds that received messages can be held awaiting processing. Default is 3,600 (1 hour). (Note: replaces :extension, which is deprecated.)
    • :max_duration_per_lease_extension [Integer] The maximum amount of time in seconds for a single lease extension attempt. Bounds the delay before a message redelivery if the subscriber fails to extend the deadline. Default is 0 (disabled).
  • threads (Hash) (defaults to: {})

    The number of threads to create to handle concurrent calls by each stream opened by the subscriber. Optional.

    Hash keys and values may include the following:

Yields:

  • (received_message)

    a block for processing new messages

Yield Parameters:

Returns:



1054
1055
1056
1057
1058
1059
1060
1061
# File 'lib/google/cloud/pubsub/subscription.rb', line 1054

def listen deadline: nil, message_ordering: nil, streams: nil, inventory: nil, threads: {}, &block
  ensure_service!
  deadline ||= self.deadline
  message_ordering = message_ordering? if message_ordering.nil?

  Subscriber.new name, block, deadline: deadline, streams: streams, inventory: inventory,
                              message_ordering: message_ordering, threads: threads, service: service
end

#message_ordering?Boolean

Note:

At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.

Whether message ordering has been enabled. When enabled, messages published with the same ordering_key will be delivered in the order they were published. When disabled, messages may be delivered in any order.

See Topic#publish_async, #listen, and Message#ordering_key.

Makes an API call to retrieve the enable_message_ordering value when called on a reference object. See #reference?.

Returns:

  • (Boolean)


695
696
697
698
# File 'lib/google/cloud/pubsub/subscription.rb', line 695

def message_ordering?
  ensure_grpc!
  @grpc.enable_message_ordering
end

#modify_ack_deadline(new_deadline, *messages) ⇒ Object

Modifies the acknowledge deadline for messages.

This indicates that more time is needed to process the messages, or to make the messages available for redelivery if the processing was interrupted.

See also ReceivedMessage#modify_ack_deadline!.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull immediate: false
sub.modify_ack_deadline 120, received_messages

Parameters:

  • new_deadline (Integer)

    The new ack deadline in seconds from the time this request is sent to the Pub/Sub system. Must be >= 0. For example, if the value is 10, the new ack deadline will expire 10 seconds after the call is made. Specifying 0 may immediately make the message available for another pull request.

  • messages (ReceivedMessage, String)

    One or more ReceivedMessage objects or ack_id values.



1120
1121
1122
1123
1124
1125
# File 'lib/google/cloud/pubsub/subscription.rb', line 1120

def modify_ack_deadline new_deadline, *messages
  ack_ids = coerce_ack_ids messages
  ensure_service!
  service.modify_ack_deadline name, ack_ids, new_deadline
  true
end

#nameString

The name of the subscription.

Returns:

  • (String)

    A fully-qualified subscription name in the form projects/{project_id}/subscriptions/{subscription_id}.



83
84
85
86
# File 'lib/google/cloud/pubsub/subscription.rb', line 83

def name
  return @resource_name if reference?
  @grpc.name
end

#policy {|policy| ... } ⇒ Policy

Gets the Cloud IAM access control policy for this subscription.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
sub = pubsub.subscription "my-subscription"

policy = sub.policy

Update the policy by passing a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
sub = pubsub.subscription "my-subscription"

sub.policy do |p|
  p.add "roles/owner", "user:owner@example.com"
end

Yields:

  • (policy)

    A block for updating the policy. The latest policy will be read from the Pub/Sub service and passed to the block. After the block completes, the modified policy will be written to the service.

Yield Parameters:

  • policy (Policy)

    the current Cloud IAM Policy for this subscription

Returns:

  • (Policy)

    the current Cloud IAM Policy for this subscription

See Also:



1333
1334
1335
1336
1337
1338
1339
1340
# File 'lib/google/cloud/pubsub/subscription.rb', line 1333

def policy
  ensure_service!
  grpc = service.get_subscription_policy name
  policy = Policy.from_grpc grpc
  return policy unless block_given?
  yield policy
  update_policy policy
end

#pull(immediate: true, max: 100) ⇒ Array<Google::Cloud::PubSub::ReceivedMessage>

Pulls messages from the server, blocking until messages are available when called with the immediate: false option, which is recommended to avoid adverse impacts on the performance of pull operations.

Raises an API error with status UNAVAILABLE if there are too many concurrent pull requests pending for the given subscription.

See also #listen for the preferred way to process messages as they become available.

Examples:

The immediate: false option is now recommended to avoid adverse impacts on pull operations:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull immediate: false
received_messages.each do |received_message|
  received_message.acknowledge!
end

A maximum number of messages returned can also be specified:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.pull immediate: false, max: 10
received_messages.each do |received_message|
  received_message.acknowledge!
end

Parameters:

  • immediate (Boolean) (defaults to: true)

    Whether to return immediately or block until messages are available.

    Warning: The default value of this field is true. However, sending true is discouraged because it adversely impacts the performance of pull operations. We recommend that users always explicitly set this field to false.

    If this field set to true, the system will respond immediately even if it there are no messages available to return in the pull response. Otherwise, the system may wait (for a bounded amount of time) until at least one message is available, rather than returning no messages.

    See also #listen for the preferred way to process messages as they become available.

  • max (Integer) (defaults to: 100)

    The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000.

Returns:



856
857
858
859
860
861
862
863
864
865
# File 'lib/google/cloud/pubsub/subscription.rb', line 856

def pull immediate: true, max: 100
  ensure_service!
  options = { immediate: immediate, max: max }
  list_grpc = service.pull name, options
  Array(list_grpc.received_messages).map do |msg_grpc|
    ReceivedMessage.from_grpc msg_grpc, self
  end
rescue Google::Cloud::DeadlineExceededError
  []
end

#push_config {|push_config| ... } ⇒ Subscription::PushConfig

Inspect the Subscription's push configuration settings. The configuration can be changed by modifying the values in the method's block.

Subscription objects that are reference only will return an empty PushConfig object, which can be configured and saved using the method's block. Unlike #endpoint, which will retrieve the full resource from the API before returning. To get the actual values for a reference object, call #reload! before calling #push_config.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
sub.push_config.endpoint #=> "http://example.com/callback"
sub.push_config.authentication.email #=> "user@example.com"
sub.push_config.authentication.audience #=> "client-12345"

Update the push configuration by passing a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
sub = pubsub.subscription "my-subscription"

sub.push_config do |pc|
  pc.endpoint = "http://example.net/callback"
  pc.set_oidc_token "user@example.net", "client-67890"
end

Yields:

  • (push_config)

    a block for modifying the push configuration

Yield Parameters:

Returns:



286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
# File 'lib/google/cloud/pubsub/subscription.rb', line 286

def push_config
  ensure_service!

  orig_config = reference? ? nil : @grpc.push_config
  config = PushConfig.from_grpc orig_config

  if block_given?
    old_config = config.to_grpc.dup
    yield config
    new_config = config.to_grpc

    if old_config != new_config # has the object been changed?
      update_grpc = Google::Cloud::PubSub::V1::Subscription.new name: name, push_config: new_config
      @grpc = service.update_subscription update_grpc, :push_config
    end
  end

  config.freeze
end

#reference?Boolean

Determines whether the subscription object was created without retrieving the resource representation from the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.get_subscription "my-topic-sub", skip_lookup: true
sub.reference? #=> true

Returns:

  • (Boolean)

    true when the subscription was created without a resource representation, false otherwise.



1254
1255
1256
# File 'lib/google/cloud/pubsub/subscription.rb', line 1254

def reference?
  @grpc.nil?
end

#reload!Google::Cloud::PubSub::Subscription Also known as: refresh!

Reloads the subscription with current data from the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.get_subscription "my-topic-sub"
sub.reload!

Returns:



1291
1292
1293
1294
1295
1296
# File 'lib/google/cloud/pubsub/subscription.rb', line 1291

def reload!
  ensure_service!
  @grpc = service.get_subscription name
  @resource_name = nil
  self
end

#remove_dead_letter_policyBoolean

Removes an existing dead letter policy. A dead letter policy specifies the conditions for dead lettering messages in the subscription. If a dead letter policy is not set, dead lettering is disabled.

Makes an API call to retrieve the dead_letter_policy when called on a reference object. See #reference?.

See #dead_letter_topic, #dead_letter_topic=, #dead_letter_max_delivery_attempts and #dead_letter_max_delivery_attempts=.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

sub.dead_letter_topic.name #=> "projects/my-project/topics/my-dead-letter-topic"
sub.dead_letter_max_delivery_attempts #=> 10

sub.remove_dead_letter_policy

sub.dead_letter_topic #=> nil
sub.dead_letter_max_delivery_attempts #=> nil

Returns:

  • (Boolean)

    true if an existing dead letter policy was removed, false if no existing dead letter policy was present.



615
616
617
618
619
620
621
# File 'lib/google/cloud/pubsub/subscription.rb', line 615

def remove_dead_letter_policy
  ensure_grpc!
  return false if @grpc.dead_letter_policy.nil?
  update_grpc = Google::Cloud::PubSub::V1::Subscription.new name: name, dead_letter_policy: nil
  @grpc = service.update_subscription update_grpc, :dead_letter_policy
  true
end

#resource?Boolean

Determines whether the subscription object was created with a resource representation from the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.get_subscription "my-topic-sub"
sub.resource? #=> true

Returns:

  • (Boolean)

    true when the subscription was created with a resource representation, false otherwise.



1273
1274
1275
# File 'lib/google/cloud/pubsub/subscription.rb', line 1273

def resource?
  !@grpc.nil?
end

#retain_ackedBoolean

Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of the #retention window. Default is false.

Makes an API call to retrieve the retain_acked value when called on a reference object. See #reference?.

Returns:

  • (Boolean)

    Returns true if acknowledged messages are retained.



148
149
150
151
# File 'lib/google/cloud/pubsub/subscription.rb', line 148

def retain_acked
  ensure_grpc!
  @grpc.retain_acked_messages
end

#retain_acked=(new_retain_acked) ⇒ Object

Sets whether to retain acknowledged messages.

Parameters:

  • new_retain_acked (Boolean)

    The new retain acknowledged messages value.



159
160
161
162
163
164
# File 'lib/google/cloud/pubsub/subscription.rb', line 159

def retain_acked= new_retain_acked
  update_grpc = Google::Cloud::PubSub::V1::Subscription.new name:                  name,
                                                            retain_acked_messages: !(!new_retain_acked)
  @grpc = service.update_subscription update_grpc, :retain_acked_messages
  @resource_name = nil
end

#retentionNumeric

How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. If #retain_acked is true, then this also configures the retention of acknowledged messages, and thus configures how far back in time a #seek can be done. Cannot be less than 600 (10 minutes) or more than 604,800 (7 days). Default is 604,800 seconds (7 days).

Makes an API call to retrieve the retention value when called on a reference object. See #reference?.

Returns:

  • (Numeric)

    The message retention duration in seconds.



179
180
181
182
# File 'lib/google/cloud/pubsub/subscription.rb', line 179

def retention
  ensure_grpc!
  Convert.duration_to_number @grpc.message_retention_duration
end

#retention=(new_retention) ⇒ Object

Sets the message retention duration in seconds.

Parameters:

  • new_retention (Numeric)

    The new retention value.



189
190
191
192
193
194
195
# File 'lib/google/cloud/pubsub/subscription.rb', line 189

def retention= new_retention
  new_retention_duration = Convert.number_to_duration new_retention
  update_grpc = Google::Cloud::PubSub::V1::Subscription.new name:                       name,
                                                            message_retention_duration: new_retention_duration
  @grpc = service.update_subscription update_grpc, :message_retention_duration
  @resource_name = nil
end

#retry_policyRetryPolicy?

A policy that specifies how Cloud Pub/Sub retries message delivery for this subscription. If nil, the default retry policy is applied. This generally implies that messages will be retried as soon as possible for healthy subscribers. Retry Policy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.

Makes an API call to retrieve the retry_policy when called on a reference object. See #reference?.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

sub.retry_policy = Google::Cloud::PubSub::RetryPolicy.new minimum_backoff: 5, maximum_backoff: 300

sub.retry_policy.minimum_backoff #=> 5
sub.retry_policy.maximum_backoff #=> 300

Returns:

  • (RetryPolicy, nil)

    The retry policy for the subscription, or nil.



645
646
647
648
649
# File 'lib/google/cloud/pubsub/subscription.rb', line 645

def retry_policy
  ensure_grpc!
  return nil unless @grpc.retry_policy
  RetryPolicy.from_grpc @grpc.retry_policy
end

#retry_policy=(new_retry_policy) ⇒ Object

Sets a policy that specifies how Cloud Pub/Sub retries message delivery for this subscription. If nil, the default retry policy is applied. This generally implies that messages will be retried as soon as possible for healthy subscribers. Retry Policy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"

sub.retry_policy = Google::Cloud::PubSub::RetryPolicy.new minimum_backoff: 5, maximum_backoff: 300

sub.retry_policy.minimum_backoff #=> 5
sub.retry_policy.maximum_backoff #=> 300

Parameters:

  • new_retry_policy (RetryPolicy, nil)

    A new retry policy for the subscription, or nil.



671
672
673
674
675
676
677
# File 'lib/google/cloud/pubsub/subscription.rb', line 671

def retry_policy= new_retry_policy
  ensure_service!
  new_retry_policy = new_retry_policy.to_grpc if new_retry_policy
  update_grpc = Google::Cloud::PubSub::V1::Subscription.new name: name, retry_policy: new_retry_policy
  @grpc = service.update_subscription update_grpc, :retry_policy
  @resource_name = nil
end

#seek(snapshot) ⇒ Boolean

Resets the subscription's backlog to a given Google::Cloud::PubSub::Snapshot or to a point in time, whichever is provided in the request.

Examples:

Using a snapshot

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
sub = pubsub.subscription "my-sub"

snapshot = sub.create_snapshot

received_messages = sub.pull immediate: false
sub.acknowledge received_messages

sub.seek snapshot

Using a time:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
sub = pubsub.subscription "my-sub"

time = Time.now

received_messages = sub.pull immediate: false
sub.acknowledge received_messages

sub.seek time

Parameters:

  • snapshot (Snapshot, String, Time)

    The Snapshot instance, snapshot name, or time to which to perform the seek. If the argument is a snapshot, the snapshot's topic must be the same as that of the subscription. If it is a time, messages retained in the subscription that were published before this time are marked as acknowledged, and messages retained in the subscription that were published after this time are marked as unacknowledged. Note that this operation affects only those messages retained in the subscription. For example, if the time corresponds to a point before the message retention window (or to a point before the system's notion of the subscription creation time), only retained messages will be marked as unacknowledged, and already-expunged messages will not be restored.

Returns:

  • (Boolean)

    Returns true if the seek was successful.



1233
1234
1235
1236
1237
# File 'lib/google/cloud/pubsub/subscription.rb', line 1233

def seek snapshot
  ensure_service!
  service.seek name, snapshot
  true
end

#test_permissions(*permissions) ⇒ Array<String>

Tests the specified permissions against the Cloud IAM access control policy.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
sub = pubsub.subscription "my-subscription"
perms = sub.test_permissions "pubsub.subscriptions.get",
                             "pubsub.subscriptions.consume"
perms.include? "pubsub.subscriptions.get" #=> true
perms.include? "pubsub.subscriptions.consume" #=> false

Parameters:

  • permissions (String, Array<String>)

    The set of permissions to check access for. Permissions with wildcards (such as * or storage.*) are not allowed.

    The permissions that can be checked on a subscription are:

    • pubsub.subscriptions.consume
    • pubsub.subscriptions.get
    • pubsub.subscriptions.delete
    • pubsub.subscriptions.update
    • pubsub.subscriptions.getIamPolicy
    • pubsub.subscriptions.setIamPolicy

Returns:

  • (Array<String>)

    The permissions that have access.

See Also:



1410
1411
1412
1413
1414
1415
# File 'lib/google/cloud/pubsub/subscription.rb', line 1410

def test_permissions *permissions
  permissions = Array(permissions).flatten
  ensure_service!
  grpc = service.test_subscription_permissions name, permissions
  grpc.permissions
end

#topicTopic

The Topic from which this subscription receives messages.

Makes an API call to retrieve the topic information when called on a reference object. See #reference?.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
sub.topic.name #=> "projects/my-project/topics/my-topic"

Returns:



104
105
106
107
# File 'lib/google/cloud/pubsub/subscription.rb', line 104

def topic
  ensure_grpc!
  Topic.from_name @grpc.topic, service
end

#topic_retentionNumeric?

Indicates the minimum duration for which a message is retained after it is published to the subscription's topic. If this field is set, messages published to the subscription's topic in the last topic_message_retention_duration are always available to subscribers. Output only. See Topic#retention.

Makes an API call to retrieve the retention value when called on a reference object. See #reference?.

Returns:

  • (Numeric, nil)

    The topic message retention duration in seconds, or nil if not set.



210
211
212
213
# File 'lib/google/cloud/pubsub/subscription.rb', line 210

def topic_retention
  ensure_grpc!
  Convert.duration_to_number @grpc.topic_message_retention_duration
end

#update_policy(new_policy) ⇒ Policy Also known as: policy=

Updates the Cloud IAM access control policy for this subscription. The policy should be read from #policy. See Policy for an explanation of the policy etag property and how to modify policies.

You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
sub = pubsub.subscription "my-subscription"

policy = sub.policy # API call

policy.add "roles/owner", "user:owner@example.com"

sub.update_policy policy # API call

Parameters:

  • new_policy (Policy)

    a new or modified Cloud IAM Policy for this subscription

Returns:

  • (Policy)

    the policy returned by the API update operation

See Also:



1371
1372
1373
1374
1375
# File 'lib/google/cloud/pubsub/subscription.rb', line 1371

def update_policy new_policy
  ensure_service!
  grpc = service.set_subscription_policy name, new_policy.to_grpc
  Policy.from_grpc grpc
end

#wait_for_messages(max: 100) ⇒ Array<Google::Cloud::PubSub::ReceivedMessage>

Pulls from the server while waiting for messages to become available. This is the same as:

subscription.pull immediate: false

See also #listen for the preferred way to process messages as they become available.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
received_messages = sub.wait_for_messages
received_messages.each do |received_message|
  received_message.acknowledge!
end

Parameters:

  • max (Integer) (defaults to: 100)

    The maximum number of messages to return for this request. The Pub/Sub system may return fewer than the number specified. The default value is 100, the maximum value is 1000.

Returns:



893
894
895
# File 'lib/google/cloud/pubsub/subscription.rb', line 893

def wait_for_messages max: 100
  pull immediate: false, max: max
end