As of January 1, 2020 this library no longer supports Python 2 on the latest released version. Library versions released prior to that date will continue to be available. For more information please visit Python 2 support on Google Cloud.

Source code for google.cloud.firestore_v1.base_transaction

# Copyright 2017 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Helpers for applying Google Cloud Firestore changes in a transaction."""
from __future__ import annotations

from typing import (
    TYPE_CHECKING,
    Any,
    AsyncGenerator,
    Coroutine,
    Generator,
    NoReturn,
    Optional,
    Union,
)

from google.api_core import retry as retries

from google.cloud.firestore_v1 import types

# Types needed only for Type Hints
if TYPE_CHECKING:  # pragma: NO COVER
    from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
    from google.cloud.firestore_v1.document import DocumentSnapshot
    from google.cloud.firestore_v1.query_profile import ExplainOptions
    from google.cloud.firestore_v1.stream_generator import StreamGenerator


_CANT_BEGIN: str
_CANT_COMMIT: str
_CANT_RETRY_READ_ONLY: str
_CANT_ROLLBACK: str
_EXCEED_ATTEMPTS_TEMPLATE: str
_INITIAL_SLEEP: float
_MAX_SLEEP: float
_MISSING_ID_TEMPLATE: str
_MULTIPLIER: float
_WRITE_READ_ONLY: str


MAX_ATTEMPTS = 5
"""int: Default number of transaction attempts (with retries)."""
_CANT_BEGIN: str = "The transaction has already begun. Current transaction ID: {!r}."
_MISSING_ID_TEMPLATE: str = "The transaction has no transaction ID, so it cannot be {}."
_CANT_ROLLBACK: str = _MISSING_ID_TEMPLATE.format("rolled back")
_CANT_COMMIT: str = _MISSING_ID_TEMPLATE.format("committed")
_WRITE_READ_ONLY: str = "Cannot perform write operation in read-only transaction."
_EXCEED_ATTEMPTS_TEMPLATE: str = "Failed to commit transaction in {:d} attempts."
_CANT_RETRY_READ_ONLY: str = "Only read-write transactions can be retried."


[docs]class BaseTransaction(object): """Accumulate read-and-write operations to be sent in a transaction. Args: max_attempts (Optional[int]): The maximum number of attempts for the transaction (i.e. allowing retries). Defaults to :attr:`~google.cloud.firestore_v1.transaction.MAX_ATTEMPTS`. read_only (Optional[bool]): Flag indicating if the transaction should be read-only or should allow writes. Defaults to :data:`False`. """ def __init__(self, max_attempts=MAX_ATTEMPTS, read_only=False) -> None: self._max_attempts = max_attempts self._read_only = read_only self._id = None def _add_write_pbs(self, write_pbs) -> NoReturn: raise NotImplementedError def _options_protobuf( self, retry_id: Union[bytes, None] ) -> Optional[types.common.TransactionOptions]: """Convert the current object to protobuf. The ``retry_id`` value is used when retrying a transaction that failed (e.g. due to contention). It is intended to be the "first" transaction that failed (i.e. if multiple retries are needed). Args: retry_id (Union[bytes, NoneType]): Transaction ID of a transaction to be retried. Returns: Optional[google.cloud.firestore_v1.types.TransactionOptions]: The protobuf ``TransactionOptions`` if ``read_only==True`` or if there is a transaction ID to be retried, else :data:`None`. Raises: ValueError: If ``retry_id`` is not :data:`None` but the transaction is read-only. """ if retry_id is not None: if self._read_only: raise ValueError(_CANT_RETRY_READ_ONLY) return types.TransactionOptions( read_write=types.TransactionOptions.ReadWrite( retry_transaction=retry_id ) ) elif self._read_only: return types.TransactionOptions( read_only=types.TransactionOptions.ReadOnly() ) else: return None @property def in_progress(self): """Determine if this transaction has already begun. Returns: bool: Indicates if the transaction has started. """ return self._id is not None @property def id(self): """Get the current transaction ID. Returns: Optional[bytes]: The transaction ID (or :data:`None` if the current transaction is not in progress). """ return self._id def _clean_up(self) -> None: """Clean up the instance after :meth:`_rollback`` or :meth:`_commit``. This intended to occur on success or failure of the associated RPCs. """ self._write_pbs = [] self._id = None def _begin(self, retry_id=None) -> NoReturn: raise NotImplementedError def _rollback(self) -> NoReturn: raise NotImplementedError def _commit(self) -> Union[list, Coroutine[Any, Any, list]]: raise NotImplementedError def get_all( self, references: list, retry: retries.Retry = None, timeout: float = None, ) -> ( Generator[DocumentSnapshot, Any, None] | Coroutine[Any, Any, AsyncGenerator[DocumentSnapshot, Any]] ): raise NotImplementedError def get( self, ref_or_query, retry: retries.Retry = None, timeout: float = None, *, explain_options: Optional[ExplainOptions] = None, ) -> ( StreamGenerator[DocumentSnapshot] | Generator[DocumentSnapshot, Any, None] | Coroutine[Any, Any, AsyncGenerator[DocumentSnapshot, Any]] | Coroutine[Any, Any, AsyncStreamGenerator[DocumentSnapshot]] ): raise NotImplementedError
class _BaseTransactional(object): """Provide a callable object to use as a transactional decorater. This is surfaced via :func:`~google.cloud.firestore_v1.transaction.transactional`. Args: to_wrap (Callable[[:class:`~google.cloud.firestore_v1.transaction.Transaction`, ...], Any]): A callable that should be run (and retried) in a transaction. """ def __init__(self, to_wrap) -> None: self.to_wrap = to_wrap self.current_id = None """Optional[bytes]: The current transaction ID.""" self.retry_id = None """Optional[bytes]: The ID of the first attempted transaction.""" def _reset(self) -> None: """Unset the transaction IDs.""" self.current_id = None self.retry_id = None def _pre_commit(self, transaction, *args, **kwargs) -> NoReturn: raise NotImplementedError def __call__(self, transaction, *args, **kwargs): raise NotImplementedError