Class: Google::Cloud::PubSub::Project

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/project.rb

Overview

Project

Represents the project that pubsub messages are pushed to and pulled from. Topic is a named resource to which messages are sent by publishers. Subscription is a named resource representing the stream of messages from a single, specific topic, to be delivered to the subscribing application. Message is a combination of data and attributes that a publisher sends to a topic and is eventually delivered to subscribers.

See Google::Cloud#pubsub

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
topic.publish "task completed"

Instance Method Summary collapse

Instance Method Details

#create_schema(schema_id, type, definition, project: nil) ⇒ Google::Cloud::PubSub::Schema Also known as: new_schema

Creates a new schema.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

definition = "..."
schema = pubsub.create_schema "my-schema", :avro, definition
schema.name #=> "projects/my-project/schemas/my-schema"

Parameters:

  • schema_id (String)

    The ID to use for the schema, which will become the final component of the schema's resource name. Required.

    The schema ID (relative name) must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and it must not start with goog.

  • type (String, Symbol)

    The type of the schema. Required. Possible values are case-insensitive and include:

    • PROTOCOL_BUFFER - A Protocol Buffer schema definition.
    • AVRO - An Avro schema definition.
  • definition (String)

    The definition of the schema. Required. This should be a string representing the full definition of the schema that is a valid schema definition of the type specified in type.

  • project (String) (defaults to: nil)

    If the schema belongs to a project other than the one currently connected to, the alternate project ID can be specified here. Optional.

Returns:



569
570
571
572
573
574
# File 'lib/google/cloud/pubsub/project.rb', line 569

def create_schema schema_id, type, definition, project: nil
  ensure_service!
  type = type.to_s.upcase
  grpc = service.create_schema schema_id, type, definition, project: project
  Schema.from_grpc grpc, service
end

#create_topic(topic_name, labels: nil, kms_key: nil, persistence_regions: nil, async: nil, schema_name: nil, message_encoding: nil, retention: nil, ingestion_data_source_settings: nil) ⇒ Google::Cloud::PubSub::Topic Also known as: new_topic

Creates a new topic.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.create_topic "my-topic"

