Source code for django_spanner.introspection

# Copyright 2020 Google LLC
#
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file or at
# https://developers.google.com/open-source/licenses/bsd

from django.db.backends.base.introspection import (
    BaseDatabaseIntrospection,
    FieldInfo,
    TableInfo,
)
from django.db.models import Index
from google.cloud.spanner_v1 import TypeCode
from django_spanner import USE_EMULATOR


[docs]class DatabaseIntrospection(BaseDatabaseIntrospection): """A Spanner-specific version of Django introspection utilities.""" data_types_reverse = { TypeCode.BOOL: "BooleanField", TypeCode.BYTES: "BinaryField", TypeCode.DATE: "DateField", TypeCode.FLOAT64: "FloatField", TypeCode.INT64: "IntegerField", TypeCode.STRING: "CharField", TypeCode.TIMESTAMP: "DateTimeField", TypeCode.NUMERIC: "DecimalField", TypeCode.JSON: "JSONField", } LIST_TABLE_SQL = """ SELECT t.table_name, t.table_type FROM information_schema.tables AS t WHERE t.table_catalog = '' and t.table_schema = @schema_name """
[docs] def get_field_type(self, data_type, description): """A hook for a Spanner database to use the cursor description to match a Django field type to the database column. :type data_type: int :param data_type: One of Spanner's standard data types. :type description: :class:`~google.cloud.spanner_dbapi._helpers.ColumnInfo` :param description: A description of Spanner column data type. :rtype: str :returns: The corresponding type of Django field. """ if data_type == TypeCode.STRING and description.internal_size == "MAX": return "TextField" return super().get_field_type(data_type, description)
[docs] def get_table_list(self, cursor): """Return a list of table and view names in the current database. :type cursor: :class:`~google.cloud.spanner_dbapi.cursor.Cursor` :param cursor: A reference to a Spanner Database cursor. :rtype: list :returns: A list of table and view names in the current database. """ schema_name = self._get_schema_name(cursor) results = cursor.run_sql_in_snapshot( self.LIST_TABLE_SQL, params={"schema_name": schema_name} ) tables = [] # The second TableInfo field is 't' for table or 'v' for view. for row in results: table_type = "t" if row[1] == "VIEW": table_type = "v" tables.append(TableInfo(row[0], table_type)) return tables
[docs] def get_table_description(self, cursor, table_name): """Return a description of the table with the DB-API cursor.description interface. :type cursor: :class:`~google.cloud.spanner_dbapi.cursor.Cursor` :param cursor: A reference to a Spanner Database cursor. :type table_name: str :param table_name: The name of the table. :rtype: list :returns: A description of the table with the DB-API cursor.description interface. """ cursor.execute( "SELECT * FROM %s LIMIT 1" % self.connection.ops.quote_name(table_name) ) column_details = cursor.get_table_column_schema(table_name) descriptions = [] for line in cursor.description: column_name, type_code = line[0], line[1] details = column_details[column_name] if details.spanner_type.startswith("STRING"): # Extract the size of the string from, e.g. STRING(#). internal_size = details.spanner_type[7:-1] if internal_size != "MAX": internal_size = int(internal_size) else: internal_size = None descriptions.append( FieldInfo( column_name, type_code, None, # display_size internal_size, None, # precision None, # scale details.null_ok, None, # default None, # collation ) ) return descriptions
[docs] def get_relations(self, cursor, table_name): """Return a dictionary of {field_name: (field_name_other_table, other_table)} representing all the relationships in the table. :type cursor: :class:`~google.cloud.spanner_dbapi.cursor.Cursor` :param cursor: A reference to a Spanner Database cursor. :type table_name: str :param table_name: The name of the Cloud Spanner Database. :rtype: dict :returns: A dictionary representing column relationships to other tables. """ schema_name = self._get_schema_name(cursor) results = cursor.run_sql_in_snapshot( """ SELECT tc.COLUMN_NAME as col, ccu.COLUMN_NAME as ref_col, ccu.TABLE_NAME as ref_table FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS tc JOIN INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS as rc ON tc.CONSTRAINT_NAME = rc.CONSTRAINT_NAME JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE as ccu ON rc.UNIQUE_CONSTRAINT_NAME = ccu.CONSTRAINT_NAME WHERE tc.TABLE_SCHEMA=@schema_name AND tc.TABLE_NAME=@view_name """, params={"schema_name": schema_name, "view_name": table_name}, ) return { column: (referred_column, referred_table) for (column, referred_column, referred_table) in results }
[docs] def get_primary_key_column(self, cursor, table_name): """Return Primary Key column. :type cursor: :class:`~google.cloud.spanner_dbapi.cursor.Cursor` :param cursor: A reference to a Spanner Database cursor. :type table_name: str :param table_name: The name of the table. :rtype: str :returns: The name of the PK column. """ schema_name = self._get_schema_name(cursor) results = cursor.run_sql_in_snapshot( """ SELECT ccu.COLUMN_NAME FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS as tc RIGHT JOIN INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS ccu ON tc.CONSTRAINT_NAME = ccu.CONSTRAINT_NAME WHERE tc.TABLE_NAME=@table_name AND tc.CONSTRAINT_TYPE='PRIMARY KEY' AND tc.TABLE_SCHEMA=@schema_name """, params={"schema_name": schema_name, "table_name": table_name}, ) return results[0][0] if results else None
[docs] def get_constraints(self, cursor, table_name): """Retrieve the Spanner Table column constraints. :type cursor: :class:`~google.cloud.spanner_dbapi.cursor.Cursor` :param cursor: The cursor in the linked database. :type table_name: str :param table_name: The name of the table. :rtype: dict :returns: A dictionary with constraints. """ constraints = {} schema_name = self._get_schema_name(cursor) # Firstly populate all available constraints and their columns. constraint_columns = cursor.run_sql_in_snapshot( """ SELECT CONSTRAINT_NAME, COLUMN_NAME FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE WHERE TABLE_NAME=@table AND TABLE_SCHEMA=@schema_name""", params={"table": table_name, "schema_name": schema_name}, ) for constraint, column_name in constraint_columns: if constraint not in constraints: constraints[constraint] = { "check": False, "columns": [], "foreign_key": None, "index": False, "orders": [], "primary_key": False, "type": None, "unique": False, } constraints[constraint]["columns"].append(column_name) # Add the various constraints by type. constraint_types = cursor.run_sql_in_snapshot( """ SELECT CONSTRAINT_NAME, CONSTRAINT_TYPE FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS WHERE TABLE_NAME=@table AND TABLE_SCHEMA=@schema_name""", params={"table": table_name, "schema_name": schema_name}, ) for constraint, constraint_type in constraint_types: already_added = constraint in constraints if constraint_type == "FOREIGN KEY": # We don't yet support anything related to FOREIGN KEY. # See https://github.com/googleapis/python-spanner-django/issues/313. if already_added: del constraints[constraint] continue if not already_added: constraints[constraint] = { "check": False, "columns": [], "foreign_key": None, "index": False, "orders": [], "primary_key": False, "type": None, "unique": False, } is_primary_key = constraint_type == "PRIMARY KEY" constraints[constraint]["check"] = constraint_type == "CHECK" constraints[constraint]["index"] = constraint_type == "INDEX" constraints[constraint]["unique"] = ( constraint_type == "UNIQUE" or is_primary_key ) constraints[constraint]["primary_key"] = is_primary_key # Add the indices. indexes = cursor.run_sql_in_snapshot( """ SELECT idx.INDEX_NAME, idx_col.COLUMN_NAME, idx_col.COLUMN_ORDERING, idx.INDEX_TYPE, idx.IS_UNIQUE FROM INFORMATION_SCHEMA.INDEXES AS idx RIGHT JOIN INFORMATION_SCHEMA.INDEX_COLUMNS AS idx_col ON idx_col.INDEX_NAME = idx.INDEX_NAME AND idx_col.TABLE_NAME=@table AND idx_col.TABLE_SCHEMA=idx.TABLE_SCHEMA WHERE idx.TABLE_NAME=@table AND idx.TABLE_SCHEMA=@schema_name ORDER BY idx_col.ORDINAL_POSITION """, params={"table": table_name, "schema_name": schema_name}, ) for ( index_name, column_name, ordering, index_type, is_unique, ) in indexes: if index_name not in constraints: constraints[index_name] = { "check": False, "columns": [], "foreign_key": None, "index": False, "orders": [], "primary_key": False, "type": None, "unique": False, } constraints[index_name]["columns"].append(column_name) constraints[index_name]["index"] = True constraints[index_name]["orders"].append(ordering) # Index_type for PRIMARY KEY is 'PRIMARY_KEY' and NOT 'PRIMARY KEY' is_primary_key = index_type == "PRIMARY_KEY" constraints[index_name]["primary_key"] = is_primary_key constraints[index_name]["type"] = ( index_type if is_primary_key else Index.suffix ) constraints[index_name]["unique"] = is_unique return constraints
[docs] def get_key_columns(self, cursor, table_name): """ Return a list of (column_name, referenced_table, referenced_column) for all key columns in the given table. """ key_columns = [] schema_name = self._get_schema_name(cursor) cursor.execute( """SELECT tc.COLUMN_NAME as column_name, ccu.TABLE_NAME as referenced_table, ccu.COLUMN_NAME as referenced_column FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE AS tc JOIN INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS as rc ON tc.CONSTRAINT_NAME = rc.CONSTRAINT_NAME JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE as ccu ON rc.CONSTRAINT_NAME = ccu.CONSTRAINT_NAME WHERE tc.TABLE_NAME=@table AND tc.TABLE_SCHEMA=@schema_name """, params={"table": table_name, "schema_name": schema_name}, ) key_columns.extend(cursor.fetchall()) return key_columns
def _get_schema_name(self, cursor): return cursor.connection.current_schema