Source code for google.api_core.iam

# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Non-API-specific IAM policy definitions

For allowed roles / permissions, see:
https://cloud.google.com/iam/docs/understanding-roles

Example usage:

.. code-block:: python

   # ``get_iam_policy`` returns a :class:'~google.api_core.iam.Policy`.
   policy = resource.get_iam_policy()

   phred = policy.user("phred@example.com")
   admin_group = policy.group("admins@groups.example.com")
   account = policy.service_account("account-1234@accounts.example.com")
   policy["roles/owner"] = [phred, admin_group, account]
   policy["roles/editor"] = policy.authenticated_users()
   policy["roles/viewer"] = policy.all_users()

   resource.set_iam_policy(policy)
"""

import collections
try:
    from collections import abc as collections_abc
except ImportError:  # Python 2.7
    import collections as collections_abc
import warnings

# Generic IAM roles

OWNER_ROLE = "roles/owner"
"""Generic role implying all rights to an object."""

EDITOR_ROLE = "roles/editor"
"""Generic role implying rights to modify an object."""

VIEWER_ROLE = "roles/viewer"
"""Generic role implying rights to access an object."""

_ASSIGNMENT_DEPRECATED_MSG = """\
Assigning to '{}' is deprecated.  Replace with 'policy[{}] = members."""


[docs]class Policy(collections_abc.MutableMapping): """IAM Policy See https://cloud.google.com/iam/reference/rest/v1/Policy Args: etag (Optional[str]): ETag used to identify a unique of the policy version (Optional[int]): unique version of the policy """ _OWNER_ROLES = (OWNER_ROLE,) """Roles mapped onto our ``owners`` attribute.""" _EDITOR_ROLES = (EDITOR_ROLE,) """Roles mapped onto our ``editors`` attribute.""" _VIEWER_ROLES = (VIEWER_ROLE,) """Roles mapped onto our ``viewers`` attribute.""" def __init__(self, etag=None, version=None): self.etag = etag self.version = version self._bindings = collections.defaultdict(set) def __iter__(self): return iter(self._bindings) def __len__(self): return len(self._bindings) def __getitem__(self, key): return self._bindings[key] def __setitem__(self, key, value): self._bindings[key] = set(value) def __delitem__(self, key): del self._bindings[key] @property def owners(self): """Legacy access to owner role. DEPRECATED: use ``policy["roles/owners"]`` instead.""" result = set() for role in self._OWNER_ROLES: for member in self._bindings.get(role, ()): result.add(member) return frozenset(result) @owners.setter def owners(self, value): """Update owners. DEPRECATED: use ``policy["roles/owners"] = value`` instead.""" warnings.warn( _ASSIGNMENT_DEPRECATED_MSG.format("owners", OWNER_ROLE), DeprecationWarning ) self[OWNER_ROLE] = value @property def editors(self): """Legacy access to editor role. DEPRECATED: use ``policy["roles/editors"]`` instead.""" result = set() for role in self._EDITOR_ROLES: for member in self._bindings.get(role, ()): result.add(member) return frozenset(result) @editors.setter def editors(self, value): """Update editors. DEPRECATED: use ``policy["roles/editors"] = value`` instead.""" warnings.warn( _ASSIGNMENT_DEPRECATED_MSG.format("editors", EDITOR_ROLE), DeprecationWarning, ) self[EDITOR_ROLE] = value @property def viewers(self): """Legacy access to viewer role. DEPRECATED: use ``policy["roles/viewers"]`` instead """ result = set() for role in self._VIEWER_ROLES: for member in self._bindings.get(role, ()): result.add(member) return frozenset(result) @viewers.setter def viewers(self, value): """Update viewers. DEPRECATED: use ``policy["roles/viewers"] = value`` instead. """ warnings.warn( _ASSIGNMENT_DEPRECATED_MSG.format("viewers", VIEWER_ROLE), DeprecationWarning, ) self[VIEWER_ROLE] = value
[docs] @staticmethod def user(email): """Factory method for a user member. Args: email (str): E-mail for this particular user. Returns: str: A member string corresponding to the given user. """ return "user:%s" % (email,)
[docs] @staticmethod def service_account(email): """Factory method for a service account member. Args: email (str): E-mail for this particular service account. Returns: str: A member string corresponding to the given service account. """ return "serviceAccount:%s" % (email,)
[docs] @staticmethod def group(email): """Factory method for a group member. Args: email (str): An id or e-mail for this particular group. Returns: str: A member string corresponding to the given group. """ return "group:%s" % (email,)
[docs] @staticmethod def domain(domain): """Factory method for a domain member. Args: domain (str): The domain for this member. Returns: str: A member string corresponding to the given domain. """ return "domain:%s" % (domain,)
[docs] @staticmethod def all_users(): """Factory method for a member representing all users. Returns: str: A member string representing all users. """ return "allUsers"
[docs] @staticmethod def authenticated_users(): """Factory method for a member representing all authenticated users. Returns: str: A member string representing all authenticated users. """ return "allAuthenticatedUsers"
[docs] @classmethod def from_api_repr(cls, resource): """Factory: create a policy from a JSON resource. Args: resource (dict): policy resource returned by ``getIamPolicy`` API. Returns: :class:`Policy`: the parsed policy """ version = resource.get("version") etag = resource.get("etag") policy = cls(etag, version) for binding in resource.get("bindings", ()): role = binding["role"] members = sorted(binding["members"]) policy[role] = members return policy
[docs] def to_api_repr(self): """Render a JSON policy resource. Returns: dict: a resource to be passed to the ``setIamPolicy`` API. """ resource = {} if self.etag is not None: resource["etag"] = self.etag if self.version is not None: resource["version"] = self.version if self._bindings: bindings = resource["bindings"] = [] for role, members in sorted(self._bindings.items()): if members: bindings.append({"role": role, "members": sorted(set(members))}) if not bindings: del resource["bindings"] return resource