Class: Google::Cloud::PubSub::Topic
- Inherits:
-
Object
- Object
- Google::Cloud::PubSub::Topic
- Defined in:
- lib/google/cloud/pubsub/topic.rb,
lib/google/cloud/pubsub/topic/list.rb
Overview
Defined Under Namespace
Classes: List
Instance Method Summary collapse
-
#async_publisher ⇒ AsyncPublisher
AsyncPublisher object used to publish multiple messages in batches.
-
#delete ⇒ Boolean
Permanently deletes the topic.
-
#enable_message_ordering! ⇒ Object
Enables message ordering for messages with ordering keys on the #async_publisher.
-
#exists? ⇒ Boolean
Determines whether the topic exists in the Pub/Sub service.
-
#kms_key ⇒ String
The Cloud KMS encryption key that will be used to protect access to messages published on this topic.
-
#kms_key=(new_kms_key_name) ⇒ Object
Set the Cloud KMS encryption key that will be used to protect access to messages published on this topic.
-
#labels ⇒ Hash
A hash of user-provided labels associated with this topic.
-
#labels=(new_labels) ⇒ Object
Sets the hash of user-provided labels associated with this topic.
-
#message_encoding ⇒ Symbol?
The encoding of messages validated against the schema identified by #schema_name.
-
#message_encoding_binary? ⇒ Boolean
Checks if the encoding of messages in the schema settings is
BINARY
. -
#message_encoding_json? ⇒ Boolean
Checks if the encoding of messages in the schema settings is
JSON
. -
#message_ordering? ⇒ Boolean
Whether message ordering for messages with ordering keys has been enabled on the #async_publisher.
-
#name ⇒ String
The name of the topic.
-
#persistence_regions ⇒ Array<String>
The list of GCP region IDs where messages that are published to the topic may be persisted in storage.
-
#persistence_regions=(new_persistence_regions) ⇒ Object
Sets the list of GCP region IDs where messages that are published to the topic may be persisted in storage.
-
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this topic.
-
#publish(data = nil, attributes = nil, ordering_key: nil, compress: nil, compression_bytes_threshold: nil, **extra_attrs) {|batch| ... } ⇒ Message+
Publishes one or more messages to the topic.
-
#publish_async(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object
Publishes a message asynchronously to the topic using #async_publisher.
-
#reference? ⇒ Boolean
Determines whether the topic object was created without retrieving the resource representation from the Pub/Sub service.
-
#reload! ⇒ Google::Cloud::PubSub::Topic
(also: #refresh!)
Reloads the topic with current data from the Pub/Sub service.
-
#resource? ⇒ Boolean
Determines whether the topic object was created with a resource representation from the Pub/Sub service.
-
#resume_publish(ordering_key) ⇒ boolean
Resume publishing ordered messages for the provided ordering key.
-
#retention ⇒ Numeric?
Indicates the minimum number of seconds to retain a message after it is published to the topic.
-
#retention=(new_retention) ⇒ Object
Sets the message retention duration in seconds.
-
#schema_name ⇒ String?
The name of the schema that messages published should be validated against, if schema settings are configured for the topic.
-
#subscribe(subscription_name, **options) ⇒ Google::Cloud::PubSub::Subscription
(also: #create_subscription, #new_subscription)
Creates a new Subscription object on the current Topic.
-
#subscription(subscription_name, skip_lookup: nil) ⇒ Google::Cloud::PubSub::Subscription?
(also: #get_subscription, #find_subscription)
Retrieves subscription by name.
-
#subscriptions(token: nil, max: nil) ⇒ Array<Subscription>
(also: #find_subscriptions, #list_subscriptions)
Retrieves a list of subscription names for the given project.
-
#test_permissions(*permissions) ⇒ Array<Strings>
Tests the specified permissions against the Cloud IAM access control policy.
-
#update_policy(new_policy) ⇒ Policy
(also: #policy=)
Updates the Cloud IAM access control policy for this topic.
Instance Method Details
#async_publisher ⇒ AsyncPublisher
AsyncPublisher object used to publish multiple messages in batches.
83 84 85 |
# File 'lib/google/cloud/pubsub/topic.rb', line 83 def async_publisher @async_publisher end |
#delete ⇒ Boolean
Permanently deletes the topic.
358 359 360 361 362 |
# File 'lib/google/cloud/pubsub/topic.rb', line 358 def delete ensure_service! service.delete_topic name true end |
#enable_message_ordering! ⇒ Object
At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.
Enables message ordering for messages with ordering keys on the
#async_publisher. When enabled, messages published with the same
ordering_key
will be delivered in the order they were published.
See #message_ordering?. See #publish_async, Subscription#listen, and Message#ordering_key.
826 827 828 829 |
# File 'lib/google/cloud/pubsub/topic.rb', line 826 def @async_publisher ||= AsyncPublisher.new name, service, **@async_opts @async_publisher. end |
#exists? ⇒ Boolean
Determines whether the topic exists in the Pub/Sub service.
991 992 993 994 995 996 997 998 999 1000 |
# File 'lib/google/cloud/pubsub/topic.rb', line 991 def exists? # Always true if the object is not set as reference return true unless reference? # If we have a value, return it return @exists unless @exists.nil? ensure_grpc! @exists = true rescue Google::Cloud::NotFoundError @exists = false end |
#kms_key ⇒ String
The Cloud KMS encryption key that will be used to protect access
to messages published on this topic.
For example: projects/a/locations/b/keyRings/c/cryptoKeys/d
The default value is nil
, which means default encryption is used.
Makes an API call to retrieve the KMS encryption key when called on a reference object. See #reference?.
155 156 157 158 |
# File 'lib/google/cloud/pubsub/topic.rb', line 155 def kms_key ensure_grpc! @grpc.kms_key_name end |
#kms_key=(new_kms_key_name) ⇒ Object
Set the Cloud KMS encryption key that will be used to protect access
to messages published on this topic.
For example: projects/a/locations/b/keyRings/c/cryptoKeys/d
The default value is nil
, which means default encryption is used.
178 179 180 181 182 |
# File 'lib/google/cloud/pubsub/topic.rb', line 178 def kms_key= new_kms_key_name update_grpc = Google::Cloud::PubSub::V1::Topic.new name: name, kms_key_name: new_kms_key_name @grpc = service.update_topic update_grpc, :kms_key_name @resource_name = nil end |
#labels ⇒ Hash
A hash of user-provided labels associated with this topic. Labels can be used to organize and group topics. See Creating and Managing Labels.
The returned hash is frozen and changes are not allowed. Use #labels= to update the labels for this topic.
Makes an API call to retrieve the labels values when called on a reference object. See #reference?.
111 112 113 114 |
# File 'lib/google/cloud/pubsub/topic.rb', line 111 def labels ensure_grpc! @grpc.labels.to_h.freeze end |
#labels=(new_labels) ⇒ Object
Sets the hash of user-provided labels associated with this topic. Labels can be used to organize and group topics. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.
128 129 130 131 132 133 |
# File 'lib/google/cloud/pubsub/topic.rb', line 128 def labels= new_labels raise ArgumentError, "Value must be a Hash" if new_labels.nil? update_grpc = Google::Cloud::PubSub::V1::Topic.new name: name, labels: new_labels @grpc = service.update_topic update_grpc, :labels @resource_name = nil end |
#message_encoding ⇒ Symbol?
The encoding of messages validated against the schema identified by #schema_name. If present, #schema_name should also be present. Values include:
JSON
- JSON encoding.BINARY
- Binary encoding, as defined by the schema type. For some schema types, binary encoding may not be available.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
281 282 283 284 |
# File 'lib/google/cloud/pubsub/topic.rb', line 281 def ensure_grpc! @grpc.schema_settings&.encoding end |
#message_encoding_binary? ⇒ Boolean
Checks if the encoding of messages in the schema settings is BINARY
. See #message_encoding.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
293 294 295 |
# File 'lib/google/cloud/pubsub/topic.rb', line 293 def .to_s.upcase == "BINARY" end |
#message_encoding_json? ⇒ Boolean
Checks if the encoding of messages in the schema settings is JSON
. See #message_encoding.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
304 305 306 |
# File 'lib/google/cloud/pubsub/topic.rb', line 304 def .to_s.upcase == "JSON" end |
#message_ordering? ⇒ Boolean
Whether message ordering for messages with ordering keys has been
enabled on the #async_publisher. When enabled, messages published
with the same ordering_key
will be delivered in the order they were
published. When disabled, messages may be delivered in any order.
See #enable_message_ordering!. See #publish_async, Subscription#listen, and Message#ordering_key.
842 843 844 845 |
# File 'lib/google/cloud/pubsub/topic.rb', line 842 def @async_publisher ||= AsyncPublisher.new name, service, **@async_opts @async_publisher. end |
#name ⇒ String
The name of the topic.
93 94 95 96 |
# File 'lib/google/cloud/pubsub/topic.rb', line 93 def name return @resource_name if reference? @grpc.name end |
#persistence_regions ⇒ Array<String>
The list of GCP region IDs where messages that are published to the topic may be persisted in storage.
Messages published by publishers running in non-allowed GCP regions (or running outside of GCP altogether) will be routed for storage in one of the allowed regions. An empty list indicates a misconfiguration at the project or organization level, which will result in all publish operations failing.
Makes an API call to retrieve the list of GCP region IDs values when called on a reference object. See #reference?.
208 209 210 211 212 |
# File 'lib/google/cloud/pubsub/topic.rb', line 208 def persistence_regions ensure_grpc! return [] if @grpc..nil? Array @grpc..allowed_persistence_regions end |
#persistence_regions=(new_persistence_regions) ⇒ Object
Sets the list of GCP region IDs where messages that are published to the topic may be persisted in storage.
229 230 231 232 233 234 |
# File 'lib/google/cloud/pubsub/topic.rb', line 229 def persistence_regions= new_persistence_regions update_grpc = Google::Cloud::PubSub::V1::Topic.new \ name: name, message_storage_policy: { allowed_persistence_regions: Array(new_persistence_regions) } @grpc = service.update_topic update_grpc, :message_storage_policy @resource_name = nil end |
#policy {|policy| ... } ⇒ Policy
Gets the Cloud IAM access control policy for this topic.
894 895 896 897 898 899 900 901 |
# File 'lib/google/cloud/pubsub/topic.rb', line 894 def policy ensure_service! grpc = service.get_topic_policy name policy = Policy.from_grpc grpc return policy unless block_given? yield policy update_policy policy end |
#publish(data = nil, attributes = nil, ordering_key: nil, compress: nil, compression_bytes_threshold: nil, **extra_attrs) {|batch| ... } ⇒ Message+
Publishes one or more messages to the topic.
The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.
679 680 681 682 683 684 685 686 687 688 689 690 691 692 |
# File 'lib/google/cloud/pubsub/topic.rb', line 679 def publish data = nil, attributes = nil, ordering_key: nil, compress: nil, compression_bytes_threshold: nil, **extra_attrs, &block ensure_service! batch = BatchPublisher.new data, attributes, ordering_key, extra_attrs, compress: compress, compression_bytes_threshold: compression_bytes_threshold block&.call batch return nil if batch..count.zero? batch. name, service end |
#publish_async(data = nil, attributes = nil, ordering_key: nil, **extra_attrs) {|result| ... } ⇒ Object
At the time of this release, ordering keys are not yet publicly enabled and requires special project enablements.
Publishes a message asynchronously to the topic using #async_publisher.
The message payload must not be empty; it must contain either a non-empty data field, or at least one attribute.
Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.
To use ordering keys, specify ordering_key
. Before specifying
ordering_key
on a message a call to #enable_message_ordering!
must
be made or an error will be raised.
Publisher flow control limits the number of outstanding messages that
are allowed to wait to be published. See the flow_control
key in the
async
parameter in Project#topic for more information about publisher
flow control settings.
808 809 810 811 812 813 |
# File 'lib/google/cloud/pubsub/topic.rb', line 808 def publish_async data = nil, attributes = nil, ordering_key: nil, **extra_attrs, &callback ensure_service! @async_publisher ||= AsyncPublisher.new name, service, **@async_opts @async_publisher.publish data, attributes, ordering_key: ordering_key, **extra_attrs, &callback end |
#reference? ⇒ Boolean
Determines whether the topic object was created without retrieving the resource representation from the Pub/Sub service.
1017 1018 1019 |
# File 'lib/google/cloud/pubsub/topic.rb', line 1017 def reference? @grpc.nil? end |
#reload! ⇒ Google::Cloud::PubSub::Topic Also known as: refresh!
Reloads the topic with current data from the Pub/Sub service.
1053 1054 1055 1056 1057 1058 |
# File 'lib/google/cloud/pubsub/topic.rb', line 1053 def reload! ensure_service! @grpc = service.get_topic name @resource_name = nil self end |
#resource? ⇒ Boolean
Determines whether the topic object was created with a resource representation from the Pub/Sub service.
1036 1037 1038 |
# File 'lib/google/cloud/pubsub/topic.rb', line 1036 def resource? !@grpc.nil? end |
#resume_publish(ordering_key) ⇒ boolean
Resume publishing ordered messages for the provided ordering key.
855 856 857 858 |
# File 'lib/google/cloud/pubsub/topic.rb', line 855 def resume_publish ordering_key @async_publisher ||= AsyncPublisher.new name, service, **@async_opts @async_publisher.resume_publish ordering_key end |
#retention ⇒ Numeric?
Indicates the minimum number of seconds to retain a message after it is
published to the topic. If this field is set, messages published to the topic
within the retention
number of seconds are always available to subscribers.
For instance, it allows any attached subscription to seek to a
timestamp
that is up to retention
number of seconds in the past. If this field is
not set, message retention is controlled by settings on individual
subscriptions. Cannot be less than 600 (10 minutes) or more than 604,800 (7 days).
See #retention=.
Makes an API call to retrieve the retention value when called on a reference object. See #reference?.
324 325 326 327 |
# File 'lib/google/cloud/pubsub/topic.rb', line 324 def retention ensure_grpc! Convert.duration_to_number @grpc. end |
#retention=(new_retention) ⇒ Object
Sets the message retention duration in seconds. If set to a positive duration
between 600 (10 minutes) and 604,800 (7 days), inclusive, the message retention
duration is changed. If set to nil
, this clears message retention duration
from the topic. See #retention.
337 338 339 340 341 342 343 |
# File 'lib/google/cloud/pubsub/topic.rb', line 337 def retention= new_retention new_retention_duration = Convert.number_to_duration new_retention update_grpc = Google::Cloud::PubSub::V1::Topic.new name: name, message_retention_duration: new_retention_duration @grpc = service.update_topic update_grpc, :message_retention_duration @resource_name = nil end |
#schema_name ⇒ String?
The name of the schema that messages published should be validated against, if schema settings are configured
for the topic. The value is a fully-qualified schema name in the form
projects/{project_id}/schemas/{schema_id}
. If present, #message_encoding should also be present. The value
of this field will be _deleted-schema_
if the schema has been deleted.
Makes an API call to retrieve the schema settings when called on a reference object. See #reference?.
255 256 257 258 |
# File 'lib/google/cloud/pubsub/topic.rb', line 255 def schema_name ensure_grpc! @grpc.schema_settings&.schema end |
#subscribe(subscription_name, **options) ⇒ Google::Cloud::PubSub::Subscription Also known as: create_subscription, new_subscription
Creates a new Subscription object on the current Topic.
494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 |
# File 'lib/google/cloud/pubsub/topic.rb', line 494 def subscribe subscription_name, ** ensure_service! if [:push_config] && [:endpoint] raise ArgumentError, "endpoint and push_config were both provided. Please provide only one." end if [:endpoint] [:push_config] = Google::Cloud::PubSub::Subscription::PushConfig.new endpoint: [:endpoint] end [:dead_letter_topic_name] = [:dead_letter_topic].name if [:dead_letter_topic] if [:dead_letter_max_delivery_attempts] && ![:dead_letter_topic_name] # Service error message "3:Invalid resource name given (name=)." does not identify param. raise ArgumentError, "dead_letter_topic is required with dead_letter_max_delivery_attempts" end [:push_config] = [:push_config].to_grpc if [:push_config] [:retry_policy] = [:retry_policy].to_grpc if [:retry_policy] grpc = service.create_subscription name, subscription_name, Subscription.from_grpc grpc, service end |
#subscription(subscription_name, skip_lookup: nil) ⇒ Google::Cloud::PubSub::Subscription? Also known as: get_subscription, find_subscription
Retrieves subscription by name.
554 555 556 557 558 559 560 561 |
# File 'lib/google/cloud/pubsub/topic.rb', line 554 def subscription subscription_name, skip_lookup: nil ensure_service! return Subscription.from_name subscription_name, service if skip_lookup grpc = service.get_subscription subscription_name Subscription.from_grpc grpc, service rescue Google::Cloud::NotFoundError nil end |
#subscriptions(token: nil, max: nil) ⇒ Array<Subscription> Also known as: find_subscriptions, list_subscriptions
Retrieves a list of subscription names for the given project.
597 598 599 600 601 602 |
# File 'lib/google/cloud/pubsub/topic.rb', line 597 def subscriptions token: nil, max: nil ensure_service! = { token: token, max: max } grpc = service.list_topics_subscriptions name, Subscription::List.from_topic_grpc grpc, service, name, max end |
#test_permissions(*permissions) ⇒ Array<Strings>
Tests the specified permissions against the Cloud IAM access control policy.
972 973 974 975 976 977 978 |
# File 'lib/google/cloud/pubsub/topic.rb', line 972 def * = Array().flatten = Array().flatten ensure_service! grpc = service. name, grpc. end |
#update_policy(new_policy) ⇒ Policy Also known as: policy=
Updates the Cloud IAM access control
policy for this topic. The policy should be read from #policy. See
Policy for an explanation of the policy
etag
property and how to modify policies.
You can also update the policy by passing a block to #policy, which will call this method internally after the block completes.
932 933 934 935 936 |
# File 'lib/google/cloud/pubsub/topic.rb', line 932 def update_policy new_policy ensure_service! grpc = service.set_topic_policy name, new_policy.to_grpc @policy = Policy.from_grpc grpc end |