As of January 1, 2020 this library no longer supports Python 2 on the latest released version.
Library versions released prior to that date will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.cloud.firestore_v1.aggregation
# Copyright 2023 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classes for representing aggregation queries for the Google Cloud Firestore API.
A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from
a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
a more common way to create an aggregation query than direct usage of the constructor.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Generator, List, Optional, Union
from google.api_core import exceptions, gapic_v1
from google.api_core import retry as retries
from google.cloud.firestore_v1.base_aggregation import (
AggregationResult,
BaseAggregationQuery,
_query_response_to_result,
)
from google.cloud.firestore_v1.query_results import QueryResultsList
from google.cloud.firestore_v1.stream_generator import StreamGenerator
# Types needed only for Type Hints
if TYPE_CHECKING: # pragma: NO COVER
from google.cloud.firestore_v1 import transaction
from google.cloud.firestore_v1.query_profile import ExplainMetrics
from google.cloud.firestore_v1.query_profile import ExplainOptions
[docs]class AggregationQuery(BaseAggregationQuery):
"""Represents an aggregation query to the Firestore API."""
def __init__(
self,
nested_query,
) -> None:
super(AggregationQuery, self).__init__(nested_query)
[docs] def get(
self,
transaction=None,
retry: Union[
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: float | None = None,
*,
explain_options: Optional[ExplainOptions] = None,
) -> QueryResultsList[AggregationResult]:
"""Runs the aggregation query.
This sends a ``RunAggregationQuery`` RPC and returns a list of
aggregation results in the stream of ``RunAggregationQueryResponse``
messages.
Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
If a ``transaction`` is used and it already has write operations
added, this method cannot be used (i.e. read-after-write is not
allowed).
retry (google.api_core.retry.Retry): Designation of what errors, if any,
should be retried. Defaults to a system-specified policy.
timeout (float): The timeout for this request. Defaults to a
system-specified value.
explain_options
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
Returns:
QueryResultsList[AggregationResult]: The aggregation query results.
"""
explain_metrics: ExplainMetrics | None = None
result = self.stream(
transaction=transaction,
retry=retry,
timeout=timeout,
explain_options=explain_options,
)
result_list = list(result)
if explain_options is None:
explain_metrics = None
else:
explain_metrics = result.get_explain_metrics()
return QueryResultsList(result_list, explain_options, explain_metrics)
def _get_stream_iterator(self, transaction, retry, timeout, explain_options=None):
"""Helper method for :meth:`stream`."""
request, kwargs = self._prep_stream(
transaction,
retry,
timeout,
explain_options,
)
return self._client._firestore_api.run_aggregation_query(
request=request,
metadata=self._client._rpc_metadata,
**kwargs,
)
def _retry_query_after_exception(self, exc, retry, transaction):
"""Helper method for :meth:`stream`."""
if transaction is None: # no snapshot-based retry inside transaction
if retry is gapic_v1.method.DEFAULT:
transport = self._client._firestore_api._transport
gapic_callable = transport.run_aggregation_query
retry = gapic_callable._retry
return retry._predicate(exc)
return False
def _make_stream(
self,
transaction: Optional[transaction.Transaction] = None,
retry: Union[
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: Optional[float] = None,
explain_options: Optional[ExplainOptions] = None,
) -> Generator[List[AggregationResult], Any, Optional[ExplainMetrics]]:
"""Internal method for stream(). Runs the aggregation query.
This sends a ``RunAggregationQuery`` RPC and then returns a generator
which consumes each document returned in the stream of
``RunAggregationQueryResponse`` messages.
If a ``transaction`` is used and it already has write operations added,
this method cannot be used (i.e. read-after-write is not allowed).
Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
retry (Optional[google.api_core.retry.Retry]): Designation of what
errors, if any, should be retried. Defaults to a
system-specified policy.
timeout (Optional[float]): The timeout for this request. Defaults
to a system-specified value.
explain_options
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
Yields:
List[AggregationResult]:
The result of aggregations of this query.
Returns:
(Optional[google.cloud.firestore_v1.types.query_profile.ExplainMetrtics]):
The results of query profiling, if received from the service.
"""
metrics: ExplainMetrics | None = None
response_iterator = self._get_stream_iterator(
transaction,
retry,
timeout,
explain_options,
)
while True:
try:
response = next(response_iterator, None)
except exceptions.GoogleAPICallError as exc:
if self._retry_query_after_exception(exc, retry, transaction):
response_iterator = self._get_stream_iterator(
transaction,
retry,
timeout,
)
continue
else:
raise
if response is None: # EOI
break
if metrics is None and response.explain_metrics:
metrics = response.explain_metrics
result = _query_response_to_result(response)
if result:
yield result
return metrics
[docs] def stream(
self,
transaction: Optional["transaction.Transaction"] = None,
retry: Union[
retries.Retry, None, gapic_v1.method._MethodDefault
] = gapic_v1.method.DEFAULT,
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
) -> StreamGenerator[List[AggregationResult]]:
"""Runs the aggregation query.
This sends a ``RunAggregationQuery`` RPC and then returns a generator
which consumes each document returned in the stream of
``RunAggregationQueryResponse`` messages.
If a ``transaction`` is used and it already has write operations added,
this method cannot be used (i.e. read-after-write is not allowed).
Args:
transaction
(Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]):
An existing transaction that this query will run in.
retry (Optional[google.api_core.retry.Retry]): Designation of what
errors, if any, should be retried. Defaults to a
system-specified policy.
timeout (Optinal[float]): The timeout for this request. Defaults
to a system-specified value.
explain_options
(Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]):
Options to enable query profiling for this query. When set,
explain_metrics will be available on the returned generator.
Returns:
`StreamGenerator[List[AggregationResult]]`:
A generator of the query results.
"""
inner_generator = self._make_stream(
transaction=transaction,
retry=retry,
timeout=timeout,
explain_options=explain_options,
)
return StreamGenerator(inner_generator, explain_options)