Source code for google.cloud.ndb.global_cache

# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""GlobalCache interface and its implementations."""

import abc
import collections
import os
import time
import uuid

import redis as redis_module


[docs]class GlobalCache(object): """Abstract base class for a global entity cache. A global entity cache is shared across contexts, sessions, and possibly even servers. A concrete implementation is available which uses Redis. Essentially, this class models a simple key/value store where keys and values are arbitrary ``bytes`` instances. "Compare and swap", aka "optimistic transactions" should also be supported. Concrete implementations can either by synchronous or asynchronous. Asynchronous implementations should return :class:`~google.cloud.ndb.tasklets.Future` instances whose eventual results match the return value described for each method. Because coordinating with the single threaded event model used by ``NDB`` can be tricky with remote services, it's not recommended that casual users write asynchronous implementations, as some specialized knowledge is required. """ __metaclass__ = abc.ABCMeta
[docs] @abc.abstractmethod def get(self, keys): """Retrieve entities from the cache. Arguments: keys (List[bytes]): The keys to get. Returns: List[Union[bytes, None]]]: Serialized entities, or :data:`None`, for each key. """ raise NotImplementedError
[docs] @abc.abstractmethod def set(self, items, expires=None): """Store entities in the cache. Arguments: items (Dict[bytes, Union[bytes, None]]): Mapping of keys to serialized entities. expires (Optional[float]): Number of seconds until value expires. """ raise NotImplementedError
[docs] @abc.abstractmethod def delete(self, keys): """Remove entities from the cache. Arguments: keys (List[bytes]): The keys to remove. """ raise NotImplementedError
[docs] @abc.abstractmethod def watch(self, keys): """Begin an optimistic transaction for the given keys. A future call to :meth:`compare_and_swap` will only set values for keys whose values haven't changed since the call to this method. Arguments: keys (List[bytes]): The keys to watch. """ raise NotImplementedError
[docs] @abc.abstractmethod def compare_and_swap(self, items, expires=None): """Like :meth:`set` but using an optimistic transaction. Only keys whose values haven't changed since a preceding call to :meth:`watch` will be changed. Arguments: items (Dict[bytes, Union[bytes, None]]): Mapping of keys to serialized entities. expires (Optional[float]): Number of seconds until value expires. """ raise NotImplementedError
class _InProcessGlobalCache(GlobalCache): """Reference implementation of :class:`GlobalCache`. Not intended for production use. Uses a single process wide dictionary to keep an in memory cache. For use in testing and to have an easily grokkable reference implementation. Thread safety is potentially a little sketchy. """ cache = {} """Dict: The cache. Relies on atomicity of ``__setitem__`` for thread safety. See: http://effbot.org/pyfaq/what-kinds-of-global-value-mutation-are-thread-safe.htm """ def __init__(self): self._watch_keys = {} def get(self, keys): """Implements :meth:`GlobalCache.get`.""" now = time.time() results = [self.cache.get(key) for key in keys] entity_pbs = [] for result in results: if result is not None: entity_pb, expires = result if expires and expires < now: entity_pb = None else: entity_pb = None entity_pbs.append(entity_pb) return entity_pbs def set(self, items, expires=None): """Implements :meth:`GlobalCache.set`.""" if expires: expires = time.time() + expires for key, value in items.items(): self.cache[key] = (value, expires) # Supposedly threadsafe def delete(self, keys): """Implements :meth:`GlobalCache.delete`.""" for key in keys: self.cache.pop(key, None) # Threadsafe? def watch(self, keys): """Implements :meth:`GlobalCache.watch`.""" for key in keys: self._watch_keys[key] = self.cache.get(key) def compare_and_swap(self, items, expires=None): """Implements :meth:`GlobalCache.compare_and_swap`.""" if expires: expires = time.time() + expires for key, new_value in items.items(): watch_value = self._watch_keys.get(key) current_value = self.cache.get(key) if watch_value == current_value: self.cache[key] = (new_value, expires) _Pipeline = collections.namedtuple("_Pipeline", ("pipe", "id"))
[docs]class RedisCache(GlobalCache): """Redis implementation of the :class:`GlobalCache`. This is a synchronous implementation. The idea is that calls to Redis should be fast enough not to warrant the added complexity of an asynchronous implementation. Args: redis (redis.Redis): Instance of Redis client to use. """
[docs] @classmethod def from_environment(cls): """Generate a class:`RedisCache` from an environment variable. This class method looks for the ``REDIS_CACHE_URL`` environment variable and, if it is set, passes its value to ``Redis.from_url`` to construct a ``Redis`` instance which is then used to instantiate a ``RedisCache`` instance. Returns: Optional[RedisCache]: A :class:`RedisCache` instance or :data:`None`, if ``REDIS_CACHE_URL`` is not set in the environment. """ url = os.environ.get("REDIS_CACHE_URL") if url: return cls(redis_module.Redis.from_url(url))
def __init__(self, redis): self.redis = redis self.pipes = {}
[docs] def get(self, keys): """Implements :meth:`GlobalCache.get`.""" res = self.redis.mget(keys) return res
[docs] def set(self, items, expires=None): """Implements :meth:`GlobalCache.set`.""" self.redis.mset(items) if expires: for key in items.keys(): self.redis.expire(key, expires)
[docs] def delete(self, keys): """Implements :meth:`GlobalCache.delete`.""" self.redis.delete(*keys)
[docs] def watch(self, keys): """Implements :meth:`GlobalCache.watch`.""" pipe = self.redis.pipeline() pipe.watch(*keys) holder = _Pipeline(pipe, str(uuid.uuid4())) for key in keys: self.pipes[key] = holder
[docs] def compare_and_swap(self, items, expires=None): """Implements :meth:`GlobalCache.compare_and_swap`.""" pipes = {} mappings = {} results = {} remove_keys = [] # get associated pipes for key, value in items.items(): remove_keys.append(key) if key not in self.pipes: continue pipe = self.pipes[key] pipes[pipe.id] = pipe mapping = mappings.setdefault(pipe.id, {}) mapping[key] = value # execute transaction for each pipes for pipe_id, mapping in mappings.items(): pipe = pipes[pipe_id].pipe try: pipe.multi() pipe.mset(mapping) if expires: for key in mapping.keys(): pipe.expire(key, expires) pipe.execute() except redis_module.exceptions.WatchError: pass finally: pipe.reset() # get keys associated to pipes but not updated for key, pipe in self.pipes.items(): if pipe.id in pipes: remove_keys.append(key) # remote keys for key in remove_keys: self.pipes.pop(key, None) return results