Class: Google::Cloud::PubSub::ReceivedMessage

Inherits:
Object
  • Object
show all
Defined in:
lib/google/cloud/pubsub/received_message.rb

Overview

ReceivedMessage

Represents a Pub/Sub Message that can be acknowledged or delayed.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do |received_message|
  puts received_message.message.data
  received_message.acknowledge!
end

# Start background threads that will call the block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Instance Method Summary collapse

Instance Method Details

#ack_idObject

The acknowledgment ID for the message.



62
63
64
# File 'lib/google/cloud/pubsub/received_message.rb', line 62

def ack_id
  @grpc.ack_id
end

#acknowledge! {|callback| ... } ⇒ Object Also known as: ack!

Acknowledges receipt of the message.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do |received_message|
  puts received_message.message.data

  received_message.acknowledge! do |result|
    puts result.status
  end
end

# Start background threads that will call block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!
require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do |received_message|
  puts received_message.message.data

  received_message.acknowledge!
end

# Start background threads that will call block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Yields:

  • (callback)

    The block to be called when reject operation is done.

Yield Parameters:



205
206
207
208
209
210
211
212
213
# File 'lib/google/cloud/pubsub/received_message.rb', line 205

def acknowledge! &block
  ensure_subscription!
  if subscription.respond_to?(:exactly_once_delivery_enabled) && subscription.exactly_once_delivery_enabled
    subscription.acknowledge ack_id, &block
  else
    subscription.acknowledge ack_id
    yield AcknowledgeResult.new(AcknowledgeResult::SUCCESS) if block_given?
  end
end

#attributesObject

Optional attributes for the received message.



120
121
122
# File 'lib/google/cloud/pubsub/received_message.rb', line 120

def attributes
  message.attributes
end

#dataObject

The received message payload. This data is a list of bytes encoded as ASCII-8BIT.



114
115
116
# File 'lib/google/cloud/pubsub/received_message.rb', line 114

def data
  message.data
end

#delivery_attemptInteger?

Returns the delivery attempt counter for the message. If a dead letter policy is not set on the subscription, this will be nil. See Topic#subscribe, Subscription#dead_letter_topic= and Subscription#dead_letter_max_delivery_attempts=.

The delivery attempt counter is 1 + (the sum of number of NACKs and number of ack_deadline exceeds) for the message.

A NACK is any call to ModifyAckDeadline with a 0 deadline. An ack_deadline exceeds event is whenever a message is not acknowledged within ack_deadline. Note that ack_deadline is initially Subscription.ackDeadlineSeconds, but may get extended automatically by the client library.

The first delivery of a given message will have this value as 1. The value is calculated at best effort and is approximate.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

topic = pubsub.topic "my-topic"
dead_letter_topic = pubsub.topic "my-dead-letter-topic", skip_lookup: true
sub = topic.subscribe "my-topic-sub",
                      dead_letter_topic: dead_letter_topic,
                      dead_letter_max_delivery_attempts: 10

subscriber = sub.listen do |received_message|
  puts received_message.message.delivery_attempt
end

Returns:

  • (Integer, nil)

    A delivery attempt value of 1 or greater, or nil if a dead letter policy is not set on the subscription.



99
100
101
102
# File 'lib/google/cloud/pubsub/received_message.rb', line 99

def delivery_attempt
  return nil if @grpc.delivery_attempt && @grpc.delivery_attempt < 1
  @grpc.delivery_attempt
end

#messageObject Also known as: msg

The received message.



106
107
108
# File 'lib/google/cloud/pubsub/received_message.rb', line 106

def message
  Message.from_grpc @grpc.message
end

#message_idObject Also known as: msg_id

The ID of the received message, assigned by the server at publication time. Guaranteed to be unique within the topic.



127
128
129
# File 'lib/google/cloud/pubsub/received_message.rb', line 127

def message_id
  message.message_id
end

#modify_ack_deadline!(new_deadline) {|callback| ... } ⇒ Object

Modifies the acknowledge deadline for the message.

This indicates that more time is needed to process the message, or to make the message available for redelivery.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do |received_message|
  puts received_message.message.data

  # Delay for 2 minutes
  received_message.modify_ack_deadline! 120 do |result|
    puts result.status
  end
end

# Start background threads that will call block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!
require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do |received_message|
  puts received_message.message.data

  # Delay for 2 minutes
  received_message.modify_ack_deadline! 120
end

# Start background threads that will call block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Parameters:

  • new_deadline (Integer)

    The new ack deadline in seconds from the time this request is sent to the Pub/Sub system. Must be >= 0. For example, if the value is 10, the new ack deadline will expire 10 seconds after the call is made. Specifying 0 may immediately make the message available for another pull request.

Yields:

  • (callback)

    The block to be called when reject operation is done.

Yield Parameters:



271
272
273
274
275
276
277
278
279
# File 'lib/google/cloud/pubsub/received_message.rb', line 271

def modify_ack_deadline! new_deadline, &block
  ensure_subscription!
  if subscription.respond_to?(:exactly_once_delivery_enabled) && subscription.exactly_once_delivery_enabled
    subscription.modify_ack_deadline new_deadline, ack_id, &block
  else
    subscription.modify_ack_deadline new_deadline, ack_id
    yield AcknowledgeResult.new(AcknowledgeResult::SUCCESS) if block_given?
  end
end

#ordering_keyString

Identifies related messages for which publish order should be respected.

Google Cloud Pub/Sub ordering keys provide the ability to ensure related messages are sent to subscribers in the order in which they were published. Messages can be tagged with an ordering key, a string that identifies related messages for which publish order should be respected. The service guarantees that, for a given ordering key and publisher, messages are sent to subscribers in the order in which they were published. Ordering does not require sacrificing high throughput or scalability, as the service automatically distributes messages for different ordering keys across subscribers.

See Topic#publish_async and Subscription#listen.

Returns:

  • (String)


150
151
152
# File 'lib/google/cloud/pubsub/received_message.rb', line 150

def ordering_key
  message.ordering_key
end

#published_atObject Also known as: publish_time

The time at which the message was published.



156
157
158
# File 'lib/google/cloud/pubsub/received_message.rb', line 156

def published_at
  message.published_at
end

#reject! {|callback| ... } ⇒ Object Also known as: nack!, ignore!

Resets the acknowledge deadline for the message without acknowledging it.

This will make the message available for redelivery.

Examples:

require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do |received_message|
  puts received_message.message.data

  # Release message back to the API.
  received_message.reject! do |result|
    puts result.status
  end
end

# Start background threads that will call block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!
require "google/cloud/pubsub"

pubsub = Google::Cloud::PubSub.new

sub = pubsub.subscription "my-topic-sub"
subscriber = sub.listen do |received_message|
  puts received_message.message.data

  # Release message back to the API.
  received_message.reject!
end

# Start background threads that will call block passed to listen.
subscriber.start

# Shut down the subscriber when ready to stop receiving messages.
subscriber.stop!

Yields:

  • (callback)

    The block to be called when reject operation is done.

Yield Parameters:



330
331
332
# File 'lib/google/cloud/pubsub/received_message.rb', line 330

def reject! &block
  modify_ack_deadline! 0, &block
end