Class: Google::Cloud::PubSub::Topic

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/topic.rb,
lib/google/cloud/pubsub/topic/list.rb

Overview

Topic

A named resource to which messages are published.

See Project#create_topic and Project#topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.publish "task completed"

Defined Under Namespace

Classes: List

Instance Method Summary collapse

Instance Method Details

#async_publisherAsyncPublisher

AsyncPublisher object used to publish multiple messages in batches.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

topic.async_publisher.stop!

Returns:



83
84
85
# File 'lib/google/cloud/pubsub/topic.rb', line 83

def async_publisher
  @async_publisher
end

#deleteBoolean

Permanently deletes the topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.delete

Returns:

  • (Boolean)

    Returns true if the topic was deleted.



358
359
360
361
362
# File 'lib/google/cloud/pubsub/topic.rb', line 358

def delete
  ensure_service!
  service.delete_topic name
  true
end

#enable_message_ordering!Object

Note:

At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.

Enables message ordering for messages with ordering keys on the #async_publisher. When enabled, messages published with the same ordering_key will be delivered in the order they were published.

See #message_ordering?. See #publish_async, Subscription#listen, and Message#ordering_key.



826
827
828
829
# File 'lib/google/cloud/pubsub/topic.rb', line 826

def enable_message_ordering!
  @async_publisher ||= AsyncPublisher.new name, service, **@async_opts
  @async_publisher.enable_message_ordering!
end

#exists?Boolean

Determines whether the topic exists in the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.exists? #=> true

Returns:

  • (Boolean)


991
992
993
994
995
996
997
998
999
1000
# File 'lib/google/cloud/pubsub/topic.rb', line 991

def exists?
  # Always true if the object is not set as reference
  return true unless reference?
  # If we have a value, return it
  return @exists unless @exists.nil?
  ensure_grpc!
  @exists = true
rescue Google::Cloud::NotFoundError
  @exists = false
end

#kms_keyString

The Cloud KMS encryption key that will be used to protect access to messages published on this topic. For example: projects/a/locations/b/keyRings/c/cryptoKeys/d The default value is nil, which means default encryption is used.

Makes an API call to retrieve the KMS encryption key when called on a reference object. See #reference?.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"

topic.kms_key #=> "projects/a/locations/b/keyRings/c/cryptoKeys/d"

Returns:

  • (String)


155
156
157
158
# File 'lib/google/cloud/pubsub/topic.rb', line 155

def kms_key
  ensure_grpc!
  @grpc.kms_key_name
end

#kms_key=(new_kms_key_name) ⇒ Object

Set the Cloud KMS encryption key that will be used to protect access to messages published on this topic. For example: projects/a/locations/b/keyRings/c/cryptoKeys/d The default value is nil, which means default encryption is used.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"

key_name = "projects/a/locations/b/keyRings/c/cryptoKeys/d"
topic.kms_key = key_name

Parameters:

  • new_kms_key_name (String)

    New Cloud KMS key name



178
179
180
181
182
# File 'lib/google/cloud/pubsub/topic.rb', line 178

def kms_key= new_kms_key_name
  update_grpc = Google::Cloud::PubSub::V1::Topic.new name: name, kms_key_name: new_kms_key_name
  @grpc = service.update_topic update_grpc, :kms_key_name
  @resource_name = nil
end

#labelsHash

A hash of user-provided labels associated with this topic. Labels can be used to organize and group topics. See Creating and Managing Labels.

The returned hash is frozen and changes are not allowed. Use #labels= to update the labels for this topic.

Makes an API call to retrieve the labels values when called on a reference object. See #reference?.

Returns:

  • (Hash)

    The frozen labels hash.



111
112
113
114
# File 'lib/google/cloud/pubsub/topic.rb', line 111

def labels
  ensure_grpc!
  @grpc.labels.to_h.freeze
end

#labels=(new_labels) ⇒ Object

