As of January 1, 2020 this library no longer supports Python 2 on the latest released version.
Library versions released prior to that date will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.cloud.bigquery_storage_v1.reader
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import collections
import io
import json
import time
try:
import fastavro
except ImportError: # pragma: NO COVER
fastavro = None
import google.api_core.exceptions
import google.rpc.error_details_pb2
try:
import pandas
except ImportError: # pragma: NO COVER
pandas = None
try:
import pyarrow
except ImportError: # pragma: NO COVER
pyarrow = None
try:
import pyarrow
except ImportError: # pragma: NO COVER
pyarrow = None
_STREAM_RESUMPTION_EXCEPTIONS = (
google.api_core.exceptions.ServiceUnavailable,
# Caused by transport-level error. No status code was received.
# https://github.com/googleapis/python-bigquery-storage/issues/262
google.api_core.exceptions.Unknown,
)
# The Google API endpoint can unexpectedly close long-running HTTP/2 streams.
# Unfortunately, this condition is surfaced to the caller as an internal error
# by gRPC. We don't want to resume on all internal errors, so instead we look
# for error message that we know are caused by problems that are safe to
# reconnect.
_STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES = (
# See: https://github.com/googleapis/google-cloud-python/pull/9994
"RST_STREAM",
)
_FASTAVRO_REQUIRED = (
"fastavro is required to parse ReadRowResponse messages with Avro bytes."
)
_PANDAS_REQUIRED = "pandas is required to create a DataFrame"
_PYARROW_REQUIRED = (
"pyarrow is required to parse ReadRowResponse messages with Arrow bytes."
)
[docs]class ReadRowsStream(object):
"""A stream of results from a read rows request.
This stream is an iterable of
:class:`~google.cloud.bigquery_storage_v1.types.ReadRowsResponse`.
Iterate over it to fetch all row messages.
If the fastavro library is installed, use the
:func:`~google.cloud.bigquery_storage_v1.reader.ReadRowsStream.rows()`
method to parse all messages into a stream of row dictionaries.
If the pandas and fastavro libraries are installed, use the
:func:`~google.cloud.bigquery_storage_v1.reader.ReadRowsStream.to_dataframe()`
method to parse all messages into a :class:`pandas.DataFrame`.
This object should not be created directly, but is returned by
other methods in this library.
"""
def __init__(
self, client, name, offset, read_rows_kwargs, retry_delay_callback=None
):
"""Construct a ReadRowsStream.
Args:
client ( \
~google.cloud.bigquery_storage_v1.services. \
big_query_read.BigQueryReadClient \
):
A GAPIC client used to reconnect to a ReadRows stream. This
must be the GAPIC client to avoid a circular dependency on
this class.
name (str):
Required. Stream ID from which rows are being read.
offset (int):
Required. Position in the stream to start
reading from. The offset requested must be less than the last
row read from ReadRows. Requesting a larger offset is
undefined.
read_rows_kwargs (dict):
Keyword arguments to use when reconnecting to a ReadRows
stream.
retry_delay_callback (Optional[Callable[[float], None]]):
If the client receives a retryable error that asks the client to
delay its next attempt and retry_delay_callback is not None,
ReadRowsStream will call retry_delay_callback with the delay
duration (in seconds) before it starts sleeping until the next
attempt.
Returns:
Iterable[ \
~google.cloud.bigquery_storage.types.ReadRowsResponse \
]:
A sequence of row messages.
"""
# Make a copy of the read position so that we can update it without
# mutating the original input.
self._client = client
self._name = name
self._offset = offset
self._read_rows_kwargs = read_rows_kwargs
self._retry_delay_callback = retry_delay_callback
self._wrapped = None
[docs] def __iter__(self):
"""An iterable of messages.
Returns:
Iterable[ \
~google.cloud.bigquery_storage_v1.types.ReadRowsResponse \
]:
A sequence of row messages.
"""
# Infinite loop to reconnect on reconnectable errors while processing
# the row stream.
if self._wrapped is None:
self._reconnect()
while True:
try:
for message in self._wrapped:
rowcount = message.row_count
self._offset += rowcount
yield message
return # Made it through the whole stream.
except google.api_core.exceptions.InternalServerError as exc:
resumable_error = any(
resumable_message in exc.message
for resumable_message in _STREAM_RESUMPTION_INTERNAL_ERROR_MESSAGES
)
if not resumable_error:
raise
except _STREAM_RESUMPTION_EXCEPTIONS:
# Transient error, so reconnect to the stream.
pass
except Exception as exc:
if not self._resource_exhausted_exception_is_retryable(exc):
raise
self._reconnect()
def _reconnect(self):
"""Reconnect to the ReadRows stream using the most recent offset."""
while True:
try:
self._wrapped = self._client.read_rows(
read_stream=self._name,
offset=self._offset,
**self._read_rows_kwargs
)
break
except Exception as exc:
if not self._resource_exhausted_exception_is_retryable(exc):
raise
def _resource_exhausted_exception_is_retryable(self, exc):
if isinstance(exc, google.api_core.exceptions.ResourceExhausted):
# ResourceExhausted errors are only retried if a valid
# RetryInfo is provided with the error.
#
# TODO: Remove hasattr logic when we require google-api-core >= 2.2.0.
# ResourceExhausted added details/_details in google-api-core 2.2.0.
details = None
if hasattr(exc, "details"):
details = exc.details
elif hasattr(exc, "_details"):
details = exc._details
if details is not None:
for detail in details:
if isinstance(detail, google.rpc.error_details_pb2.RetryInfo):
retry_delay = detail.retry_delay
if retry_delay is not None:
delay = max(
0,
float(retry_delay.seconds)
+ (float(retry_delay.nanos) / 1e9),
)
if self._retry_delay_callback:
self._retry_delay_callback(delay)
time.sleep(delay)
return True
return False
[docs] def rows(self, read_session=None):
"""Iterate over all rows in the stream.
This method requires the fastavro library in order to parse row
messages in avro format. For arrow format messages, the pyarrow
library is required.
.. warning::
DATETIME columns are not supported. They are currently parsed as
strings in the fastavro library.
Args:
read_session ( \
Optional[~google.cloud.bigquery_storage_v1.types.ReadSession] \
):
This argument was used to specify the schema of the rows in the
stream, but now the first message in a read stream contains
this information. When row_restriction is applied, some streams
may be empty without read_session info. Provide this argument
to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733
Returns:
Iterable[Mapping]:
A sequence of rows, represented as dictionaries.
"""
return ReadRowsIterable(self, read_session=read_session)
[docs] def to_arrow(self, read_session=None):
"""Create a :class:`pyarrow.Table` of all rows in the stream.
This method requires the pyarrow library and a stream using the Arrow
format.
Args:
read_session ( \
~google.cloud.bigquery_storage_v1.types.ReadSession \
):
This argument was used to specify the schema of the rows in the
stream, but now the first message in a read stream contains
this information. When row_restriction is applied, some streams
may be empty without read_session info. Provide this argument
to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733
Returns:
pyarrow.Table:
A table of all rows in the stream.
"""
return self.rows(read_session=read_session).to_arrow()
[docs] def to_dataframe(self, read_session=None, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
This method requires the pandas libary to create a data frame and the
fastavro library to parse row messages.
.. warning::
DATETIME columns are not supported. They are currently parsed as
strings.
Args:
read_session ( \
~google.cloud.bigquery_storage_v1.types.ReadSession \
):
This argument was used to specify the schema of the rows in the
stream, but now the first message in a read stream contains
this information. When row_restriction is applied, some streams
may be empty without read_session info. Provide this argument
to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
pandas.DataFrame:
A data frame of all rows in the stream.
"""
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)
return self.rows(read_session=read_session).to_dataframe(dtypes=dtypes)
[docs]class ReadRowsIterable(object):
"""An iterable of rows from a read session.
Args:
reader (google.cloud.bigquery_storage_v1.reader.ReadRowsStream):
A read rows stream.
read_session ( \
Optional[~google.cloud.bigquery_storage_v1.types.ReadSession] \
):
This argument was used to specify the schema of the rows in the
stream, but now the first message in a read stream contains
this information. When row_restriction is applied, some streams
may be empty without read_session info. Provide this argument
to avoid an error. For more information, see https://github.com/googleapis/python-bigquery-storage/issues/733ß
"""
# This class is modelled after the google.cloud.bigquery.table.RowIterator
# and aims to be API compatible where possible.
def __init__(self, reader, read_session=None):
self._reader = reader
if read_session is not None:
self._stream_parser = _StreamParser.from_read_session(read_session)
else:
self._stream_parser = None
@property
def pages(self):
"""A generator of all pages in the stream.
Returns:
types.GeneratorType[google.cloud.bigquery_storage_v1.ReadRowsPage]:
A generator of pages.
"""
# Each page is an iterator of rows. But also has num_items, remaining,
# and to_dataframe.
for message in self._reader:
# Only the first message contains the schema, which is needed to
# decode the messages.
if not self._stream_parser:
self._stream_parser = _StreamParser.from_read_rows_response(message)
yield ReadRowsPage(self._stream_parser, message)
[docs] def __iter__(self):
"""Iterator for each row in all pages."""
for page in self.pages:
for row in page:
yield row
[docs] def to_arrow(self):
"""Create a :class:`pyarrow.Table` of all rows in the stream.
This method requires the pyarrow library and a stream using the Arrow
format.
Returns:
pyarrow.Table:
A table of all rows in the stream.
"""
record_batches = []
for page in self.pages:
record_batches.append(page.to_arrow())
if record_batches:
return pyarrow.Table.from_batches(record_batches)
# No data, return an empty Table.
self._stream_parser._parse_arrow_schema()
return pyarrow.Table.from_batches([], schema=self._stream_parser._schema)
[docs] def to_dataframe(self, dtypes=None):
"""Create a :class:`pandas.DataFrame` of all rows in the stream.
This method requires the pandas libary to create a data frame and the
fastavro library to parse row messages.
.. warning::
DATETIME columns are not supported. They are currently parsed as
strings in the fastavro library.
Args:
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
pandas.DataFrame:
A data frame of all rows in the stream.
"""
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)
if dtypes is None:
dtypes = {}
# If it's an Arrow stream, calling to_arrow, then converting to a
# pandas dataframe is about 2x faster. This is because pandas.concat is
# rarely no-copy, whereas pyarrow.Table.from_batches + to_pandas is
# usually no-copy.
try:
record_batch = self.to_arrow()
except NotImplementedError:
pass
else:
df = record_batch.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df
frames = [page.to_dataframe(dtypes=dtypes) for page in self.pages]
if frames:
return pandas.concat(frames)
# No data, construct an empty dataframe with columns matching the schema.
# The result should be consistent with what an empty ARROW stream would produce.
self._stream_parser._parse_avro_schema()
schema = self._stream_parser._avro_schema_json
column_dtypes = self._dtypes_from_avro(schema["fields"])
column_dtypes.update(dtypes)
df = pandas.DataFrame(columns=column_dtypes.keys())
for column in df:
df[column] = pandas.Series([], dtype=column_dtypes[column])
return df
def _dtypes_from_avro(self, avro_fields):
"""Determine Pandas dtypes for columns in Avro schema.
Args:
avro_fields (Iterable[Mapping[str, Any]]):
Avro fields' metadata.
Returns:
colelctions.OrderedDict[str, str]:
Column names with their corresponding Pandas dtypes.
"""
result = collections.OrderedDict()
type_map = {"long": "int64", "double": "float64", "boolean": "bool"}
for field_info in avro_fields:
# If a type is an union of multiple types, pick the first type
# that is not "null".
if isinstance(field_info["type"], list):
type_info = next(item for item in field_info["type"] if item != "null")
if isinstance(type_info, str):
field_dtype = type_map.get(type_info, "object")
else:
logical_type = type_info.get("logicalType")
if logical_type == "timestamp-micros":
field_dtype = "datetime64[ns, UTC]"
else:
field_dtype = "object"
result[field_info["name"]] = field_dtype
return result
[docs]class ReadRowsPage(object):
"""An iterator of rows from a read session message.
Args:
stream_parser (google.cloud.bigquery_storage_v1.reader._StreamParser):
A helper for parsing messages into rows.
message (google.cloud.bigquery_storage_v1.types.ReadRowsResponse):
A message of data from a read rows stream.
"""
# This class is modeled after google.api_core.page_iterator.Page and aims
# to provide API compatibility where possible.
def __init__(self, stream_parser, message):
self._stream_parser = stream_parser
self._message = message
self._iter_rows = None
self._num_items = self._message.row_count
self._remaining = self._message.row_count
def _parse_rows(self):
"""Parse rows from the message only once."""
if self._iter_rows is not None:
return
rows = self._stream_parser.to_rows(self._message)
self._iter_rows = iter(rows)
@property
def num_items(self):
"""int: Total items in the page."""
return self._num_items
@property
def remaining(self):
"""int: Remaining items in the page."""
return self._remaining
[docs] def next(self):
"""Get the next row in the page."""
self._parse_rows()
if self._remaining > 0:
self._remaining -= 1
return next(self._iter_rows)
# Alias needed for Python 2/3 support.
__next__ = next
[docs] def to_arrow(self):
"""Create an :class:`pyarrow.RecordBatch` of rows in the page.
Returns:
pyarrow.RecordBatch:
Rows from the message, as an Arrow record batch.
"""
return self._stream_parser.to_arrow(self._message)
[docs] def to_dataframe(self, dtypes=None):
"""Create a :class:`pandas.DataFrame` of rows in the page.
This method requires the pandas libary to create a data frame and the
fastavro library to parse row messages.
.. warning::
DATETIME columns are not supported. They are currently parsed as
strings in the fastavro library.
Args:
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
pandas.DataFrame:
A data frame of all rows in the stream.
"""
if pandas is None:
raise ImportError(_PANDAS_REQUIRED)
return self._stream_parser.to_dataframe(self._message, dtypes=dtypes)
class _StreamParser(object):
def to_arrow(self, message):
raise NotImplementedError("Not implemented.")
def to_dataframe(self, message, dtypes=None):
raise NotImplementedError("Not implemented.")
def to_rows(self, message):
raise NotImplementedError("Not implemented.")
def _parse_avro_schema(self):
raise NotImplementedError("Not implemented.")
def _parse_arrow_schema(self):
raise NotImplementedError("Not implemented.")
@staticmethod
def from_read_session(read_session):
schema_type = read_session._pb.WhichOneof("schema")
if schema_type == "avro_schema":
return _AvroStreamParser(read_session)
elif schema_type == "arrow_schema":
return _ArrowStreamParser(read_session)
else:
raise TypeError(
"Unsupported schema type in read_session: {0}".format(schema_type)
)
@staticmethod
def from_read_rows_response(message):
schema_type = message._pb.WhichOneof("schema")
if schema_type == "avro_schema":
return _AvroStreamParser(message)
elif schema_type == "arrow_schema":
return _ArrowStreamParser(message)
else:
raise TypeError(
"Unsupported schema type in message: {0}".format(schema_type)
)
class _AvroStreamParser(_StreamParser):
"""Helper to parse Avro messages into useful representations."""
def __init__(self, message):
"""Construct an _AvroStreamParser.
Args:
message (Union[
google.cloud.bigquery_storage_v1.types.ReadSession, \
google.cloud.bigquery_storage_v1.types.ReadRowsResponse, \
]):
Either the first message of data from a read rows stream or a
read session. Both types contain a oneof "schema" field, which
can be used to determine how to deserialize rows.
"""
if fastavro is None:
raise ImportError(_FASTAVRO_REQUIRED)
self._first_message = message
self._avro_schema_json = None
self._fastavro_schema = None
self._column_names = None
def to_arrow(self, message):
"""Create an :class:`pyarrow.RecordBatch` of rows in the page.
Args:
message (google.cloud.bigquery_storage_v1.types.ReadRowsResponse):
Protocol buffer from the read rows stream, to convert into an
Arrow record batch.
Returns:
pyarrow.RecordBatch:
Rows from the message, as an Arrow record batch.
"""
raise NotImplementedError("to_arrow not implemented for Avro streams.")
def to_dataframe(self, message, dtypes=None):
"""Create a :class:`pandas.DataFrame` of rows in the page.
This method requires the pandas libary to create a data frame and the
fastavro library to parse row messages.
.. warning::
DATETIME columns are not supported. They are currently parsed as
strings in the fastavro library.
Args:
message ( \
~google.cloud.bigquery_storage_v1.types.ReadRowsResponse \
):
A message containing Avro bytes to parse into a pandas DataFrame.
dtypes ( \
Map[str, Union[str, pandas.Series.dtype]] \
):
Optional. A dictionary of column names pandas ``dtype``s. The
provided ``dtype`` is used when constructing the series for
the column specified. Otherwise, the default pandas behavior
is used.
Returns:
pandas.DataFrame:
A data frame of all rows in the stream.
"""
self._parse_avro_schema()
if dtypes is None:
dtypes = {}
columns = collections.defaultdict(list)
for row in self.to_rows(message):
for column in row:
columns[column].append(row[column])
for column in dtypes:
columns[column] = pandas.Series(columns[column], dtype=dtypes[column])
return pandas.DataFrame(columns, columns=self._column_names)
def _parse_avro_schema(self):
"""Extract and parse Avro schema from a read session."""
if self._avro_schema_json:
return
self._avro_schema_json = json.loads(self._first_message.avro_schema.schema)
self._column_names = tuple(
(field["name"] for field in self._avro_schema_json["fields"])
)
self._first_message = None
def _parse_fastavro(self):
"""Convert parsed Avro schema to fastavro format."""
self._parse_avro_schema()
self._fastavro_schema = fastavro.parse_schema(self._avro_schema_json)
def to_rows(self, message):
"""Parse all rows in a stream message.
Args:
message ( \
~google.cloud.bigquery_storage_v1.types.ReadRowsResponse \
):
A message containing Avro bytes to parse into rows.
Returns:
Iterable[Mapping]:
A sequence of rows, represented as dictionaries.
"""
self._parse_fastavro()
messageio = io.BytesIO(message.avro_rows.serialized_binary_rows)
while True:
# Loop in a while loop because schemaless_reader can only read
# a single record.
try:
# TODO: Parse DATETIME into datetime.datetime (no timezone),
# instead of as a string.
yield fastavro.schemaless_reader(messageio, self._fastavro_schema)
except (StopIteration, EOFError):
break # Finished with message
class _ArrowStreamParser(_StreamParser):
def __init__(self, message):
"""Construct an _ArrowStreamParser.
Args:
message (Union[
google.cloud.bigquery_storage_v1.types.ReadSession, \
google.cloud.bigquery_storage_v1.types.ReadRowsResponse, \
]):
Either the first message of data from a read rows stream or a
read session. Both types contain a oneof "schema" field, which
can be used to determine how to deserialize rows.
"""
if pyarrow is None:
raise ImportError(_PYARROW_REQUIRED)
self._first_message = message
self._schema = None
def to_arrow(self, message):
return self._parse_arrow_message(message)
def to_rows(self, message):
record_batch = self._parse_arrow_message(message)
# Iterate through each column simultaneously, and make a dict from the
# row values
for row in zip(*record_batch.columns):
yield dict(zip(self._column_names, row))
def to_dataframe(self, message, dtypes=None):
record_batch = self._parse_arrow_message(message)
if dtypes is None:
dtypes = {}
df = record_batch.to_pandas()
for column in dtypes:
df[column] = pandas.Series(df[column], dtype=dtypes[column])
return df
def _parse_arrow_message(self, message):
self._parse_arrow_schema()
return pyarrow.ipc.read_record_batch(
pyarrow.py_buffer(message.arrow_record_batch.serialized_record_batch),
self._schema,
)
def _parse_arrow_schema(self):
if self._schema:
return
self._schema = pyarrow.ipc.read_schema(
pyarrow.py_buffer(self._first_message.arrow_schema.serialized_schema)
)
self._column_names = [field.name for field in self._schema]
self._first_message = None