On January 1, 2020 this library will no longer support Python 2 on the latest released version.
Previously released library versions will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.api_core.iam
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Non-API-specific IAM policy definitions
For allowed roles / permissions, see:
https://cloud.google.com/iam/docs/understanding-roles
Example usage:
.. code-block:: python
# ``get_iam_policy`` returns a :class:'~google.api_core.iam.Policy`.
policy = resource.get_iam_policy(requested_policy_version=3)
phred = "user:phred@example.com"
admin_group = "group:admins@groups.example.com"
account = "serviceAccount:account-1234@accounts.example.com"
policy.version = 3
policy.bindings = [
{
"role": "roles/owner",
"members": {phred, admin_group, account}
},
{
"role": "roles/editor",
"members": {"allAuthenticatedUsers"}
},
{
"role": "roles/viewer",
"members": {"allUsers"}
"condition": {
"title": "request_time",
"description": "Requests made before 2021-01-01T00:00:00Z",
"expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")"
}
}
]
resource.set_iam_policy(policy)
"""
import collections
import operator
import warnings
try:
from collections import abc as collections_abc
except ImportError: # Python 2.7
import collections as collections_abc
# Generic IAM roles
OWNER_ROLE = "roles/owner"
"""Generic role implying all rights to an object."""
EDITOR_ROLE = "roles/editor"
"""Generic role implying rights to modify an object."""
VIEWER_ROLE = "roles/viewer"
"""Generic role implying rights to access an object."""
_ASSIGNMENT_DEPRECATED_MSG = """\
Assigning to '{}' is deprecated. Use the `policy.bindings` property to modify bindings instead."""
_FACTORY_DEPRECATED_MSG = """\
Factory method {0} is deprecated. Replace with '{0}'."""
_DICT_ACCESS_MSG = """\
Dict access is not supported on policies with version > 1 or with conditional bindings."""
[docs]class InvalidOperationException(Exception):
"""Raised when trying to use Policy class as a dict."""
pass
[docs]class Policy(collections_abc.MutableMapping):
"""IAM Policy
Args:
etag (Optional[str]): ETag used to identify a unique of the policy
version (Optional[int]): The syntax schema version of the policy.
Note:
Using conditions in bindings requires the policy's version to be set
to `3` or greater, depending on the versions that are currently supported.
Accessing the policy using dict operations will raise InvalidOperationException
when the policy's version is set to 3.
Use the policy.bindings getter/setter to retrieve and modify the policy's bindings.
See:
IAM Policy https://cloud.google.com/iam/reference/rest/v1/Policy
Policy versions https://cloud.google.com/iam/docs/policies#versions
Conditions overview https://cloud.google.com/iam/docs/conditions-overview.
"""
_OWNER_ROLES = (OWNER_ROLE,)
"""Roles mapped onto our ``owners`` attribute."""
_EDITOR_ROLES = (EDITOR_ROLE,)
"""Roles mapped onto our ``editors`` attribute."""
_VIEWER_ROLES = (VIEWER_ROLE,)
"""Roles mapped onto our ``viewers`` attribute."""
def __init__(self, etag=None, version=None):
self.etag = etag
self.version = version
self._bindings = []
def __iter__(self):
self.__check_version__()
return (binding["role"] for binding in self._bindings)
def __len__(self):
self.__check_version__()
return len(self._bindings)
def __getitem__(self, key):
self.__check_version__()
for b in self._bindings:
if b["role"] == key:
return b["members"]
return set()
def __setitem__(self, key, value):
self.__check_version__()
value = set(value)
for binding in self._bindings:
if binding["role"] == key:
binding["members"] = value
return
self._bindings.append({"role": key, "members": value})
def __delitem__(self, key):
self.__check_version__()
for b in self._bindings:
if b["role"] == key:
self._bindings.remove(b)
return
raise KeyError(key)
[docs] def __check_version__(self):
"""Raise InvalidOperationException if version is greater than 1 or policy contains conditions."""
raise_version = self.version is not None and self.version > 1
if raise_version or self._contains_conditions():
raise InvalidOperationException(_DICT_ACCESS_MSG)
def _contains_conditions(self):
for b in self._bindings:
if b.get("condition") is not None:
return True
return False
@property
def bindings(self):
"""The policy's list of bindings.
A binding is specified by a dictionary with keys:
* role (str): Role that is assigned to `members`.
* members (:obj:`set` of str): Specifies the identities associated to this binding.
* condition (:obj:`dict` of str:str): Specifies a condition under which this binding will apply.
* title (str): Title for the condition.
* description (:obj:str, optional): Description of the condition.
* expression: A CEL expression.
Type:
:obj:`list` of :obj:`dict`
See:
Policy versions https://cloud.google.com/iam/docs/policies#versions
Conditions overview https://cloud.google.com/iam/docs/conditions-overview.
Example:
.. code-block:: python
USER = "user:phred@example.com"
ADMIN_GROUP = "group:admins@groups.example.com"
SERVICE_ACCOUNT = "serviceAccount:account-1234@accounts.example.com"
CONDITION = {
"title": "request_time",
"description": "Requests made before 2021-01-01T00:00:00Z", # Optional
"expression": "request.time < timestamp(\"2021-01-01T00:00:00Z\")"
}
# Set policy's version to 3 before setting bindings containing conditions.
policy.version = 3
policy.bindings = [
{
"role": "roles/viewer",
"members": {USER, ADMIN_GROUP, SERVICE_ACCOUNT},
"condition": CONDITION
},
...
]
"""
return self._bindings
@bindings.setter
def bindings(self, bindings):
self._bindings = bindings
@property
def owners(self):
"""Legacy access to owner role.
Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
DEPRECATED: use `policy.bindings` to access bindings instead.
"""
result = set()
for role in self._OWNER_ROLES:
for member in self.get(role, ()):
result.add(member)
return frozenset(result)
@owners.setter
def owners(self, value):
"""Update owners.
Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
DEPRECATED: use `policy.bindings` to access bindings instead.
"""
warnings.warn(
_ASSIGNMENT_DEPRECATED_MSG.format("owners", OWNER_ROLE), DeprecationWarning
)
self[OWNER_ROLE] = value
@property
def editors(self):
"""Legacy access to editor role.
Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
DEPRECATED: use `policy.bindings` to access bindings instead.
"""
result = set()
for role in self._EDITOR_ROLES:
for member in self.get(role, ()):
result.add(member)
return frozenset(result)
@editors.setter
def editors(self, value):
"""Update editors.
Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
DEPRECATED: use `policy.bindings` to modify bindings instead.
"""
warnings.warn(
_ASSIGNMENT_DEPRECATED_MSG.format("editors", EDITOR_ROLE),
DeprecationWarning,
)
self[EDITOR_ROLE] = value
@property
def viewers(self):
"""Legacy access to viewer role.
Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
DEPRECATED: use `policy.bindings` to modify bindings instead.
"""
result = set()
for role in self._VIEWER_ROLES:
for member in self.get(role, ()):
result.add(member)
return frozenset(result)
@viewers.setter
def viewers(self, value):
"""Update viewers.
Raise InvalidOperationException if version is greater than 1 or policy contains conditions.
DEPRECATED: use `policy.bindings` to modify bindings instead.
"""
warnings.warn(
_ASSIGNMENT_DEPRECATED_MSG.format("viewers", VIEWER_ROLE),
DeprecationWarning,
)
self[VIEWER_ROLE] = value
[docs] @staticmethod
def user(email):
"""Factory method for a user member.
Args:
email (str): E-mail for this particular user.
Returns:
str: A member string corresponding to the given user.
DEPRECATED: set the role `user:{email}` in the binding instead.
"""
warnings.warn(
_FACTORY_DEPRECATED_MSG.format("user:{email}"), DeprecationWarning,
)
return "user:%s" % (email,)
[docs] @staticmethod
def service_account(email):
"""Factory method for a service account member.
Args:
email (str): E-mail for this particular service account.
Returns:
str: A member string corresponding to the given service account.
DEPRECATED: set the role `serviceAccount:{email}` in the binding instead.
"""
warnings.warn(
_FACTORY_DEPRECATED_MSG.format("serviceAccount:{email}"),
DeprecationWarning,
)
return "serviceAccount:%s" % (email,)
[docs] @staticmethod
def group(email):
"""Factory method for a group member.
Args:
email (str): An id or e-mail for this particular group.
Returns:
str: A member string corresponding to the given group.
DEPRECATED: set the role `group:{email}` in the binding instead.
"""
warnings.warn(
_FACTORY_DEPRECATED_MSG.format("group:{email}"), DeprecationWarning,
)
return "group:%s" % (email,)
[docs] @staticmethod
def domain(domain):
"""Factory method for a domain member.
Args:
domain (str): The domain for this member.
Returns:
str: A member string corresponding to the given domain.
DEPRECATED: set the role `domain:{email}` in the binding instead.
"""
warnings.warn(
_FACTORY_DEPRECATED_MSG.format("domain:{email}"), DeprecationWarning,
)
return "domain:%s" % (domain,)
[docs] @staticmethod
def all_users():
"""Factory method for a member representing all users.
Returns:
str: A member string representing all users.
DEPRECATED: set the role `allUsers` in the binding instead.
"""
warnings.warn(
_FACTORY_DEPRECATED_MSG.format("allUsers"), DeprecationWarning,
)
return "allUsers"
[docs] @staticmethod
def authenticated_users():
"""Factory method for a member representing all authenticated users.
Returns:
str: A member string representing all authenticated users.
DEPRECATED: set the role `allAuthenticatedUsers` in the binding instead.
"""
warnings.warn(
_FACTORY_DEPRECATED_MSG.format("allAuthenticatedUsers"), DeprecationWarning,
)
return "allAuthenticatedUsers"
[docs] @classmethod
def from_api_repr(cls, resource):
"""Factory: create a policy from a JSON resource.
Args:
resource (dict): policy resource returned by ``getIamPolicy`` API.
Returns:
:class:`Policy`: the parsed policy
"""
version = resource.get("version")
etag = resource.get("etag")
policy = cls(etag, version)
policy.bindings = resource.get("bindings", [])
for binding in policy.bindings:
binding["members"] = set(binding.get("members", ()))
return policy
[docs] def to_api_repr(self):
"""Render a JSON policy resource.
Returns:
dict: a resource to be passed to the ``setIamPolicy`` API.
"""
resource = {}
if self.etag is not None:
resource["etag"] = self.etag
if self.version is not None:
resource["version"] = self.version
if self._bindings and len(self._bindings) > 0:
bindings = []
for binding in self._bindings:
members = binding.get("members")
if members:
new_binding = {
"role": binding["role"],
"members": sorted(members)
}
condition = binding.get("condition")
if condition:
new_binding["condition"] = condition
bindings.append(new_binding)
if bindings:
# Sort bindings by role
key = operator.itemgetter("role")
resource["bindings"] = sorted(bindings, key=key)
return resource