Publishing Messages¶
Publishing messages is handled through the
Client
class (aliased as
google.cloud.pubsub.PublisherClient
). This class provides methods to
create topics, and (most importantly) a
publish()
method that publishes
messages to Pub/Sub.
Instantiating a publishing client is straightforward:
from google.cloud import pubsub
publish_client = pubsub.PublisherClient()
Publish a Message¶
To publish a message, use the
publish()
method. This method accepts
two positional arguments: the topic to publish to, and the body of the message.
It also accepts arbitrary keyword arguments, which are passed along as
attributes of the message.
The topic is passed along as a string; all topics have the canonical form of
projects/{project_name}/topics/{topic_name}
.
Therefore, a very basic publishing call looks like:
topic = 'projects/{project}/topics/{topic}'
future = publish_client.publish(topic, b'This is my message.')
Note
The message data in Pub/Sub is an opaque blob of bytes, and as such, you
must send a bytes
object in Python 3 (str
object in Python 2).
If you send a text string (str
in Python 3, unicode
in Python 2),
the method will raise TypeError
.
The reason it works this way is because there is no reasonable guarantee that the same language or environment is being used by the subscriber, and so it is the responsibility of the publisher to properly encode the payload.
If you want to include attributes, simply add keyword arguments:
topic = 'projects/{project}/topics/{topic}'
future = publish_client.publish(topic, b'This is my message.', foo='bar')
Batching¶
Whenever you publish a message, the publisher will automatically batch the messages over a small time window to avoid making too many separate requests to the service. This helps increase throughput.
Note
By default, this uses threading
, and you will need to be in an
environment with threading enabled. It is possible to provide an
alternative batch class that uses another concurrency strategy.
The way that this works is that on the first message that you send, a new batch is created automatically. For every subsequent message, if there is already a valid batch that is still accepting messages, then that batch is used. When the batch is created, it begins a countdown that publishes the batch once sufficient time has elapsed (by default, this is 0.05 seconds).
If you need different batching settings, simply provide a
BatchSettings
object when you instantiate the
Client
:
from google.cloud import pubsub
from google.cloud.pubsub import types
client = pubsub.PublisherClient(
batch_settings=types.BatchSettings(max_messages=500),
)
Pub/Sub accepts a maximum of 1,000 messages in a batch, and the size of a batch can not exceed 10 megabytes.
Futures¶
Every call to publish()
returns
an instance of Future
.
Note
The returned future conforms for the most part to the interface of
the standard library’s Future
, but might not
be usable in all cases which expect that exact implementaton.
You can use this to ensure that the publish succeeded:
# The .result() method will block until the future is complete.
# If there is an error, it will raise an exception.
future = client.publish(topic, b'My awesome message.')
message_id = future.result()
You can also attach a callback to the future:
# Callbacks receive the future as their only argument, as defined in
# the Future interface.
def callback(future):
message_id = future.result()
do_something_with(message_id)
# The callback is added once you get the future. If you add a callback
# and the future is already done, it will simply be executed immediately.
future = client.publish(topic, b'My awesome message.')
future.add_done_callback(callback)
Publish Flow Control¶
If publishing large amounts of messages or very large messages in quick
succession, some of the publish requests might time out, especially if the
bandwidth available is limited. To mitigate this the client can be
configured with custom PublishFlowControl
settings.
You can configure the maximum desired number of messages and their maximum total size, as well as the action that should be taken when the threshold is reached.
from google.cloud import pubsub_v1
client = pubsub_v1.PublisherClient(
publisher_options=pubsub_v1.types.PublisherOptions(
flow_control=pubsub_v1.types.PublishFlowControl(
message_limit=500,
byte_limit=2 * 1024 * 1024,
limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK,
),
),
)
The action to be taken on overflow can be one of the following:
IGNORE
(default): Ignore the overflow and continue publishing the messages as normal.ERROR
: RaiseFlowControlLimitError
and reject the message.BLOCK
: Temporarily block in thepublish()
method until there is enough capacity available.