As of January 1, 2020 this library no longer supports Python 2 on the latest released version. Library versions released prior to that date will continue to be available. For more information please visit Python 2 support on Google Cloud.

Source code for google.api_core.page_iterator

# Copyright 2015 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Iterators for paging through paged API methods.

These iterators simplify the process of paging through API responses
where the request takes a page token and the response is a list of results with
a token for the next page. See `list pagination`_ in the Google API Style Guide
for more details.

.. _list pagination:
    https://cloud.google.com/apis/design/design_patterns#list_pagination

API clients that have methods that follow the list pagination pattern can
return an :class:`.Iterator`. You can use this iterator to get **all** of
the results across all pages::

    >>> results_iterator = client.list_resources()
    >>> list(results_iterator)  # Convert to a list (consumes all values).

Or you can walk your way through items and call off the search early if
you find what you're looking for (resulting in possibly fewer requests)::

    >>> for resource in results_iterator:
    ...     print(resource.name)
    ...     if not resource.is_valid:
    ...         break

At any point, you may check the number of items consumed by referencing the
``num_results`` property of the iterator::

    >>> for my_item in results_iterator:
    ...     if results_iterator.num_results >= 10:
    ...         break

When iterating, not every new item will send a request to the server.
To iterate based on each page of items (where a page corresponds to
a request)::

    >>> for page in results_iterator.pages:
    ...     print('=' * 20)
    ...     print('    Page number: {:d}'.format(iterator.page_number))
    ...     print('  Items in page: {:d}'.format(page.num_items))
    ...     print('     First item: {!r}'.format(next(page)))
    ...     print('Items remaining: {:d}'.format(page.remaining))
    ...     print('Next page token: {}'.format(iterator.next_page_token))
    ====================
        Page number: 1
      Items in page: 1
         First item: <MyItemClass at 0x7f1d3cccf690>
    Items remaining: 0
    Next page token: eav1OzQB0OM8rLdGXOEsyQWSG
    ====================
        Page number: 2
      Items in page: 19
         First item: <MyItemClass at 0x7f1d3cccffd0>
    Items remaining: 18
    Next page token: None

Then, for each page you can get all the resources on that page by iterating
through it or using :func:`list`::

    >>> list(page)
    [
        <MyItemClass at 0x7fd64a098ad0>,
        <MyItemClass at 0x7fd64a098ed0>,
        <MyItemClass at 0x7fd64a098e90>,
    ]
