As of January 1, 2020 this library no longer supports Python 2 on the latest released version. Library versions released prior to that date will continue to be available. For more information please visit Python 2 support on Google Cloud.

Source code for google.cloud.pubsub_v1.publisher.client

# Copyright 2019, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import copy
import logging
import os
import pkg_resources
import threading
import time

from google.api_core import gapic_v1
from google.auth.credentials import AnonymousCredentials
from google.oauth2 import service_account

from google.cloud.pubsub_v1 import _gapic
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher import futures
from google.cloud.pubsub_v1.publisher._batch import thread
from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer
from google.cloud.pubsub_v1.publisher.flow_controller import FlowController
from google.pubsub_v1 import types as gapic_types
from google.pubsub_v1.services.publisher import client as publisher_client

try:
    __version__ = pkg_resources.get_distribution("google-cloud-pubsub").version
except pkg_resources.DistributionNotFound:
    # Distribution might not be available if we are not running from within a
    # PIP package.
    __version__ = "0.0"

_LOGGER = logging.getLogger(__name__)

_BLACKLISTED_METHODS = (
    "publish",
    "from_service_account_file",
    "from_service_account_json",
)

_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()


[docs]@_gapic.add_methods(publisher_client.PublisherClient, blacklist=_BLACKLISTED_METHODS) class Client(object): """A publisher client for Google Cloud Pub/Sub. This creates an object that is capable of publishing messages. Generally, you can instantiate this client with no arguments, and you get sensible defaults. Args: batch_settings (~google.cloud.pubsub_v1.types.BatchSettings): The settings for batch publishing. publisher_options (~google.cloud.pubsub_v1.types.PublisherOptions): The options for the publisher client. Note that enabling message ordering will override the publish retry timeout to be infinite. kwargs (dict): Any additional arguments provided are sent as keyword arguments to the underlying :class:`~google.cloud.pubsub_v1.gapic.publisher_client.PublisherClient`. Generally you should not need to set additional keyword arguments. Regional endpoints can be set via ``client_options`` that takes a single key-value pair that defines the endpoint. Example: .. code-block:: python from google.cloud import pubsub_v1 publisher_client = pubsub_v1.PublisherClient( # Optional batch_settings = pubsub_v1.types.BatchSettings( max_bytes=1024, # One kilobyte max_latency=1, # One second ), # Optional publisher_options = pubsub_v1.types.PublisherOptions( enable_message_ordering=False, flow_control=pubsub_v1.types.PublishFlowControl( message_limit=2000, limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK, ), ), # Optional client_options = { "api_endpoint": REGIONAL_ENDPOINT } ) """ def __init__(self, batch_settings=(), publisher_options=(), **kwargs): assert ( type(batch_settings) is types.BatchSettings or len(batch_settings) == 0 ), "batch_settings must be of type BatchSettings or an empty tuple." assert ( type(publisher_options) is types.PublisherOptions or len(publisher_options) == 0 ), "publisher_options must be of type PublisherOptions or an empty tuple." # Sanity check: Is our goal to use the emulator? # If so, create a grpc insecure channel with the emulator host # as the target. if os.environ.get("PUBSUB_EMULATOR_HOST"): kwargs["client_options"] = { "api_endpoint": os.environ.get("PUBSUB_EMULATOR_HOST") } kwargs["credentials"] = AnonymousCredentials() # For a transient failure, retry publishing the message infinitely. self.publisher_options = types.PublisherOptions(*publisher_options) self._enable_message_ordering = self.publisher_options[0] # Add the metrics headers, and instantiate the underlying GAPIC # client. self.api = publisher_client.PublisherClient(**kwargs) self._target = self.api._transport._host self._batch_class = thread.Batch self.batch_settings = types.BatchSettings(*batch_settings) # The batches on the publisher client are responsible for holding # messages. One batch exists for each topic. self._batch_lock = self._batch_class.make_lock() # (topic, ordering_key) => sequencers object self._sequencers = {} self._is_stopped = False # Thread created to commit all sequencers after a timeout. self._commit_thread = None # The object controlling the message publishing flow self._flow_controller = FlowController(self.publisher_options.flow_control)
[docs] @classmethod def from_service_account_file(cls, filename, batch_settings=(), **kwargs): """Creates an instance of this client using the provided credentials file. Args: filename (str): The path to the service account private key json file. batch_settings (~google.cloud.pubsub_v1.types.BatchSettings): The settings for batch publishing. kwargs: Additional arguments to pass to the constructor. Returns: A Publisher :class:`~google.cloud.pubsub_v1.publisher.client.Client` instance that is the constructed client. """ credentials = service_account.Credentials.from_service_account_file(filename) kwargs["credentials"] = credentials return cls(batch_settings, **kwargs)
from_service_account_json = from_service_account_file @property def target(self): """Return the target (where the API is). Returns: str: The location of the API. """ return self._target def _get_or_create_sequencer(self, topic, ordering_key): """ Get an existing sequencer or create a new one given the (topic, ordering_key) pair. """ sequencer_key = (topic, ordering_key) sequencer = self._sequencers.get(sequencer_key) if sequencer is None: if ordering_key == "": sequencer = unordered_sequencer.UnorderedSequencer(self, topic) else: sequencer = ordered_sequencer.OrderedSequencer( self, topic, ordering_key ) self._sequencers[sequencer_key] = sequencer return sequencer
[docs] def resume_publish(self, topic, ordering_key): """ Resume publish on an ordering key that has had unrecoverable errors. Args: topic (str): The topic to publish messages to. ordering_key: A string that identifies related messages for which publish order should be respected. Raises: RuntimeError: If called after publisher has been stopped by a `stop()` method call. ValueError: If the topic/ordering key combination has not been seen before by this client. """ with self._batch_lock: if self._is_stopped: raise RuntimeError("Cannot resume publish on a stopped publisher.") if not self._enable_message_ordering: raise ValueError( "Cannot resume publish on a topic/ordering key if ordering " "is not enabled." ) sequencer_key = (topic, ordering_key) sequencer = self._sequencers.get(sequencer_key) if sequencer is None: _LOGGER.debug( "Error: The topic/ordering key combination has not " "been seen before." ) else: sequencer.unpause()
[docs] def publish( self, topic, data, ordering_key="", retry=gapic_v1.method.DEFAULT, timeout: gapic_types.TimeoutType = gapic_v1.method.DEFAULT, **attrs ): """Publish a single message. .. note:: Messages in Pub/Sub are blobs of bytes. They are *binary* data, not text. You must send data as a bytestring (``bytes`` in Python 3; ``str`` in Python 2), and this library will raise an exception if you send a text string. The reason that this is so important (and why we do not try to coerce for you) is because Pub/Sub is also platform independent and there is no way to know how to decode messages properly on the other side; therefore, encoding and decoding is a required exercise for the developer. Add the given message to this object; this will cause it to be published once the batch either has enough messages or a sufficient period of time has elapsed. This method may block if LimitExceededBehavior.BLOCK is used in the flow control settings. Example: >>> from google.cloud import pubsub_v1 >>> client = pubsub_v1.PublisherClient() >>> topic = client.topic_path('[PROJECT]', '[TOPIC]') >>> data = b'The rain in Wales falls mainly on the snails.' >>> response = client.publish(topic, data, username='guido') Args: topic (str): The topic to publish messages to. data (bytes): A bytestring representing the message body. This must be a bytestring. ordering_key: A string that identifies related messages for which publish order should be respected. Message ordering must be enabled for this client to use this feature. retry (Optional[google.api_core.retry.Retry]): Designation of what errors, if any, should be retried. If `ordering_key` is specified, the total retry deadline will be changed to "infinity". If given, it overides any retry passed into the client through the ``publisher_options`` argument. timeout (:class:`~.pubsub_v1.types.TimeoutType`): The timeout for the RPC request. Can be used to override any timeout passed in through ``publisher_options`` when instantiating the client. attrs (Mapping[str, str]): A dictionary of attributes to be sent as metadata. (These may be text strings or byte strings.) Returns: A :class:`~google.cloud.pubsub_v1.publisher.futures.Future` instance that conforms to Python Standard library's :class:`~concurrent.futures.Future` interface (but not an instance of that class). Raises: RuntimeError: If called after publisher has been stopped by a `stop()` method call. pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing the ``message`` would exceed the max size limit on the backend. """ # Sanity check: Is the data being sent as a bytestring? # If it is literally anything else, complain loudly about it. if not isinstance(data, bytes): raise TypeError( "Data being published to Pub/Sub must be sent as a bytestring." ) if not self._enable_message_ordering and ordering_key != "": raise ValueError( "Cannot publish a message with an ordering key when message " "ordering is not enabled." ) # Coerce all attributes to text strings. for k, v in copy.copy(attrs).items(): if isinstance(v, str): continue if isinstance(v, bytes): attrs[k] = v.decode("utf-8") continue raise TypeError( "All attributes being published to Pub/Sub must " "be sent as text strings." ) # Create the Pub/Sub message object. For performance reasons, the message # should be constructed by directly using the raw protobuf class, and only # then wrapping it into the higher-level PubsubMessage class. vanilla_pb = _raw_proto_pubbsub_message( data=data, ordering_key=ordering_key, attributes=attrs ) message = gapic_types.PubsubMessage.wrap(vanilla_pb) # Messages should go through flow control to prevent excessive # queuing on the client side (depending on the settings). try: self._flow_controller.add(message) except exceptions.FlowControlLimitError as exc: future = futures.Future() future.set_exception(exc) return future def on_publish_done(future): self._flow_controller.release(message) if retry is gapic_v1.method.DEFAULT: # if custom retry not passed in retry = self.publisher_options.retry if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in timeout = self.publisher_options.timeout with self._batch_lock: if self._is_stopped: raise RuntimeError("Cannot publish on a stopped publisher.") # Set retry timeout to "infinite" when message ordering is enabled. # Note that this then also impacts messages added with an empty # ordering key. if self._enable_message_ordering: if retry is gapic_v1.method.DEFAULT: # use the default retry for the publish GRPC method as a base transport = self.api._transport retry = transport._wrapped_methods[transport.publish]._retry retry = retry.with_deadline(2.0 ** 32) # Delegate the publishing to the sequencer. sequencer = self._get_or_create_sequencer(topic, ordering_key) future = sequencer.publish(message, retry=retry, timeout=timeout) future.add_done_callback(on_publish_done) # Create a timer thread if necessary to enforce the batching # timeout. self._ensure_commit_timer_runs_no_lock() return future
[docs] def ensure_cleanup_and_commit_timer_runs(self): """ Ensure a cleanup/commit timer thread is running. If a cleanup/commit timer thread is already running, this does nothing. """ with self._batch_lock: self._ensure_commit_timer_runs_no_lock()
def _ensure_commit_timer_runs_no_lock(self): """ Ensure a commit timer thread is running, without taking _batch_lock. _batch_lock must be held before calling this method. """ if not self._commit_thread and self.batch_settings.max_latency < float("inf"): self._start_commit_thread() def _start_commit_thread(self): """Start a new thread to actually wait and commit the sequencers.""" # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue. # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418 self._commit_thread = threading.Thread( name="Thread-PubSubBatchCommitter", target=self._wait_and_commit_sequencers, daemon=True, ) self._commit_thread.start() def _wait_and_commit_sequencers(self): """ Wait up to the batching timeout, and commit all sequencers. """ # Sleep for however long we should be waiting. time.sleep(self.batch_settings.max_latency) _LOGGER.debug("Commit thread is waking up") with self._batch_lock: if self._is_stopped: return self._commit_sequencers() self._commit_thread = None def _commit_sequencers(self): """ Clean up finished sequencers and commit the rest. """ finished_sequencer_keys = [ key for key, sequencer in self._sequencers.items() if sequencer.is_finished() ] for sequencer_key in finished_sequencer_keys: del self._sequencers[sequencer_key] for sequencer in self._sequencers.values(): sequencer.commit()
[docs] def stop(self): """Immediately publish all outstanding messages. Asynchronously sends all outstanding messages and prevents future calls to `publish()`. Method should be invoked prior to deleting this `Client()` object in order to ensure that no pending messages are lost. .. note:: This method is non-blocking. Use `Future()` objects returned by `publish()` to make sure all publish requests completed, either in success or error. Raises: RuntimeError: If called after publisher has been stopped by a `stop()` method call. """ with self._batch_lock: if self._is_stopped: raise RuntimeError("Cannot stop a publisher already stopped.") self._is_stopped = True for sequencer in self._sequencers.values(): sequencer.stop()
# Used only for testing. def _set_batch(self, topic, batch, ordering_key=""): sequencer = self._get_or_create_sequencer(topic, ordering_key) sequencer._set_batch(batch) # Used only for testing. def _set_batch_class(self, batch_class): self._batch_class = batch_class # Used only for testing. def _set_sequencer(self, topic, sequencer, ordering_key=""): sequencer_key = (topic, ordering_key) self._sequencers[sequencer_key] = sequencer