As of January 1, 2020 this library no longer supports Python 2 on the latest released version.
Library versions released prior to that date will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.cloud.spanner_v1.transaction
# Copyright 2016 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Spanner read-write transaction support."""
import functools
import threading
from google.protobuf.struct_pb2 import Struct
from google.cloud.spanner_v1._helpers import (
_make_value_pb,
_merge_query_options,
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
_retry,
_check_rst_stream_error,
)
from google.cloud.spanner_v1 import CommitRequest
from google.cloud.spanner_v1 import ExecuteBatchDmlRequest
from google.cloud.spanner_v1 import ExecuteSqlRequest
from google.cloud.spanner_v1 import TransactionSelector
from google.cloud.spanner_v1 import TransactionOptions
from google.cloud.spanner_v1.snapshot import _SnapshotBase
from google.cloud.spanner_v1.batch import _BatchBase
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1 import RequestOptions
from google.api_core import gapic_v1
from google.api_core.exceptions import InternalServerError
from dataclasses import dataclass
from typing import Any
[docs]class Transaction(_SnapshotBase, _BatchBase):
"""Implement read-write transaction semantics for a session.
:type session: :class:`~google.cloud.spanner_v1.session.Session`
:param session: the session used to perform the commit
:raises ValueError: if session has an existing transaction
"""
committed = None
"""Timestamp at which the transaction was successfully committed."""
rolled_back = False
commit_stats = None
_multi_use = True
_execute_sql_count = 0
_lock = threading.Lock()
_read_only = False
exclude_txn_from_change_streams = False
def __init__(self, session):
if session._transaction is not None:
raise ValueError("Session has existing transaction.")
super(Transaction, self).__init__(session)
def _check_state(self):
"""Helper for :meth:`commit` et al.
:raises: :exc:`ValueError` if the object's state is invalid for making
API requests.
"""
if self.committed is not None:
raise ValueError("Transaction is already committed")
if self.rolled_back:
raise ValueError("Transaction is already rolled back")
def _make_txn_selector(self):
"""Helper for :meth:`read`.
:rtype:
:class:`~.transaction_pb2.TransactionSelector`
:returns: a selector configured for read-write transaction semantics.
"""
self._check_state()
if self._transaction_id is None:
return TransactionSelector(
begin=TransactionOptions(
read_write=TransactionOptions.ReadWrite(),
exclude_txn_from_change_streams=self.exclude_txn_from_change_streams,
)
)
else:
return TransactionSelector(id=self._transaction_id)
def _execute_request(
self, method, request, trace_name=None, session=None, attributes=None
):
"""Helper method to execute request after fetching transaction selector.
:type method: callable
:param method: function returning iterator
:type request: proto
:param request: request proto to call the method with
"""
transaction = self._make_txn_selector()
request.transaction = transaction
with trace_call(trace_name, session, attributes):
method = functools.partial(method, request=request)
response = _retry(
method,
allowed_exceptions={InternalServerError: _check_rst_stream_error},
)
return response
[docs] def begin(self):
"""Begin a transaction on the database.
:rtype: bytes
:returns: the ID for the newly-begun transaction.
:raises ValueError:
if the transaction is already begun, committed, or rolled back.
"""
if self._transaction_id is not None:
raise ValueError("Transaction already begun")
if self.committed is not None:
raise ValueError("Transaction already committed")
if self.rolled_back:
raise ValueError("Transaction is already rolled back")
database = self._session._database
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
if database._route_to_leader_enabled:
metadata.append(
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
)
txn_options = TransactionOptions(
read_write=TransactionOptions.ReadWrite(),
exclude_txn_from_change_streams=self.exclude_txn_from_change_streams,
)
with trace_call("CloudSpanner.BeginTransaction", self._session):
method = functools.partial(
api.begin_transaction,
session=self._session.name,
options=txn_options,
metadata=metadata,
)
response = _retry(
method,
allowed_exceptions={InternalServerError: _check_rst_stream_error},
)
self._transaction_id = response.id
return self._transaction_id
[docs] def rollback(self):
"""Roll back a transaction on the database."""
self._check_state()
if self._transaction_id is not None:
database = self._session._database
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
if database._route_to_leader_enabled:
metadata.append(
_metadata_with_leader_aware_routing(
database._route_to_leader_enabled
)
)
with trace_call("CloudSpanner.Rollback", self._session):
method = functools.partial(
api.rollback,
session=self._session.name,
transaction_id=self._transaction_id,
metadata=metadata,
)
_retry(
method,
allowed_exceptions={InternalServerError: _check_rst_stream_error},
)
self.rolled_back = True
del self._session._transaction
[docs] def commit(
self, return_commit_stats=False, request_options=None, max_commit_delay=None
):
"""Commit mutations to the database.
:type return_commit_stats: bool
:param return_commit_stats:
If true, the response will return commit stats which can be accessed though commit_stats.
:type request_options:
:class:`google.cloud.spanner_v1.types.RequestOptions`
:param request_options:
(Optional) Common options for this request.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
:type max_commit_delay: :class:`datetime.timedelta`
:param max_commit_delay:
(Optional) The amount of latency this request is willing to incur
in order to improve throughput.
:class:`~google.cloud.spanner_v1.types.MaxCommitDelay`.
:rtype: datetime
:returns: timestamp of the committed changes.
:raises ValueError: if there are no mutations to commit.
"""
self._check_state()
if self._transaction_id is None and len(self._mutations) > 0:
self.begin()
elif self._transaction_id is None and len(self._mutations) == 0:
raise ValueError("Transaction is not begun")
database = self._session._database
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
if database._route_to_leader_enabled:
metadata.append(
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
)
trace_attributes = {"num_mutations": len(self._mutations)}
if request_options is None:
request_options = RequestOptions()
elif type(request_options) is dict:
request_options = RequestOptions(request_options)
if self.transaction_tag is not None:
request_options.transaction_tag = self.transaction_tag
# Request tags are not supported for commit requests.
request_options.request_tag = None
request = CommitRequest(
session=self._session.name,
mutations=self._mutations,
transaction_id=self._transaction_id,
return_commit_stats=return_commit_stats,
max_commit_delay=max_commit_delay,
request_options=request_options,
)
with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
method = functools.partial(
api.commit,
request=request,
metadata=metadata,
)
response = _retry(
method,
allowed_exceptions={InternalServerError: _check_rst_stream_error},
)
self.committed = response.commit_timestamp
if return_commit_stats:
self.commit_stats = response.commit_stats
del self._session._transaction
return self.committed
@staticmethod
def _make_params_pb(params, param_types):
"""Helper for :meth:`execute_update`.
:type params: dict, {str -> column value}
:param params: values for parameter replacement. Keys must match
the names used in ``dml``.
:type param_types: dict[str -> Union[dict, .types.Type]]
:param param_types:
(Optional) maps explicit types for one or more param values;
required if parameters are passed.
:rtype: Union[None, :class:`Struct`]
:returns: a struct message for the passed params, or None
:raises ValueError:
If ``param_types`` is None but ``params`` is not None.
:raises ValueError:
If ``params`` is None but ``param_types`` is not None.
"""
if params is not None:
return Struct(
fields={key: _make_value_pb(value) for key, value in params.items()}
)
return {}
[docs] def execute_update(
self,
dml,
params=None,
param_types=None,
query_mode=None,
query_options=None,
request_options=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Perform an ``ExecuteSql`` API request with DML.
:type dml: str
:param dml: SQL DML statement
:type params: dict, {str -> column value}
:param params: values for parameter replacement. Keys must match
the names used in ``dml``.
:type param_types: dict[str -> Union[dict, .types.Type]]
:param param_types:
(Optional) maps explicit types for one or more param values;
required if parameters are passed.
:type query_mode:
:class:`~google.cloud.spanner_v1.types.ExecuteSqlRequest.QueryMode`
:param query_mode: Mode governing return of results / query plan.
See:
`QueryMode <https://cloud.google.com/spanner/reference/rpc/google.spanner.v1#google.spanner.v1.ExecuteSqlRequest.QueryMode>`_.
:type query_options:
:class:`~google.cloud.spanner_v1.types.ExecuteSqlRequest.QueryOptions`
or :class:`dict`
:param query_options: (Optional) Options that are provided for query plan stability.
:type request_options:
:class:`google.cloud.spanner_v1.types.RequestOptions`
:param request_options:
(Optional) Common options for this request.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype: int
:returns: Count of rows affected by the DML statement.
"""
params_pb = self._make_params_pb(params, param_types)
database = self._session._database
metadata = _metadata_with_prefix(database.name)
if database._route_to_leader_enabled:
metadata.append(
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
)
api = database.spanner_api
seqno, self._execute_sql_count = (
self._execute_sql_count,
self._execute_sql_count + 1,
)
# Query-level options have higher precedence than client-level and
# environment-level options
default_query_options = database._instance._client._query_options
query_options = _merge_query_options(default_query_options, query_options)
if request_options is None:
request_options = RequestOptions()
elif type(request_options) is dict:
request_options = RequestOptions(request_options)
request_options.transaction_tag = self.transaction_tag
trace_attributes = {"db.statement": dml}
request = ExecuteSqlRequest(
session=self._session.name,
sql=dml,
params=params_pb,
param_types=param_types,
query_mode=query_mode,
query_options=query_options,
seqno=seqno,
request_options=request_options,
)
method = functools.partial(
api.execute_sql,
request=request,
metadata=metadata,
retry=retry,
timeout=timeout,
)
if self._transaction_id is None:
# lock is added to handle the inline begin for first rpc
with self._lock:
response = self._execute_request(
method,
request,
"CloudSpanner.ReadWriteTransaction",
self._session,
trace_attributes,
)
# Setting the transaction id because the transaction begin was inlined for first rpc.
if (
self._transaction_id is None
and response is not None
and response.metadata is not None
and response.metadata.transaction is not None
):
self._transaction_id = response.metadata.transaction.id
else:
response = self._execute_request(
method,
request,
"CloudSpanner.ReadWriteTransaction",
self._session,
trace_attributes,
)
return response.stats.row_count_exact
[docs] def batch_update(
self,
statements,
request_options=None,
*,
retry=gapic_v1.method.DEFAULT,
timeout=gapic_v1.method.DEFAULT,
):
"""Perform a batch of DML statements via an ``ExecuteBatchDml`` request.
:type statements:
Sequence[Union[ str, Tuple[str, Dict[str, Any], Dict[str, Union[dict, .types.Type]]]]]
:param statements:
List of DML statements, with optional params / param types.
If passed, 'params' is a dict mapping names to the values
for parameter replacement. Keys must match the names used in the
corresponding DML statement. If 'params' is passed, 'param_types'
must also be passed, as a dict mapping names to the type of
value passed in 'params'.
:type request_options:
:class:`google.cloud.spanner_v1.types.RequestOptions`
:param request_options:
(Optional) Common options for this request.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
:type retry: :class:`~google.api_core.retry.Retry`
:param retry: (Optional) The retry settings for this request.
:type timeout: float
:param timeout: (Optional) The timeout for this request.
:rtype:
Tuple(status, Sequence[int])
:returns:
Status code, plus counts of rows affected by each completed DML
statement. Note that if the status code is not ``OK``, the
statement triggering the error will not have an entry in the
list, nor will any statements following that one.
"""
parsed = []
for statement in statements:
if isinstance(statement, str):
parsed.append(ExecuteBatchDmlRequest.Statement(sql=statement))
else:
dml, params, param_types = statement
params_pb = self._make_params_pb(params, param_types)
parsed.append(
ExecuteBatchDmlRequest.Statement(
sql=dml, params=params_pb, param_types=param_types
)
)
database = self._session._database
metadata = _metadata_with_prefix(database.name)
if database._route_to_leader_enabled:
metadata.append(
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
)
api = database.spanner_api
seqno, self._execute_sql_count = (
self._execute_sql_count,
self._execute_sql_count + 1,
)
if request_options is None:
request_options = RequestOptions()
elif type(request_options) is dict:
request_options = RequestOptions(request_options)
request_options.transaction_tag = self.transaction_tag
trace_attributes = {
# Get just the queries from the DML statement batch
"db.statement": ";".join([statement.sql for statement in parsed])
}
request = ExecuteBatchDmlRequest(
session=self._session.name,
statements=parsed,
seqno=seqno,
request_options=request_options,
)
method = functools.partial(
api.execute_batch_dml,
request=request,
metadata=metadata,
retry=retry,
timeout=timeout,
)
if self._transaction_id is None:
# lock is added to handle the inline begin for first rpc
with self._lock:
response = self._execute_request(
method,
request,
"CloudSpanner.DMLTransaction",
self._session,
trace_attributes,
)
# Setting the transaction id because the transaction begin was inlined for first rpc.
for result_set in response.result_sets:
if (
self._transaction_id is None
and result_set.metadata is not None
and result_set.metadata.transaction is not None
):
self._transaction_id = result_set.metadata.transaction.id
break
else:
response = self._execute_request(
method,
request,
"CloudSpanner.DMLTransaction",
self._session,
trace_attributes,
)
row_counts = [
result_set.stats.row_count_exact for result_set in response.result_sets
]
return response.status, row_counts
[docs] def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
if exc_type is None:
self.commit()
else:
self.rollback()