Sets the hash of user-provided labels associated with this topic. Labels can be used to organize and group topics. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.

Parameters:

  • new_labels (Hash)

    The new labels hash.

Raises:

  • (ArgumentError)


128
129
130
131
132
133
# File 'lib/google/cloud/pubsub/topic.rb', line 128

def labels= new_labels
  raise ArgumentError, "Value must be a Hash" if new_labels.nil?
  update_grpc = Google::Cloud::PubSub::V1::Topic.new name: name, labels: new_labels
  @grpc = service.update_topic update_grpc, :labels
  @resource_name = nil
end

#message_encodingSymbol?

The encoding of messages validated against the schema identified by #schema_name. If present, #schema_name should also be present. Values include:

  • JSON - JSON encoding.
  • BINARY - Binary encoding, as defined by the schema type. For some schema types, binary encoding may not be available.

Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"

topic.message_encoding #=> :JSON

Returns:

  • (Symbol, nil)

    The schema encoding, or nil if schema settings are not configured for the topic.



281
282
283
284
# File 'lib/google/cloud/pubsub/topic.rb', line 281

def message_encoding
  ensure_grpc!
  @grpc.schema_settings&.encoding
end

#message_encoding_binary?Boolean

Checks if the encoding of messages in the schema settings is BINARY. See #message_encoding.

Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.

Returns:

  • (Boolean)

    true when BINARY, false if not BINARY or schema settings is not set.



293
294
295
# File 'lib/google/cloud/pubsub/topic.rb', line 293

def message_encoding_binary?
  message_encoding.to_s.upcase == "BINARY"
end

#message_encoding_json?Boolean

Checks if the encoding of messages in the schema settings is JSON. See #message_encoding.

Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.

Returns:

  • (Boolean)

    true when JSON, false if not JSON or schema settings is not set.



304
305
306
# File 'lib/google/cloud/pubsub/topic.rb', line 304

def message_encoding_json?
  message_encoding.to_s.upcase == "JSON"
end

#message_ordering?Boolean

Whether message ordering for messages with ordering keys has been enabled on the #async_publisher. When enabled, messages published with the same ordering_key will be delivered in the order they were published. When disabled, messages may be delivered in any order.

See #enable_message_ordering!. See #publish_async, Subscription#listen, and Message#ordering_key.

Returns:

  • (Boolean)


842
843
844
845
# File 'lib/google/cloud/pubsub/topic.rb', line 842

def message_ordering?
  @async_publisher ||= AsyncPublisher.new name, service, **@async_opts
  @async_publisher.message_ordering?
end

#nameString

The name of the topic.

Returns:

  • (String)

    A fully-qualified topic name in the form projects/{project_id}/topics/{topic_id}.



93
94
95
96
# File 'lib/google/cloud/pubsub/topic.rb', line 93

def name
  return @resource_name if reference?
  @grpc.name
end

#persistence_regionsArray<String>

The list of GCP region IDs where messages that are published to the topic may be persisted in storage.

Messages published by publishers running in non-allowed GCP regions (or running outside of GCP altogether) will be routed for storage in one of the allowed regions. An empty list indicates a misconfiguration at the project or organization level, which will result in all publish operations failing.

Makes an API call to retrieve the list of GCP region IDs values when called on a reference object. See #reference?.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"

topic.persistence_regions #=> ["us-central1", "us-central2"]

Returns:

  • (Array<String>)


208
209
210
211
212
# File 'lib/google/cloud/pubsub/topic.rb', line 208

def persistence_regions
  ensure_grpc!
  return [] if @grpc.message_storage_policy.nil?
  Array @grpc.message_storage_policy.allowed_persistence_regions
end

#persistence_regions=(new_persistence_regions) ⇒ Object

Sets the list of GCP region IDs where messages that are published to the topic may be persisted in storage.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"

topic.persistence_regions = ["us-central1", "us-central2"]

Parameters:

  • new_persistence_regions (Array<String>)