Parameters:

  • topic_name (String)

    Name of a topic. Required. The value can be a simple topic ID (relative name), in which case the current project ID will be supplied, or a fully-qualified topic name in the form projects/{project_id}/topics/{topic_id}.

    The topic ID (relative name) must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and it must not start with goog.

  • labels (Hash) (defaults to: nil)

    A hash of user-provided labels associated with the topic. You can use these to organize and group your topics. Label keys and values can be no longer than 63 characters, can only contain lowercase letters, numeric characters, underscores and dashes. International characters are allowed. Label values are optional. Label keys must start with a letter and each label in the list must have a different key. See Creating and Managing Labels.

  • kms_key (String) (defaults to: nil)

    The Cloud KMS encryption key that will be used to protect access to messages published on this topic. Optional. For example: projects/a/locations/b/keyRings/c/cryptoKeys/d

  • persistence_regions (Array<String>) (defaults to: nil)

    The list of GCP region IDs where messages that are published to the topic may be persisted in storage. Optional.

  • async (Hash) (defaults to: nil)

    A hash of values to configure the topic's AsyncPublisher that is created when Topic#publish_async is called. Optional.

    Hash keys and values may include the following:

    • :max_bytes (Integer) The maximum size of messages to be collected before the batch is published. Default is 1,000,000 (1MB).
    • :max_messages (Integer) The maximum number of messages to be collected before the batch is published. Default is 100.
    • :interval (Numeric) The number of seconds to collect messages before the batch is published. Default is 0.01.
    • :threads (Hash) The number of threads to create to handle concurrent calls by the publisher:
      • :publish (Integer) The number of threads used to publish messages. Default is 2.
      • :callback (Integer) The number of threads to handle the published messages' callbacks. Default is 4.
    • :compress (Boolean) The flag that enables publisher compression. Default is false
    • :compression_bytes_threshold (Integer) The number of bytes above which compress should be enabled. Default is 240.
    • :flow_control (Hash) The client flow control settings for message publishing:
      • :message_limit (Integer) The maximum number of messages allowed to wait to be published. Default is 10 * max_messages.
      • :byte_limit (Integer) The maximum total size of messages allowed to wait to be published. Default is 10 * max_bytes.
      • :limit_exceeded_behavior (Symbol) The action to take when publish flow control limits are exceeded. Possible values include: :ignore - Flow control is disabled. :error - Calls to Topic#publish_async will raise FlowControlLimitError when publish flow control limits are exceeded. :block - Calls to Topic#publish_async will block until capacity is available when publish flow control limits are exceeded. The default value is :ignore.
  • schema_name (String) (defaults to: nil)

    The name of the schema that messages published should be validated against. Optional. The value can be a simple schema ID (relative name), in which case the current project ID will be supplied, or a fully-qualified schema name in the form projects/{project_id}/schemas/{schema_id}. If provided, message_encoding must also be provided.

  • message_encoding (String, Symbol) (defaults to: nil)

    The encoding of messages validated against the schema identified by schema_name. Optional. Values include:

    • JSON - JSON encoding.
    • BINARY - Binary encoding, as defined by the schema type. For some schema types, binary encoding may not be available.
  • retention (Numeric) (defaults to: nil)

    Indicates the minimum number of seconds to retain a message after it is published to the topic. If this field is set, messages published to the topic within the retention number of seconds are always available to subscribers. For instance, it allows any attached subscription to seek to a timestamp that is up to retention number of seconds in the past. If this field is not set, message retention is controlled by settings on individual subscriptions. Cannot be less than 600 (10 minutes) or more than 604,800 (7 days). @param ingestion_data_source_settings [::Google::Cloud::PubSub::V1::IngestionDataSourceSettings, ::Hash] Optional. Settings for ingestion from a data source into this topic.

Returns:



277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
# File 'lib/google/cloud/pubsub/project.rb', line 277

def create_topic topic_name,
                 labels: nil,
                 kms_key: nil,
                 persistence_regions: nil,
                 async: nil,
                 schema_name: nil,
                 message_encoding: nil,
                 retention: nil,
                 ingestion_data_source_settings: nil
  ensure_service!
  grpc = service.create_topic topic_name,
                              labels: labels,
                              kms_key_name: kms_key,
                              persistence_regions: persistence_regions,
                              schema_name: schema_name,
                              message_encoding: message_encoding,
                              retention: retention,
                              ingestion_data_source_settings: ingestion_data_source_settings
  Topic.from_grpc grpc, service, async: async
end

#project_idObject Also known as: project

The Pub/Sub project connected to.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new(
  project_id: "my-project",
  credentials: "/path/to/keyfile.json"
)

pubsub.project_id #=> "my-project"


74
75
76
# File 'lib/google/cloud/pubsub/project.rb', line 74

def project_id
  service.project
end

#schema(schema_name, view: nil, project: nil, skip_lookup: nil) ⇒ Google::Cloud::PubSub::Schema? Also known as: get_schema, find_schema

Retrieves schema by name.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

schema = pubsub.schema "my-schema"
schema.name #=> "projects/my-project/schemas/my-schema"
schema.type #=> :PROTOCOL_BUFFER
schema.definition # The schema definition

Skip the lookup against the service with skip_lookup:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

# No API call is made to retrieve the schema information.
# The default project is used in the name.
schema = pubsub.schema "my-schema", skip_lookup: true
schema.name #=> "projects/my-project/schemas/my-schema"
schema.type #=> nil
schema.definition #=> nil

Omit the schema definition with view: :basic:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

schema = pubsub.schema "my-schema", view: :basic
schema.name #=> "projects/my-project/schemas/my-schema"
schema.type #=> :PROTOCOL_BUFFER
schema.definition #=> nil

