As of January 1, 2020 this library no longer supports Python 2 on the latest released version. Library versions released prior to that date will continue to be available. For more information please visit Python 2 support on Google Cloud.

Publishing Messages

Publishing messages is handled through the Client class (aliased as google.cloud.pubsub.PublisherClient). This class provides methods to create topics, and (most importantly) a publish() method that publishes messages to Pub/Sub.

Instantiating a publishing client is straightforward:

from google.cloud import pubsub
publish_client = pubsub.PublisherClient()

Publish a Message

To publish a message, use the publish() method. This method accepts two positional arguments: the topic to publish to, and the body of the message. It also accepts arbitrary keyword arguments, which are passed along as attributes of the message.

The topic is passed along as a string; all topics have the canonical form of projects/{project_name}/topics/{topic_name}.

Therefore, a very basic publishing call looks like:

topic = 'projects/{project}/topics/{topic}'
future = publish_client.publish(topic, b'This is my message.')

Note

The message data in Pub/Sub is an opaque blob of bytes, and as such, you must send a bytes object in Python 3 (str object in Python 2). If you send a text string (str in Python 3, unicode in Python 2), the method will raise TypeError.

The reason it works this way is because there is no reasonable guarantee that the same language or environment is being used by the subscriber, and so it is the responsibility of the publisher to properly encode the payload.

If you want to include attributes, simply add keyword arguments:

topic = 'projects/{project}/topics/{topic}'
future = publish_client.publish(topic, b'This is my message.', foo='bar')

Batching

Whenever you publish a message, the publisher will automatically batch the messages over a small time window to avoid making too many separate requests to the service. This helps increase throughput.

Note

By default, this uses threading, and you will need to be in an environment with threading enabled. It is possible to provide an alternative batch class that uses another concurrency strategy.

The way that this works is that on the first message that you send, a new batch is created automatically. For every subsequent message, if there is already a valid batch that is still accepting messages, then that batch is used. When the batch is created, it begins a countdown that publishes the batch once sufficient time has elapsed (by default, this is 0.05 seconds).

If you need different batching settings, simply provide a BatchSettings object when you instantiate the Client:

from google.cloud import pubsub
from google.cloud.pubsub import types

client = pubsub.PublisherClient(
    batch_settings=types.BatchSettings(max_messages=500),
)

Pub/Sub accepts a maximum of 1,000 messages in a batch, and the size of a batch can not exceed 10 megabytes.

Futures

Every call to publish() returns an instance of Future.

Note

The returned future conforms for the most part to the interface of the standard library’s Future, but might not be usable in all cases which expect that exact implementaton.

You can use this to ensure that the publish succeeded:

# The .result() method will block until the future is complete.
# If there is an error, it will raise an exception.
future = client.publish(topic, b'My awesome message.')
message_id = future.result()

You can also attach a callback to the future:

# Callbacks receive the future as their only argument, as defined in
# the Future interface.
def callback(future):
    message_id = future.result()
    do_something_with(message_id)

# The callback is added once you get the future. If you add a callback
# and the future is already done, it will simply be executed immediately.
future = client.publish(topic, b'My awesome message.')
future.add_done_callback(callback)

Publish Flow Control

If publishing large amounts of messages or very large messages in quick succession, some of the publish requests might time out, especially if the bandwidth available is limited. To mitigate this the client can be configured with custom PublishFlowControl settings.

You can configure the maximum desired number of messages and their maximum total size, as well as the action that should be taken when the threshold is reached.

from google.cloud import pubsub_v1

client = pubsub_v1.PublisherClient(
    publisher_options=pubsub_v1.types.PublisherOptions(
        flow_control=pubsub_v1.types.PublishFlowControl(
            message_limit=500,
            byte_limit=2 * 1024 * 1024,
            limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK,
        ),
    ),
)

The action to be taken on overflow can be one of the following:

  • IGNORE (default): Ignore the overflow and continue publishing the messages as normal.

  • ERROR: Raise FlowControlLimitError and reject the message.

  • BLOCK: Temporarily block in the publish() method until there is enough capacity available.

API Reference