Subscriber Client¶
Subscribe operations are handled through the
SubscriberClient
class (aliased as
google.cloud.pubsublite.cloudpubsub.SubscriberClient
).
You should instantiate a subscriber client using a context manager:
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
with SubscriberClient() as subscriber_client:
# Use subscriber_client
When not using a context manager, you need to call
__enter__()
.
Receive messages¶
To receive messages, use the
subscribe()
method. This method requires
three positional arguments: a SubscriptionPath
object,
a callback function, and a FlowControlSettings
object.
Receiving messages looks like:
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
CloudRegion,
CloudZone,
SubscriptionPath,
DISABLED_FLOW_CONTROL,
)
project_number = 1122334455
cloud_region = "us-central1"
zone_id = "a"
subscription_id = "your-subscription-id"
location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)
with SubscriberClient() as subscriber_client:
streaming_pull_future = subscriber_client.subscribe(
subscription_path,
callback=callback,
per_partition_flow_control_settings=DISABLED_FLOW_CONTROL,
)
streaming_pull_future.result()
Subscriber Callbacks¶
Received messages are processed in a callback function. This callback function
only takes one argument of type google.cloud.pubsub_v1.subscriber.message.Message
.
After this message has been processed, the function should either call
ack()
to acknowledge the message or
nack()
to send a negative acknowledgement.
def callback(message):
message_data = message.data.decode("utf-8")
print(f"Received {message_data}.")
message.ack()
Flow Control Settings¶
Flow control settings are applied per partition. They control when to pause the message stream to a partition so the server temporarily stops sending out more messages (known as outstanding messages) from this partition.
You can configure flow control settings by setting the maximum number and size of outstanding messages. The message stream is paused when either condition is met.
from google.cloud.pubsublite.types import FlowControlSettings
flow_control_settings = FlowControlSettings(
# 1,000 outstanding messages. Must be >0.
messages_outstanding=1000,
# 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
bytes_outstanding=10 * 1024 * 1024,
)
You may also turn off flow control settings by setting it to
google.cloud.pubsublite.types.DISABLED_FLOW_CONTROL
.