Parameters:

  • schema_name (String)

    Name of a schema. The value can be a simple schema ID, in which case the current project ID will be supplied, or a fully-qualified schema name in the form projects/{project_id}/schemas/{schema_id}.

  • view (Symbol, String, nil) (defaults to: nil)

    Possible values:

    • BASIC - Include the name and type of the schema, but not the definition.
    • FULL - Include all Schema object fields.

    The default value is FULL.

  • project (String) (defaults to: nil)

    If the schema belongs to a project other than the one currently connected to, the alternate project ID can be specified here. Not used if a fully-qualified schema name is provided for schema_name.

  • skip_lookup (Boolean) (defaults to: nil)

    Optionally create a Schema object without verifying the schema resource exists on the Pub/Sub service. Calls made on this object will raise errors if the service resource does not exist. Default is false.

Returns:



522
523
524
525
526
527
528
529
530
531
# File 'lib/google/cloud/pubsub/project.rb', line 522

def schema schema_name, view: nil, project: nil, skip_lookup: nil
  ensure_service!
  options = { project: project }
  return Schema.from_name schema_name, view, service, options if skip_lookup
  view ||= :FULL
  grpc = service.get_schema schema_name, view, options
  Schema.from_grpc grpc, service
rescue Google::Cloud::NotFoundError
  nil
end

#schemas(view: nil, token: nil, max: nil) ⇒ Array<Google::Cloud::PubSub::Schema> Also known as: find_schemas, list_schemas

Retrieves a list of schemas for the given project.

The default value is FULL.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

schemas = pubsub.schemas
schemas.each do |schema|
  puts schema.name
end

