As of January 1, 2020 this library no longer supports Python 2 on the latest released version. Library versions released prior to that date will continue to be available. For more information please visit Python 2 support on Google Cloud.

Source code for google.cloud.spanner_v1.pool

# Copyright 2016 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Pools managing shared Session objects."""

import datetime
import queue

from google.cloud.exceptions import NotFound
from google.cloud.spanner_v1 import BatchCreateSessionsRequest
from google.cloud.spanner_v1 import Session
from google.cloud.spanner_v1._helpers import (
    _metadata_with_prefix,
    _metadata_with_leader_aware_routing,
)
from warnings import warn

_NOW = datetime.datetime.utcnow  # unit tests may replace


[docs]class AbstractSessionPool(object): """Specifies required API for concrete session pool implementations. :type labels: dict (str -> str) or None :param labels: (Optional) user-assigned labels for sessions created by the pool. :type database_role: str :param database_role: (Optional) user-assigned database_role for the session. """ _database = None def __init__(self, labels=None, database_role=None): if labels is None: labels = {} self._labels = labels self._database_role = database_role @property def labels(self): """User-assigned labels for sessions created by the pool. :rtype: dict (str -> str) :returns: labels assigned by the user """ return self._labels @property def database_role(self): """User-assigned database_role for sessions created by the pool. :rtype: str :returns: database_role assigned by the user """ return self._database_role
[docs] def bind(self, database): """Associate the pool with a database. :type database: :class:`~google.cloud.spanner_v1.database.Database` :param database: database used by the pool to create sessions when needed. Concrete implementations of this method may pre-fill the pool using the database. :raises NotImplementedError: abstract method """ raise NotImplementedError()
[docs] def get(self): """Check a session out from the pool. Concrete implementations of this method are allowed to raise an error to signal that the pool is exhausted, or to block until a session is available. :raises NotImplementedError: abstract method """ raise NotImplementedError()
[docs] def put(self, session): """Return a session to the pool. :type session: :class:`~google.cloud.spanner_v1.session.Session` :param session: the session being returned. Concrete implementations of this method are allowed to raise an error to signal that the pool is full, or to block until it is not full. :raises NotImplementedError: abstract method """ raise NotImplementedError()
[docs] def clear(self): """Delete all sessions in the pool. Concrete implementations of this method are allowed to raise an error to signal that the pool is full, or to block until it is not full. :raises NotImplementedError: abstract method """ raise NotImplementedError()
def _new_session(self): """Helper for concrete methods creating session instances. :rtype: :class:`~google.cloud.spanner_v1.session.Session` :returns: new session instance. """ return self._database.session( labels=self.labels, database_role=self.database_role )
[docs] def session(self, **kwargs): """Check out a session from the pool. :param kwargs: (optional) keyword arguments, passed through to the returned checkout. :rtype: :class:`~google.cloud.spanner_v1.session.SessionCheckout` :returns: a checkout instance, to be used as a context manager for accessing the session and returning it to the pool. """ return SessionCheckout(self, **kwargs)
[docs]class FixedSizePool(AbstractSessionPool): """Concrete session pool implementation: - Pre-allocates / creates a fixed number of sessions. - "Pings" existing sessions via :meth:`session.exists` before returning them, and replaces expired sessions. - Blocks, with a timeout, when :meth:`get` is called on an empty pool. Raises after timing out. - Raises when :meth:`put` is called on a full pool. That error is never expected in normal practice, as users should be calling :meth:`get` followed by :meth:`put` whenever in need of a session. :type size: int :param size: fixed pool size :type default_timeout: int :param default_timeout: default timeout, in seconds, to wait for a returned session. :type labels: dict (str -> str) or None :param labels: (Optional) user-assigned labels for sessions created by the pool. :type database_role: str :param database_role: (Optional) user-assigned database_role for the session. """ DEFAULT_SIZE = 10 DEFAULT_TIMEOUT = 10 def __init__( self, size=DEFAULT_SIZE, default_timeout=DEFAULT_TIMEOUT, labels=None, database_role=None, ): super(FixedSizePool, self).__init__(labels=labels, database_role=database_role) self.size = size self.default_timeout = default_timeout self._sessions = queue.LifoQueue(size)
[docs] def bind(self, database): """Associate the pool with a database. :type database: :class:`~google.cloud.spanner_v1.database.Database` :param database: database used by the pool to used to create sessions when needed. """ self._database = database api = database.spanner_api metadata = _metadata_with_prefix(database.name) if database._route_to_leader_enabled: metadata.append( _metadata_with_leader_aware_routing(database._route_to_leader_enabled) ) self._database_role = self._database_role or self._database.database_role request = BatchCreateSessionsRequest( database=database.name, session_count=self.size - self._sessions.qsize(), session_template=Session(creator_role=self.database_role), ) while not self._sessions.full(): resp = api.batch_create_sessions( request=request, metadata=metadata, ) for session_pb in resp.session: session = self._new_session() session._session_id = session_pb.name.split("/")[-1] self._sessions.put(session)
[docs] def get(self, timeout=None): """Check a session out from the pool. :type timeout: int :param timeout: seconds to block waiting for an available session :rtype: :class:`~google.cloud.spanner_v1.session.Session` :returns: an existing session from the pool, or a newly-created session. :raises: :exc:`queue.Empty` if the queue is empty. """ if timeout is None: timeout = self.default_timeout session = self._sessions.get(block=True, timeout=timeout) if not session.exists(): session = self._database.session() session.create() return session
[docs] def put(self, session): """Return a session to the pool. Never blocks: if the pool is full, raises. :type session: :class:`~google.cloud.spanner_v1.session.Session` :param session: the session being returned. :raises: :exc:`queue.Full` if the queue is full. """ self._sessions.put_nowait(session)
[docs] def clear(self): """Delete all sessions in the pool.""" while True: try: session = self._sessions.get(block=False) except queue.Empty: break else: session.delete()
[docs]class BurstyPool(AbstractSessionPool): """Concrete session pool implementation: - "Pings" existing sessions via :meth:`session.exists` before returning them. - Creates a new session, rather than blocking, when :meth:`get` is called on an empty pool. - Discards the returned session, rather than blocking, when :meth:`put` is called on a full pool. :type target_size: int :param target_size: max pool size :type labels: dict (str -> str) or None :param labels: (Optional) user-assigned labels for sessions created by the pool. :type database_role: str :param database_role: (Optional) user-assigned database_role for the session. """ def __init__(self, target_size=10, labels=None, database_role=None): super(BurstyPool, self).__init__(labels=labels, database_role=database_role) self.target_size = target_size self._database = None self._sessions = queue.LifoQueue(target_size)
[docs] def bind(self, database): """Associate the pool with a database. :type database: :class:`~google.cloud.spanner_v1.database.Database` :param database: database used by the pool to create sessions when needed. """ self._database = database self._database_role = self._database_role or self._database.database_role
[docs] def get(self): """Check a session out from the pool. :rtype: :class:`~google.cloud.spanner_v1.session.Session` :returns: an existing session from the pool, or a newly-created session. """ try: session = self._sessions.get_nowait() except queue.Empty: session = self._new_session() session.create() else: if not session.exists(): session = self._new_session() session.create() return session
[docs] def put(self, session): """Return a session to the pool. Never blocks: if the pool is full, the returned session is discarded. :type session: :class:`~google.cloud.spanner_v1.session.Session` :param session: the session being returned. """ try: self._sessions.put_nowait(session) except queue.Full: try: session.delete() except NotFound: pass
[docs] def clear(self): """Delete all sessions in the pool.""" while True: try: session = self._sessions.get(block=False) except queue.Empty: break else: session.delete()
[docs]class PingingPool(AbstractSessionPool): """Concrete session pool implementation: - Pre-allocates / creates a fixed number of sessions. - Sessions are used in "round-robin" order (LRU first). - "Pings" existing sessions in the background after a specified interval via an API call (``session.ping()``). - Blocks, with a timeout, when :meth:`get` is called on an empty pool. Raises after timing out. - Raises when :meth:`put` is called on a full pool. That error is never expected in normal practice, as users should be calling :meth:`get` followed by :meth:`put` whenever in need of a session. The application is responsible for calling :meth:`ping` at appropriate times, e.g. from a background thread. :type size: int :param size: fixed pool size :type default_timeout: int :param default_timeout: default timeout, in seconds, to wait for a returned session. :type ping_interval: int :param ping_interval: interval at which to ping sessions. :type labels: dict (str -> str) or None :param labels: (Optional) user-assigned labels for sessions created by the pool. :type database_role: str :param database_role: (Optional) user-assigned database_role for the session. """ def __init__( self, size=10, default_timeout=10, ping_interval=3000, labels=None, database_role=None, ): super(PingingPool, self).__init__(labels=labels, database_role=database_role) self.size = size self.default_timeout = default_timeout self._delta = datetime.timedelta(seconds=ping_interval) self._sessions = queue.PriorityQueue(size)
[docs] def bind(self, database): """Associate the pool with a database. :type database: :class:`~google.cloud.spanner_v1.database.Database` :param database: database used by the pool to create sessions when needed. """ self._database = database api = database.spanner_api metadata = _metadata_with_prefix(database.name) if database._route_to_leader_enabled: metadata.append( _metadata_with_leader_aware_routing(database._route_to_leader_enabled) ) created_session_count = 0 self._database_role = self._database_role or self._database.database_role request = BatchCreateSessionsRequest( database=database.name, session_count=self.size - created_session_count, session_template=Session(creator_role=self.database_role), ) while created_session_count < self.size: resp = api.batch_create_sessions( request=request, metadata=metadata, ) for session_pb in resp.session: session = self._new_session() session._session_id = session_pb.name.split("/")[-1] self.put(session) created_session_count += len(resp.session)
[docs] def get(self, timeout=None): """Check a session out from the pool. :type timeout: int :param timeout: seconds to block waiting for an available session :rtype: :class:`~google.cloud.spanner_v1.session.Session` :returns: an existing session from the pool, or a newly-created session. :raises: :exc:`queue.Empty` if the queue is empty. """ if timeout is None: timeout = self.default_timeout ping_after, session = self._sessions.get(block=True, timeout=timeout) if _NOW() > ping_after: # Using session.exists() guarantees the returned session exists. # session.ping() uses a cached result in the backend which could # result in a recently deleted session being returned. if not session.exists(): session = self._new_session() session.create() return session
[docs] def put(self, session): """Return a session to the pool. Never blocks: if the pool is full, raises. :type session: :class:`~google.cloud.spanner_v1.session.Session` :param session: the session being returned. :raises: :exc:`queue.Full` if the queue is full. """ self._sessions.put_nowait((_NOW() + self._delta, session))
[docs] def clear(self): """Delete all sessions in the pool.""" while True: try: _, session = self._sessions.get(block=False) except queue.Empty: break else: session.delete()
[docs] def ping(self): """Refresh maybe-expired sessions in the pool. This method is designed to be called from a background thread, or during the "idle" phase of an event loop. """ while True: try: ping_after, session = self._sessions.get(block=False) except queue.Empty: # all sessions in use break if ping_after > _NOW(): # oldest session is fresh # Re-add to queue with existing expiration self._sessions.put((ping_after, session)) break try: session.ping() except NotFound: session = self._new_session() session.create() # Re-add to queue with new expiration self.put(session)
[docs]class TransactionPingingPool(PingingPool): """Concrete session pool implementation: Deprecated: TransactionPingingPool no longer begins a transaction for each of its sessions at startup. Hence the TransactionPingingPool is same as :class:`PingingPool` and maybe removed in the future. In addition to the features of :class:`PingingPool`, this class creates and begins a transaction for each of its sessions at startup. When a session is returned to the pool, if its transaction has been committed or rolled back, the pool creates a new transaction for the session and pushes the transaction onto a separate queue of "transactions to begin." The application is responsible for flushing this queue as appropriate via the pool's :meth:`begin_pending_transactions` method. :type size: int :param size: fixed pool size :type default_timeout: int :param default_timeout: default timeout, in seconds, to wait for a returned session. :type ping_interval: int :param ping_interval: interval at which to ping sessions. :type labels: dict (str -> str) or None :param labels: (Optional) user-assigned labels for sessions created by the pool. :type database_role: str :param database_role: (Optional) user-assigned database_role for the session. """ def __init__( self, size=10, default_timeout=10, ping_interval=3000, labels=None, database_role=None, ): """This throws a deprecation warning on initialization.""" warn( f"{self.__class__.__name__} is deprecated.", DeprecationWarning, stacklevel=2, ) self._pending_sessions = queue.Queue() super(TransactionPingingPool, self).__init__( size, default_timeout, ping_interval, labels=labels, database_role=database_role, ) self.begin_pending_transactions()
[docs] def bind(self, database): """Associate the pool with a database. :type database: :class:`~google.cloud.spanner_v1.database.Database` :param database: database used by the pool to create sessions when needed. """ super(TransactionPingingPool, self).bind(database) self._database_role = self._database_role or self._database.database_role self.begin_pending_transactions()
[docs] def put(self, session): """Return a session to the pool. Never blocks: if the pool is full, raises. :type session: :class:`~google.cloud.spanner_v1.session.Session` :param session: the session being returned. :raises: :exc:`queue.Full` if the queue is full. """ if self._sessions.full(): raise queue.Full txn = session._transaction if txn is None or txn.committed or txn.rolled_back: session.transaction() self._pending_sessions.put(session) else: super(TransactionPingingPool, self).put(session)
[docs] def begin_pending_transactions(self): """Begin all transactions for sessions added to the pool.""" while not self._pending_sessions.empty(): session = self._pending_sessions.get() super(TransactionPingingPool, self).put(session)
[docs]class SessionCheckout(object): """Context manager: hold session checked out from a pool. :type pool: concrete subclass of :class:`~google.cloud.spanner_v1.pool.AbstractSessionPool` :param pool: Pool from which to check out a session. :param kwargs: extra keyword arguments to be passed to :meth:`pool.get`. """ _session = None # Not checked out until '__enter__'. def __init__(self, pool, **kwargs): self._pool = pool self._kwargs = kwargs.copy() def __enter__(self): self._session = self._pool.get(**self._kwargs) return self._session def __exit__(self, *ignored): self._pool.put(self._session)