As of January 1, 2020 this library no longer supports Python 2 on the latest released version.
Library versions released prior to that date will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.cloud.bigquery.dbapi.cursor
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Cursor for the Google BigQuery DB-API."""
import collections
import copy
import warnings
try:
from collections import abc as collections_abc
except ImportError: # Python 2.7
import collections as collections_abc
import logging
import six
from google.cloud.bigquery import job
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions
import google.cloud.exceptions
_LOGGER = logging.getLogger(__name__)
# Per PEP 249: A 7-item sequence containing information describing one result
# column. The first two items (name and type_code) are mandatory, the other
# five are optional and are set to None if no meaningful values can be
# provided.
Column = collections.namedtuple(
"Column",
[
"name",
"type_code",
"display_size",
"internal_size",
"precision",
"scale",
"null_ok",
],
)
[docs]@_helpers.raise_on_closed("Operating on a closed cursor.")
class Cursor(object):
"""DB-API Cursor to Google BigQuery.
Args:
connection (google.cloud.bigquery.dbapi.Connection):
A DB-API connection to Google BigQuery.
"""
def __init__(self, connection):
self.connection = connection
self.description = None
# Per PEP 249: The attribute is -1 in case no .execute*() has been
# performed on the cursor or the rowcount of the last operation
# cannot be determined by the interface.
self.rowcount = -1
# Per PEP 249: The arraysize attribute defaults to 1, meaning to fetch
# a single row at a time. However, we deviate from that, and set the
# default to None, allowing the backend to automatically determine the
# most appropriate size.
self.arraysize = None
self._query_data = None
self._query_job = None
self._closed = False
[docs] def close(self):
"""Mark the cursor as closed, preventing its further use."""
self._closed = True
def _set_description(self, schema):
"""Set description from schema.
Args:
schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
A description of fields in the schema.
"""
if schema is None:
self.description = None
return
self.description = tuple(
Column(
name=field.name,
type_code=field.field_type,
display_size=None,
internal_size=None,
precision=None,
scale=None,
null_ok=field.is_nullable,
)
for field in schema
)
def _set_rowcount(self, query_results):
"""Set the rowcount from query results.
Normally, this sets rowcount to the number of rows returned by the
query, but if it was a DML statement, it sets rowcount to the number
of modified rows.
Args:
query_results (google.cloud.bigquery.query._QueryResults):
Results of a query.
"""
total_rows = 0
num_dml_affected_rows = query_results.num_dml_affected_rows
if query_results.total_rows is not None and query_results.total_rows > 0:
total_rows = query_results.total_rows
if num_dml_affected_rows is not None and num_dml_affected_rows > 0:
total_rows = num_dml_affected_rows
self.rowcount = total_rows
[docs] def execute(self, operation, parameters=None, job_id=None, job_config=None):
"""Prepare and execute a database operation.
.. note::
When setting query parameters, values which are "text"
(``unicode`` in Python2, ``str`` in Python3) will use
the 'STRING' BigQuery type. Values which are "bytes" (``str`` in
Python2, ``bytes`` in Python3), will use using the 'BYTES' type.
A `~datetime.datetime` parameter without timezone information uses
the 'DATETIME' BigQuery type (example: Global Pi Day Celebration
March 14, 2017 at 1:59pm). A `~datetime.datetime` parameter with
timezone information uses the 'TIMESTAMP' BigQuery type (example:
a wedding on April 29, 2011 at 11am, British Summer Time).
For more information about BigQuery data types, see:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
``STRUCT``/``RECORD`` and ``REPEATED`` query parameters are not
yet supported. See:
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3524
Args:
operation (str): A Google BigQuery query string.
parameters (Union[Mapping[str, Any], Sequence[Any]]):
(Optional) dictionary or sequence of parameter values.
job_id (str):
(Optional) The job_id to use. If not set, a job ID
is generated at random.
job_config (google.cloud.bigquery.job.QueryJobConfig):
(Optional) Extra configuration options for the query job.
"""
self._query_data = None
self._query_job = None
client = self.connection._client
# The DB-API uses the pyformat formatting, since the way BigQuery does
# query parameters was not one of the standard options. Convert both
# the query and the parameters to the format expected by the client
# libraries.
formatted_operation = _format_operation(operation, parameters=parameters)
query_parameters = _helpers.to_query_parameters(parameters)
if client._default_query_job_config:
if job_config:
config = job_config._fill_from_default(client._default_query_job_config)
else:
config = copy.deepcopy(client._default_query_job_config)
else:
config = job_config or job.QueryJobConfig(use_legacy_sql=False)
config.query_parameters = query_parameters
self._query_job = client.query(
formatted_operation, job_config=config, job_id=job_id
)
if self._query_job.dry_run:
self._set_description(schema=None)
self.rowcount = 0
return
# Wait for the query to finish.
try:
self._query_job.result()
except google.cloud.exceptions.GoogleCloudError as exc:
raise exceptions.DatabaseError(exc)
query_results = self._query_job._query_results
self._set_rowcount(query_results)
self._set_description(query_results.schema)
[docs] def executemany(self, operation, seq_of_parameters):
"""Prepare and execute a database operation multiple times.
Args:
operation (str): A Google BigQuery query string.
seq_of_parameters (Union[Sequence[Mapping[str, Any], Sequence[Any]]]):
Sequence of many sets of parameter values.
"""
for parameters in seq_of_parameters:
self.execute(operation, parameters)
def _try_fetch(self, size=None):
"""Try to start fetching data, if not yet started.
Mutates self to indicate that iteration has started.
"""
if self._query_job is None:
raise exceptions.InterfaceError(
"No query results: execute() must be called before fetch."
)
if self._query_job.dry_run:
self._query_data = iter([])
return
is_dml = (
self._query_job.statement_type
and self._query_job.statement_type.upper() != "SELECT"
)
if is_dml:
self._query_data = iter([])
return
if self._query_data is None:
client = self.connection._client
bqstorage_client = self.connection._bqstorage_client
if bqstorage_client is not None:
rows_iterable = self._bqstorage_fetch(bqstorage_client)
self._query_data = _helpers.to_bq_table_rows(rows_iterable)
return
rows_iter = client.list_rows(
self._query_job.destination,
selected_fields=self._query_job._query_results.schema,
page_size=self.arraysize,
)
self._query_data = iter(rows_iter)
def _bqstorage_fetch(self, bqstorage_client):
"""Start fetching data with the BigQuery Storage API.
The method assumes that the data about the relevant query job already
exists internally.
Args:
bqstorage_client(\
google.cloud.bigquery_storage_v1.BigQueryReadClient \
):
A client tha know how to talk to the BigQuery Storage API.
Returns:
Iterable[Mapping]:
A sequence of rows, represented as dictionaries.
"""
# Hitting this code path with a BQ Storage client instance implies that
# bigquery_storage_v1* can indeed be imported here without errors.
from google.cloud import bigquery_storage_v1
from google.cloud import bigquery_storage_v1beta1
table_reference = self._query_job.destination
is_v1beta1_client = isinstance(
bqstorage_client, bigquery_storage_v1beta1.BigQueryStorageClient
)
# We want to preserve compatibility with the v1beta1 BQ Storage clients,
# thus adjust the session creation if needed.
if is_v1beta1_client:
warnings.warn(
"Support for BigQuery Storage v1beta1 clients is deprecated, please "
"consider upgrading the client to BigQuery Storage v1 stable version.",
category=DeprecationWarning,
)
read_session = bqstorage_client.create_read_session(
table_reference.to_bqstorage(v1beta1=True),
"projects/{}".format(table_reference.project),
# a single stream only, as DB API is not well-suited for multithreading
requested_streams=1,
format_=bigquery_storage_v1beta1.enums.DataFormat.ARROW,
)
else:
requested_session = bigquery_storage_v1.types.ReadSession(
table=table_reference.to_bqstorage(),
data_format=bigquery_storage_v1.enums.DataFormat.ARROW,
)
read_session = bqstorage_client.create_read_session(
parent="projects/{}".format(table_reference.project),
read_session=requested_session,
# a single stream only, as DB API is not well-suited for multithreading
max_stream_count=1,
)
if not read_session.streams:
return iter([]) # empty table, nothing to read
if is_v1beta1_client:
read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream=read_session.streams[0],
)
read_rows_stream = bqstorage_client.read_rows(read_position)
else:
stream_name = read_session.streams[0].name
read_rows_stream = bqstorage_client.read_rows(stream_name)
rows_iterable = read_rows_stream.rows(read_session)
return rows_iterable
[docs] def fetchone(self):
"""Fetch a single row from the results of the last ``execute*()`` call.
.. note::
If a dry run query was executed, no rows are returned.
Returns:
Tuple:
A tuple representing a row or ``None`` if no more data is
available.
Raises:
google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
"""
self._try_fetch()
try:
return six.next(self._query_data)
except StopIteration:
return None
[docs] def fetchmany(self, size=None):
"""Fetch multiple results from the last ``execute*()`` call.
.. note::
If a dry run query was executed, no rows are returned.
.. note::
The size parameter is not used for the request/response size.
Set the ``arraysize`` attribute before calling ``execute()`` to
set the batch size.
Args:
size (int):
(Optional) Maximum number of rows to return. Defaults to the
``arraysize`` property value. If ``arraysize`` is not set, it
defaults to ``1``.
Returns:
List[Tuple]: A list of rows.
Raises:
google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
"""
if size is None:
# Since self.arraysize can be None (a deviation from PEP 249),
# use an actual PEP 249 default of 1 in such case (*some* number
# is needed here).
size = self.arraysize if self.arraysize else 1
self._try_fetch(size=size)
rows = []
for row in self._query_data:
rows.append(row)
if len(rows) >= size:
break
return rows
[docs] def fetchall(self):
"""Fetch all remaining results from the last ``execute*()`` call.
.. note::
If a dry run query was executed, no rows are returned.
Returns:
List[Tuple]: A list of all the rows in the results.
Raises:
google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
"""
self._try_fetch()
return list(self._query_data)
[docs] def setinputsizes(self, sizes):
"""No-op, but for consistency raise an error if cursor is closed."""
[docs] def setoutputsize(self, size, column=None):
"""No-op, but for consistency raise an error if cursor is closed."""
def _format_operation_list(operation, parameters):
"""Formats parameters in operation in the way BigQuery expects.
The input operation will be a query like ``SELECT %s`` and the output
will be a query like ``SELECT ?``.
Args:
operation (str): A Google BigQuery query string.
parameters (Sequence[Any]): Sequence of parameter values.
Returns:
str: A formatted query string.
Raises:
google.cloud.bigquery.dbapi.ProgrammingError:
if a parameter used in the operation is not found in the
``parameters`` argument.
"""
formatted_params = ["?" for _ in parameters]
try:
return operation % tuple(formatted_params)
except TypeError as exc:
raise exceptions.ProgrammingError(exc)
def _format_operation_dict(operation, parameters):
"""Formats parameters in operation in the way BigQuery expects.
The input operation will be a query like ``SELECT %(namedparam)s`` and
the output will be a query like ``SELECT @namedparam``.
Args:
operation (str): A Google BigQuery query string.
parameters (Mapping[str, Any]): Dictionary of parameter values.
Returns:
str: A formatted query string.
Raises:
google.cloud.bigquery.dbapi.ProgrammingError:
if a parameter used in the operation is not found in the
``parameters`` argument.
"""
formatted_params = {}
for name in parameters:
escaped_name = name.replace("`", r"\`")
formatted_params[name] = "@`{}`".format(escaped_name)
try:
return operation % formatted_params
except KeyError as exc:
raise exceptions.ProgrammingError(exc)
def _format_operation(operation, parameters=None):
"""Formats parameters in operation in way BigQuery expects.
Args:
operation (str): A Google BigQuery query string.
parameters (Union[Mapping[str, Any], Sequence[Any]]):
Optional parameter values.
Returns:
str: A formatted query string.
Raises:
google.cloud.bigquery.dbapi.ProgrammingError:
if a parameter used in the operation is not found in the
``parameters`` argument.
"""
if parameters is None:
return operation
if isinstance(parameters, collections_abc.Mapping):
return _format_operation_dict(operation, parameters)
return _format_operation_list(operation, parameters)