Retrieve all schemas: (See Schema::List#all)

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

schemas = pubsub.schemas
schemas.all do |schema|
  puts schema.name
end

Parameters:

  • view (String, Symbol, nil) (defaults to: nil)

    The set of fields to return in the response. Possible values:

    • BASIC - Include the name and type of the schema, but not the definition.
    • FULL - Include all Schema object fields.
  • token (String) (defaults to: nil)

    A previously-returned page token representing part of the larger set of results to view.

  • max (Integer) (defaults to: nil)

    Maximum number of schemas to return.

Returns:



613
614
615
616
617
618
619
# File 'lib/google/cloud/pubsub/project.rb', line 613

def schemas view: nil, token: nil, max: nil
  ensure_service!
  view ||= :FULL
  options = { token: token, max: max }
  grpc = service.list_schemas view, options
  Schema::List.from_grpc grpc, service, view, max
end

#snapshots(token: nil, max: nil) ⇒ Array<Google::Cloud::PubSub::Snapshot> Also known as: find_snapshots, list_snapshots

Retrieves a list of snapshots for the given project.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

snapshots = pubsub.snapshots
snapshots.each do |snapshot|
  puts snapshot.name
end

Retrieve all snapshots: (See Snapshot::List#all)

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

snapshots = pubsub.snapshots
snapshots.all do |snapshot|
  puts snapshot.name
end

Parameters:

  • token (String) (defaults to: nil)

    A previously-returned page token representing part of the larger set of results to view.

  • max (Integer) (defaults to: nil)

    Maximum number of snapshots to return.

Returns:



457
458
459
460
461
462
# File 'lib/google/cloud/pubsub/project.rb', line 457

def snapshots token: nil, max: nil
  ensure_service!
  options = { token: token, max: max }
  grpc = service.list_snapshots options
  Snapshot::List.from_grpc grpc, service, max
end

#subscription(subscription_name, project: nil, skip_lookup: nil) ⇒ Google::Cloud::PubSub::Subscription? Also known as: get_subscription, find_subscription

Retrieves subscription by name.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-sub"
sub.name #=> "projects/my-project/subscriptions/my-sub"

Skip the lookup against the service with skip_lookup:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

# No API call is made to retrieve the subscription information.
sub = pubsub.subscription "my-sub", skip_lookup: true
sub.name #=> "projects/my-project/subscriptions/my-sub"

Parameters:

  • subscription_name (String)

    Name of a subscription. The value can be a simple subscription ID, in which case the current project ID will be supplied, or a fully-qualified subscription name in the form projects/{project_id}/subscriptions/{subscription_id}.

  • project (String) (defaults to: nil)

    If the subscription belongs to a project other than the one currently connected to, the alternate project ID can be specified here. Not used if a fully-qualified subscription name is provided for subscription_name.

  • skip_lookup (Boolean) (defaults to: nil)

    Optionally create a Subscription object without verifying the subscription resource exists on the Pub/Sub service. Calls made on this object will raise errors if the service resource does not exist. Default is false.

Returns:



375
376
377
378
379
380
381
382
383
# File 'lib/google/cloud/pubsub/project.rb', line 375

def subscription subscription_name, project: nil, skip_lookup: nil
  ensure_service!
  options = { project: project }
  return Subscription.from_name subscription_name, service, options if skip_lookup
  grpc = service.get_subscription subscription_name, options
  Subscription.from_grpc grpc, service
rescue Google::Cloud::NotFoundError
  nil
end

#subscriptions(token: nil, max: nil) ⇒ Array<Google::Cloud::PubSub::Subscription> Also known as: find_subscriptions, list_subscriptions

Retrieves a list of subscriptions for the given project.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subs = pubsub.subscriptions
subs.each do |sub|
  puts sub.name
end

Retrieve all subscriptions: (See Subscription::List#all)

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

subs = pubsub.subscriptions
subs.all do |sub|
  puts sub.name
end

Parameters:

  • token (String) (defaults to: nil)

    A previously-returned page token representing part of the larger set of results to view.

  • max (Integer) (defaults to: nil)

    Maximum number of subscriptions to return.

Returns:



417
418
419
420
421
422
# File 'lib/google/cloud/pubsub/project.rb', line 417

def subscriptions token: nil, max: nil
  ensure_service!
  options = { token: token, max: max }
  grpc = service.list_subscriptions options
  Subscription::List.from_grpc grpc, service, max
end

#topic(topic_name, project: nil, skip_lookup: nil, async: nil) ⇒ Google::Cloud::PubSub::Topic? Also known as: get_topic, find_topic

Retrieves topic by name.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "existing-topic"

By default nil will be returned if topic does not exist.

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "non-existing-topic" # nil

Create topic in a different project with the project flag.

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "another-topic", project: "another-project"

Skip the lookup against the service with skip_lookup:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "another-topic", skip_lookup: true

Configuring AsyncPublisher to increase concurrent callbacks:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new
topic = pubsub.topic "my-topic",
                     async: { threads: { callback: 16 } }

topic.publish_async "task completed" do |result|
  if result.succeeded?
    log_publish_success result.data
  else
    log_publish_failure result.data, result.error
  end
end

topic.async_publisher.stop!

Parameters:

  • topic_name (String)

    Name of a topic. The value can be a simple topic ID (relative name), in which case the current project ID will be supplied, or a fully-qualified topic name in the form projects/{project_id}/topics/{topic_id}.

  • project (String) (defaults to: nil)

    If the topic belongs to a project other than the one currently connected to, the alternate project ID can be specified here. Optional. Not used if a fully-qualified topic name is provided for topic_name.

  • skip_lookup (Boolean) (defaults to: nil)

    Optionally create a Topic object without verifying the topic resource exists on the Pub/Sub service. Calls made on this object will raise errors if the topic resource does not exist. Default is false. Optional.

  • async (Hash) (defaults to: nil)

    A hash of values to configure the topic's AsyncPublisher that is created when Topic#publish_async is called. Optional.

    Hash keys and values may include the following:

    • :max_bytes (Integer) The maximum size of messages to be collected before the batch is published. Default is 1,000,000 (1MB).
    • :max_messages (Integer) The maximum number of messages to be collected before the batch is published. Default is 100.
    • :interval (Numeric) The number of seconds to collect messages before the batch is published. Default is 0.01.
    • :threads (Hash) The number of threads to create to handle concurrent calls by the publisher:
      • :publish (Integer) The number of threads used to publish messages. Default is 2.
      • :callback (Integer) The number of threads to handle the published messages' callbacks. Default is 4.
    • :compress (Boolean) The flag that enables publisher compression. Default is false
    • :compression_bytes_threshold (Integer) The number of bytes above which compress should be enabled. Default is 240.
    • :flow_control (Hash) The client flow control settings for message publishing:
      • :message_limit (Integer) The maximum number of messages allowed to wait to be published. Default is 10 * max_messages.
      • :byte_limit (Integer) The maximum total size of messages allowed to wait to be published. Default is 10 * max_bytes.
      • :limit_exceeded_behavior (Symbol) The action to take when publish flow control limits are exceeded. Possible values include: :ignore - Flow control is disabled. :error - Calls to Topic#publish_async will raise FlowControlLimitError when publish flow control limits are exceeded. :block - Calls to Topic#publish_async will block until capacity is available when publish flow control limits are exceeded. The default value is :ignore.

Returns:



176
177
178
179
180
181
182
183
184
# File 'lib/google/cloud/pubsub/project.rb', line 176

def topic topic_name, project: nil, skip_lookup: nil, async: nil
  ensure_service!
  options = { project: project, async: async }
  return Topic.from_name topic_name, service, options if skip_lookup
  grpc = service.get_topic topic_name, options
  Topic.from_grpc grpc, service, async: async
rescue Google::Cloud::NotFoundError
  nil
end

#topics(token: nil, max: nil) ⇒ Array<Google::Cloud::PubSub::Topic> Also known as: find_topics, list_topics

Retrieves a list of topics for the given project.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topics = pubsub.topics
topics.each do |topic|
  puts topic.name
end

Retrieve all topics: (See Topic::List#all)

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topics = pubsub.topics
topics.all do |topic|
  puts topic.name
end

Parameters:

  • token (String) (defaults to: nil)

    The token value returned by the last call to topics; indicates that this is a continuation of a call, and that the system should return the next page of data.

  • max (Integer) (defaults to: nil)

    Maximum number of topics to return.

Returns:



330
331
332
333
334
335
# File 'lib/google/cloud/pubsub/project.rb', line 330

def topics token: nil, max: nil
  ensure_service!
  options = { token: token, max: max }
  grpc = service.list_topics options
  Topic::List.from_grpc grpc, service, max
end

#universe_domainString

The universe domain the client is connected to

Returns:

  • (String)


84
85
86
# File 'lib/google/cloud/pubsub/project.rb', line 84

def universe_domain
  service.universe_domain
end

#valid_schema?(type, definition, project: nil) ⇒ Boolean Also known as: validate_schema

Validates a schema type and definition.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

definition = "..."
pubsub.validate_schema :avro, definition #=> true

Parameters:

  • type (String, Symbol)

    The type of the schema. Required. Possible values are case-insensitive and include:

    • PROTOCOL_BUFFER - A Protocol Buffer schema definition.
    • AVRO - An Avro schema definition.
  • definition (String)

    The definition of the schema. Required. This should be a string representing the full definition of the schema that is a valid schema definition of the type specified in type.

  • project (String) (defaults to: nil)

    If the schema belongs to a project other than the one currently connected to, the alternate project ID can be specified here. Optional.

Returns:

  • (Boolean)

    true if the schema is valid, false otherwise.



648
649
650
651
652
653
654
655
# File 'lib/google/cloud/pubsub/project.rb', line 648

def valid_schema? type, definition, project: nil
  ensure_service!
  type = type.to_s.upcase
  service.validate_schema type, definition, project: project # return type is empty
  true
rescue Google::Cloud::InvalidArgumentError
  false
end