As of January 1, 2020 this library no longer supports Python 2 on the latest released version.
Library versions released prior to that date will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.cloud.spanner_v1.batch
# Copyright 2016 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Context manager for Cloud Spanner batched writes."""
import functools
from google.cloud.spanner_v1 import CommitRequest
from google.cloud.spanner_v1 import Mutation
from google.cloud.spanner_v1 import TransactionOptions
from google.cloud.spanner_v1 import BatchWriteRequest
from google.cloud.spanner_v1._helpers import _SessionWrapper
from google.cloud.spanner_v1._helpers import _make_list_value_pbs
from google.cloud.spanner_v1._helpers import (
_metadata_with_prefix,
_metadata_with_leader_aware_routing,
)
from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1 import RequestOptions
from google.cloud.spanner_v1._helpers import _retry
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
from google.api_core.exceptions import InternalServerError
class _BatchBase(_SessionWrapper):
"""Accumulate mutations for transmission during :meth:`commit`.
:type session: :class:`~google.cloud.spanner_v1.session.Session`
:param session: the session used to perform the commit
"""
transaction_tag = None
_read_only = False
def __init__(self, session):
super(_BatchBase, self).__init__(session)
self._mutations = []
def _check_state(self):
"""Helper for :meth:`commit` et al.
Subclasses must override
:raises: :exc:`ValueError` if the object's state is invalid for making
API requests.
"""
raise NotImplementedError
def insert(self, table, columns, values):
"""Insert one or more new table rows.
:type table: str
:param table: Name of the table to be modified.
:type columns: list of str
:param columns: Name of the table columns to be modified.
:type values: list of lists
:param values: Values to be modified.
"""
self._mutations.append(Mutation(insert=_make_write_pb(table, columns, values)))
def update(self, table, columns, values):
"""Update one or more existing table rows.
:type table: str
:param table: Name of the table to be modified.
:type columns: list of str
:param columns: Name of the table columns to be modified.
:type values: list of lists
:param values: Values to be modified.
"""
self._mutations.append(Mutation(update=_make_write_pb(table, columns, values)))
def insert_or_update(self, table, columns, values):
"""Insert/update one or more table rows.
:type table: str
:param table: Name of the table to be modified.
:type columns: list of str
:param columns: Name of the table columns to be modified.
:type values: list of lists
:param values: Values to be modified.
"""
self._mutations.append(
Mutation(insert_or_update=_make_write_pb(table, columns, values))
)
def replace(self, table, columns, values):
"""Replace one or more table rows.
:type table: str
:param table: Name of the table to be modified.
:type columns: list of str
:param columns: Name of the table columns to be modified.
:type values: list of lists
:param values: Values to be modified.
"""
self._mutations.append(Mutation(replace=_make_write_pb(table, columns, values)))
def delete(self, table, keyset):
"""Delete one or more table rows.
:type table: str
:param table: Name of the table to be modified.
:type keyset: :class:`~google.cloud.spanner_v1.keyset.Keyset`
:param keyset: Keys/ranges identifying rows to delete.
"""
delete = Mutation.Delete(table=table, key_set=keyset._to_pb())
self._mutations.append(Mutation(delete=delete))
[docs]class Batch(_BatchBase):
"""Accumulate mutations for transmission during :meth:`commit`."""
committed = None
commit_stats = None
"""Timestamp at which the batch was successfully committed."""
def _check_state(self):
"""Helper for :meth:`commit` et al.
Subclasses must override
:raises: :exc:`ValueError` if the object's state is invalid for making
API requests.
"""
if self.committed is not None:
raise ValueError("Batch already committed")
[docs] def commit(
self,
return_commit_stats=False,
request_options=None,
max_commit_delay=None,
exclude_txn_from_change_streams=False,
):
"""Commit mutations to the database.
:type return_commit_stats: bool
:param return_commit_stats:
If true, the response will return commit stats which can be accessed though commit_stats.
:type request_options:
:class:`google.cloud.spanner_v1.types.RequestOptions`
:param request_options:
(Optional) Common options for this request.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
:type max_commit_delay: :class:`datetime.timedelta`
:param max_commit_delay:
(Optional) The amount of latency this request is willing to incur
in order to improve throughput.
:rtype: datetime
:returns: timestamp of the committed changes.
"""
self._check_state()
database = self._session._database
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
if database._route_to_leader_enabled:
metadata.append(
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
)
txn_options = TransactionOptions(
read_write=TransactionOptions.ReadWrite(),
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
)
trace_attributes = {"num_mutations": len(self._mutations)}
if request_options is None:
request_options = RequestOptions()
elif type(request_options) is dict:
request_options = RequestOptions(request_options)
request_options.transaction_tag = self.transaction_tag
# Request tags are not supported for commit requests.
request_options.request_tag = None
request = CommitRequest(
session=self._session.name,
mutations=self._mutations,
single_use_transaction=txn_options,
return_commit_stats=return_commit_stats,
max_commit_delay=max_commit_delay,
request_options=request_options,
)
with trace_call("CloudSpanner.Commit", self._session, trace_attributes):
method = functools.partial(
api.commit,
request=request,
metadata=metadata,
)
response = _retry(
method,
allowed_exceptions={InternalServerError: _check_rst_stream_error},
)
self.committed = response.commit_timestamp
self.commit_stats = response.commit_stats
return self.committed
[docs] def __exit__(self, exc_type, exc_val, exc_tb):
"""End ``with`` block."""
if exc_type is None:
self.commit()
[docs]class MutationGroup(_BatchBase):
"""A container for mutations.
Clients should use :class:`~google.cloud.spanner_v1.MutationGroups` to
obtain instances instead of directly creating instances.
:type session: :class:`~google.cloud.spanner_v1.session.Session`
:param session: The session used to perform the commit.
:type mutations: list
:param mutations: The list into which mutations are to be accumulated.
"""
def __init__(self, session, mutations=[]):
super(MutationGroup, self).__init__(session)
self._mutations = mutations
[docs]class MutationGroups(_SessionWrapper):
"""Accumulate mutation groups for transmission during :meth:`batch_write`.
:type session: :class:`~google.cloud.spanner_v1.session.Session`
:param session: the session used to perform the commit
"""
committed = None
def __init__(self, session):
super(MutationGroups, self).__init__(session)
self._mutation_groups = []
def _check_state(self):
"""Checks if the object's state is valid for making API requests.
:raises: :exc:`ValueError` if the object's state is invalid for making
API requests.
"""
if self.committed is not None:
raise ValueError("MutationGroups already committed")
[docs] def group(self):
"""Returns a new `MutationGroup` to which mutations can be added."""
mutation_group = BatchWriteRequest.MutationGroup()
self._mutation_groups.append(mutation_group)
return MutationGroup(self._session, mutation_group.mutations)
[docs] def batch_write(self, request_options=None, exclude_txn_from_change_streams=False):
"""Executes batch_write.
:type request_options:
:class:`google.cloud.spanner_v1.types.RequestOptions`
:param request_options:
(Optional) Common options for this request.
If a dict is provided, it must be of the same form as the protobuf
message :class:`~google.cloud.spanner_v1.types.RequestOptions`.
:type exclude_txn_from_change_streams: bool
:param exclude_txn_from_change_streams:
(Optional) If true, instructs the transaction to be excluded from being recorded in change streams
with the DDL option `allow_txn_exclusion=true`. This does not exclude the transaction from
being recorded in the change streams with the DDL option `allow_txn_exclusion` being false or
unset.
:rtype: :class:`Iterable[google.cloud.spanner_v1.types.BatchWriteResponse]`
:returns: a sequence of responses for each batch.
"""
self._check_state()
database = self._session._database
api = database.spanner_api
metadata = _metadata_with_prefix(database.name)
if database._route_to_leader_enabled:
metadata.append(
_metadata_with_leader_aware_routing(database._route_to_leader_enabled)
)
trace_attributes = {"num_mutation_groups": len(self._mutation_groups)}
if request_options is None:
request_options = RequestOptions()
elif type(request_options) is dict:
request_options = RequestOptions(request_options)
request = BatchWriteRequest(
session=self._session.name,
mutation_groups=self._mutation_groups,
request_options=request_options,
exclude_txn_from_change_streams=exclude_txn_from_change_streams,
)
with trace_call("CloudSpanner.BatchWrite", self._session, trace_attributes):
method = functools.partial(
api.batch_write,
request=request,
metadata=metadata,
)
response = _retry(
method,
allowed_exceptions={InternalServerError: _check_rst_stream_error},
)
self.committed = True
return response
def _make_write_pb(table, columns, values):
"""Helper for :meth:`Batch.insert` et al.
:type table: str
:param table: Name of the table to be modified.
:type columns: list of str
:param columns: Name of the table columns to be modified.
:type values: list of lists
:param values: Values to be modified.
:rtype: :class:`google.cloud.spanner_v1.types.Mutation.Write`
:returns: Write protobuf
"""
return Mutation.Write(
table=table, columns=columns, values=_make_list_value_pbs(values)
)