229
230
231
232
233
234
# File 'lib/google/cloud/pubsub/topic.rb', line 229

def persistence_regions= new_persistence_regions
  update_grpc = Google::Cloud::PubSub::V1::Topic.new \
    name: name, message_storage_policy: { allowed_persistence_regions: Array(new_persistence_regions) }
  @grpc = service.update_topic update_grpc, :message_storage_policy
  @resource_name = nil
end

#policy {|policy| ... } ⇒ Policy

Gets the Cloud IAM access control policy for this topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "my-topic"

policy = topic.policy

Update the policy by passing a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "my-topic"

topic.policy do |p|
  p.add "roles/owner", "user:owner@example.com"
end

Yields:

  • (policy)

    A block for updating the policy. The latest policy will be read from the Pub/Sub service and passed to the block. After the block completes, the modified policy will be written to the service.

Yield Parameters:

  • policy (Policy)

    the current Cloud IAM Policy for this topic

Returns:

  • (Policy)

    the current Cloud IAM Policy for this topic

See Also:



894
895
896
897
898
899
900
901
# File 'lib/google/cloud/pubsub/topic.rb', line 894

def policy
  ensure_service!
  grpc = service.get_topic_policy name
  policy = Policy.from_grpc grpc
  return policy unless block_given?
  yield policy
  update_policy policy
end

#publish(data = nil, attributes = nil, ordering_key: nil, compress: nil, compression_bytes_threshold: nil, **extra_attrs) {|batch| ... } ⇒ Message+

Publishes one or more messages to the topic.

The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
msg = topic.publish "task completed"

A message can be published using a File object:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
file = File.open "message.txt", mode: "rb"
msg = topic.publish file

Additionally, a message can be published with attributes:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
msg = topic.publish "task completed",
                    foo: :bar,
                    this: :that

Multiple messages can be sent at the same time using a block:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
msgs = topic.publish do |t|
  t.publish "task 1 completed", foo: :bar
  t.publish "task 2 completed", foo: :baz
  t.publish "task 3 completed", foo: :bif
end

Ordered messages are supported using ordering_key:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-ordered-topic"

# Ensure that message ordering is enabled.
topic.enable_message_ordering!

# Publish an ordered message with an ordering key.
topic.publish "task completed",
              ordering_key: "task-key"

Parameters:

  • data (String, File) (defaults to: nil)

    The message payload. This will be converted to bytes encoded as ASCII-8BIT.

  • attributes (Hash) (defaults to: nil)

    Optional attributes for the message.

  • ordering_key (String) (defaults to: nil)

    Identifies related messages for which publish order should be respected.

Yields:

  • (batch)

    a block for publishing multiple messages in one request

Yield Parameters:

Returns:

  • (Message, Array<Message>)

    Returns the published message when called without a block, or an array of messages when called with a block.



679
680
681
682
683
684
685
686
687
688
689
690
691
692
# File 'lib/google/cloud/pubsub/topic.rb', line 679

def publish data = nil, attributes = nil, ordering_key: nil, compress: nil, compression_bytes_threshold: nil,
            **extra_attrs, &block
  ensure_service!
  batch = BatchPublisher.new data,
                             attributes,
                             ordering_key,
                             extra_attrs,
                             compress: compress,
                             compression_bytes_threshold: compression_bytes_threshold

  block&.call batch
  return nil if batch.messages.count.zero?
  batch.publish_batch_messages name, service
end

#publish_async(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object

Note:

At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.

Publishes a message asynchronously to the topic using #async_publisher.

The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.

Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.

To use ordering keys, specify ordering_key. Before specifying ordering_key on a message a call to #enable_message_ordering! must be made or an error will be raised.

Publisher flow control limits the number of outstanding messages that are allowed to wait to be published. See the flow_control key in the async parameter in Project#topic for more information about publisher flow control settings.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

