As of January 1, 2020 this library no longer supports Python 2 on the latest released version. Library versions released prior to that date will continue to be available. For more information please visit Python 2 support on Google Cloud.

Source code for google.cloud.pubsub_v1.publisher.client

# Copyright 2019, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import copy
import logging
import os
import threading
import time
import typing
from typing import Any, Dict, Optional, Sequence, Tuple, Type, Union
import warnings
import sys

from google.api_core import gapic_v1
from google.auth.credentials import AnonymousCredentials  # type: ignore
from google.oauth2 import service_account  # type: ignore

from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher import futures
from google.cloud.pubsub_v1.publisher._batch import thread
from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer
from google.cloud.pubsub_v1.publisher.flow_controller import FlowController
from google.pubsub_v1 import gapic_version as package_version
from google.pubsub_v1 import types as gapic_types
from google.pubsub_v1.services.publisher import client as publisher_client
from google.cloud.pubsub_v1.open_telemetry.publish_message_wrapper import (
    PublishMessageWrapper,
)

__version__ = package_version.__version__

if typing.TYPE_CHECKING:  # pragma: NO COVER
    from google.cloud import pubsub_v1
    from google.cloud.pubsub_v1.publisher import _batch
    from google.pubsub_v1.services.publisher.client import OptionalRetry
    from google.pubsub_v1.types import pubsub as pubsub_types


_LOGGER = logging.getLogger(__name__)


_raw_proto_pubbsub_message = gapic_types.PubsubMessage.pb()

SequencerType = Union[
    ordered_sequencer.OrderedSequencer, unordered_sequencer.UnorderedSequencer
]


