As of January 1, 2020 this library no longer supports Python 2 on the latest released version. Library versions released prior to that date will continue to be available. For more information please visit Python 2 support on Google Cloud.

Source code for google.cloud.firestore_v1.base_aggregation

# Copyright 2023 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Classes for representing aggregation queries for the Google Cloud Firestore API.

A :class:`~google.cloud.firestore_v1.aggregation.AggregationQuery` can be created directly from
a :class:`~google.cloud.firestore_v1.collection.Collection` and that can be
a more common way to create an aggregation query than direct usage of the constructor.
"""


from __future__ import annotations

import abc
from abc import ABC
from typing import TYPE_CHECKING, Any, Coroutine, List, Optional, Tuple, Union

from google.api_core import gapic_v1
from google.api_core import retry as retries

from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.field_path import FieldPath
from google.cloud.firestore_v1.types import (
    RunAggregationQueryResponse,
    StructuredAggregationQuery,
)

# Types needed only for Type Hints
if TYPE_CHECKING:  # pragma: NO COVER
    from google.cloud.firestore_v1 import transaction
    from google.cloud.firestore_v1.async_stream_generator import AsyncStreamGenerator
    from google.cloud.firestore_v1.query_profile import ExplainOptions
    from google.cloud.firestore_v1.query_results import QueryResultsList
    from google.cloud.firestore_v1.stream_generator import (
        StreamGenerator,
    )


[docs]class AggregationResult(object): """ A class representing result from Aggregation Query :type alias: str :param alias: The alias for the aggregation. :type value: int :param value: The resulting value from the aggregation. :type read_time: :param value: The resulting read_time """ def __init__(self, alias: str, value: float, read_time=None): self.alias = alias self.value = value self.read_time = read_time def __repr__(self): return f"<Aggregation alias={self.alias}, value={self.value}, readtime={self.read_time}>"
[docs]class BaseAggregation(ABC): def __init__(self, alias: str | None = None): self.alias = alias @abc.abstractmethod def _to_protobuf(self): """Convert this instance to the protobuf representation"""
[docs]class CountAggregation(BaseAggregation): def __init__(self, alias: str | None = None): super(CountAggregation, self).__init__(alias=alias) def _to_protobuf(self): """Convert this instance to the protobuf representation""" aggregation_pb = StructuredAggregationQuery.Aggregation() aggregation_pb.alias = self.alias aggregation_pb.count = StructuredAggregationQuery.Aggregation.Count() return aggregation_pb
[docs]class SumAggregation(BaseAggregation): def __init__(self, field_ref: str | FieldPath, alias: str | None = None): if isinstance(field_ref, FieldPath): # convert field path to string field_ref = field_ref.to_api_repr() self.field_ref = field_ref super(SumAggregation, self).__init__(alias=alias) def _to_protobuf(self): """Convert this instance to the protobuf representation""" aggregation_pb = StructuredAggregationQuery.Aggregation() aggregation_pb.alias = self.alias aggregation_pb.sum = StructuredAggregationQuery.Aggregation.Sum() aggregation_pb.sum.field.field_path = self.field_ref return aggregation_pb
[docs]class AvgAggregation(BaseAggregation): def __init__(self, field_ref: str | FieldPath, alias: str | None = None): if isinstance(field_ref, FieldPath): # convert field path to string field_ref = field_ref.to_api_repr() self.field_ref = field_ref super(AvgAggregation, self).__init__(alias=alias) def _to_protobuf(self): """Convert this instance to the protobuf representation""" aggregation_pb = StructuredAggregationQuery.Aggregation() aggregation_pb.alias = self.alias aggregation_pb.avg = StructuredAggregationQuery.Aggregation.Avg() aggregation_pb.avg.field.field_path = self.field_ref return aggregation_pb
def _query_response_to_result( response_pb: RunAggregationQueryResponse, ) -> List[AggregationResult]: results = [ AggregationResult( alias=key, value=response_pb.result.aggregate_fields[key].integer_value or response_pb.result.aggregate_fields[key].double_value, read_time=response_pb.read_time, ) for key in response_pb.result.aggregate_fields.pb.keys() ] return results
[docs]class BaseAggregationQuery(ABC): """Represents an aggregation query to the Firestore API.""" def __init__(self, nested_query, alias: str | None = None) -> None: self._nested_query = nested_query self._alias = alias self._collection_ref = nested_query._parent self._aggregations: List[BaseAggregation] = [] @property def _client(self): return self._collection_ref._client
[docs] def count(self, alias: str | None = None): """ Adds a count over the nested query """ count_aggregation = CountAggregation(alias=alias) self._aggregations.append(count_aggregation) return self
[docs] def sum(self, field_ref: str | FieldPath, alias: str | None = None): """ Adds a sum over the nested query """ sum_aggregation = SumAggregation(field_ref, alias=alias) self._aggregations.append(sum_aggregation) return self
[docs] def avg(self, field_ref: str | FieldPath, alias: str | None = None): """ Adds an avg over the nested query """ avg_aggregation = AvgAggregation(field_ref, alias=alias) self._aggregations.append(avg_aggregation) return self
[docs] def add_aggregation(self, aggregation: BaseAggregation) -> None: """ Adds an aggregation operation to the nested query :type aggregation: :class:`google.cloud.firestore_v1.aggregation.BaseAggregation` :param aggregation: An aggregation operation, e.g. a CountAggregation """ self._aggregations.append(aggregation)
[docs] def add_aggregations(self, aggregations: List[BaseAggregation]) -> None: """ Adds a list of aggregations to the nested query :type aggregations: list :param aggregations: a list of aggregation operations """ self._aggregations.extend(aggregations)
def _to_protobuf(self) -> StructuredAggregationQuery: pb = StructuredAggregationQuery() pb.structured_query = self._nested_query._to_protobuf() for aggregation in self._aggregations: aggregation_pb = aggregation._to_protobuf() pb.aggregations.append(aggregation_pb) return pb def _prep_stream( self, transaction=None, retry: Union[retries.Retry, None, gapic_v1.method._MethodDefault] = None, timeout: float | None = None, explain_options: Optional[ExplainOptions] = None, ) -> Tuple[dict, dict]: parent_path, expected_prefix = self._collection_ref._parent_info() request = { "parent": parent_path, "structured_aggregation_query": self._to_protobuf(), "transaction": _helpers.get_transaction_id(transaction), } if explain_options: request["explain_options"] = explain_options._to_dict() kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout) return request, kwargs
[docs] @abc.abstractmethod def get( self, transaction=None, retry: Union[ retries.Retry, None, gapic_v1.method._MethodDefault ] = gapic_v1.method.DEFAULT, timeout: float | None = None, *, explain_options: Optional[ExplainOptions] = None, ) -> ( QueryResultsList[AggregationResult] | Coroutine[Any, Any, List[List[AggregationResult]]] ): """Runs the aggregation query. This sends a ``RunAggregationQuery`` RPC and returns a list of aggregation results in the stream of ``RunAggregationQueryResponse`` messages. Args: transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): An existing transaction that this query will run in. If a ``transaction`` is used and it already has write operations added, this method cannot be used (i.e. read-after-write is not allowed). retry (google.api_core.retry.Retry): Designation of what errors, if any, should be retried. Defaults to a system-specified policy. timeout (float): The timeout for this request. Defaults to a system-specified value. explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. Returns: (QueryResultsList[List[AggregationResult]] | Coroutine[Any, Any, List[List[AggregationResult]]]): The aggregation query results. """
[docs] @abc.abstractmethod def stream( self, transaction: Optional[transaction.Transaction] = None, retry: Union[ retries.Retry, None, gapic_v1.method._MethodDefault ] = gapic_v1.method.DEFAULT, timeout: Optional[float] = None, *, explain_options: Optional[ExplainOptions] = None, ) -> ( StreamGenerator[List[AggregationResult]] | AsyncStreamGenerator[List[AggregationResult]] ): """Runs the aggregation query. This sends a``RunAggregationQuery`` RPC and returns a generator in the stream of ``RunAggregationQueryResponse`` messages. Args: transaction (Optional[:class:`~google.cloud.firestore_v1.transaction.Transaction`]): An existing transaction that this query will run in. retry (Optional[google.api_core.retry.Retry]): Designation of what errors, if any, should be retried. Defaults to a system-specified policy. timeout (Optinal[float]): The timeout for this request. Defaults to a system-specified value. explain_options (Optional[:class:`~google.cloud.firestore_v1.query_profile.ExplainOptions`]): Options to enable query profiling for this query. When set, explain_metrics will be available on the returned generator. Returns: StreamGenerator[List[AggregationResult]] | AsyncStreamGenerator[List[AggregationResult]]: A generator of the query results. """