# Shut down the publisher when ready to stop publishing messages.
topic.async_publisher.stop!

A message can be published using a File object:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
file = File.open "message.txt", mode: "rb"
topic.publish_async file

# Shut down the publisher when ready to stop publishing messages.
topic.async_publisher.stop!

Additionally, a message can be published with attributes:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.publish_async "task completed",
                    foo: :bar, this: :that

# Shut down the publisher when ready to stop publishing messages.
topic.async_publisher.stop!

Ordered messages are supported using ordering_key:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-ordered-topic"

# Ensure that message ordering is enabled.
topic.enable_message_ordering!

# Publish an ordered message with an ordering key.
topic.publish_async "task completed",
                    ordering_key: "task-key"

# Shut down the publisher when ready to stop publishing messages.
topic.async_publisher.stop!

Parameters:

  • data (String, File) (defaults to: nil)

    The message payload. This will be converted to bytes encoded as ASCII-8BIT.

  • attributes (Hash) (defaults to: nil)

    Optional attributes for the message.

  • ordering_key (String) (defaults to: nil)

    Identifies related messages for which publish order should be respected.

Yields:

  • (result)

    the callback for when the message has been published

Yield Parameters:

  • result (PublishResult)

    the result of the asynchronous publish

Raises:



808
809
810
811
812
813
# File 'lib/google/cloud/pubsub/topic.rb', line 808

def publish_async data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback
  ensure_service!

  @async_publisher ||= AsyncPublisher.new name, service, **@async_opts
  @async_publisher.publish data, attributes, ordering_key: ordering_key, **extra_attrs, &callback
end

#reference?Boolean

Determines whether the topic object was created without retrieving the resource representation from the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic", skip_lookup: true
topic.reference? #=> true

Returns:

  • (Boolean)

    true when the topic was created without a resource representation, false otherwise.



1017
1018
1019
# File 'lib/google/cloud/pubsub/topic.rb', line 1017

def reference?
  @grpc.nil?
end

#reload!Google::Cloud::PubSub::Topic Also known as: refresh!

Reloads the topic with current data from the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.reload!

Returns:



1053
1054
1055
1056
1057
1058
# File 'lib/google/cloud/pubsub/topic.rb', line 1053

def reload!
  ensure_service!
  @grpc = service.get_topic name
  @resource_name = nil
  self
end

#resource?Boolean

Determines whether the topic object was created with a resource representation from the Pub/Sub service.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.resource? #=> true

Returns:

  • (Boolean)

    true when the topic was created with a resource representation, false otherwise.



1036
1037
1038
# File 'lib/google/cloud/pubsub/topic.rb', line 1036

def resource?
  !@grpc.nil?
end

#resume_publish(ordering_key) ⇒ boolean

Resume publishing ordered messages for the provided ordering key.

Parameters:

  • ordering_key (String)

    Identifies related messages for which publish order should be respected.

Returns:

  • (boolean)

    true when resumed, false otherwise.



855
856
857
858
# File 'lib/google/cloud/pubsub/topic.rb', line 855

def resume_publish ordering_key
  @async_publisher ||= AsyncPublisher.new name, service, **@async_opts
  @async_publisher.resume_publish ordering_key
end

#retentionNumeric?

Indicates the minimum number of seconds to retain a message after it is published to the topic. If this field is set, messages published to the topic within the retention number of seconds are always available to subscribers. For instance, it allows any attached subscription to seek to a timestamp that is up to retention number of seconds in the past. If this field is not set, message retention is controlled by settings on individual subscriptions. Cannot be less than 600 (10 minutes) or more than 604,800 (7 days). See #retention=.

Makes an API call to retrieve the retention value when called on a reference object. See #reference?.

Returns:

  • (Numeric, nil)

    The message retention duration in seconds, or nil if not set.



324
325
326
327
# File 'lib/google/cloud/pubsub/topic.rb', line 324

