On January 1, 2020 this library will no longer support Python 2 on the latest released version.
Previously released library versions will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.api_core.retry_async
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helpers for retrying coroutine functions with exponential back-off.
The :class:`AsyncRetry` decorator shares most functionality and behavior with
:class:`Retry`, but supports coroutine functions. Please refer to description
of :class:`Retry` for more details.
By default, this decorator will retry transient
API errors (see :func:`if_transient_error`). For example:
.. code-block:: python
@retry_async.AsyncRetry()
async def call_flaky_rpc():
return await client.flaky_rpc()
# Will retry flaky_rpc() if it raises transient API errors.
result = await call_flaky_rpc()
You can pass a custom predicate to retry on different exceptions, such as
waiting for an eventually consistent item to be available:
.. code-block:: python
@retry_async.AsyncRetry(predicate=retry_async.if_exception_type(exceptions.NotFound))
async def check_if_exists():
return await client.does_thing_exist()
is_available = await check_if_exists()
Some client library methods apply retry automatically. These methods can accept
a ``retry`` parameter that allows you to configure the behavior:
.. code-block:: python
my_retry = retry_async.AsyncRetry(deadline=60)
result = await client.some_method(retry=my_retry)
"""
import asyncio
import datetime
import functools
import logging
from google.api_core import datetime_helpers, exceptions
from google.api_core.retry import (exponential_sleep_generator, # noqa: F401
if_exception_type, if_transient_error)
_LOGGER = logging.getLogger(__name__)
_DEFAULT_INITIAL_DELAY = 1.0 # seconds
_DEFAULT_MAXIMUM_DELAY = 60.0 # seconds
_DEFAULT_DELAY_MULTIPLIER = 2.0
_DEFAULT_DEADLINE = 60.0 * 2.0 # seconds
[docs]async def retry_target(target, predicate, sleep_generator, deadline, on_error=None):
"""Call a function and retry if it fails.
This is the lowest-level retry helper. Generally, you'll use the
higher-level retry helper :class:`Retry`.
Args:
target(Callable): The function to call and retry. This must be a
nullary function - apply arguments with `functools.partial`.
predicate (Callable[Exception]): A callable used to determine if an
exception raised by the target should be considered retryable.
It should return True to retry or False otherwise.
sleep_generator (Iterable[float]): An infinite iterator that determines
how long to sleep between retries.
deadline (float): How long to keep retrying the target. The last sleep
period is shortened as necessary, so that the last retry runs at
``deadline`` (and not considerably beyond it).
on_error (Callable[Exception]): A function to call while processing a
retryable exception. Any error raised by this function will *not*
be caught.
Returns:
Any: the return value of the target function.
Raises:
google.api_core.RetryError: If the deadline is exceeded while retrying.
ValueError: If the sleep generator stops yielding values.
Exception: If the target raises a method that isn't retryable.
"""
deadline_dt = (datetime_helpers.utcnow() + datetime.timedelta(seconds=deadline)) if deadline else None
last_exc = None
for sleep in sleep_generator:
try:
if not deadline_dt:
return await target()
else:
return await asyncio.wait_for(
target(),
timeout=(deadline_dt - datetime_helpers.utcnow()).total_seconds()
)
# pylint: disable=broad-except
# This function explicitly must deal with broad exceptions.
except Exception as exc:
if not predicate(exc) and not isinstance(exc, asyncio.TimeoutError):
raise
last_exc = exc
if on_error is not None:
on_error(exc)
now = datetime_helpers.utcnow()
if deadline_dt:
if deadline_dt <= now:
# Chains the raising RetryError with the root cause error,
# which helps observability and debugability.
raise exceptions.RetryError(
"Deadline of {:.1f}s exceeded while calling {}".format(
deadline, target
),
last_exc,
) from last_exc
else:
time_to_deadline = (deadline_dt - now).total_seconds()
sleep = min(time_to_deadline, sleep)
_LOGGER.debug(
"Retrying due to {}, sleeping {:.1f}s ...".format(last_exc, sleep)
)
await asyncio.sleep(sleep)
raise ValueError("Sleep generator stopped yielding sleep values.")
[docs]class AsyncRetry:
"""Exponential retry decorator for async functions.
This class is a decorator used to add exponential back-off retry behavior
to an RPC call.
Although the default behavior is to retry transient API errors, a
different predicate can be provided to retry other exceptions.
Args:
predicate (Callable[Exception]): A callable that should return ``True``
if the given exception is retryable.
initial (float): The minimum a,out of time to delay in seconds. This
must be greater than 0.
maximum (float): The maximum amout of time to delay in seconds.
multiplier (float): The multiplier applied to the delay.
deadline (float): How long to keep retrying in seconds. The last sleep
period is shortened as necessary, so that the last retry runs at
``deadline`` (and not considerably beyond it).
on_error (Callable[Exception]): A function to call while processing
a retryable exception. Any error raised by this function will
*not* be caught.
"""
def __init__(
self,
predicate=if_transient_error,
initial=_DEFAULT_INITIAL_DELAY,
maximum=_DEFAULT_MAXIMUM_DELAY,
multiplier=_DEFAULT_DELAY_MULTIPLIER,
deadline=_DEFAULT_DEADLINE,
on_error=None,
):
self._predicate = predicate
self._initial = initial
self._multiplier = multiplier
self._maximum = maximum
self._deadline = deadline
self._on_error = on_error
[docs] def __call__(self, func, on_error=None):
"""Wrap a callable with retry behavior.
Args:
func (Callable): The callable to add retry behavior to.
on_error (Callable[Exception]): A function to call while processing
a retryable exception. Any error raised by this function will
*not* be caught.
Returns:
Callable: A callable that will invoke ``func`` with retry
behavior.
"""
if self._on_error is not None:
on_error = self._on_error
@functools.wraps(func)
async def retry_wrapped_func(*args, **kwargs):
"""A wrapper that calls target function with retry."""
target = functools.partial(func, *args, **kwargs)
sleep_generator = exponential_sleep_generator(
self._initial, self._maximum, multiplier=self._multiplier
)
return await retry_target(
target,
self._predicate,
sleep_generator,
self._deadline,
on_error=on_error,
)
return retry_wrapped_func
def _replace(self,
predicate=None,
initial=None,
maximum=None,
multiplier=None,
deadline=None,
on_error=None):
return AsyncRetry(
predicate=predicate or self._predicate,
initial=initial or self._initial,
maximum=maximum or self._maximum,
multiplier=multiplier or self._multiplier,
deadline=deadline or self._deadline,
on_error=on_error or self._on_error,
)
[docs] def with_deadline(self, deadline):
"""Return a copy of this retry with the given deadline.
Args:
deadline (float): How long to keep retrying.
Returns:
AsyncRetry: A new retry instance with the given deadline.
"""
return self._replace(deadline=deadline)
[docs] def with_predicate(self, predicate):
"""Return a copy of this retry with the given predicate.
Args:
predicate (Callable[Exception]): A callable that should return
``True`` if the given exception is retryable.
Returns:
AsyncRetry: A new retry instance with the given predicate.
"""
return self._replace(predicate=predicate)
[docs] def with_delay(self, initial=None, maximum=None, multiplier=None):
"""Return a copy of this retry with the given delay options.
Args:
initial (float): The minimum amout of time to delay. This must
be greater than 0.
maximum (float): The maximum amout of time to delay.
multiplier (float): The multiplier applied to the delay.
Returns:
AsyncRetry: A new retry instance with the given predicate.
"""
return self._replace(initial=initial, maximum=maximum, multiplier=multiplier)
def __str__(self):
return (
"<AsyncRetry predicate={}, initial={:.1f}, maximum={:.1f}, "
"multiplier={:.1f}, deadline={:.1f}, on_error={}>".format(
self._predicate,
self._initial,
self._maximum,
self._multiplier,
self._deadline,
self._on_error,
)
)