As of January 1, 2020 this library no longer supports Python 2 on the latest released version.
Library versions released prior to that date will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.cloud.spanner_v1.streamed
# Copyright 2016 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Wrapper for streaming results."""
from google.cloud import exceptions
from google.protobuf.struct_pb2 import ListValue
from google.protobuf.struct_pb2 import Value
from google.cloud.spanner_v1 import PartialResultSet
from google.cloud.spanner_v1 import ResultSetMetadata
from google.cloud.spanner_v1 import TypeCode
from google.cloud.spanner_v1._helpers import _parse_value_pb
[docs]class StreamedResultSet(object):
"""Process a sequence of partial result sets into a single set of row data.
:type response_iterator:
:param response_iterator:
Iterator yielding
:class:`~google.cloud.spanner_v1.types.PartialResultSet`
instances.
:type source: :class:`~google.cloud.spanner_v1.snapshot.Snapshot`
:param source: Snapshot from which the result set was fetched.
"""
def __init__(self, response_iterator, source=None, column_info=None):
self._response_iterator = response_iterator
self._rows = [] # Fully-processed rows
self._metadata = None # Until set from first PRS
self._stats = None # Until set from last PRS
self._current_row = [] # Accumulated values for incomplete row
self._pending_chunk = None # Incomplete value
self._source = source # Source snapshot
self._column_info = column_info # Column information
@property
def fields(self):
"""Field descriptors for result set columns.
:rtype: list of :class:`~google.cloud.spanner_v1.types.StructType.Field`
:returns: list of fields describing column names / types.
"""
return self._metadata.row_type.fields
@property
def metadata(self):
"""Result set metadata
:rtype: :class:`~google.cloud.spanner_v1.types.ResultSetMetadata`
:returns: structure describing the results
"""
if self._metadata:
return ResultSetMetadata.wrap(self._metadata)
return None
@property
def stats(self):
"""Result set statistics
:rtype:
:class:`~google.cloud.spanner_v1.types.ResultSetStats`
:returns: structure describing status about the response
"""
return self._stats
def _merge_chunk(self, value):
"""Merge pending chunk with next value.
:type value: :class:`~google.protobuf.struct_pb2.Value`
:param value: continuation of chunked value from previous
partial result set.
:rtype: :class:`~google.protobuf.struct_pb2.Value`
:returns: the merged value
"""
current_column = len(self._current_row)
field = self.fields[current_column]
merged = _merge_by_type(self._pending_chunk, value, field.type_)
self._pending_chunk = None
return merged
def _merge_values(self, values):
"""Merge values into rows.
:type values: list of :class:`~google.protobuf.struct_pb2.Value`
:param values: non-chunked values from partial result set.
"""
field_types = [field.type_ for field in self.fields]
field_names = [field.name for field in self.fields]
width = len(field_types)
index = len(self._current_row)
for value in values:
self._current_row.append(
_parse_value_pb(
value, field_types[index], field_names[index], self._column_info
)
)
index += 1
if index == width:
self._rows.append(self._current_row)
self._current_row = []
index = 0
def _consume_next(self):
"""Consume the next partial result set from the stream.
Parse the result set into new/existing rows in :attr:`_rows`
"""
response = next(self._response_iterator)
response_pb = PartialResultSet.pb(response)
if self._metadata is None: # first response
metadata = self._metadata = response_pb.metadata
source = self._source
if source is not None and source._transaction_id is None:
source._transaction_id = metadata.transaction.id
if response_pb.HasField("stats"): # last response
self._stats = response.stats
values = list(response_pb.values)
if self._pending_chunk is not None:
values[0] = self._merge_chunk(values[0])
if response_pb.chunked_value:
self._pending_chunk = values.pop()
self._merge_values(values)
def __iter__(self):
while True:
iter_rows, self._rows[:] = self._rows[:], ()
while iter_rows:
yield iter_rows.pop(0)
try:
self._consume_next()
except StopIteration:
return
[docs] def one(self):
"""Return exactly one result, or raise an exception.
:raises: :exc:`NotFound`: If there are no results.
:raises: :exc:`ValueError`: If there are multiple results.
:raises: :exc:`RuntimeError`: If consumption has already occurred,
in whole or in part.
"""
answer = self.one_or_none()
if answer is None:
raise exceptions.NotFound("No rows matched the given query.")
return answer
[docs] def one_or_none(self):
"""Return exactly one result, or None if there are no results.
:raises: :exc:`ValueError`: If there are multiple results.
:raises: :exc:`RuntimeError`: If consumption has already occurred,
in whole or in part.
"""
# Sanity check: Has consumption of this query already started?
# If it has, then this is an exception.
if self._metadata is not None:
raise RuntimeError(
"Can not call `.one` or `.one_or_none` after "
"stream consumption has already started."
)
# Consume the first result of the stream.
# If there is no first result, then return None.
iterator = iter(self)
try:
answer = next(iterator)
except StopIteration:
return None
# Attempt to consume more. This should no-op; if we get additional
# rows, then this is an error case.
try:
next(iterator)
raise ValueError("Expected one result; got more.")
except StopIteration:
return answer
[docs] def to_dict_list(self):
"""Return the result of a query as a list of dictionaries.
In each dictionary the key is the column name and the value is the
value of the that column in a given row.
:rtype:
:class:`list of dict`
:returns: result rows as a list of dictionaries
"""
rows = []
for row in self:
rows.append(
{
column: value
for column, value in zip(
[column.name for column in self._metadata.row_type.fields], row
)
}
)
return rows
[docs]class Unmergeable(ValueError):
"""Unable to merge two values.
:type lhs: :class:`~google.protobuf.struct_pb2.Value`
:param lhs: pending value to be merged
:type rhs: :class:`~google.protobuf.struct_pb2.Value`
:param rhs: remaining value to be merged
:type type_: :class:`~google.cloud.spanner_v1.types.Type`
:param type_: field type of values being merged
"""
def __init__(self, lhs, rhs, type_):
message = "Cannot merge %s values: %s %s" % (
TypeCode(type_.code),
lhs,
rhs,
)
super(Unmergeable, self).__init__(message)
def _unmergeable(lhs, rhs, type_):
"""Helper for '_merge_by_type'."""
raise Unmergeable(lhs, rhs, type_)
def _merge_float64(lhs, rhs, type_):
"""Helper for '_merge_by_type'."""
lhs_kind = lhs.WhichOneof("kind")
if lhs_kind == "string_value":
return Value(string_value=lhs.string_value + rhs.string_value)
rhs_kind = rhs.WhichOneof("kind")
array_continuation = (
lhs_kind == "number_value"
and rhs_kind == "string_value"
and rhs.string_value == ""
)
if array_continuation:
return lhs
raise Unmergeable(lhs, rhs, type_)
def _merge_string(lhs, rhs, type_):
"""Helper for '_merge_by_type'."""
return Value(string_value=lhs.string_value + rhs.string_value)
_UNMERGEABLE_TYPES = (TypeCode.BOOL,)
def _merge_array(lhs, rhs, type_):
"""Helper for '_merge_by_type'."""
element_type = type_.array_element_type
if element_type.code in _UNMERGEABLE_TYPES:
# Individual values cannot be merged, just concatenate
lhs.list_value.values.extend(rhs.list_value.values)
return lhs
lhs, rhs = list(lhs.list_value.values), list(rhs.list_value.values)
# Sanity check: If either list is empty, short-circuit.
# This is effectively a no-op.
if not len(lhs) or not len(rhs):
return Value(list_value=ListValue(values=(lhs + rhs)))
first = rhs.pop(0)
if first.HasField("null_value"): # can't merge
lhs.append(first)
else:
last = lhs.pop()
if last.HasField("null_value"):
lhs.append(last)
lhs.append(first)
else:
try:
merged = _merge_by_type(last, first, element_type)
except Unmergeable:
lhs.append(last)
lhs.append(first)
else:
lhs.append(merged)
return Value(list_value=ListValue(values=(lhs + rhs)))
def _merge_struct(lhs, rhs, type_):
"""Helper for '_merge_by_type'."""
fields = type_.struct_type.fields
lhs, rhs = list(lhs.list_value.values), list(rhs.list_value.values)
# Sanity check: If either list is empty, short-circuit.
# This is effectively a no-op.
if not len(lhs) or not len(rhs):
return Value(list_value=ListValue(values=(lhs + rhs)))
candidate_type = fields[len(lhs) - 1].type_
first = rhs.pop(0)
if first.HasField("null_value") or candidate_type.code in _UNMERGEABLE_TYPES:
lhs.append(first)
else:
last = lhs.pop()
if last.HasField("null_value"):
lhs.append(last)
lhs.append(first)
else:
try:
merged = _merge_by_type(last, first, candidate_type)
except Unmergeable:
lhs.append(last)
lhs.append(first)
else:
lhs.append(merged)
return Value(list_value=ListValue(values=lhs + rhs))
_MERGE_BY_TYPE = {
TypeCode.ARRAY: _merge_array,
TypeCode.BOOL: _unmergeable,
TypeCode.BYTES: _merge_string,
TypeCode.DATE: _merge_string,
TypeCode.FLOAT64: _merge_float64,
TypeCode.FLOAT32: _merge_float64,
TypeCode.INT64: _merge_string,
TypeCode.STRING: _merge_string,
TypeCode.STRUCT: _merge_struct,
TypeCode.TIMESTAMP: _merge_string,
TypeCode.NUMERIC: _merge_string,
TypeCode.JSON: _merge_string,
TypeCode.PROTO: _merge_string,
TypeCode.ENUM: _merge_string,
}
def _merge_by_type(lhs, rhs, type_):
"""Helper for '_merge_chunk'."""
merger = _MERGE_BY_TYPE[type_.code]
return merger(lhs, rhs, type_)