def retention
  ensure_grpc!
  Convert.duration_to_number @grpc.message_retention_duration
end

#retention=(new_retention) ⇒ Object

Sets the message retention duration in seconds. If set to a positive duration between 600 (10 minutes) and 604,800 (7 days), inclusive, the message retention duration is changed. If set to nil, this clears message retention duration from the topic. See #retention.

Parameters:

  • new_retention (Numeric, nil)

    The new message retention duration value.



337
338
339
340
341
342
343
# File 'lib/google/cloud/pubsub/topic.rb', line 337

def retention= new_retention
  new_retention_duration = Convert.number_to_duration new_retention
  update_grpc = Google::Cloud::PubSub::V1::Topic.new name: name,
                                                     message_retention_duration: new_retention_duration
  @grpc = service.update_topic update_grpc, :message_retention_duration
  @resource_name = nil
end

#schema_nameString?

The name of the schema that messages published should be validated against, if schema settings are configured for the topic. The value is a fully-qualified schema name in the form projects/{project_id}/schemas/{schema_id}. If present, #message_encoding should also be present. The value of this field will be _deleted-schema_ if the schema has been deleted.

Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"

topic.schema_name #=> "projects/my-project/schemas/my-schema"

Returns:

  • (String, nil)

    The schema name, or nil if schema settings are not configured for the topic.



255
256
257
258
# File 'lib/google/cloud/pubsub/topic.rb', line 255

def schema_name
  ensure_grpc!
  @grpc.schema_settings&.schema
end

#subscribe(subscription_name, **options) ⇒ Google::Cloud::PubSub::Subscription Also known as: create_subscription, new_subscription

Creates a new Subscription object on the current Topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub"
sub.name # => "my-topic-sub"

Wait 2 minutes for acknowledgement:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub",
                      deadline: 120

Configure a push endpoint:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "my-topic"

push_config = Google::Cloud::PubSub::Subscription::PushConfig.new endpoint: "http://example.net/callback"
push_config.set_oidc_token "service-account@example.net", "audience-header-value"

sub = topic.subscribe "my-subscription", push_config: push_config

Configure a Dead Letter Queues policy:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

# Dead Letter Queue (DLQ) testing requires IAM bindings to the Cloud Pub/Sub service account that is
# automatically created and managed by the service team in a private project.
my_project_number = "000000000000"
 = "serviceAccount:service-#{my_project_number}@gcp-sa-pubsub.iam.gserviceaccount.com"

dead_letter_topic = pubsub.topic "my-dead-letter-topic"
dead_letter_subscription = dead_letter_topic.subscribe "my-dead-letter-sub"

dead_letter_topic.policy { |p| p.add "roles/pubsub.publisher",  }
dead_letter_subscription.policy { |p| p.add "roles/pubsub.subscriber",  }

topic = pubsub.topic "my-topic"
sub = topic.subscribe "my-topic-sub",
                      dead_letter_topic: dead_letter_topic,
                      dead_letter_max_delivery_attempts: 10

Configure a Retry Policy:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"

retry_policy = Google::Cloud::PubSub::RetryPolicy.new minimum_backoff: 5, maximum_backoff: 300
sub = topic.subscribe "my-topic-sub", retry_policy: retry_policy

Parameters:

  • options (Hash)

    a customizable set of options

