As of January 1, 2020 this library no longer supports Python 2 on the latest released version. Library versions released prior to that date will continue to be available. For more information please visit Python 2 support on Google Cloud.

Source code for google.api_core.future.polling

# Copyright 2017, Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Abstract and helper bases for Future implementations."""

import abc
import concurrent.futures

from google.api_core import exceptions
from google.api_core import retry
from google.api_core.future import _helpers
from google.api_core.future import base


class _OperationNotComplete(Exception):
    """Private exception used for polling via retry."""

    pass


RETRY_PREDICATE = retry.if_exception_type(
    _OperationNotComplete,
    exceptions.TooManyRequests,
    exceptions.InternalServerError,
    exceptions.BadGateway,
)
DEFAULT_RETRY = retry.Retry(predicate=RETRY_PREDICATE)


[docs]class PollingFuture(base.Future): """A Future that needs to poll some service to check its status. The :meth:`done` method should be implemented by subclasses. The polling behavior will repeatedly call ``done`` until it returns True. .. note: Privacy here is intended to prevent the final class from overexposing, not to prevent subclasses from accessing methods. Args: retry (google.api_core.retry.Retry): The retry configuration used when polling. This can be used to control how often :meth:`done` is polled. Regardless of the retry's ``deadline``, it will be overridden by the ``timeout`` argument to :meth:`result`. """ def __init__(self, retry=DEFAULT_RETRY): super(PollingFuture, self).__init__() self._retry = retry self._result = None self._exception = None self._result_set = False """bool: Set to True when the result has been set via set_result or set_exception.""" self._polling_thread = None self._done_callbacks = []
[docs] @abc.abstractmethod def done(self, retry=DEFAULT_RETRY): """Checks to see if the operation is complete. Args: retry (google.api_core.retry.Retry): (Optional) How to retry the RPC. Returns: bool: True if the operation is complete, False otherwise. """ # pylint: disable=redundant-returns-doc, missing-raises-doc raise NotImplementedError()
def _done_or_raise(self, retry=DEFAULT_RETRY): """Check if the future is done and raise if it's not.""" kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry} if not self.done(**kwargs): raise _OperationNotComplete()
[docs] def running(self): """True if the operation is currently running.""" return not self.done()
def _blocking_poll(self, timeout=None, retry=DEFAULT_RETRY): """Poll and wait for the Future to be resolved. Args: timeout (int): How long (in seconds) to wait for the operation to complete. If None, wait indefinitely. """ if self._result_set: return retry_ = self._retry.with_deadline(timeout) try: kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry} retry_(self._done_or_raise)(**kwargs) except exceptions.RetryError: raise concurrent.futures.TimeoutError( "Operation did not complete within the designated " "timeout." )
[docs] def result(self, timeout=None, retry=DEFAULT_RETRY): """Get the result of the operation, blocking if necessary. Args: timeout (int): How long (in seconds) to wait for the operation to complete. If None, wait indefinitely. Returns: google.protobuf.Message: The Operation's result. Raises: google.api_core.GoogleAPICallError: If the operation errors or if the timeout is reached before the operation completes. """ kwargs = {} if retry is DEFAULT_RETRY else {"retry": retry} self._blocking_poll(timeout=timeout, **kwargs) if self._exception is not None: # pylint: disable=raising-bad-type # Pylint doesn't recognize that this is valid in this case. raise self._exception return self._result
[docs] def exception(self, timeout=None): """Get the exception from the operation, blocking if necessary. Args: timeout (int): How long to wait for the operation to complete. If None, wait indefinitely. Returns: Optional[google.api_core.GoogleAPICallError]: The operation's error. """ self._blocking_poll(timeout=timeout) return self._exception
[docs] def add_done_callback(self, fn): """Add a callback to be executed when the operation is complete. If the operation is not already complete, this will start a helper thread to poll for the status of the operation in the background. Args: fn (Callable[Future]): The callback to execute when the operation is complete. """ if self._result_set: _helpers.safe_invoke_callback(fn, self) return self._done_callbacks.append(fn) if self._polling_thread is None: # The polling thread will exit on its own as soon as the operation # is done. self._polling_thread = _helpers.start_daemon_thread( target=self._blocking_poll )
def _invoke_callbacks(self, *args, **kwargs): """Invoke all done callbacks.""" for callback in self._done_callbacks: _helpers.safe_invoke_callback(callback, *args, **kwargs)
[docs] def set_result(self, result): """Set the Future's result.""" self._result = result self._result_set = True self._invoke_callbacks(self)
[docs] def set_exception(self, exception): """Set the Future's exception.""" self._exception = exception self._result_set = True self._invoke_callbacks(self)