As of January 1, 2020 this library no longer supports Python 2 on the latest released version.
Library versions released prior to that date will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.cloud.bigquery.job.query
# Copyright 2015 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classes for query jobs."""
import concurrent.futures
import copy
import re
import time
import typing
from typing import Any, Dict, Iterable, List, Optional, Union
from google.api_core import exceptions
from google.api_core import retry as retries
import requests
from google.cloud.bigquery.dataset import Dataset
from google.cloud.bigquery.dataset import DatasetListItem
from google.cloud.bigquery.dataset import DatasetReference
from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
from google.cloud.bigquery.enums import KeyResultStatementKind, DefaultPandasDTypes
from google.cloud.bigquery.external_config import ExternalConfig
from google.cloud.bigquery import _helpers
from google.cloud.bigquery.query import (
_query_param_from_api_repr,
ArrayQueryParameter,
ConnectionProperty,
ScalarQueryParameter,
StructQueryParameter,
UDFResource,
)
from google.cloud.bigquery.retry import (
DEFAULT_RETRY,
DEFAULT_JOB_RETRY,
POLLING_DEFAULT_VALUE,
)
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import _table_arg_to_table_ref
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import TimePartitioning
from google.cloud.bigquery._tqdm_helpers import wait_for_query
from google.cloud.bigquery.job.base import _AsyncJob
from google.cloud.bigquery.job.base import _JobConfig
from google.cloud.bigquery.job.base import _JobReference
try:
import pandas # type: ignore
except ImportError:
pandas = None
if typing.TYPE_CHECKING: # pragma: NO COVER
# Assumption: type checks are only used by library developers and CI environments
# that have all optional dependencies installed, thus no conditional imports.
import pandas # type: ignore
import geopandas # type: ignore
import pyarrow # type: ignore
from google.cloud import bigquery_storage
from google.cloud.bigquery.client import Client
from google.cloud.bigquery.table import RowIterator
_CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE)
_EXCEPTION_FOOTER_TEMPLATE = "{message}\n\nLocation: {location}\nJob ID: {job_id}\n"
_TIMEOUT_BUFFER_SECS = 0.1
def _contains_order_by(query):
"""Do we need to preserve the order of the query results?
This function has known false positives, such as with ordered window
functions:
.. code-block:: sql
SELECT SUM(x) OVER (
window_name
PARTITION BY...
ORDER BY...
window_frame_clause)
FROM ...
This false positive failure case means the behavior will be correct, but
downloading results with the BigQuery Storage API may be slower than it
otherwise would. This is preferable to the false negative case, where
results are expected to be in order but are not (due to parallel reads).
"""
return query and _CONTAINS_ORDER_BY.search(query)
def _from_api_repr_query_parameters(resource):
return [_query_param_from_api_repr(mapping) for mapping in resource]
def _to_api_repr_query_parameters(value):
return [query_parameter.to_api_repr() for query_parameter in value]
def _from_api_repr_udf_resources(resource):
udf_resources = []
for udf_mapping in resource:
for udf_type, udf_value in udf_mapping.items():
udf_resources.append(UDFResource(udf_type, udf_value))
return udf_resources
def _to_api_repr_udf_resources(value):
return [{udf_resource.udf_type: udf_resource.value} for udf_resource in value]
def _from_api_repr_table_defs(resource):
return {k: ExternalConfig.from_api_repr(v) for k, v in resource.items()}
def _to_api_repr_table_defs(value):
return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()}
class BiEngineReason(typing.NamedTuple):
"""Reason for BI Engine acceleration failure
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginereason
"""
code: str = "CODE_UNSPECIFIED"
reason: str = ""
@classmethod
def from_api_repr(cls, reason: Dict[str, str]) -> "BiEngineReason":
return cls(reason.get("code", "CODE_UNSPECIFIED"), reason.get("message", ""))
class BiEngineStats(typing.NamedTuple):
"""Statistics for a BI Engine query
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#bienginestatistics
"""
mode: str = "ACCELERATION_MODE_UNSPECIFIED"
""" Specifies which mode of BI Engine acceleration was performed (if any)
"""
reasons: List[BiEngineReason] = []
""" Contains explanatory messages in case of DISABLED / PARTIAL acceleration
"""
@classmethod
def from_api_repr(cls, stats: Dict[str, Any]) -> "BiEngineStats":
mode = stats.get("biEngineMode", "ACCELERATION_MODE_UNSPECIFIED")
reasons = [
BiEngineReason.from_api_repr(r) for r in stats.get("biEngineReasons", [])
]
return cls(mode, reasons)
class DmlStats(typing.NamedTuple):
"""Detailed statistics for DML statements.
https://cloud.google.com/bigquery/docs/reference/rest/v2/DmlStats
"""
inserted_row_count: int = 0
"""Number of inserted rows. Populated by DML INSERT and MERGE statements."""
deleted_row_count: int = 0
"""Number of deleted rows. populated by DML DELETE, MERGE and TRUNCATE statements.
"""
updated_row_count: int = 0
"""Number of updated rows. Populated by DML UPDATE and MERGE statements."""
@classmethod
def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats":
# NOTE: The field order here must match the order of fields set at the
# class level.
api_fields = ("insertedRowCount", "deletedRowCount", "updatedRowCount")
args = (
int(stats.get(api_field, default_val))
for api_field, default_val in zip(api_fields, cls.__new__.__defaults__) # type: ignore
)
return cls(*args)
class IndexUnusedReason(typing.NamedTuple):
"""Reason about why no search index was used in the search query (or sub-query).
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#indexunusedreason
"""
code: Optional[str] = None
"""Specifies the high-level reason for the scenario when no search index was used.
"""
message: Optional[str] = None
"""Free form human-readable reason for the scenario when no search index was used.
"""
baseTable: Optional[TableReference] = None
"""Specifies the base table involved in the reason that no search index was used.
"""
indexName: Optional[str] = None
"""Specifies the name of the unused search index, if available."""
@classmethod
def from_api_repr(cls, reason):
code = reason.get("code")
message = reason.get("message")
baseTable = reason.get("baseTable")
indexName = reason.get("indexName")
return cls(code, message, baseTable, indexName)
class SearchStats(typing.NamedTuple):
"""Statistics related to Search Queries. Populated as part of JobStatistics2.
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#searchstatistics
"""
mode: Optional[str] = None
"""Indicates the type of search index usage in the entire search query."""
reason: List[IndexUnusedReason] = []
"""Reason about why no search index was used in the search query (or sub-query)"""
@classmethod
def from_api_repr(cls, stats: Dict[str, Any]):
mode = stats.get("indexUsageMode", None)
reason = [
IndexUnusedReason.from_api_repr(r)
for r in stats.get("indexUnusedReasons", [])
]
return cls(mode, reason)
class ScriptOptions:
"""Options controlling the execution of scripts.
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ScriptOptions
"""
def __init__(
self,
statement_timeout_ms: Optional[int] = None,
statement_byte_budget: Optional[int] = None,
key_result_statement: Optional[KeyResultStatementKind] = None,
):
self._properties: Dict[str, Any] = {}
self.statement_timeout_ms = statement_timeout_ms
self.statement_byte_budget = statement_byte_budget
self.key_result_statement = key_result_statement
@classmethod
def from_api_repr(cls, resource: Dict[str, Any]) -> "ScriptOptions":
"""Factory: construct instance from the JSON repr.
Args:
resource(Dict[str: Any]):
ScriptOptions representation returned from API.
Returns:
google.cloud.bigquery.ScriptOptions:
ScriptOptions sample parsed from ``resource``.
"""
entry = cls()
entry._properties = copy.deepcopy(resource)
return entry
def to_api_repr(self) -> Dict[str, Any]:
"""Construct the API resource representation."""
return copy.deepcopy(self._properties)
@property
def statement_timeout_ms(self) -> Union[int, None]:
"""Timeout period for each statement in a script."""
return _helpers._int_or_none(self._properties.get("statementTimeoutMs"))
@statement_timeout_ms.setter
def statement_timeout_ms(self, value: Union[int, None]):
new_value = None if value is None else str(value)
self._properties["statementTimeoutMs"] = new_value
@property
def statement_byte_budget(self) -> Union[int, None]:
"""Limit on the number of bytes billed per statement.
Exceeding this budget results in an error.
"""
return _helpers._int_or_none(self._properties.get("statementByteBudget"))
@statement_byte_budget.setter
def statement_byte_budget(self, value: Union[int, None]):
new_value = None if value is None else str(value)
self._properties["statementByteBudget"] = new_value
@property
def key_result_statement(self) -> Union[KeyResultStatementKind, None]:
"""Determines which statement in the script represents the "key result".
This is used to populate the schema and query results of the script job.
Default is ``KeyResultStatementKind.LAST``.
"""
return self._properties.get("keyResultStatement")
@key_result_statement.setter
def key_result_statement(self, value: Union[KeyResultStatementKind, None]):
self._properties["keyResultStatement"] = value
class QueryJobConfig(_JobConfig):
"""Configuration options for query jobs.
All properties in this class are optional. Values which are :data:`None` ->
server defaults. Set properties on the constructed configuration by using
the property name as the name of a keyword argument.
"""
def __init__(self, **kwargs) -> None:
super(QueryJobConfig, self).__init__("query", **kwargs)
@property
def destination_encryption_configuration(self):
"""google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
encryption configuration for the destination table.
Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
if using default encryption.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_encryption_configuration
"""
prop = self._get_sub_prop("destinationEncryptionConfiguration")
if prop is not None:
prop = EncryptionConfiguration.from_api_repr(prop)
return prop
@destination_encryption_configuration.setter
def destination_encryption_configuration(self, value):
api_repr = value
if value is not None:
api_repr = value.to_api_repr()
self._set_sub_prop("destinationEncryptionConfiguration", api_repr)
@property
def allow_large_results(self):
"""bool: Allow large query results tables (legacy SQL, only)
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.allow_large_results
"""
return self._get_sub_prop("allowLargeResults")
@allow_large_results.setter
def allow_large_results(self, value):
self._set_sub_prop("allowLargeResults", value)
@property
def connection_properties(self) -> List[ConnectionProperty]:
"""Connection properties.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.connection_properties
.. versionadded:: 2.29.0
"""
resource = self._get_sub_prop("connectionProperties", [])
return [ConnectionProperty.from_api_repr(prop) for prop in resource]
@connection_properties.setter
def connection_properties(self, value: Iterable[ConnectionProperty]):
self._set_sub_prop(
"connectionProperties",
[prop.to_api_repr() for prop in value],
)
@property
def create_disposition(self):
"""google.cloud.bigquery.job.CreateDisposition: Specifies behavior
for creating tables.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_disposition
"""
return self._get_sub_prop("createDisposition")
@create_disposition.setter
def create_disposition(self, value):
self._set_sub_prop("createDisposition", value)
@property
def create_session(self) -> Optional[bool]:
"""[Preview] If :data:`True`, creates a new session, where
:attr:`~google.cloud.bigquery.job.QueryJob.session_info` will contain a
random server generated session id.
If :data:`False`, runs query with an existing ``session_id`` passed in
:attr:`~google.cloud.bigquery.job.QueryJobConfig.connection_properties`,
otherwise runs query in non-session mode.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.create_session
.. versionadded:: 2.29.0
"""
return self._get_sub_prop("createSession")
@create_session.setter
def create_session(self, value: Optional[bool]):
self._set_sub_prop("createSession", value)
@property
def default_dataset(self):
"""google.cloud.bigquery.dataset.DatasetReference: the default dataset
to use for unqualified table names in the query or :data:`None` if not
set.
The ``default_dataset`` setter accepts:
- a :class:`~google.cloud.bigquery.dataset.Dataset`, or
- a :class:`~google.cloud.bigquery.dataset.DatasetReference`, or
- a :class:`str` of the fully-qualified dataset ID in standard SQL
format. The value must included a project ID and dataset ID
separated by ``.``. For example: ``your-project.your_dataset``.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.default_dataset
"""
prop = self._get_sub_prop("defaultDataset")
if prop is not None:
prop = DatasetReference.from_api_repr(prop)
return prop
@default_dataset.setter
def default_dataset(self, value):
if value is None:
self._set_sub_prop("defaultDataset", None)
return
if isinstance(value, str):
value = DatasetReference.from_string(value)
if isinstance(value, (Dataset, DatasetListItem)):
value = value.reference
resource = value.to_api_repr()
self._set_sub_prop("defaultDataset", resource)
@property
def destination(self):
"""google.cloud.bigquery.table.TableReference: table where results are
written or :data:`None` if not set.
The ``destination`` setter accepts:
- a :class:`~google.cloud.bigquery.table.Table`, or
- a :class:`~google.cloud.bigquery.table.TableReference`, or
- a :class:`str` of the fully-qualified table ID in standard SQL
format. The value must included a project ID, dataset ID, and table
ID, each separated by ``.``. For example:
``your-project.your_dataset.your_table``.
.. note::
Only table ID is passed to the backend, so any configuration
in `~google.cloud.bigquery.table.Table` is discarded.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.destination_table
"""
prop = self._get_sub_prop("destinationTable")
if prop is not None:
prop = TableReference.from_api_repr(prop)
return prop
@destination.setter
def destination(self, value):
if value is None:
self._set_sub_prop("destinationTable", None)
return
value = _table_arg_to_table_ref(value)
resource = value.to_api_repr()
self._set_sub_prop("destinationTable", resource)
@property
def dry_run(self):
"""bool: :data:`True` if this query should be a dry run to estimate
costs.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfiguration.FIELDS.dry_run
"""
return self._properties.get("dryRun")
@dry_run.setter
def dry_run(self, value):
self._properties["dryRun"] = value
@property
def flatten_results(self):
"""bool: Flatten nested/repeated fields in results. (Legacy SQL only)
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.flatten_results
"""
return self._get_sub_prop("flattenResults")
@flatten_results.setter
def flatten_results(self, value):
self._set_sub_prop("flattenResults", value)
@property
def maximum_billing_tier(self):
"""int: Deprecated. Changes the billing tier to allow high-compute
queries.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_billing_tier
"""
return self._get_sub_prop("maximumBillingTier")
@maximum_billing_tier.setter
def maximum_billing_tier(self, value):
self._set_sub_prop("maximumBillingTier", value)
@property
def maximum_bytes_billed(self):
"""int: Maximum bytes to be billed for this job or :data:`None` if not set.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.maximum_bytes_billed
"""
return _helpers._int_or_none(self._get_sub_prop("maximumBytesBilled"))
@maximum_bytes_billed.setter
def maximum_bytes_billed(self, value):
self._set_sub_prop("maximumBytesBilled", str(value))
@property
def priority(self):
"""google.cloud.bigquery.job.QueryPriority: Priority of the query.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.priority
"""
return self._get_sub_prop("priority")
@priority.setter
def priority(self, value):
self._set_sub_prop("priority", value)
@property
def query_parameters(self):
"""List[Union[google.cloud.bigquery.query.ArrayQueryParameter, \
google.cloud.bigquery.query.ScalarQueryParameter, \
google.cloud.bigquery.query.StructQueryParameter]]: list of parameters
for parameterized query (empty by default)
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query_parameters
"""
prop = self._get_sub_prop("queryParameters", default=[])
return _from_api_repr_query_parameters(prop)
@query_parameters.setter
def query_parameters(self, values):
self._set_sub_prop("queryParameters", _to_api_repr_query_parameters(values))
@property
def range_partitioning(self):
"""Optional[google.cloud.bigquery.table.RangePartitioning]:
Configures range-based partitioning for destination table.
.. note::
**Beta**. The integer range partitioning feature is in a
pre-release state and might change or have limited support.
Only specify at most one of
:attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
:attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
Raises:
ValueError:
If the value is not
:class:`~google.cloud.bigquery.table.RangePartitioning` or
:data:`None`.
"""
resource = self._get_sub_prop("rangePartitioning")
if resource is not None:
return RangePartitioning(_properties=resource)
@range_partitioning.setter
def range_partitioning(self, value):
resource = value
if isinstance(value, RangePartitioning):
resource = value._properties
elif value is not None:
raise ValueError(
"Expected value to be RangePartitioning or None, got {}.".format(value)
)
self._set_sub_prop("rangePartitioning", resource)
@property
def udf_resources(self):
"""List[google.cloud.bigquery.query.UDFResource]: user
defined function resources (empty by default)
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.user_defined_function_resources
"""
prop = self._get_sub_prop("userDefinedFunctionResources", default=[])
return _from_api_repr_udf_resources(prop)
@udf_resources.setter
def udf_resources(self, values):
self._set_sub_prop(
"userDefinedFunctionResources", _to_api_repr_udf_resources(values)
)
@property
def use_legacy_sql(self):
"""bool: Use legacy SQL syntax.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_legacy_sql
"""
return self._get_sub_prop("useLegacySql")
@use_legacy_sql.setter
def use_legacy_sql(self, value):
self._set_sub_prop("useLegacySql", value)
@property
def use_query_cache(self):
"""bool: Look for the query result in the cache.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.use_query_cache
"""
return self._get_sub_prop("useQueryCache")
@use_query_cache.setter
def use_query_cache(self, value):
self._set_sub_prop("useQueryCache", value)
@property
def write_disposition(self):
"""google.cloud.bigquery.job.WriteDisposition: Action that occurs if
the destination table already exists.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.write_disposition
"""
return self._get_sub_prop("writeDisposition")
@write_disposition.setter
def write_disposition(self, value):
self._set_sub_prop("writeDisposition", value)
@property
def write_incremental_results(self) -> Optional[bool]:
"""This is only supported for a SELECT query using a temporary table.
If set, the query is allowed to write results incrementally to the temporary result
table. This may incur a performance penalty. This option cannot be used with Legacy SQL.
This feature is not generally available.
"""
return self._get_sub_prop("writeIncrementalResults")
@write_incremental_results.setter
def write_incremental_results(self, value):
self._set_sub_prop("writeIncrementalResults", value)
@property
def table_definitions(self):
"""Dict[str, google.cloud.bigquery.external_config.ExternalConfig]:
Definitions for external tables or :data:`None` if not set.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.external_table_definitions
"""
prop = self._get_sub_prop("tableDefinitions")
if prop is not None:
prop = _from_api_repr_table_defs(prop)
return prop
@table_definitions.setter
def table_definitions(self, values):
self._set_sub_prop("tableDefinitions", _to_api_repr_table_defs(values))
@property
def time_partitioning(self):
"""Optional[google.cloud.bigquery.table.TimePartitioning]: Specifies
time-based partitioning for the destination table.
Only specify at most one of
:attr:`~google.cloud.bigquery.job.LoadJobConfig.time_partitioning` or
:attr:`~google.cloud.bigquery.job.LoadJobConfig.range_partitioning`.
Raises:
ValueError:
If the value is not
:class:`~google.cloud.bigquery.table.TimePartitioning` or
:data:`None`.
"""
prop = self._get_sub_prop("timePartitioning")
if prop is not None:
prop = TimePartitioning.from_api_repr(prop)
return prop
@time_partitioning.setter
def time_partitioning(self, value):
api_repr = value
if value is not None:
api_repr = value.to_api_repr()
self._set_sub_prop("timePartitioning", api_repr)
@property
def clustering_fields(self):
"""Optional[List[str]]: Fields defining clustering for the table
(Defaults to :data:`None`).
Clustering fields are immutable after table creation.
.. note::
BigQuery supports clustering for both partitioned and
non-partitioned tables.
"""
prop = self._get_sub_prop("clustering")
if prop is not None:
return list(prop.get("fields", ()))
@clustering_fields.setter
def clustering_fields(self, value):
"""Optional[List[str]]: Fields defining clustering for the table
(Defaults to :data:`None`).
"""
if value is not None:
self._set_sub_prop("clustering", {"fields": value})
else:
self._del_sub_prop("clustering")
@property
def schema_update_options(self):
"""List[google.cloud.bigquery.job.SchemaUpdateOption]: Specifies
updates to the destination table schema to allow as a side effect of
the query job.
"""
return self._get_sub_prop("schemaUpdateOptions")
@schema_update_options.setter
def schema_update_options(self, values):
self._set_sub_prop("schemaUpdateOptions", values)
@property
def script_options(self) -> ScriptOptions:
"""Options controlling the execution of scripts.
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#scriptoptions
"""
prop = self._get_sub_prop("scriptOptions")
if prop is not None:
prop = ScriptOptions.from_api_repr(prop)
return prop
@script_options.setter
def script_options(self, value: Union[ScriptOptions, None]):
new_value = None if value is None else value.to_api_repr()
self._set_sub_prop("scriptOptions", new_value)
def to_api_repr(self) -> dict:
"""Build an API representation of the query job config.
Returns:
Dict: A dictionary in the format used by the BigQuery API.
"""
resource = copy.deepcopy(self._properties)
# Query parameters have an addition property associated with them
# to indicate if the query is using named or positional parameters.
query_parameters = resource.get("query", {}).get("queryParameters")
if query_parameters:
if query_parameters[0].get("name") is None:
resource["query"]["parameterMode"] = "POSITIONAL"
else:
resource["query"]["parameterMode"] = "NAMED"
return resource
class QueryJob(_AsyncJob):
"""Asynchronous job: query tables.
Args:
job_id (str): the job's ID, within the project belonging to ``client``.
query (str): SQL query string.
client (google.cloud.bigquery.client.Client):
A client which holds credentials and project configuration
for the dataset (which requires a project).
job_config (Optional[google.cloud.bigquery.job.QueryJobConfig]):
Extra configuration options for the query job.
"""
_JOB_TYPE = "query"
_UDF_KEY = "userDefinedFunctionResources"
_CONFIG_CLASS = QueryJobConfig
def __init__(self, job_id, query, client, job_config=None):
super(QueryJob, self).__init__(job_id, client)
if job_config is not None:
self._properties["configuration"] = job_config._properties
if self.configuration.use_legacy_sql is None:
self.configuration.use_legacy_sql = False
if query:
_helpers._set_sub_prop(
self._properties, ["configuration", "query", "query"], query
)
self._query_results = None
self._done_timeout = None
self._transport_timeout = None
@property
def allow_large_results(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.allow_large_results`.
"""
return self.configuration.allow_large_results
@property
def configuration(self) -> QueryJobConfig:
"""The configuration for this query job."""
return typing.cast(QueryJobConfig, super().configuration)
@property
def connection_properties(self) -> List[ConnectionProperty]:
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.connection_properties`.
.. versionadded:: 2.29.0
"""
return self.configuration.connection_properties
@property
def create_disposition(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.create_disposition`.
"""
return self.configuration.create_disposition
@property
def create_session(self) -> Optional[bool]:
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.create_session`.
.. versionadded:: 2.29.0
"""
return self.configuration.create_session
@property
def default_dataset(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.default_dataset`.
"""
return self.configuration.default_dataset
@property
def destination(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.destination`.
"""
return self.configuration.destination
@property
def destination_encryption_configuration(self):
"""google.cloud.bigquery.encryption_configuration.EncryptionConfiguration: Custom
encryption configuration for the destination table.
Custom encryption configuration (e.g., Cloud KMS keys) or :data:`None`
if using default encryption.
See
:attr:`google.cloud.bigquery.job.QueryJobConfig.destination_encryption_configuration`.
"""
return self.configuration.destination_encryption_configuration
@property
def dry_run(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.dry_run`.
"""
return self.configuration.dry_run
@property
def flatten_results(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.flatten_results`.
"""
return self.configuration.flatten_results
@property
def priority(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.priority`.
"""
return self.configuration.priority
@property
def search_stats(self) -> Optional[SearchStats]:
"""Returns a SearchStats object."""
stats = self._job_statistics().get("searchStatistics")
if stats is not None:
return SearchStats.from_api_repr(stats)
return None
@property
def query(self):
"""str: The query text used in this query job.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationQuery.FIELDS.query
"""
return _helpers._get_sub_prop(
self._properties, ["configuration", "query", "query"]
)
@property
def query_id(self) -> Optional[str]:
"""[Preview] ID of a completed query.
This ID is auto-generated and not guaranteed to be populated.
"""
query_results = self._query_results
return query_results.query_id if query_results is not None else None
@property
def query_parameters(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.query_parameters`.
"""
return self.configuration.query_parameters
@property
def udf_resources(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.udf_resources`.
"""
return self.configuration.udf_resources
@property
def use_legacy_sql(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.use_legacy_sql`.
"""
return self.configuration.use_legacy_sql
@property
def use_query_cache(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.use_query_cache`.
"""
return self.configuration.use_query_cache
@property
def write_disposition(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.write_disposition`.
"""
return self.configuration.write_disposition
@property
def maximum_billing_tier(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_billing_tier`.
"""
return self.configuration.maximum_billing_tier
@property
def maximum_bytes_billed(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.maximum_bytes_billed`.
"""
return self.configuration.maximum_bytes_billed
@property
def range_partitioning(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.range_partitioning`.
"""
return self.configuration.range_partitioning
@property
def table_definitions(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.table_definitions`.
"""
return self.configuration.table_definitions
@property
def time_partitioning(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.time_partitioning`.
"""
return self.configuration.time_partitioning
@property
def clustering_fields(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.clustering_fields`.
"""
return self.configuration.clustering_fields
@property
def schema_update_options(self):
"""See
:attr:`google.cloud.bigquery.job.QueryJobConfig.schema_update_options`.
"""
return self.configuration.schema_update_options
def to_api_repr(self):
"""Generate a resource for :meth:`_begin`."""
# Use to_api_repr to allow for some configuration properties to be set
# automatically.
configuration = self.configuration.to_api_repr()
return {
"jobReference": self._properties["jobReference"],
"configuration": configuration,
}
@classmethod
def from_api_repr(cls, resource: dict, client: "Client") -> "QueryJob":
"""Factory: construct a job given its API representation
Args:
resource (Dict): dataset job representation returned from the API
client (google.cloud.bigquery.client.Client):
Client which holds credentials and project
configuration for the dataset.
Returns:
google.cloud.bigquery.job.QueryJob: Job parsed from ``resource``.
"""
job_ref_properties = resource.setdefault(
"jobReference", {"projectId": client.project, "jobId": None}
)
job_ref = _JobReference._from_api_repr(job_ref_properties)
job = cls(job_ref, None, client=client)
job._set_properties(resource)
return job
@property
def query_plan(self):
"""Return query plan from job statistics, if present.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.query_plan
Returns:
List[google.cloud.bigquery.job.QueryPlanEntry]:
mappings describing the query plan, or an empty list
if the query has not yet completed.
"""
plan_entries = self._job_statistics().get("queryPlan", ())
return [QueryPlanEntry.from_api_repr(entry) for entry in plan_entries]
@property
def schema(self) -> Optional[List[SchemaField]]:
"""The schema of the results.
Present only for successful dry run of non-legacy SQL queries.
"""
resource = self._job_statistics().get("schema")
if resource is None:
return None
fields = resource.get("fields", [])
return [SchemaField.from_api_repr(field) for field in fields]
@property
def timeline(self):
"""List(TimelineEntry): Return the query execution timeline
from job statistics.
"""
raw = self._job_statistics().get("timeline", ())
return [TimelineEntry.from_api_repr(entry) for entry in raw]
@property
def total_bytes_processed(self):
"""Return total bytes processed from job statistics, if present.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_processed
Returns:
Optional[int]:
Total bytes processed by the job, or None if job is not
yet complete.
"""
result = self._job_statistics().get("totalBytesProcessed")
if result is not None:
result = int(result)
return result
@property
def total_bytes_billed(self):
"""Return total bytes billed from job statistics, if present.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.total_bytes_billed
Returns:
Optional[int]:
Total bytes processed by the job, or None if job is not
yet complete.
"""
result = self._job_statistics().get("totalBytesBilled")
if result is not None:
result = int(result)
return result
@property
def billing_tier(self):
"""Return billing tier from job statistics, if present.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.billing_tier
Returns:
Optional[int]:
Billing tier used by the job, or None if job is not
yet complete.
"""
return self._job_statistics().get("billingTier")
@property
def cache_hit(self):
"""Return whether or not query results were served from cache.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.cache_hit
Returns:
Optional[bool]:
whether the query results were returned from cache, or None
if job is not yet complete.
"""
return self._job_statistics().get("cacheHit")
@property
def ddl_operation_performed(self):
"""Optional[str]: Return the DDL operation performed.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_operation_performed
"""
return self._job_statistics().get("ddlOperationPerformed")
@property
def ddl_target_routine(self):
"""Optional[google.cloud.bigquery.routine.RoutineReference]: Return the DDL target routine, present
for CREATE/DROP FUNCTION/PROCEDURE queries.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_routine
"""
prop = self._job_statistics().get("ddlTargetRoutine")
if prop is not None:
prop = RoutineReference.from_api_repr(prop)
return prop
@property
def ddl_target_table(self):
"""Optional[google.cloud.bigquery.table.TableReference]: Return the DDL target table, present
for CREATE/DROP TABLE/VIEW queries.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.ddl_target_table
"""
prop = self._job_statistics().get("ddlTargetTable")
if prop is not None:
prop = TableReference.from_api_repr(prop)
return prop
@property
def num_dml_affected_rows(self) -> Optional[int]:
"""Return the number of DML rows affected by the job.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.num_dml_affected_rows
Returns:
Optional[int]:
number of DML rows affected by the job, or None if job is not
yet complete.
"""
result = self._job_statistics().get("numDmlAffectedRows")
if result is not None:
result = int(result)
return result
@property
def slot_millis(self):
"""Union[int, None]: Slot-milliseconds used by this query job."""
return _helpers._int_or_none(self._job_statistics().get("totalSlotMs"))
@property
def statement_type(self):
"""Return statement type from job statistics, if present.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.statement_type
Returns:
Optional[str]:
type of statement used by the job, or None if job is not
yet complete.
"""
return self._job_statistics().get("statementType")
@property
def referenced_tables(self):
"""Return referenced tables from job statistics, if present.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.referenced_tables
Returns:
List[Dict]:
mappings describing the query plan, or an empty list
if the query has not yet completed.
"""
tables = []
datasets_by_project_name = {}
for table in self._job_statistics().get("referencedTables", ()):
t_project = table["projectId"]
ds_id = table["datasetId"]
t_dataset = datasets_by_project_name.get((t_project, ds_id))
if t_dataset is None:
t_dataset = DatasetReference(t_project, ds_id)
datasets_by_project_name[(t_project, ds_id)] = t_dataset
t_name = table["tableId"]
tables.append(t_dataset.table(t_name))
return tables
@property
def undeclared_query_parameters(self):
"""Return undeclared query parameters from job statistics, if present.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.undeclared_query_parameters
Returns:
List[Union[ \
google.cloud.bigquery.query.ArrayQueryParameter, \
google.cloud.bigquery.query.ScalarQueryParameter, \
google.cloud.bigquery.query.StructQueryParameter \
]]:
Undeclared parameters, or an empty list if the query has
not yet completed.
"""
parameters = []
undeclared = self._job_statistics().get("undeclaredQueryParameters", ())
for parameter in undeclared:
p_type = parameter["parameterType"]
if "arrayType" in p_type:
klass = ArrayQueryParameter
elif "structTypes" in p_type:
klass = StructQueryParameter
else:
klass = ScalarQueryParameter
parameters.append(klass.from_api_repr(parameter))
return parameters
@property
def estimated_bytes_processed(self):
"""Return the estimated number of bytes processed by the query.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatistics2.FIELDS.estimated_bytes_processed
Returns:
Optional[int]:
number of DML rows affected by the job, or None if job is not
yet complete.
"""
result = self._job_statistics().get("estimatedBytesProcessed")
if result is not None:
result = int(result)
return result
@property
def dml_stats(self) -> Optional[DmlStats]:
stats = self._job_statistics().get("dmlStats")
if stats is None:
return None
else:
return DmlStats.from_api_repr(stats)
@property
def bi_engine_stats(self) -> Optional[BiEngineStats]:
stats = self._job_statistics().get("biEngineStatistics")
if stats is None:
return None
else:
return BiEngineStats.from_api_repr(stats)
def _blocking_poll(self, timeout=None, **kwargs):
self._done_timeout = timeout
self._transport_timeout = timeout
super(QueryJob, self)._blocking_poll(timeout=timeout, **kwargs)
@staticmethod
def _format_for_exception(message: str, query: str):
"""Format a query for the output in exception message.
Args:
message (str): The original exception message.
query (str): The SQL query to format.
Returns:
str: A formatted query text.
"""
template = "{message}\n\n{header}\n\n{ruler}\n{body}\n{ruler}"
lines = query.splitlines() if query is not None else [""]
max_line_len = max(len(line) for line in lines)
header = "-----Query Job SQL Follows-----"
header = "{:^{total_width}}".format(header, total_width=max_line_len + 5)
# Print out a "ruler" above and below the SQL so we can judge columns.
# Left pad for the line numbers (4 digits plus ":").
ruler = " |" + " . |" * (max_line_len // 10)
# Put line numbers next to the SQL.
body = "\n".join(
"{:4}:{}".format(n, line) for n, line in enumerate(lines, start=1)
)
return template.format(message=message, header=header, ruler=ruler, body=body)
def _begin(self, client=None, retry=DEFAULT_RETRY, timeout=None):
"""API call: begin the job via a POST request
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert
Args:
client (Optional[google.cloud.bigquery.client.Client]):
The client to use. If not passed, falls back to the ``client``
associated with the job object or``NoneType``.
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Raises:
ValueError: If the job has already begun.
"""
try:
super(QueryJob, self)._begin(client=client, retry=retry, timeout=timeout)
except exceptions.GoogleAPICallError as exc:
exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
message=exc.message, location=self.location, job_id=self.job_id
)
exc.debug_message = self._format_for_exception(exc.message, self.query)
exc.query_job = self
raise
def _reload_query_results(
self,
retry: "retries.Retry" = DEFAULT_RETRY,
timeout: Optional[float] = None,
page_size: int = 0,
start_index: Optional[int] = None,
):
"""Refresh the cached query results unless already cached and complete.
Args:
retry (Optional[google.api_core.retry.Retry]):
How to retry the call that retrieves query results.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
page_size (int):
Maximum number of rows in a single response. See maxResults in
the jobs.getQueryResults REST API.
start_index (Optional[int]):
Zero-based index of the starting row. See startIndex in the
jobs.getQueryResults REST API.
"""
# Optimization: avoid a call to jobs.getQueryResults if it's already
# been fetched, e.g. from jobs.query first page of results.
if self._query_results and self._query_results.complete:
return
# Since the API to getQueryResults can hang up to the timeout value
# (default of 10 seconds), set the timeout parameter to ensure that
# the timeout from the futures API is respected. See:
# https://github.com/GoogleCloudPlatform/google-cloud-python/issues/4135
timeout_ms = None
# Python_API_core, as part of a major rewrite of the deadline, timeout,
# retry process sets the timeout value as a Python object().
# Our system does not natively handle that and instead expects
# either None or a numeric value. If passed a Python object, convert to
# None.
if type(self._done_timeout) is object: # pragma: NO COVER
self._done_timeout = None
if self._done_timeout is not None: # pragma: NO COVER
# Subtract a buffer for context switching, network latency, etc.
api_timeout = self._done_timeout - _TIMEOUT_BUFFER_SECS
api_timeout = max(min(api_timeout, 10), 0)
self._done_timeout -= api_timeout
self._done_timeout = max(0, self._done_timeout)
timeout_ms = int(api_timeout * 1000)
# If an explicit timeout is not given, fall back to the transport timeout
# stored in _blocking_poll() in the process of polling for job completion.
if timeout is not None:
transport_timeout = timeout
else:
transport_timeout = self._transport_timeout
# Handle PollingJob._DEFAULT_VALUE.
if not isinstance(transport_timeout, (float, int)):
transport_timeout = None
self._query_results = self._client._get_query_results(
self.job_id,
retry,
project=self.project,
timeout_ms=timeout_ms,
location=self.location,
timeout=transport_timeout,
page_size=page_size,
start_index=start_index,
)
def result( # type: ignore # (incompatible with supertype)
self,
page_size: Optional[int] = None,
max_results: Optional[int] = None,
retry: Optional[retries.Retry] = DEFAULT_RETRY,
timeout: Optional[Union[float, object]] = POLLING_DEFAULT_VALUE,
start_index: Optional[int] = None,
job_retry: Optional[retries.Retry] = DEFAULT_JOB_RETRY,
) -> Union["RowIterator", _EmptyRowIterator]:
"""Start the job and wait for it to complete and get the result.
Args:
page_size (Optional[int]):
The maximum number of rows in each page of results from this
request. Non-positive values are ignored.
max_results (Optional[int]):
The maximum total number of rows from this request.
retry (Optional[google.api_core.retry.Retry]):
How to retry the call that retrieves rows. This only
applies to making RPC calls. It isn't used to retry
failed jobs. This has a reasonable default that
should only be overridden with care. If the job state
is ``DONE``, retrying is aborted early even if the
results are not available, as this will not change
anymore.
timeout (Optional[Union[float, \
google.api_core.future.polling.PollingFuture._DEFAULT_VALUE, \
]]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``. If ``None``, wait indefinitely
unless an error is returned. If unset, only the
underlying API calls have their default timeouts, but we still
wait indefinitely for the job to finish.
start_index (Optional[int]):
The zero-based index of the starting row to read.
job_retry (Optional[google.api_core.retry.Retry]):
How to retry failed jobs. The default retries
rate-limit-exceeded errors. Passing ``None`` disables
job retry.
Not all jobs can be retried. If ``job_id`` was
provided to the query that created this job, then the
job returned by the query will not be retryable, and
an exception will be raised if non-``None``
non-default ``job_retry`` is also provided.
Returns:
google.cloud.bigquery.table.RowIterator:
Iterator of row data
:class:`~google.cloud.bigquery.table.Row`-s. During each
page, the iterator will have the ``total_rows`` attribute
set, which counts the total number of rows **in the result
set** (this is distinct from the total number of rows in the
current page: ``iterator.page.num_items``).
If the query is a special query that produces no results, e.g.
a DDL query, an ``_EmptyRowIterator`` instance is returned.
Raises:
google.api_core.exceptions.GoogleAPICallError:
If the job failed and retries aren't successful.
concurrent.futures.TimeoutError:
If the job did not complete in the given timeout.
TypeError:
If Non-``None`` and non-default ``job_retry`` is
provided and the job is not retryable.
"""
# Note: Since waiting for a query job to finish is more complex than
# refreshing the job state in a loop, we avoid calling the superclass
# in this method.
if self.dry_run:
return _EmptyRowIterator(
project=self.project,
location=self.location,
schema=self.schema,
total_bytes_processed=self.total_bytes_processed,
# Intentionally omit job_id and query_id since this doesn't
# actually correspond to a finished query job.
)
# Setting max_results should be equivalent to setting page_size with
# regards to allowing the user to tune how many results to download
# while we wait for the query to finish. See internal issue:
# 344008814. But if start_index is set, user is trying to access a
# specific page, so we don't need to set page_size. See issue #1950.
if page_size is None and max_results is not None and start_index is None:
page_size = max_results
# When timeout has default sentinel value ``object()``, do not pass
# anything to invoke default timeouts in subsequent calls.
done_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
reload_query_results_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
list_rows_kwargs: Dict[str, Union[_helpers.TimeoutType, object]] = {}
if type(timeout) is not object:
done_kwargs["timeout"] = timeout
list_rows_kwargs["timeout"] = timeout
reload_query_results_kwargs["timeout"] = timeout
if page_size is not None:
reload_query_results_kwargs["page_size"] = page_size
if start_index is not None:
reload_query_results_kwargs["start_index"] = start_index
try:
retry_do_query = getattr(self, "_retry_do_query", None)
if retry_do_query is not None:
if job_retry is DEFAULT_JOB_RETRY:
job_retry = self._job_retry # type: ignore
else:
if job_retry is not None and job_retry is not DEFAULT_JOB_RETRY:
raise TypeError(
"`job_retry` was provided, but this job is"
" not retryable, because a custom `job_id` was"
" provided to the query that created this job."
)
restart_query_job = False
def is_job_done():
nonlocal restart_query_job
if restart_query_job:
restart_query_job = False
# The original job has failed. Create a new one.
#
# Note that we won't get here if retry_do_query is
# None, because we won't use a retry.
job = retry_do_query()
# Become the new job:
self.__dict__.clear()
self.__dict__.update(job.__dict__)
# It's possible the job fails again and we'll have to
# retry that too.
self._retry_do_query = retry_do_query
self._job_retry = job_retry
# If the job hasn't been created, create it now. Related:
# https://github.com/googleapis/python-bigquery/issues/1940
if self.state is None:
self._begin(retry=retry, **done_kwargs)
# Refresh the job status with jobs.get because some of the
# exceptions thrown by jobs.getQueryResults like timeout and
# rateLimitExceeded errors are ambiguous. We want to know if
# the query job failed and not just the call to
# jobs.getQueryResults.
if self.done(retry=retry, **done_kwargs):
# If it's already failed, we might as well stop.
job_failed_exception = self.exception()
if job_failed_exception is not None:
# Only try to restart the query job if the job failed for
# a retriable reason. For example, don't restart the query
# if the call to reload the job metadata within self.done()
# timed out.
#
# The `restart_query_job` must only be called after a
# successful call to the `jobs.get` REST API and we
# determine that the job has failed.
#
# The `jobs.get` REST API
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/get)
# is called via `self.done()` which calls
# `self.reload()`.
#
# To determine if the job failed, the `self.exception()`
# is set from `self.reload()` via
# `self._set_properties()`, which translates the
# `Job.status.errorResult` field
# (https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobStatus.FIELDS.error_result)
# into an exception that can be processed by the
# `job_retry` predicate.
restart_query_job = True
raise job_failed_exception
else:
# Make sure that the _query_results are cached so we
# can return a complete RowIterator.
#
# Note: As an optimization, _reload_query_results
# doesn't make any API calls if the query results are
# already cached and have jobComplete=True in the
# response from the REST API. This ensures we aren't
# making any extra API calls if the previous loop
# iteration fetched the finished job.
self._reload_query_results(
retry=retry, **reload_query_results_kwargs
)
return True
# Call jobs.getQueryResults with max results set to 0 just to
# wait for the query to finish. Unlike most methods,
# jobs.getQueryResults hangs as long as it can to ensure we
# know when the query has finished as soon as possible.
self._reload_query_results(retry=retry, **reload_query_results_kwargs)
# Even if the query is finished now according to
# jobs.getQueryResults, we'll want to reload the job status if
# it's not already DONE.
return False
if retry_do_query is not None and job_retry is not None:
is_job_done = job_retry(is_job_done)
# timeout can be a number of seconds, `None`, or a
# `google.api_core.future.polling.PollingFuture._DEFAULT_VALUE`
# sentinel object indicating a default timeout if we choose to add
# one some day. This value can come from our PollingFuture
# superclass and was introduced in
# https://github.com/googleapis/python-api-core/pull/462.
if isinstance(timeout, (float, int)):
remaining_timeout = timeout
else:
# Note: we may need to handle _DEFAULT_VALUE as a separate
# case someday, but even then the best we can do for queries
# is 72+ hours for hyperparameter tuning jobs:
# https://cloud.google.com/bigquery/quotas#query_jobs
#
# The timeout for a multi-statement query is 24+ hours. See:
# https://cloud.google.com/bigquery/quotas#multi_statement_query_limits
remaining_timeout = None
if remaining_timeout is None:
# Since is_job_done() calls jobs.getQueryResults, which is a
# long-running API, don't delay the next request at all.
while not is_job_done():
pass
else:
# Use a monotonic clock since we don't actually care about
# daylight savings or similar, just the elapsed time.
previous_time = time.monotonic()
while not is_job_done():
current_time = time.monotonic()
elapsed_time = current_time - previous_time
remaining_timeout = remaining_timeout - elapsed_time
previous_time = current_time
if remaining_timeout < 0:
raise concurrent.futures.TimeoutError()
except exceptions.GoogleAPICallError as exc:
exc.message = _EXCEPTION_FOOTER_TEMPLATE.format(
message=exc.message, location=self.location, job_id=self.job_id
)
exc.debug_message = self._format_for_exception(exc.message, self.query) # type: ignore
exc.query_job = self # type: ignore
raise
except requests.exceptions.Timeout as exc:
raise concurrent.futures.TimeoutError from exc
# If the query job is complete but there are no query results, this was
# special job, such as a DDL query. Return an empty result set to
# indicate success and avoid calling tabledata.list on a table which
# can't be read (such as a view table).
if self._query_results.total_rows is None:
return _EmptyRowIterator(
location=self.location,
project=self.project,
job_id=self.job_id,
query_id=self.query_id,
schema=self.schema,
num_dml_affected_rows=self._query_results.num_dml_affected_rows,
query=self.query,
total_bytes_processed=self.total_bytes_processed,
slot_millis=self.slot_millis,
)
# We know that there's at least 1 row, so only treat the response from
# jobs.getQueryResults / jobs.query as the first page of the
# RowIterator response if there are any rows in it. This prevents us
# from stopping the iteration early in the cases where we set
# maxResults=0. In that case, we're missing rows and there's no next
# page token.
first_page_response = self._query_results._properties
if "rows" not in first_page_response:
first_page_response = None
rows = self._client._list_rows_from_query_results(
self.job_id,
self.location,
self.project,
self._query_results.schema,
total_rows=self._query_results.total_rows,
destination=self.destination,
page_size=page_size,
max_results=max_results,
start_index=start_index,
retry=retry,
query_id=self.query_id,
first_page_response=first_page_response,
num_dml_affected_rows=self._query_results.num_dml_affected_rows,
query=self.query,
total_bytes_processed=self.total_bytes_processed,
slot_millis=self.slot_millis,
created=self.created,
started=self.started,
ended=self.ended,
**list_rows_kwargs,
)
rows._preserve_order = _contains_order_by(self.query)
return rows
# If changing the signature of this method, make sure to apply the same
# changes to table.RowIterator.to_arrow(), except for the max_results parameter
# that should only exist here in the QueryJob method.
def to_arrow(
self,
progress_bar_type: Optional[str] = None,
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
create_bqstorage_client: bool = True,
max_results: Optional[int] = None,
) -> "pyarrow.Table":
"""[Beta] Create a class:`pyarrow.Table` by loading all pages of a
table or query.
Args:
progress_bar_type (Optional[str]):
If set, use the `tqdm <https://tqdm.github.io/>`_ library to
display a progress bar while the data downloads. Install the
``tqdm`` package to use this feature.
Possible values of ``progress_bar_type`` include:
``None``
No progress bar.
``'tqdm'``
Use the :func:`tqdm.tqdm` function to print a progress bar
to :data:`sys.stdout`.
``'tqdm_notebook'``
Use the :func:`tqdm.notebook.tqdm` function to display a
progress bar as a Jupyter notebook widget.
``'tqdm_gui'``
Use the :func:`tqdm.tqdm_gui` function to display a
progress bar as a graphical dialog box.
bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
A BigQuery Storage API client. If supplied, use the faster
BigQuery Storage API to fetch rows from BigQuery. This API
is a billable API.
This method requires ``google-cloud-bigquery-storage`` library.
Reading from a specific partition or snapshot is not
currently supported by this method.
create_bqstorage_client (Optional[bool]):
If ``True`` (default), create a BigQuery Storage API client
using the default API settings. The BigQuery Storage API
is a faster way to fetch rows from BigQuery. See the
``bqstorage_client`` parameter for more information.
This argument does nothing if ``bqstorage_client`` is supplied.
.. versionadded:: 1.24.0
max_results (Optional[int]):
Maximum number of rows to include in the result. No limit by default.
.. versionadded:: 2.21.0
Returns:
pyarrow.Table
A :class:`pyarrow.Table` populated with row data and column
headers from the query results. The column headers are derived
from the destination table's schema.
Raises:
ValueError:
If the :mod:`pyarrow` library cannot be imported.
.. versionadded:: 1.17.0
"""
query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
return query_result.to_arrow(
progress_bar_type=progress_bar_type,
bqstorage_client=bqstorage_client,
create_bqstorage_client=create_bqstorage_client,
)
# If changing the signature of this method, make sure to apply the same
# changes to table.RowIterator.to_dataframe(), except for the max_results parameter
# that should only exist here in the QueryJob method.
def to_dataframe(
self,
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
dtypes: Optional[Dict[str, Any]] = None,
progress_bar_type: Optional[str] = None,
create_bqstorage_client: bool = True,
max_results: Optional[int] = None,
geography_as_object: bool = False,
bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
float_dtype: Union[Any, None] = None,
string_dtype: Union[Any, None] = None,
date_dtype: Union[Any, None] = DefaultPandasDTypes.DATE_DTYPE,
datetime_dtype: Union[Any, None] = None,
time_dtype: Union[Any, None] = DefaultPandasDTypes.TIME_DTYPE,
timestamp_dtype: Union[Any, None] = None,
range_date_dtype: Union[Any, None] = DefaultPandasDTypes.RANGE_DATE_DTYPE,
range_datetime_dtype: Union[
Any, None
] = DefaultPandasDTypes.RANGE_DATETIME_DTYPE,
range_timestamp_dtype: Union[
Any, None
] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE,
) -> "pandas.DataFrame":
"""Return a pandas DataFrame from a QueryJob
Args:
bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
A BigQuery Storage API client. If supplied, use the faster
BigQuery Storage API to fetch rows from BigQuery. This
API is a billable API.
This method requires the ``fastavro`` and
``google-cloud-bigquery-storage`` libraries.
Reading from a specific partition or snapshot is not
currently supported by this method.
dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
A dictionary of column names pandas ``dtype``s. The provided
``dtype`` is used when constructing the series for the column
specified. Otherwise, the default pandas behavior is used.
progress_bar_type (Optional[str]):
If set, use the `tqdm <https://tqdm.github.io/>`_ library to
display a progress bar while the data downloads. Install the
``tqdm`` package to use this feature.
See
:func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
for details.
.. versionadded:: 1.11.0
create_bqstorage_client (Optional[bool]):
If ``True`` (default), create a BigQuery Storage API client
using the default API settings. The BigQuery Storage API
is a faster way to fetch rows from BigQuery. See the
``bqstorage_client`` parameter for more information.
This argument does nothing if ``bqstorage_client`` is supplied.
.. versionadded:: 1.24.0
max_results (Optional[int]):
Maximum number of rows to include in the result. No limit by default.
.. versionadded:: 2.21.0
geography_as_object (Optional[bool]):
If ``True``, convert GEOGRAPHY data to :mod:`shapely`
geometry objects. If ``False`` (default), don't cast
geography data to :mod:`shapely` geometry objects.
.. versionadded:: 2.24.0
bool_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
to convert BigQuery Boolean type, instead of relying on the default
``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
type can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
.. versionadded:: 3.8.0
int_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
to convert BigQuery Integer types, instead of relying on the default
``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
Integer types can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
.. versionadded:: 3.8.0
float_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
to convert BigQuery Float type, instead of relying on the default
``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
then the data type will be ``numpy.dtype("float64")``. BigQuery Float
type can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
.. versionadded:: 3.8.0
string_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
convert BigQuery String type, instead of relying on the default
``numpy.dtype("object")``. If you explicitly set the value to ``None``,
then the data type will be ``numpy.dtype("object")``. BigQuery String
type can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
.. versionadded:: 3.8.0
date_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g.
``pandas.ArrowDtype(pyarrow.date32())``) to convert BigQuery Date
type, instead of relying on the default ``db_dtypes.DateDtype()``.
If you explicitly set the value to ``None``, then the data type will be
``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
Date type can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#date_type
.. versionadded:: 3.10.0
datetime_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g.
``pandas.ArrowDtype(pyarrow.timestamp("us"))``) to convert BigQuery Datetime
type, instead of relying on the default ``numpy.dtype("datetime64[ns]``.
If you explicitly set the value to ``None``, then the data type will be
``numpy.dtype("datetime64[ns]")`` or ``object`` if out of bound. BigQuery
Datetime type can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#datetime_type
.. versionadded:: 3.10.0
time_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g.
``pandas.ArrowDtype(pyarrow.time64("us"))``) to convert BigQuery Time
type, instead of relying on the default ``db_dtypes.TimeDtype()``.
If you explicitly set the value to ``None``, then the data type will be
``numpy.dtype("object")``. BigQuery Time type can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#time_type
.. versionadded:: 3.10.0
timestamp_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g.
``pandas.ArrowDtype(pyarrow.timestamp("us", tz="UTC"))``) to convert BigQuery Timestamp
type, instead of relying on the default ``numpy.dtype("datetime64[ns, UTC]")``.
If you explicitly set the value to ``None``, then the data type will be
``numpy.dtype("datetime64[ns, UTC]")`` or ``object`` if out of bound. BigQuery
Datetime type can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#timestamp_type
.. versionadded:: 3.10.0
range_date_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype, such as:
.. code-block:: python
pandas.ArrowDtype(pyarrow.struct(
[("start", pyarrow.date32()), ("end", pyarrow.date32())]
))
to convert BigQuery RANGE<DATE> type, instead of relying on
the default ``object``. If you explicitly set the value to
``None``, the data type will be ``object``. BigQuery Range type
can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
.. versionadded:: 3.21.0
range_datetime_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype, such as:
.. code-block:: python
pandas.ArrowDtype(pyarrow.struct(
[
("start", pyarrow.timestamp("us")),
("end", pyarrow.timestamp("us")),
]
))
to convert BigQuery RANGE<DATETIME> type, instead of relying on
the default ``object``. If you explicitly set the value to
``None``, the data type will be ``object``. BigQuery Range type
can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
.. versionadded:: 3.21.0
range_timestamp_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype, such as:
.. code-block:: python
pandas.ArrowDtype(pyarrow.struct(
[
("start", pyarrow.timestamp("us", tz="UTC")),
("end", pyarrow.timestamp("us", tz="UTC")),
]
))
to convert BigQuery RANGE<TIMESTAMP> type, instead of relying
on the default ``object``. If you explicitly set the value to
``None``, the data type will be ``object``. BigQuery Range type
can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#range_type
.. versionadded:: 3.21.0
Returns:
pandas.DataFrame:
A :class:`~pandas.DataFrame` populated with row data
and column headers from the query results. The column
headers are derived from the destination table's
schema.
Raises:
ValueError:
If the :mod:`pandas` library cannot be imported, or
the :mod:`google.cloud.bigquery_storage_v1` module is
required but cannot be imported. Also if
`geography_as_object` is `True`, but the
:mod:`shapely` library cannot be imported.
"""
query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
return query_result.to_dataframe(
bqstorage_client=bqstorage_client,
dtypes=dtypes,
progress_bar_type=progress_bar_type,
create_bqstorage_client=create_bqstorage_client,
geography_as_object=geography_as_object,
bool_dtype=bool_dtype,
int_dtype=int_dtype,
float_dtype=float_dtype,
string_dtype=string_dtype,
date_dtype=date_dtype,
datetime_dtype=datetime_dtype,
time_dtype=time_dtype,
timestamp_dtype=timestamp_dtype,
range_date_dtype=range_date_dtype,
range_datetime_dtype=range_datetime_dtype,
range_timestamp_dtype=range_timestamp_dtype,
)
# If changing the signature of this method, make sure to apply the same
# changes to table.RowIterator.to_dataframe(), except for the max_results parameter
# that should only exist here in the QueryJob method.
def to_geodataframe(
self,
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
dtypes: Optional[Dict[str, Any]] = None,
progress_bar_type: Optional[str] = None,
create_bqstorage_client: bool = True,
max_results: Optional[int] = None,
geography_column: Optional[str] = None,
bool_dtype: Union[Any, None] = DefaultPandasDTypes.BOOL_DTYPE,
int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE,
float_dtype: Union[Any, None] = None,
string_dtype: Union[Any, None] = None,
) -> "geopandas.GeoDataFrame":
"""Return a GeoPandas GeoDataFrame from a QueryJob
Args:
bqstorage_client (Optional[google.cloud.bigquery_storage_v1.BigQueryReadClient]):
A BigQuery Storage API client. If supplied, use the faster
BigQuery Storage API to fetch rows from BigQuery. This
API is a billable API.
This method requires the ``fastavro`` and
``google-cloud-bigquery-storage`` libraries.
Reading from a specific partition or snapshot is not
currently supported by this method.
dtypes (Optional[Map[str, Union[str, pandas.Series.dtype]]]):
A dictionary of column names pandas ``dtype``s. The provided
``dtype`` is used when constructing the series for the column
specified. Otherwise, the default pandas behavior is used.
progress_bar_type (Optional[str]):
If set, use the `tqdm <https://tqdm.github.io/>`_ library to
display a progress bar while the data downloads. Install the
``tqdm`` package to use this feature.
See
:func:`~google.cloud.bigquery.table.RowIterator.to_dataframe`
for details.
.. versionadded:: 1.11.0
create_bqstorage_client (Optional[bool]):
If ``True`` (default), create a BigQuery Storage API client
using the default API settings. The BigQuery Storage API
is a faster way to fetch rows from BigQuery. See the
``bqstorage_client`` parameter for more information.
This argument does nothing if ``bqstorage_client`` is supplied.
.. versionadded:: 1.24.0
max_results (Optional[int]):
Maximum number of rows to include in the result. No limit by default.
.. versionadded:: 2.21.0
geography_column (Optional[str]):
If there are more than one GEOGRAPHY column,
identifies which one to use to construct a GeoPandas
GeoDataFrame. This option can be ommitted if there's
only one GEOGRAPHY column.
bool_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g. ``pandas.BooleanDtype()``)
to convert BigQuery Boolean type, instead of relying on the default
``pandas.BooleanDtype()``. If you explicitly set the value to ``None``,
then the data type will be ``numpy.dtype("bool")``. BigQuery Boolean
type can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#boolean_type
int_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Int64Dtype()``)
to convert BigQuery Integer types, instead of relying on the default
``pandas.Int64Dtype()``. If you explicitly set the value to ``None``,
then the data type will be ``numpy.dtype("int64")``. A list of BigQuery
Integer types can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#integer_types
float_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g. ``pandas.Float32Dtype()``)
to convert BigQuery Float type, instead of relying on the default
``numpy.dtype("float64")``. If you explicitly set the value to ``None``,
then the data type will be ``numpy.dtype("float64")``. BigQuery Float
type can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#floating_point_types
string_dtype (Optional[pandas.Series.dtype, None]):
If set, indicate a pandas ExtensionDtype (e.g. ``pandas.StringDtype()``) to
convert BigQuery String type, instead of relying on the default
``numpy.dtype("object")``. If you explicitly set the value to ``None``,
then the data type will be ``numpy.dtype("object")``. BigQuery String
type can be found at:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type
Returns:
geopandas.GeoDataFrame:
A :class:`geopandas.GeoDataFrame` populated with row
data and column headers from the query results. The
column headers are derived from the destination
table's schema.
Raises:
ValueError:
If the :mod:`geopandas` library cannot be imported, or the
:mod:`google.cloud.bigquery_storage_v1` module is
required but cannot be imported.
.. versionadded:: 2.24.0
"""
query_result = wait_for_query(self, progress_bar_type, max_results=max_results)
return query_result.to_geodataframe(
bqstorage_client=bqstorage_client,
dtypes=dtypes,
progress_bar_type=progress_bar_type,
create_bqstorage_client=create_bqstorage_client,
geography_column=geography_column,
bool_dtype=bool_dtype,
int_dtype=int_dtype,
float_dtype=float_dtype,
string_dtype=string_dtype,
)
def __iter__(self):
return iter(self.result())
class QueryPlanEntryStep(object):
"""Map a single step in a query plan entry.
Args:
kind (str): step type.
substeps (List): names of substeps.
"""
def __init__(self, kind, substeps):
self.kind = kind
self.substeps = list(substeps)
@classmethod
def from_api_repr(cls, resource: dict) -> "QueryPlanEntryStep":
"""Factory: construct instance from the JSON repr.
Args:
resource (Dict): JSON representation of the entry.
Returns:
google.cloud.bigquery.job.QueryPlanEntryStep:
New instance built from the resource.
"""
return cls(kind=resource.get("kind"), substeps=resource.get("substeps", ()))
def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
return self.kind == other.kind and self.substeps == other.substeps
class QueryPlanEntry(object):
"""QueryPlanEntry represents a single stage of a query execution plan.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#ExplainQueryStage
for the underlying API representation within query statistics.
"""
def __init__(self):
self._properties = {}
@classmethod
def from_api_repr(cls, resource: dict) -> "QueryPlanEntry":
"""Factory: construct instance from the JSON repr.
Args:
resource(Dict[str: object]):
ExplainQueryStage representation returned from API.
Returns:
google.cloud.bigquery.job.QueryPlanEntry:
Query plan entry parsed from ``resource``.
"""
entry = cls()
entry._properties = resource
return entry
@property
def name(self):
"""Optional[str]: Human-readable name of the stage."""
return self._properties.get("name")
@property
def entry_id(self):
"""Optional[str]: Unique ID for the stage within the plan."""
return self._properties.get("id")
@property
def start(self):
"""Optional[Datetime]: Datetime when the stage started."""
if self._properties.get("startMs") is None:
return None
return _helpers._datetime_from_microseconds(
int(self._properties.get("startMs")) * 1000.0
)
@property
def end(self):
"""Optional[Datetime]: Datetime when the stage ended."""
if self._properties.get("endMs") is None:
return None
return _helpers._datetime_from_microseconds(
int(self._properties.get("endMs")) * 1000.0
)
@property
def input_stages(self):
"""List(int): Entry IDs for stages that were inputs for this stage."""
if self._properties.get("inputStages") is None:
return []
return [
_helpers._int_or_none(entry)
for entry in self._properties.get("inputStages")
]
@property
def parallel_inputs(self):
"""Optional[int]: Number of parallel input segments within
the stage.
"""
return _helpers._int_or_none(self._properties.get("parallelInputs"))
@property
def completed_parallel_inputs(self):
"""Optional[int]: Number of parallel input segments completed."""
return _helpers._int_or_none(self._properties.get("completedParallelInputs"))
@property
def wait_ms_avg(self):
"""Optional[int]: Milliseconds the average worker spent waiting to
be scheduled.
"""
return _helpers._int_or_none(self._properties.get("waitMsAvg"))
@property
def wait_ms_max(self):
"""Optional[int]: Milliseconds the slowest worker spent waiting to
be scheduled.
"""
return _helpers._int_or_none(self._properties.get("waitMsMax"))
@property
def wait_ratio_avg(self):
"""Optional[float]: Ratio of time the average worker spent waiting
to be scheduled, relative to the longest time spent by any worker in
any stage of the overall plan.
"""
return self._properties.get("waitRatioAvg")
@property
def wait_ratio_max(self):
"""Optional[float]: Ratio of time the slowest worker spent waiting
to be scheduled, relative to the longest time spent by any worker in
any stage of the overall plan.
"""
return self._properties.get("waitRatioMax")
@property
def read_ms_avg(self):
"""Optional[int]: Milliseconds the average worker spent reading
input.
"""
return _helpers._int_or_none(self._properties.get("readMsAvg"))
@property
def read_ms_max(self):
"""Optional[int]: Milliseconds the slowest worker spent reading
input.
"""
return _helpers._int_or_none(self._properties.get("readMsMax"))
@property
def read_ratio_avg(self):
"""Optional[float]: Ratio of time the average worker spent reading
input, relative to the longest time spent by any worker in any stage
of the overall plan.
"""
return self._properties.get("readRatioAvg")
@property
def read_ratio_max(self):
"""Optional[float]: Ratio of time the slowest worker spent reading
to be scheduled, relative to the longest time spent by any worker in
any stage of the overall plan.
"""
return self._properties.get("readRatioMax")
@property
def compute_ms_avg(self):
"""Optional[int]: Milliseconds the average worker spent on CPU-bound
processing.
"""
return _helpers._int_or_none(self._properties.get("computeMsAvg"))
@property
def compute_ms_max(self):
"""Optional[int]: Milliseconds the slowest worker spent on CPU-bound
processing.
"""
return _helpers._int_or_none(self._properties.get("computeMsMax"))
@property
def compute_ratio_avg(self):
"""Optional[float]: Ratio of time the average worker spent on
CPU-bound processing, relative to the longest time spent by any
worker in any stage of the overall plan.
"""
return self._properties.get("computeRatioAvg")
@property
def compute_ratio_max(self):
"""Optional[float]: Ratio of time the slowest worker spent on
CPU-bound processing, relative to the longest time spent by any
worker in any stage of the overall plan.
"""
return self._properties.get("computeRatioMax")
@property
def write_ms_avg(self):
"""Optional[int]: Milliseconds the average worker spent writing
output data.
"""
return _helpers._int_or_none(self._properties.get("writeMsAvg"))
@property
def write_ms_max(self):
"""Optional[int]: Milliseconds the slowest worker spent writing
output data.
"""
return _helpers._int_or_none(self._properties.get("writeMsMax"))
@property
def write_ratio_avg(self):
"""Optional[float]: Ratio of time the average worker spent writing
output data, relative to the longest time spent by any worker in any
stage of the overall plan.
"""
return self._properties.get("writeRatioAvg")
@property
def write_ratio_max(self):
"""Optional[float]: Ratio of time the slowest worker spent writing
output data, relative to the longest time spent by any worker in any
stage of the overall plan.
"""
return self._properties.get("writeRatioMax")
@property
def records_read(self):
"""Optional[int]: Number of records read by this stage."""
return _helpers._int_or_none(self._properties.get("recordsRead"))
@property
def records_written(self):
"""Optional[int]: Number of records written by this stage."""
return _helpers._int_or_none(self._properties.get("recordsWritten"))
@property
def status(self):
"""Optional[str]: status of this stage."""
return self._properties.get("status")
@property
def shuffle_output_bytes(self):
"""Optional[int]: Number of bytes written by this stage to
intermediate shuffle.
"""
return _helpers._int_or_none(self._properties.get("shuffleOutputBytes"))
@property
def shuffle_output_bytes_spilled(self):
"""Optional[int]: Number of bytes written by this stage to
intermediate shuffle and spilled to disk.
"""
return _helpers._int_or_none(self._properties.get("shuffleOutputBytesSpilled"))
@property
def steps(self):
"""List(QueryPlanEntryStep): List of step operations performed by
each worker in the stage.
"""
return [
QueryPlanEntryStep.from_api_repr(step)
for step in self._properties.get("steps", [])
]
@property
def slot_ms(self):
"""Optional[int]: Slot-milliseconds used by the stage."""
return _helpers._int_or_none(self._properties.get("slotMs"))
class TimelineEntry(object):
"""TimelineEntry represents progress of a query job at a particular
point in time.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#querytimelinesample
for the underlying API representation within query statistics.
"""
def __init__(self):
self._properties = {}
@classmethod
def from_api_repr(cls, resource):
"""Factory: construct instance from the JSON repr.
Args:
resource(Dict[str: object]):
QueryTimelineSample representation returned from API.
Returns:
google.cloud.bigquery.TimelineEntry:
Timeline sample parsed from ``resource``.
"""
entry = cls()
entry._properties = resource
return entry
@property
def elapsed_ms(self):
"""Optional[int]: Milliseconds elapsed since start of query
execution."""
return _helpers._int_or_none(self._properties.get("elapsedMs"))
@property
def active_units(self):
"""Optional[int]: Current number of input units being processed
by workers, reported as largest value since the last sample."""
return _helpers._int_or_none(self._properties.get("activeUnits"))
@property
def pending_units(self):
"""Optional[int]: Current number of input units remaining for
query stages active at this sample time."""
return _helpers._int_or_none(self._properties.get("pendingUnits"))
@property
def completed_units(self):
"""Optional[int]: Current number of input units completed by
this query."""
return _helpers._int_or_none(self._properties.get("completedUnits"))
@property
def slot_millis(self):
"""Optional[int]: Cumulative slot-milliseconds consumed by
this query."""
return _helpers._int_or_none(self._properties.get("totalSlotMs"))