Options Hash (**options):

  • subscription_name (String)

    Name of the new subscription. Required. The value can be a simple subscription ID (relative name), in which case the current project ID will be supplied, or a fully-qualified subscription name in the form projects/{project_id}/subscriptions/{subscription_id}.

    The subscription ID (relative name) must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and it must not start with goog.

  • deadline (Integer)

    The maximum number of seconds after a subscriber receives a message before the subscriber should acknowledge the message.

  • retain_acked (Boolean)

    Indicates whether to retain acknowledged messages. If true, then messages are not expunged from the subscription's backlog, even if they are acknowledged, until they fall out of the retention window. Default is false.

  • retention (Numeric)

    How long to retain unacknowledged messages in the subscription's backlog, from the moment a message is published. If retain_acked is true, then this also configures the retention of acknowledged messages, and thus configures how far back in time a Subscription#seek can be done. Cannot be more than 604,800 seconds (7 days) or less than 600 seconds (10 minutes). Default is 604,800 seconds (7 days).

  • endpoint (String)

    A URL locating the endpoint to which messages should be pushed. The parameters push_config and endpoint should not both be provided.

  • push_config (Google::Cloud::PubSub::Subscription::PushConfig)

    The configuration for a push delivery endpoint that should contain the endpoint, and can contain authentication data (OIDC token authentication). The parameters push_config and endpoint should not both be provided.

  • labels (Hash)

    A hash of user-provided labels associated with the subscription. You can use these to organize and group your subscriptions. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.

  • message_ordering (Boolean)

    Whether to enable message ordering on the subscription.

  • filter (String)

    An expression written in the Cloud Pub/Sub filter language. If non-empty, then only Message instances whose attributes field matches the filter are delivered on this subscription. If empty, then no messages are filtered out. Optional.

  • dead_letter_topic (Topic)

    The Google::Cloud::PubSub::Topic to which dead letter messages for the subscription should be published. Dead lettering is done on a best effort basis. The same message might be dead lettered multiple times. The Cloud Pub/Sub service account associated with the enclosing subscription's parent project (i.e., service-{project_number}@gcp-sa-pubsub.iam.gserviceaccount.com) must have permission to Publish() to this topic.

    The operation will fail if the topic does not exist. Users should ensure that there is a subscription attached to this topic since messages published to a topic with no subscriptions are lost.

  • dead_letter_max_delivery_attempts (Integer)

    The maximum number of delivery attempts for any message in the subscription's dead letter policy. Dead lettering is done on a best effort basis. The same message might be dead lettered multiple times. The value must be between 5 and 100. If this parameter is 0, a default value of 5 is used. The dead_letter_topic must also be set.

  • retry_policy (RetryPolicy)

    A policy that specifies how Cloud Pub/Sub retries message delivery for this subscription. If not set, the default retry policy is applied. This generally implies that messages will be retried as soon as possible for healthy subscribers. Retry Policy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.

Returns:



494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
# File 'lib/google/cloud/pubsub/topic.rb', line 494

def subscribe subscription_name, **options
  ensure_service!
  if options[:push_config] && options[:endpoint]
    raise ArgumentError, "endpoint and push_config were both provided. Please provide only one."
  end
  if options[:endpoint]
    options[:push_config] =
      Google::Cloud::PubSub::Subscription::PushConfig.new endpoint: options[:endpoint]
  end

  options[:dead_letter_topic_name] = options[:dead_letter_topic].name if options[:dead_letter_topic]
  if options[:dead_letter_max_delivery_attempts] && !options[:dead_letter_topic_name]
    # Service error message "3:Invalid resource name given (name=)." does not identify param.
    raise ArgumentError, "dead_letter_topic is required with dead_letter_max_delivery_attempts"
  end
  options[:push_config] = options[:push_config].to_grpc if options[:push_config]
  options[:retry_policy] = options[:retry_policy].to_grpc if options[:retry_policy]
  grpc = service.create_subscription name, subscription_name, options
  Subscription.from_grpc grpc, service
end

#subscription(subscription_name, skip_lookup: nil) ⇒ Google::Cloud::PubSub::Subscription? Also known as: get_subscription, find_subscription

Retrieves subscription by name.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"

sub = topic.subscription "my-topic-sub"
sub.name #=> "projects/my-project/subscriptions/my-topic-sub"

Skip the lookup against the service with skip_lookup:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"

# No API call is made to retrieve the subscription information.
sub = topic.subscription "my-topic-sub", skip_lookup: true
sub.name #=> "projects/my-project/subscriptions/my-topic-sub"

