As of January 1, 2020 this library no longer supports Python 2 on the latest released version.
Library versions released prior to that date will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.cloud.firestore_v1.base_collection
# Copyright 2017 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Classes for representing collections for the Google Cloud Firestore API."""
from __future__ import annotations
import random
from typing import (
TYPE_CHECKING,
Any,
AsyncGenerator,
AsyncIterator,
Coroutine,
Generator,
Generic,
Iterable,
NoReturn,
Optional,
Tuple,
Union,
)
from google.api_core import retry as retries
from google.cloud.firestore_v1 import _helpers
from google.cloud.firestore_v1.base_query import QueryType
if TYPE_CHECKING: # pragma: NO COVER
# Types needed only for Type Hints
from google.cloud.firestore_v1.base_aggregation import BaseAggregationQuery
from google.cloud.firestore_v1.base_document import DocumentSnapshot
from google.cloud.firestore_v1.base_vector_query import (
BaseVectorQuery,
DistanceMeasure,
)
from google.cloud.firestore_v1.document import DocumentReference
from google.cloud.firestore_v1.field_path import FieldPath
from google.cloud.firestore_v1.query_profile import ExplainOptions
from google.cloud.firestore_v1.query_results import QueryResultsList
from google.cloud.firestore_v1.stream_generator import StreamGenerator
from google.cloud.firestore_v1.transaction import Transaction
from google.cloud.firestore_v1.vector import Vector
from google.cloud.firestore_v1.vector_query import VectorQuery
_AUTO_ID_CHARS = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
[docs]class BaseCollectionReference(Generic[QueryType]):
"""A reference to a collection in a Firestore database.
The collection may already exist or this class can facilitate creation
of documents within the collection.
Args:
path (Tuple[str, ...]): The components in the collection path.
This is a series of strings representing each collection and
sub-collection ID, as well as the document IDs for any documents
that contain a sub-collection.
kwargs (dict): The keyword arguments for the constructor. The only
supported keyword is ``client`` and it must be a
:class:`~google.cloud.firestore_v1.client.Client` if provided. It
represents the client that created this collection reference.
Raises:
ValueError: if
* the ``path`` is empty
* there are an even number of elements
* a collection ID in ``path`` is not a string
* a document ID in ``path`` is not a string
TypeError: If a keyword other than ``client`` is used.
"""
def __init__(self, *path, **kwargs) -> None:
_helpers.verify_path(path, is_collection=True)
self._path = path
self._client = kwargs.pop("client", None)
if kwargs:
raise TypeError(
"Received unexpected arguments", kwargs, "Only `client` is supported"
)
def __eq__(self, other):
if not isinstance(other, self.__class__):
return NotImplemented
return self._path == other._path and self._client == other._client
@property
def id(self):
"""The collection identifier.
Returns:
str: The last component of the path.
"""
return self._path[-1]
@property
def parent(self):
"""Document that owns the current collection.
Returns:
Optional[:class:`~google.cloud.firestore_v1.document.DocumentReference`]:
The parent document, if the current collection is not a
top-level collection.
"""
if len(self._path) == 1:
return None
else:
parent_path = self._path[:-1]
return self._client.document(*parent_path)
def _query(self) -> QueryType:
raise NotImplementedError
def _aggregation_query(self) -> BaseAggregationQuery:
raise NotImplementedError
def _vector_query(self) -> BaseVectorQuery:
raise NotImplementedError
[docs] def document(self, document_id: Optional[str] = None) -> DocumentReference:
"""Create a sub-document underneath the current collection.
Args:
document_id (Optional[str]): The document identifier
within the current collection. If not provided, will default
to a random 20 character string composed of digits,
uppercase and lowercase and letters.
Returns:
:class:`~google.cloud.firestore_v1.document.DocumentReference`:
The child document.
"""
if document_id is None:
document_id = _auto_id()
# Append `self._path` and the passed document's ID as long as the first
# element in the path is not an empty string, which comes from setting the
# parent to "" for recursive queries.
child_path = self._path + (document_id,) if self._path[0] else (document_id,)
return self._client.document(*child_path)
def _parent_info(self) -> Tuple[Any, str]:
"""Get fully-qualified parent path and prefix for this collection.
Returns:
Tuple[str, str]: Pair of
* the fully-qualified (with database and project) path to the
parent of this collection (will either be the database path
or a document path).
* the prefix to a document in this collection.
"""
parent_doc = self.parent
if parent_doc is None:
parent_path = _helpers.DOCUMENT_PATH_DELIMITER.join(
(self._client._database_string, "documents")
)
else:
parent_path = parent_doc._document_path
expected_prefix = _helpers.DOCUMENT_PATH_DELIMITER.join((parent_path, self.id))
return parent_path, expected_prefix
def _prep_add(
self,
document_data: dict,
document_id: Optional[str] = None,
retry: Optional[retries.Retry] = None,
timeout: Optional[float] = None,
) -> Tuple[DocumentReference, dict]:
"""Shared setup for async / sync :method:`add`"""
if document_id is None:
document_id = _auto_id()
document_ref = self.document(document_id)
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
return document_ref, kwargs
def add(
self,
document_data: dict,
document_id: Optional[str] = None,
retry: Optional[retries.Retry] = None,
timeout: Optional[float] = None,
) -> Union[Tuple[Any, Any], Coroutine[Any, Any, Tuple[Any, Any]]]:
raise NotImplementedError
def _prep_list_documents(
self,
page_size: Optional[int] = None,
retry: Optional[retries.Retry] = None,
timeout: Optional[float] = None,
) -> Tuple[dict, dict]:
"""Shared setup for async / sync :method:`list_documents`"""
parent, _ = self._parent_info()
request = {
"parent": parent,
"collection_id": self.id,
"page_size": page_size,
"show_missing": True,
# list_documents returns an iterator of document references, which do not
# include any fields. To save on data transfer, we can set a field_path mask
# to include no fields
"mask": {"field_paths": None},
}
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
return request, kwargs
def list_documents(
self,
page_size: Optional[int] = None,
retry: Optional[retries.Retry] = None,
timeout: Optional[float] = None,
) -> Union[
Generator[DocumentReference, Any, Any], AsyncGenerator[DocumentReference, Any]
]:
raise NotImplementedError
def recursive(self) -> QueryType:
return self._query().recursive()
[docs] def select(self, field_paths: Iterable[str]) -> QueryType:
"""Create a "select" query with this collection as parent.
See
:meth:`~google.cloud.firestore_v1.query.Query.select` for
more information on this method.
Args:
field_paths (Iterable[str, ...]): An iterable of field paths
(``.``-delimited list of field names) to use as a projection
of document fields in the query results.
Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
A "projected" query.
"""
query = self._query()
return query.select(field_paths)
[docs] def where(
self,
field_path: Optional[str] = None,
op_string: Optional[str] = None,
value=None,
*,
filter=None,
) -> QueryType:
"""Create a "where" query with this collection as parent.
See
:meth:`~google.cloud.firestore_v1.query.Query.where` for
more information on this method.
Args:
field_path (str): A field path (``.``-delimited list of
field names) for the field to filter on. Optional.
op_string (str): A comparison operation in the form of a string.
Acceptable values are ``<``, ``<=``, ``==``, ``>=``, ``>``,
and ``in``. Optional.
value (Any): The value to compare the field against in the filter.
If ``value`` is :data:`None` or a NaN, then ``==`` is the only
allowed operation. If ``op_string`` is ``in``, ``value``
must be a sequence of values. Optional.
filter (class:`~google.cloud.firestore_v1.base_query.BaseFilter`): an instance of a Filter.
Either a FieldFilter or a CompositeFilter.
Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
A filtered query.
Raises:
ValueError, if both the positional arguments (field_path, op_string, value)
and the filter keyword argument are passed at the same time.
"""
query = self._query()
if field_path and op_string:
if filter is not None:
raise ValueError(
"Can't pass in both the positional arguments and 'filter' at the same time"
)
if field_path == "__name__" and op_string == "in":
wrapped_names = []
for name in value:
if isinstance(name, str):
name = self.document(name)
wrapped_names.append(name)
value = wrapped_names
return query.where(field_path, op_string, value)
else:
return query.where(filter=filter)
[docs] def order_by(self, field_path: str, **kwargs) -> QueryType:
"""Create an "order by" query with this collection as parent.
See
:meth:`~google.cloud.firestore_v1.query.Query.order_by` for
more information on this method.
Args:
field_path (str): A field path (``.``-delimited list of
field names) on which to order the query results.
kwargs (Dict[str, Any]): The keyword arguments to pass along
to the query. The only supported keyword is ``direction``,
see :meth:`~google.cloud.firestore_v1.query.Query.order_by`
for more information.
Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
An "order by" query.
"""
query = self._query()
return query.order_by(field_path, **kwargs)
[docs] def limit(self, count: int) -> QueryType:
"""Create a limited query with this collection as parent.
.. note::
`limit` and `limit_to_last` are mutually exclusive.
Setting `limit` will drop previously set `limit_to_last`.
See
:meth:`~google.cloud.firestore_v1.query.Query.limit` for
more information on this method.
Args:
count (int): Maximum number of documents to return that match
the query.
Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
A limited query.
"""
query = self._query()
return query.limit(count)
[docs] def limit_to_last(self, count: int):
"""Create a limited to last query with this collection as parent.
.. note::
`limit` and `limit_to_last` are mutually exclusive.
Setting `limit_to_last` will drop previously set `limit`.
See
:meth:`~google.cloud.firestore_v1.query.Query.limit_to_last`
for more information on this method.
Args:
count (int): Maximum number of documents to return that
match the query.
Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
A limited to last query.
"""
query = self._query()
return query.limit_to_last(count)
[docs] def offset(self, num_to_skip: int) -> QueryType:
"""Skip to an offset in a query with this collection as parent.
See
:meth:`~google.cloud.firestore_v1.query.Query.offset` for
more information on this method.
Args:
num_to_skip (int): The number of results to skip at the beginning
of query results. (Must be non-negative.)
Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
An offset query.
"""
query = self._query()
return query.offset(num_to_skip)
[docs] def start_at(
self, document_fields: Union[DocumentSnapshot, dict, list, tuple]
) -> QueryType:
"""Start query at a cursor with this collection as parent.
See
:meth:`~google.cloud.firestore_v1.query.Query.start_at` for
more information on this method.
Args:
document_fields (Union[:class:`~google.cloud.firestore_v1.\
document.DocumentSnapshot`, dict, list, tuple]):
A document snapshot or a dictionary/list/tuple of fields
representing a query results cursor. A cursor is a collection
of values that represent a position in a query result set.
Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
A query with cursor.
"""
query = self._query()
return query.start_at(document_fields)
[docs] def start_after(
self, document_fields: Union[DocumentSnapshot, dict, list, tuple]
) -> QueryType:
"""Start query after a cursor with this collection as parent.
See
:meth:`~google.cloud.firestore_v1.query.Query.start_after` for
more information on this method.
Args:
document_fields (Union[:class:`~google.cloud.firestore_v1.\
document.DocumentSnapshot`, dict, list, tuple]):
A document snapshot or a dictionary/list/tuple of fields
representing a query results cursor. A cursor is a collection
of values that represent a position in a query result set.
Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
A query with cursor.
"""
query = self._query()
return query.start_after(document_fields)
[docs] def end_before(
self, document_fields: Union[DocumentSnapshot, dict, list, tuple]
) -> QueryType:
"""End query before a cursor with this collection as parent.
See
:meth:`~google.cloud.firestore_v1.query.Query.end_before` for
more information on this method.
Args:
document_fields (Union[:class:`~google.cloud.firestore_v1.\
document.DocumentSnapshot`, dict, list, tuple]):
A document snapshot or a dictionary/list/tuple of fields
representing a query results cursor. A cursor is a collection
of values that represent a position in a query result set.
Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
A query with cursor.
"""
query = self._query()
return query.end_before(document_fields)
[docs] def end_at(
self, document_fields: Union[DocumentSnapshot, dict, list, tuple]
) -> QueryType:
"""End query at a cursor with this collection as parent.
See
:meth:`~google.cloud.firestore_v1.query.Query.end_at` for
more information on this method.
Args:
document_fields (Union[:class:`~google.cloud.firestore_v1.\
document.DocumentSnapshot`, dict, list, tuple]):
A document snapshot or a dictionary/list/tuple of fields
representing a query results cursor. A cursor is a collection
of values that represent a position in a query result set.
Returns:
:class:`~google.cloud.firestore_v1.query.Query`:
A query with cursor.
"""
query = self._query()
return query.end_at(document_fields)
def _prep_get_or_stream(
self,
retry: Optional[retries.Retry] = None,
timeout: Optional[float] = None,
) -> Tuple[Any, dict]:
"""Shared setup for async / sync :meth:`get` / :meth:`stream`"""
query = self._query()
kwargs = _helpers.make_retry_timeout_kwargs(retry, timeout)
return query, kwargs
def get(
self,
transaction: Optional[Transaction] = None,
retry: Optional[retries.Retry] = None,
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
) -> (
QueryResultsList[DocumentSnapshot]
| Coroutine[Any, Any, QueryResultsList[DocumentSnapshot]]
):
raise NotImplementedError
def stream(
self,
transaction: Optional[Transaction] = None,
retry: Optional[retries.Retry] = None,
timeout: Optional[float] = None,
*,
explain_options: Optional[ExplainOptions] = None,
) -> StreamGenerator[DocumentSnapshot] | AsyncIterator[DocumentSnapshot]:
raise NotImplementedError
def on_snapshot(self, callback) -> NoReturn:
raise NotImplementedError
[docs] def count(self, alias=None):
"""
Adds a count over the nested query.
:type alias: str
:param alias: (Optional) The alias for the count
"""
return self._aggregation_query().count(alias=alias)
[docs] def sum(self, field_ref: str | FieldPath, alias=None):
"""
Adds a sum over the nested query.
:type field_ref: Union[str, google.cloud.firestore_v1.field_path.FieldPath]
:param field_ref: The field to aggregate across.
:type alias: Optional[str]
:param alias: Optional name of the field to store the result of the aggregation into.
If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
"""
return self._aggregation_query().sum(field_ref, alias=alias)
[docs] def avg(self, field_ref: str | FieldPath, alias=None):
"""
Adds an avg over the nested query.
:type field_ref: Union[str, google.cloud.firestore_v1.field_path.FieldPath]
:param field_ref: The field to aggregate across.
:type alias: Optional[str]
:param alias: Optional name of the field to store the result of the aggregation into.
If not provided, Firestore will pick a default name following the format field_<incremental_id++>.
"""
return self._aggregation_query().avg(field_ref, alias=alias)
[docs] def find_nearest(
self,
vector_field: str,
query_vector: Vector,
limit: int,
distance_measure: DistanceMeasure,
*,
distance_result_field: Optional[str] = None,
distance_threshold: Optional[float] = None,
) -> VectorQuery:
"""
Finds the closest vector embeddings to the given query vector.
Args:
vector_field (str): An indexed vector field to search upon. Only documents which contain
vectors whose dimensionality match the query_vector can be returned.
query_vector (Vector): The query vector that we are searching on. Must be a vector of no more
than 2048 dimensions.
limit (int): The number of nearest neighbors to return. Must be a positive integer of no more than 1000.
distance_measure (:class:`DistanceMeasure`): The Distance Measure to use.
distance_result_field (Optional[str]):
Name of the field to output the result of the vector distance calculation
distance_threshold (Optional[float]):
A threshold for which no less similar documents will be returned.
Returns:
:class`~firestore_v1.vector_query.VectorQuery`: the vector query.
"""
return self._vector_query().find_nearest(
vector_field,
query_vector,
limit,
distance_measure,
distance_result_field=distance_result_field,
distance_threshold=distance_threshold,
)
def _auto_id() -> str:
"""Generate a "random" automatically generated ID.
Returns:
str: A 20 character string composed of digits, uppercase and
lowercase and letters.
"""
return "".join(random.choice(_AUTO_ID_CHARS) for _ in range(20))
def _item_to_document_ref(collection_reference, item) -> DocumentReference:
"""Convert Document resource to document ref.
Args:
collection_reference (google.api_core.page_iterator.GRPCIterator):
iterator response
item (dict): document resource
"""
document_id = item.name.split(_helpers.DOCUMENT_PATH_DELIMITER)[-1]
return collection_reference.document(document_id)