[docs]class Client(publisher_client.PublisherClient): """A publisher client for Google Cloud Pub/Sub. This creates an object that is capable of publishing messages. Generally, you can instantiate this client with no arguments, and you get sensible defaults. Args: batch_settings: The settings for batch publishing. publisher_options: The options for the publisher client. Note that enabling message ordering will override the publish retry timeout to be infinite. kwargs: Any additional arguments provided are sent as keyword arguments to the underlying :class:`~google.cloud.pubsub_v1.gapic.publisher_client.PublisherClient`. Generally you should not need to set additional keyword arguments. Regional endpoints can be set via ``client_options`` that takes a single key-value pair that defines the endpoint. Example: .. code-block:: python from google.cloud import pubsub_v1 publisher_client = pubsub_v1.PublisherClient( # Optional batch_settings = pubsub_v1.types.BatchSettings( max_bytes=1024, # One kilobyte max_latency=1, # One second ), # Optional publisher_options = pubsub_v1.types.PublisherOptions( enable_message_ordering=False, flow_control=pubsub_v1.types.PublishFlowControl( message_limit=2000, limit_exceeded_behavior=pubsub_v1.types.LimitExceededBehavior.BLOCK, ), ), # Optional client_options = { "api_endpoint": REGIONAL_ENDPOINT } ) """ def __init__( self, batch_settings: Union[types.BatchSettings, Sequence] = (), publisher_options: Union[types.PublisherOptions, Sequence] = (), **kwargs: Any, ): assert ( type(batch_settings) is types.BatchSettings or len(batch_settings) == 0 ), "batch_settings must be of type BatchSettings or an empty sequence." assert ( type(publisher_options) is types.PublisherOptions or len(publisher_options) == 0 ), "publisher_options must be of type PublisherOptions or an empty sequence." # Sanity check: Is our goal to use the emulator? # If so, create a grpc insecure channel with the emulator host # as the target. if os.environ.get("PUBSUB_EMULATOR_HOST"): kwargs["client_options"] = { "api_endpoint": os.environ.get("PUBSUB_EMULATOR_HOST") } kwargs["credentials"] = AnonymousCredentials() # For a transient failure, retry publishing the message infinitely. self.publisher_options = types.PublisherOptions(*publisher_options) self._enable_message_ordering = self.publisher_options[0] # Add the metrics headers, and instantiate the underlying GAPIC # client. super().__init__(**kwargs) self._target = self._transport._host self._batch_class = thread.Batch self.batch_settings = types.BatchSettings(*batch_settings) # The batches on the publisher client are responsible for holding # messages. One batch exists for each topic. self._batch_lock = self._batch_class.make_lock() # (topic, ordering_key) => sequencers object self._sequencers: Dict[Tuple[str, str], SequencerType] = {} self._is_stopped = False # Thread created to commit all sequencers after a timeout. self._commit_thread: Optional[threading.Thread] = None # The object controlling the message publishing flow self._flow_controller = FlowController(self.publisher_options.flow_control) self._open_telemetry_enabled = ( self.publisher_options.enable_open_telemetry_tracing ) # OpenTelemetry features used by the library are not supported in Python versions <= 3.7. # Refer https://github.com/open-telemetry/opentelemetry-python/issues/3993#issuecomment-2211976389 if ( self.publisher_options.enable_open_telemetry_tracing and sys.version_info.major == 3 and sys.version_info.minor < 8 ): warnings.warn( message="Open Telemetry for Python version 3.7 or lower is not supported. Disabling Open Telemetry tracing.", category=RuntimeWarning, ) self._open_telemetry_enabled = False
[docs] @classmethod def from_service_account_file( # type: ignore[override] cls, filename: str, batch_settings: Union[types.BatchSettings, Sequence] = (), **kwargs: Any, ) -> "Client": """Creates an instance of this client using the provided credentials file. Args: filename: The path to the service account private key JSON file. batch_settings: The settings for batch publishing. kwargs: Additional arguments to pass to the constructor. Returns: A Publisher instance that is the constructed client. """ credentials = service_account.Credentials.from_service_account_file(filename) kwargs["credentials"] = credentials return cls(batch_settings, **kwargs)
from_service_account_json = from_service_account_file # type: ignore[assignment] @property def target(self) -> str: """Return the target (where the API is). Returns: The location of the API. """ return self._target @property def api(self): """The underlying gapic API client. .. versionchanged:: 2.10.0 Instead of a GAPIC ``PublisherClient`` client instance, this property is a proxy object to it with the same interface. .. deprecated:: 2.10.0 Use the GAPIC methods and properties on the client instance directly instead of through the :attr:`api` attribute. """ msg = ( 'The "api" property only exists for backward compatibility, access its ' 'attributes directly thorugh the client instance (e.g. "client.foo" ' 'instead of "client.api.foo").' ) warnings.warn(msg, category=DeprecationWarning) return super() @property def open_telemetry_enabled(self) -> bool: return self._open_telemetry_enabled def _get_or_create_sequencer(self, topic: str, ordering_key: str) -> SequencerType: """Get an existing sequencer or create a new one given the (topic, ordering_key) pair. """ sequencer_key = (topic, ordering_key) sequencer = self._sequencers.get(sequencer_key) if sequencer is None: if ordering_key == "": sequencer = unordered_sequencer.UnorderedSequencer(self, topic) else: sequencer = ordered_sequencer.OrderedSequencer( self, topic, ordering_key ) self._sequencers[sequencer_key] = sequencer return sequencer
[docs] def resume_publish(self, topic: str, ordering_key: str) -> None: """Resume publish on an ordering key that has had unrecoverable errors. Args: topic: The topic to publish messages to. ordering_key: A string that identifies related messages for which publish order should be respected. Raises: RuntimeError: If called after publisher has been stopped by a `stop()` method call. ValueError: If the topic/ordering key combination has not been seen before by this client. """ with self._batch_lock: if self._is_stopped: raise RuntimeError("Cannot resume publish on a stopped publisher.") if not self._enable_message_ordering: raise ValueError( "Cannot resume publish on a topic/ordering key if ordering " "is not enabled." ) sequencer_key = (topic, ordering_key) sequencer = self._sequencers.get(sequencer_key) if sequencer is None: _LOGGER.debug( "Error: The topic/ordering key combination has not " "been seen before." ) else: sequencer.unpause()
def _gapic_publish(self, *args, **kwargs) -> "pubsub_types.PublishResponse": """Call the GAPIC public API directly.""" return super().publish(*args, **kwargs)
[docs] def publish( # type: ignore[override] self, topic: str, data: bytes, ordering_key: str = "", retry: "OptionalRetry" = gapic_v1.method.DEFAULT, timeout: "types.OptionalTimeout" = gapic_v1.method.DEFAULT, **attrs: Union[bytes, str], ) -> "pubsub_v1.publisher.futures.Future": """Publish a single message. .. note:: Messages in Pub/Sub are blobs of bytes. They are *binary* data, not text. You must send data as a bytestring (``bytes`` in Python 3; ``str`` in Python 2), and this library will raise an exception if you send a text string. The reason that this is so important (and why we do not try to coerce for you) is because Pub/Sub is also platform independent and there is no way to know how to decode messages properly on the other side; therefore, encoding and decoding is a required exercise for the developer. Add the given message to this object; this will cause it to be published once the batch either has enough messages or a sufficient period of time has elapsed. This method may block if LimitExceededBehavior.BLOCK is used in the flow control settings. Example: >>> from google.cloud import pubsub_v1 >>> client = pubsub_v1.PublisherClient() >>> topic = client.topic_path('[PROJECT]', '[TOPIC]') >>> data = b'The rain in Wales falls mainly on the snails.' >>> response = client.publish(topic, data, username='guido') Args: topic: The topic to publish messages to. data: A bytestring representing the message body. This must be a bytestring. ordering_key: A string that identifies related messages for which publish order should be respected. Message ordering must be enabled for this client to use this feature. retry: Designation of what errors, if any, should be retried. If `ordering_key` is specified, the total retry deadline will be changed to "infinity". If given, it overides any retry passed into the client through the ``publisher_options`` argument. timeout: The timeout for the RPC request. Can be used to override any timeout passed in through ``publisher_options`` when instantiating the client. attrs: A dictionary of attributes to be sent as metadata. (These may be text strings or byte strings.) Returns: A :class:`~google.cloud.pubsub_v1.publisher.futures.Future` instance that conforms to Python Standard library's :class:`~concurrent.futures.Future` interface (but not an instance of that class). Raises: RuntimeError: If called after publisher has been stopped by a `stop()` method call. pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing the ``message`` would exceed the max size limit on the backend. """ # Sanity check: Is the data being sent as a bytestring? # If it is literally anything else, complain loudly about it. if not isinstance(data, bytes): raise TypeError( "Data being published to Pub/Sub must be sent as a bytestring." ) if not self._enable_message_ordering and ordering_key != "": raise ValueError( "Cannot publish a message with an ordering key when message " "ordering is not enabled." ) # Coerce all attributes to text strings. for k, v in copy.copy(attrs).items(): if isinstance(v, str): continue if isinstance(v, bytes): attrs[k] = v.decode("utf-8") continue raise TypeError( "All attributes being published to Pub/Sub must " "be sent as text strings." ) # Create the Pub/Sub message object. For performance reasons, the message # should be constructed by directly using the raw protobuf class, and only # then wrapping it into the higher-level PubsubMessage class. vanilla_pb = _raw_proto_pubbsub_message( data=data, ordering_key=ordering_key, attributes=attrs ) message = gapic_types.PubsubMessage.wrap(vanilla_pb) wrapper: PublishMessageWrapper = PublishMessageWrapper(message) if self._open_telemetry_enabled: wrapper.start_create_span(topic=topic, ordering_key=ordering_key) # Messages should go through flow control to prevent excessive # queuing on the client side (depending on the settings). try: if self._open_telemetry_enabled: if wrapper: wrapper.start_publisher_flow_control_span() else: # pragma: NO COVER warnings.warn( message="PubSubMessageWrapper is None. Not starting publisher flow control span.", category=RuntimeWarning, ) self._flow_controller.add(message) if self._open_telemetry_enabled: if wrapper: wrapper.end_publisher_flow_control_span() else: # pragma: NO COVER warnings.warn( message="PubSubMessageWrapper is None. Not ending publisher flow control span.", category=RuntimeWarning, ) except exceptions.FlowControlLimitError as exc: if self._open_telemetry_enabled: if wrapper: wrapper.end_publisher_flow_control_span(exc) wrapper.end_create_span(exc) else: # pragma: NO COVER warnings.warn( message="PubSubMessageWrapper is None. Not ending publisher create and flow control spans on FlowControlLimitError.", category=RuntimeWarning, ) future = futures.Future() future.set_exception(exc) return future def on_publish_done(future): self._flow_controller.release(message) if retry is gapic_v1.method.DEFAULT: # if custom retry not passed in retry = self.publisher_options.retry if timeout is gapic_v1.method.DEFAULT: # if custom timeout not passed in timeout = self.publisher_options.timeout if self._open_telemetry_enabled: if wrapper: wrapper.start_publisher_batching_span() else: # pragma: NO COVER warnings.warn( message="PublishMessageWrapper is None. Hence, not starting publisher batching span", category=RuntimeWarning, ) with self._batch_lock: try: if self._is_stopped: raise RuntimeError("Cannot publish on a stopped publisher.") # Set retry timeout to "infinite" when message ordering is enabled. # Note that this then also impacts messages added with an empty # ordering key. if self._enable_message_ordering: if retry is gapic_v1.method.DEFAULT: # use the default retry for the publish GRPC method as a base transport = self._transport base_retry = transport._wrapped_methods[ transport.publish ]._retry retry = base_retry.with_deadline(2.0**32) # timeout needs to be overridden and set to infinite in # addition to the retry deadline since both determine # the duration for which retries are attempted. timeout = 2.0**32 elif retry is not None: retry = retry.with_deadline(2.0**32) timeout = 2.0**32 # Delegate the publishing to the sequencer. sequencer = self._get_or_create_sequencer(topic, ordering_key) future = sequencer.publish( wrapper=wrapper, retry=retry, timeout=timeout ) future.add_done_callback(on_publish_done) except BaseException as be: # Exceptions can be thrown when attempting to add messages to # the batch. If they're thrown, record them in publisher # batching and create span, end the spans and bubble the # exception up. if self._open_telemetry_enabled: if wrapper: wrapper.end_publisher_batching_span(be) wrapper.end_create_span(be) else: # pragma: NO COVER warnings.warn( message="PublishMessageWrapper is None. Hence, not recording exception and ending publisher batching span and create span", category=RuntimeWarning, ) raise be if self._open_telemetry_enabled: if wrapper: wrapper.end_publisher_batching_span() else: # pragma: NO COVER warnings.warn( message="PublishMessageWrapper is None. Hence, not ending publisher batching span", category=RuntimeWarning, ) # Create a timer thread if necessary to enforce the batching # timeout. self._ensure_commit_timer_runs_no_lock() return future
[docs] def ensure_cleanup_and_commit_timer_runs(self) -> None: """Ensure a cleanup/commit timer thread is running. If a cleanup/commit timer thread is already running, this does nothing. """ with self._batch_lock: self._ensure_commit_timer_runs_no_lock()
def _ensure_commit_timer_runs_no_lock(self) -> None: """Ensure a commit timer thread is running, without taking _batch_lock. _batch_lock must be held before calling this method. """ if not self._commit_thread and self.batch_settings.max_latency < float("inf"): self._start_commit_thread() def _start_commit_thread(self) -> None: """Start a new thread to actually wait and commit the sequencers.""" # NOTE: If the thread is *not* a daemon, a memory leak exists due to a CPython issue. # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-829910303 # https://github.com/googleapis/python-pubsub/issues/395#issuecomment-830092418 self._commit_thread = threading.Thread( name="Thread-PubSubBatchCommitter", target=self._wait_and_commit_sequencers, daemon=True, ) self._commit_thread.start() def _wait_and_commit_sequencers(self) -> None: """Wait up to the batching timeout, and commit all sequencers.""" # Sleep for however long we should be waiting. time.sleep(self.batch_settings.max_latency) _LOGGER.debug("Commit thread is waking up") with self._batch_lock: if self._is_stopped: return self._commit_sequencers() self._commit_thread = None def _commit_sequencers(self) -> None: """Clean up finished sequencers and commit the rest.""" finished_sequencer_keys = [ key for key, sequencer in self._sequencers.items() if sequencer.is_finished() ] for sequencer_key in finished_sequencer_keys: del self._sequencers[sequencer_key] for sequencer in self._sequencers.values(): sequencer.commit()
[docs] def stop(self) -> None: """Immediately publish all outstanding messages. Asynchronously sends all outstanding messages and prevents future calls to `publish()`. Method should be invoked prior to deleting this `Client()` object in order to ensure that no pending messages are lost. .. note:: This method is non-blocking. Use `Future()` objects returned by `publish()` to make sure all publish requests completed, either in success or error. Raises: RuntimeError: If called after publisher has been stopped by a `stop()` method call. """ with self._batch_lock: if self._is_stopped: raise RuntimeError("Cannot stop a publisher already stopped.") self._is_stopped = True for sequencer in self._sequencers.values(): sequencer.stop()
# Used only for testing. def _set_batch( self, topic: str, batch: "_batch.thread.Batch", ordering_key: str = "" ) -> None: sequencer = self._get_or_create_sequencer(topic, ordering_key) sequencer._set_batch(batch) # Used only for testing. def _set_batch_class(self, batch_class: Type) -> None: self._batch_class = batch_class # Used only for testing. def _set_sequencer( self, topic: str, sequencer: SequencerType, ordering_key: str = "" ) -> None: sequencer_key = (topic, ordering_key) self._sequencers[sequencer_key] = sequencer