Parameters:

  • subscription_name (String)

    Name of a subscription. The value can be a simple subscription ID (relative name), in which case the current project ID will be supplied, or a fully-qualified subscription name in the form projects/{project_id}/subscriptions/{subscription_id}.

  • skip_lookup (Boolean) (defaults to: nil)

    Optionally create a Subscription object without verifying the subscription resource exists on the Pub/Sub service. Calls made on this object will raise errors if the service resource does not exist. Default is false.

Returns:



554
555
556
557
558
559
560
561
# File 'lib/google/cloud/pubsub/topic.rb', line 554

def subscription subscription_name, skip_lookup: nil
  ensure_service!
  return Subscription.from_name subscription_name, service if skip_lookup
  grpc = service.get_subscription subscription_name
  Subscription.from_grpc grpc, service
rescue Google::Cloud::NotFoundError
  nil
end

#subscriptions(token: nil, max: nil) ⇒ Array<Subscription> Also known as: find_subscriptions, list_subscriptions

Retrieves a list of subscription names for the given project.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
subscriptions = topic.subscriptions
subscriptions.each do |subscription|
  puts subscription.name
end

Retrieve all subscriptions: (See Subscription::List#all)

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
subscriptions = topic.subscriptions
subscriptions.all do |subscription|
  puts subscription.name
end

Parameters:

  • token (String) (defaults to: nil)

    The token value returned by the last call to subscriptions; indicates that this is a continuation of a call, and that the system should return the next page of data.

  • max (Integer) (defaults to: nil)

    Maximum number of subscriptions to return.

Returns:



597
598
599
600
601
602
# File 'lib/google/cloud/pubsub/topic.rb', line 597

def subscriptions token: nil, max: nil
  ensure_service!
  options = { token: token, max: max }
  grpc = service.list_topics_subscriptions name, options
  Subscription::List.from_topic_grpc grpc, service, name, max
end

#test_permissions(*permissions) ⇒ Array<Strings>

Tests the specified permissions against the Cloud IAM access control policy.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "my-topic"
perms = topic.test_permissions "pubsub.topics.get",
                               "pubsub.topics.publish"
perms.include? "pubsub.topics.get" #=> true
perms.include? "pubsub.topics.publish" #=> false

Parameters:

  • permissions (String, Array<String>)

    The set of permissions to check access for. Permissions with wildcards (such as * or storage.*) are not allowed.

    The permissions that can be checked on a topic are:

    • pubsub.topics.publish
    • pubsub.topics.attachSubscription
    • pubsub.topics.get
    • pubsub.topics.delete
    • pubsub.topics.update
    • pubsub.topics.getIamPolicy
    • pubsub.topics.setIamPolicy

Returns:

  • (Array<Strings>)

    The permissions that have access.

See Also:



972
973
974
975
976
977
978
# File 'lib/google/cloud/pubsub/topic.rb', line 972

def test_permissions *permissions
  permissions = Array(permissions).flatten
  permissions = Array(permissions).flatten
  ensure_service!
  grpc = service.test_topic_permissions name, permissions
  grpc.permissions
end

#update_policy(new_policy) ⇒ Policy Also known as: policy=

Updates the Cloud IAM access control policy for this topic. The policy should be read from #policy. See Policy for an explanation of the policy etag property and how to modify policies.

You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "my-topic"

policy = topic.policy # API call

policy.add "roles/owner", "user:owner@example.com"

topic.update_policy policy # API call

Parameters:

  • new_policy (Policy)

    a new or modified Cloud IAM Policy for this topic

Returns:

  • (Policy)

    the policy returned by the API update operation

See Also:



932
933
934
935
936
# File 'lib/google/cloud/pubsub/topic.rb', line 932

def update_policy new_policy
  ensure_service!
  grpc = service.set_topic_policy name, new_policy.to_grpc
  @policy = Policy.from_grpc grpc
end