"""

import abc

import six


class Page(object):
    """Single page of results in an iterator.

    Args:
        parent (google.api_core.page_iterator.Iterator): The iterator that owns
            the current page.
        items (Sequence[Any]): An iterable (that also defines __len__) of items
            from a raw API response.
        item_to_value (Callable[google.api_core.page_iterator.Iterator, Any]):
            Callable to convert an item from the type in the raw API response
            into the native object. Will be called with the iterator and a
            single item.
        raw_page Optional[google.protobuf.message.Message]:
            The raw page response.
    """

    def __init__(self, parent, items, item_to_value, raw_page=None):
        self._parent = parent
        self._num_items = len(items)
        self._remaining = self._num_items
        self._item_iter = iter(items)
        self._item_to_value = item_to_value
        self._raw_page = raw_page

    @property
    def raw_page(self):
        """google.protobuf.message.Message"""
        return self._raw_page

    @property
    def num_items(self):
        """int: Total items in the page."""
        return self._num_items

    @property
    def remaining(self):
        """int: Remaining items in the page."""
        return self._remaining

    def __iter__(self):
        """The :class:`Page` is an iterator of items."""
        return self

    def next(self):
        """Get the next value in the page."""
        item = six.next(self._item_iter)
        result = self._item_to_value(self._parent, item)
        # Since we've successfully got the next value from the
        # iterator, we update the number of remaining.
        self._remaining -= 1
        return result

    # Alias needed for Python 2/3 support.
    __next__ = next


def _item_to_value_identity(iterator, item):
    """An item to value transformer that returns the item un-changed."""
    # pylint: disable=unused-argument
    # We are conforming to the interface defined by Iterator.
    return item


@six.add_metaclass(abc.ABCMeta)
class Iterator(object):
    """A generic class for iterating through API list responses.

    Args:
        client(google.cloud.client.Client): The API client.
        item_to_value (Callable[google.api_core.page_iterator.Iterator, Any]):
            Callable to convert an item from the type in the raw API response
            into the native object. Will be called with the iterator and a
            single item.
        page_token (str): A token identifying a page in a result set to start
            fetching results from.
        max_results (int): The maximum number of results to fetch.
    """

    def __init__(
        self,
        client,
        item_to_value=_item_to_value_identity,
        page_token=None,
        max_results=None,
    ):
        self._started = False
        self.client = client
        """Optional[Any]: The client that created this iterator."""
        self.item_to_value = item_to_value
        """Callable[Iterator, Any]: Callable to convert an item from the type
            in the raw API response into the native object. Will be called with
            the iterator and a
            single item.
        """
        self.max_results = max_results
        """int: The maximum number of results to fetch."""

        # The attributes below will change over the life of the iterator.
        self.page_number = 0
        """int: The current page of results."""
        self.next_page_token = page_token
        """str: The token for the next page of results. If this is set before
            the iterator starts, it effectively offsets the iterator to a
            specific starting point."""
        self.num_results = 0
        """int: The total number of results fetched so far."""

    @property
    def pages(self):
        """Iterator of pages in the response.

        returns:
            types.GeneratorType[google.api_core.page_iterator.Page]: A
                generator of page instances.

        raises:
            ValueError: If the iterator has already been started.
        """
        if self._started:
            raise ValueError("Iterator has already started", self)
        self._started = True
        return self._page_iter(increment=True)

    def _items_iter(self):
        """Iterator for each item returned."""
        for page in self._page_iter(increment=False):
            for item in page:
                self.num_results += 1
                yield item

    def __iter__(self):
        """Iterator for each item returned.

        Returns:
            types.GeneratorType[Any]: A generator of items from the API.

        Raises:
            ValueError: If the iterator has already been started.
        """
        if self._started:
            raise ValueError("Iterator has already started", self)
        self._started = True
        return self._items_iter()

    def _page_iter(self, increment):
        """Generator of pages of API responses.

        Args:
            increment (bool): Flag indicating if the total number of results
                should be incremented on each page. This is useful since a page
                iterator will want to increment by results per page while an
                items iterator will want to increment per item.

        Yields:
            Page: each page of items from the API.
        """
        page = self._next_page()
        while page is not None:
            self.page_number += 1
            if increment:
                self.num_results += page.num_items
            yield page
            page = self._next_page()

    @abc.abstractmethod
    def _next_page(self):
        """Get the next page in the iterator.

        This does nothing and is intended to be over-ridden by subclasses
        to return the next :class:`Page`.

        Raises:
            NotImplementedError: Always, this method is abstract.
        """
        raise NotImplementedError


def _do_nothing_page_start(iterator, page, response):
    """Helper to provide custom behavior after a :class:`Page` is started.

    This is a do-nothing stand-in as the default value.

    Args:
        iterator (Iterator): An iterator that holds some request info.
        page (Page): The page that was just created.
        response (Any): The API response for a page.
    """
    # pylint: disable=unused-argument
    pass


class HTTPIterator(Iterator):
    """A generic class for iterating through HTTP/JSON API list responses.

    To make an iterator work, you'll need to provide a way to convert a JSON
    item returned from the API into the object of your choice (via
    ``item_to_value``). You also may need to specify a custom ``items_key`` so
    that a given response (containing a page of results) can be parsed into an
    iterable page of the actual objects you want.

    Args:
        client (google.cloud.client.Client): The API client.
        api_request (Callable): The function to use to make API requests.
            Generally, this will be
            :meth:`google.cloud._http.JSONConnection.api_request`.
        path (str): The method path to query for the list of items.
        item_to_value (Callable[google.api_core.page_iterator.Iterator, Any]):
            Callable to convert an item from the type in the JSON response into
            a native object. Will be called with the iterator and a single
            item.
        items_key (str): The key in the API response where the list of items
            can be found.
        page_token (str): A token identifying a page in a result set to start
            fetching results from.
        max_results (int): The maximum number of results to fetch.
        extra_params (dict): Extra query string parameters for the
            API call.
        page_start (Callable[
            google.api_core.page_iterator.Iterator,
            google.api_core.page_iterator.Page, dict]): Callable to provide
            any special behavior after a new page has been created. Assumed
            signature takes the :class:`.Iterator` that started the page,
            the :class:`.Page` that was started and the dictionary containing
            the page response.
        next_token (str): The name of the field used in the response for page
            tokens.

    .. autoattribute:: pages
    """

    _DEFAULT_ITEMS_KEY = "items"
    _PAGE_TOKEN = "pageToken"
    _MAX_RESULTS = "maxResults"
    _NEXT_TOKEN = "nextPageToken"
    _RESERVED_PARAMS = frozenset([_PAGE_TOKEN])
    _HTTP_METHOD = "GET"

    def __init__(
        self,
        client,
        api_request,
        path,
        item_to_value,
        items_key=_DEFAULT_ITEMS_KEY,
        page_token=None,
        max_results=None,
        extra_params=None,
        page_start=_do_nothing_page_start,
        next_token=_NEXT_TOKEN,
    ):
        super(HTTPIterator, self).__init__(
            client, item_to_value, page_token=page_token, max_results=max_results
        )
        self.api_request = api_request
        self.path = path
        self._items_key = items_key
        self.extra_params = extra_params
        self._page_start = page_start
        self._next_token = next_token
        # Verify inputs / provide defaults.
        if self.extra_params is None:
            self.extra_params = {}
        self._verify_params()

    def _verify_params(self):
        """Verifies the parameters don't use any reserved parameter.

        Raises:
            ValueError: If a reserved parameter is used.
        """
        reserved_in_use = self._RESERVED_PARAMS.intersection(self.extra_params)
        if reserved_in_use:
            raise ValueError("Using a reserved parameter", reserved_in_use)

    def _next_page(self):
        """Get the next page in the iterator.

        Returns:
            Optional[Page]: The next page in the iterator or :data:`None` if
                there are no pages left.
        """
        if self._has_next_page():
            response = self._get_next_page_response()
            items = response.get(self._items_key, ())
            page = Page(self, items, self.item_to_value, raw_page=response)
            self._page_start(self, page, response)
            self.next_page_token = response.get(self._next_token)
            return page
        else:
            return None

    def _has_next_page(self):
        """Determines whether or not there are more pages with results.

        Returns:
            bool: Whether the iterator has more pages.
        """
        if self.page_number == 0:
            return True

        if self.max_results is not None:
            if self.num_results >= self.max_results:
                return False

        return self.next_page_token is not None

    def _get_query_params(self):
        """Getter for query parameters for the next request.

        Returns:
            dict: A dictionary of query parameters.
        """
        result = {}
        if self.next_page_token is not None:
            result[self._PAGE_TOKEN] = self.next_page_token
        if self.max_results is not None:
            result[self._MAX_RESULTS] = self.max_results - self.num_results
        result.update(self.extra_params)
        return result

    def _get_next_page_response(self):
        """Requests the next page from the path provided.

        Returns:
            dict: The parsed JSON response of the next page's contents.

        Raises:
            ValueError: If the HTTP method is not ``GET`` or ``POST``.
        """
        params = self._get_query_params()
        if self._HTTP_METHOD == "GET":
            return self.api_request(
                method=self._HTTP_METHOD, path=self.path, query_params=params
            )
        elif self._HTTP_METHOD == "POST":
            return self.api_request(
                method=self._HTTP_METHOD, path=self.path, data=params
            )
        else:
            raise ValueError("Unexpected HTTP method", self._HTTP_METHOD)


class _GAXIterator(Iterator):
    """A generic class for iterating through Cloud gRPC APIs list responses.

    Any:
        client (google.cloud.client.Client): The API client.
        page_iter (google.gax.PageIterator): A GAX page iterator to be wrapped
            to conform to the :class:`Iterator` interface.
        item_to_value (Callable[Iterator, Any]): Callable to convert an item
            from the the protobuf response into a native object. Will
            be called with the iterator and a single item.
        max_results (int): The maximum number of results to fetch.

    .. autoattribute:: pages
    """

    def __init__(self, client, page_iter, item_to_value, max_results=None):
        super(_GAXIterator, self).__init__(
            client,
            item_to_value,
            page_token=page_iter.page_token,
            max_results=max_results,
        )
        self._gax_page_iter = page_iter

    def _next_page(self):
        """Get the next page in the iterator.

        Wraps the response from the :class:`~google.gax.PageIterator` in a
        :class:`Page` instance and captures some state at each page.

        Returns:
            Optional[Page]: The next page in the iterator or :data:`None` if
                  there are no pages left.
        """
        try:
            items = six.next(self._gax_page_iter)
            page = Page(self, items, self.item_to_value)
            self.next_page_token = self._gax_page_iter.page_token or None
            return page
        except StopIteration:
            return None


class GRPCIterator(Iterator):
    """A generic class for iterating through gRPC list responses.

    .. note:: The class does not take a ``page_token`` argument because it can
        just be specified in the ``request``.

    Args:
        client (google.cloud.client.Client): The API client. This unused by
            this class, but kept to satisfy the :class:`Iterator` interface.
        method (Callable[protobuf.Message]): A bound gRPC method that should
            take a single message for the request.
        request (protobuf.Message): The request message.
        items_field (str): The field in the response message that has the
            items for the page.
        item_to_value (Callable[GRPCIterator, Any]): Callable to convert an
            item from the type in the JSON response into a native object. Will
            be called with the iterator and a single item.
        request_token_field (str): The field in the request message used to
            specify the page token.
        response_token_field (str): The field in the response message that has
            the token for the next page.
        max_results (int): The maximum number of results to fetch.

    .. autoattribute:: pages
    """

    _DEFAULT_REQUEST_TOKEN_FIELD = "page_token"
    _DEFAULT_RESPONSE_TOKEN_FIELD = "next_page_token"

    def __init__(
        self,
        client,
        method,
        request,
        items_field,
        item_to_value=_item_to_value_identity,
        request_token_field=_DEFAULT_REQUEST_TOKEN_FIELD,
        response_token_field=_DEFAULT_RESPONSE_TOKEN_FIELD,
        max_results=None,
    ):
        super(GRPCIterator, self).__init__(
            client, item_to_value, max_results=max_results
        )
        self._method = method
        self._request = request
        self._items_field = items_field
        self._request_token_field = request_token_field
        self._response_token_field = response_token_field

    def _next_page(self):
        """Get the next page in the iterator.

        Returns:
            Page: The next page in the iterator or :data:`None` if
                there are no pages left.
        """
        if not self._has_next_page():
            return None

        if self.next_page_token is not None:
            setattr(self._request, self._request_token_field, self.next_page_token)

        response = self._method(self._request)

        self.next_page_token = getattr(response, self._response_token_field)
        items = getattr(response, self._items_field)
        page = Page(self, items, self.item_to_value, raw_page=response)

        return page

    def _has_next_page(self):
        """Determines whether or not there are more pages with results.

        Returns:
            bool: Whether the iterator has more pages.
        """
        if self.page_number == 0:
            return True

        if self.max_results is not None:
            if self.num_results >= self.max_results:
                return False

        # Note: intentionally a falsy check instead of a None check. The RPC
        # can return an empty string indicating no more pages.
        return True if self.next_page_token else False