As of January 1, 2020 this library no longer supports Python 2 on the latest released version. Library versions released prior to that date will continue to be available. For more information please visit Python 2 support on Google Cloud.

Source code for pandas_gbq.gbq

# Copyright (c) 2017 pandas-gbq Authors All rights reserved.
# Use of this source code is governed by a BSD-style
# license that can be found in the LICENSE file.

import logging
import time
import warnings
from datetime import datetime

import numpy as np

# Required dependencies, but treat as optional so that _test_google_api_imports
# can provide a better error message.
try:
    from google.api_core import exceptions as google_exceptions
    from google.cloud import bigquery
except ImportError:  # pragma: NO COVER
    bigquery = None
    google_exceptions = None

from pandas_gbq.exceptions import (
    AccessDenied,
    GenericGBQException,
    PerformanceWarning,
)
from pandas_gbq import features
from pandas_gbq.features import FEATURES
import pandas_gbq.schema
import pandas_gbq.timestamp


logger = logging.getLogger(__name__)

try:
    import tqdm  # noqa
except ImportError:
    tqdm = None


def _test_google_api_imports():
    try:
        import pkg_resources  # noqa
    except ImportError as ex:
        raise ImportError("pandas-gbq requires setuptools") from ex

    try:
        import pydata_google_auth  # noqa
    except ImportError as ex:
        raise ImportError("pandas-gbq requires pydata-google-auth") from ex

    try:
        from google_auth_oauthlib.flow import InstalledAppFlow  # noqa
    except ImportError as ex:
        raise ImportError("pandas-gbq requires google-auth-oauthlib") from ex

    try:
        import google.auth  # noqa
    except ImportError as ex:
        raise ImportError("pandas-gbq requires google-auth") from ex

    try:
        from google.cloud import bigquery  # noqa
    except ImportError as ex:
        raise ImportError("pandas-gbq requires google-cloud-bigquery") from ex


class DatasetCreationError(ValueError):
    """
    Raised when the create dataset method fails
    """

    pass


class InvalidColumnOrder(ValueError):
    """
    Raised when the provided column order for output
    results DataFrame does not match the schema
    returned by BigQuery.
    """

    pass


class InvalidIndexColumn(ValueError):
    """
    Raised when the provided index column for output
    results DataFrame does not match the schema
    returned by BigQuery.
    """

    pass


class InvalidPageToken(ValueError):
    """
    Raised when Google BigQuery fails to return,
    or returns a duplicate page token.
    """

    pass


class InvalidSchema(ValueError):
    """
    Raised when the provided DataFrame does
    not match the schema of the destination
    table in BigQuery.
    """

    pass


class NotFoundException(ValueError):
    """
    Raised when the project_id, table or dataset provided in the query could
    not be found.
    """

    pass


class QueryTimeout(ValueError):
    """
    Raised when the query request exceeds the timeoutMs value specified in the
    BigQuery configuration.
    """

    pass


class TableCreationError(ValueError):
    """
    Raised when the create table method fails
    """

    pass


[docs]class Context(object): """Storage for objects to be used throughout a session. A Context object is initialized when the ``pandas_gbq`` module is imported, and can be found at :attr:`pandas_gbq.context`. """ def __init__(self): self._credentials = None self._project = None # dialect defaults to None so that read_gbq can stop warning if set. self._dialect = None @property def credentials(self): """ Credentials to use for Google APIs. These credentials are automatically cached in memory by calls to :func:`pandas_gbq.read_gbq` and :func:`pandas_gbq.to_gbq`. To manually set the credentials, construct an :class:`google.auth.credentials.Credentials` object and set it as the context credentials as demonstrated in the example below. See `auth docs`_ for more information on obtaining credentials. .. _auth docs: http://google-auth.readthedocs.io /en/latest/user-guide.html#obtaining-credentials Returns ------- google.auth.credentials.Credentials Examples -------- Manually setting the context credentials: >>> import pandas_gbq >>> from google.oauth2 import service_account >>> credentials = service_account.Credentials.from_service_account_file( ... '/path/to/key.json', ... ) >>> pandas_gbq.context.credentials = credentials """ return self._credentials @credentials.setter def credentials(self, value): self._credentials = value @property def project(self): """Default project to use for calls to Google APIs. Returns ------- str Examples -------- Manually setting the context project: >>> import pandas_gbq >>> pandas_gbq.context.project = 'my-project' """ return self._project @project.setter def project(self, value): self._project = value @property def dialect(self): """ Default dialect to use in :func:`pandas_gbq.read_gbq`. Allowed values for the BigQuery SQL syntax dialect: ``'legacy'`` Use BigQuery's legacy SQL dialect. For more information see `BigQuery Legacy SQL Reference <https://cloud.google.com/bigquery/docs/reference/legacy-sql>`__. ``'standard'`` Use BigQuery's standard SQL, which is compliant with the SQL 2011 standard. For more information see `BigQuery Standard SQL Reference <https://cloud.google.com/bigquery/docs/reference/standard-sql/>`__. Returns ------- str Examples -------- Setting the default syntax to standard: >>> import pandas_gbq >>> pandas_gbq.context.dialect = 'standard' """ return self._dialect @dialect.setter def dialect(self, value): self._dialect = value
# Create an empty context, used to cache credentials. context = Context() """A :class:`pandas_gbq.Context` object used to cache credentials. Credentials automatically are cached in-memory by :func:`pandas_gbq.read_gbq` and :func:`pandas_gbq.to_gbq`. """ class GbqConnector(object): def __init__( self, project_id, reauth=False, private_key=None, auth_local_webserver=False, dialect="standard", location=None, credentials=None, use_bqstorage_api=False, ): global context from google.api_core.exceptions import GoogleAPIError from google.api_core.exceptions import ClientError from pandas_gbq import auth self.http_error = (ClientError, GoogleAPIError) self.project_id = project_id self.location = location self.reauth = reauth self.private_key = private_key self.auth_local_webserver = auth_local_webserver self.dialect = dialect self.credentials = credentials default_project = None # Service account credentials have a project associated with them. # Prefer that project if none was supplied. if self.project_id is None and hasattr(self.credentials, "project_id"): self.project_id = credentials.project_id # Load credentials from cache. if not self.credentials: self.credentials = context.credentials default_project = context.project # Credentials were explicitly asked for, so don't use the cache. if private_key or reauth or not self.credentials: self.credentials, default_project = auth.get_credentials( private_key=private_key, project_id=project_id, reauth=reauth, auth_local_webserver=auth_local_webserver, ) if self.project_id is None: self.project_id = default_project if self.project_id is None: raise ValueError("Could not determine project ID and one was not supplied.") # Cache the credentials if they haven't been set yet. if context.credentials is None: context.credentials = self.credentials if context.project is None: context.project = self.project_id self.client = self.get_client() self.use_bqstorage_api = use_bqstorage_api # BQ Queries costs $5 per TB. First 1 TB per month is free # see here for more: https://cloud.google.com/bigquery/pricing self.query_price_for_TB = 5.0 / 2 ** 40 # USD/TB def _start_timer(self): self.start = time.time() def get_elapsed_seconds(self): return round(time.time() - self.start, 2) def log_elapsed_seconds(self, prefix="Elapsed", postfix="s.", overlong=6): sec = self.get_elapsed_seconds() if sec > overlong: logger.info("{} {} {}".format(prefix, sec, postfix)) # http://stackoverflow.com/questions/1094841/reusable-library-to-get-human-readable-version-of-file-size @staticmethod def sizeof_fmt(num, suffix="B"): fmt = "%3.1f %s%s" for unit in ["", "K", "M", "G", "T", "P", "E", "Z"]: if abs(num) < 1024.0: return fmt % (num, unit, suffix) num /= 1024.0 return fmt % (num, "Y", suffix) def get_client(self): import pandas try: # This module was added in google-api-core 1.11.0. # We don't have a hard requirement on that version, so only # populate the client_info if available. import google.api_core.client_info client_info = google.api_core.client_info.ClientInfo( user_agent="pandas-{}".format(pandas.__version__) ) except ImportError: client_info = None # In addition to new enough version of google-api-core, a new enough # version of google-cloud-bigquery is required to populate the # client_info. if FEATURES.bigquery_has_client_info: return bigquery.Client( project=self.project_id, credentials=self.credentials, client_info=client_info, ) return bigquery.Client(project=self.project_id, credentials=self.credentials) @staticmethod def process_http_error(ex): # See `BigQuery Troubleshooting Errors # <https://cloud.google.com/bigquery/troubleshooting-errors>`__ raise GenericGBQException("Reason: {0}".format(ex)) def run_query(self, query, max_results=None, progress_bar_type=None, **kwargs): from concurrent.futures import TimeoutError from google.auth.exceptions import RefreshError job_config = { "query": { "useLegacySql": self.dialect == "legacy" # 'allowLargeResults', 'createDisposition', # 'preserveNulls', destinationTable, useQueryCache } } config = kwargs.get("configuration") if config is not None: job_config.update(config) if "query" in config and "query" in config["query"]: if query is not None: raise ValueError( "Query statement can't be specified " "inside config while it is specified " "as parameter" ) query = config["query"].pop("query") self._start_timer() try: logger.debug("Requesting query... ") query_reply = self.client.query( query, job_config=bigquery.QueryJobConfig.from_api_repr(job_config), location=self.location, project=self.project_id, ) logger.debug("Query running...") except (RefreshError, ValueError): if self.private_key: raise AccessDenied("The service account credentials are not valid") else: raise AccessDenied( "The credentials have been revoked or expired, " "please re-run the application to re-authorize" ) except self.http_error as ex: self.process_http_error(ex) job_id = query_reply.job_id logger.debug("Job ID: %s" % job_id) while query_reply.state != "DONE": self.log_elapsed_seconds(" Elapsed", "s. Waiting...") timeout_ms = job_config.get("jobTimeoutMs") or job_config["query"].get( "timeoutMs" ) timeout_ms = int(timeout_ms) if timeout_ms else None if timeout_ms and timeout_ms < self.get_elapsed_seconds() * 1000: raise QueryTimeout("Query timeout: {} ms".format(timeout_ms)) timeout_sec = 1.0 if timeout_ms: # Wait at most 1 second so we can show progress bar timeout_sec = min(1.0, timeout_ms / 1000.0) try: query_reply.result(timeout=timeout_sec) except TimeoutError: # Use our own timeout logic pass except self.http_error as ex: self.process_http_error(ex) if query_reply.cache_hit: logger.debug("Query done.\nCache hit.\n") else: bytes_processed = query_reply.total_bytes_processed or 0 bytes_billed = query_reply.total_bytes_billed or 0 logger.debug( "Query done.\nProcessed: {} Billed: {}".format( self.sizeof_fmt(bytes_processed), self.sizeof_fmt(bytes_billed), ) ) logger.debug( "Standard price: ${:,.2f} USD\n".format( bytes_billed * self.query_price_for_TB ) ) dtypes = kwargs.get("dtypes") return self._download_results( query_reply, max_results=max_results, progress_bar_type=progress_bar_type, user_dtypes=dtypes, ) def _download_results( self, query_job, max_results=None, progress_bar_type=None, user_dtypes=None, ): # No results are desired, so don't bother downloading anything. if max_results == 0: return None if user_dtypes is None: user_dtypes = {} if self.use_bqstorage_api and not FEATURES.bigquery_has_bqstorage: warnings.warn( ( "use_bqstorage_api was set, but have google-cloud-bigquery " "version {}. Requires google-cloud-bigquery version " "{} or later." ).format( FEATURES.bigquery_installed_version, features.BIGQUERY_BQSTORAGE_VERSION, ), PerformanceWarning, stacklevel=4, ) create_bqstorage_client = self.use_bqstorage_api if max_results is not None: create_bqstorage_client = False to_dataframe_kwargs = {} if FEATURES.bigquery_has_bqstorage: to_dataframe_kwargs["create_bqstorage_client"] = create_bqstorage_client try: query_job.result() # Get the table schema, so that we can list rows. destination = self.client.get_table(query_job.destination) rows_iter = self.client.list_rows(destination, max_results=max_results) schema_fields = [field.to_api_repr() for field in rows_iter.schema] conversion_dtypes = _bqschema_to_nullsafe_dtypes(schema_fields) conversion_dtypes.update(user_dtypes) df = rows_iter.to_dataframe( dtypes=conversion_dtypes, progress_bar_type=progress_bar_type, **to_dataframe_kwargs, ) except self.http_error as ex: self.process_http_error(ex) if df.empty: df = _cast_empty_df_dtypes(schema_fields, df) # Ensure any TIMESTAMP columns are tz-aware. df = pandas_gbq.timestamp.localize_df(df, schema_fields) logger.debug("Got {} rows.\n".format(rows_iter.total_rows)) return df def load_data( self, dataframe, destination_table_ref, chunksize=None, schema=None, progress_bar=True, api_method: str = "load_parquet", ): from pandas_gbq import load total_rows = len(dataframe) try: chunks = load.load_chunks( self.client, dataframe, destination_table_ref, chunksize=chunksize, schema=schema, location=self.location, api_method=api_method, ) if progress_bar and tqdm: chunks = tqdm.tqdm(chunks) for remaining_rows in chunks: logger.info( "\r{} out of {} rows loaded.".format( total_rows - remaining_rows, total_rows ) ) except self.http_error as ex: self.process_http_error(ex) def delete_and_recreate_table(self, dataset_id, table_id, table_schema): table = _Table(self.project_id, dataset_id, credentials=self.credentials) table.delete(table_id) table.create(table_id, table_schema) def _bqschema_to_nullsafe_dtypes(schema_fields): """Specify explicit dtypes based on BigQuery schema. This function only specifies a dtype when the dtype allows nulls. Otherwise, use pandas's default dtype choice. See: http://pandas.pydata.org/pandas-docs/dev/missing_data.html #missing-data-casting-rules-and-indexing """ # If you update this mapping, also update the table at # `docs/source/reading.rst`. dtype_map = { "DATE": "datetime64[ns]", "DATETIME": "datetime64[ns]", "FLOAT": np.dtype(float), "GEOMETRY": "object", "RECORD": "object", "STRING": "object", # datetime.time objects cannot be case to datetime64. # https://github.com/pydata/pandas-gbq/issues/328 "TIME": "object", # pandas doesn't support timezone-aware dtype in DataFrame/Series # constructors. It's more idiomatic to localize after construction. # https://github.com/pandas-dev/pandas/issues/25843 "TIMESTAMP": "datetime64[ns]", } dtypes = {} for field in schema_fields: name = str(field["name"]) # Array BigQuery type is represented as an object column containing # list objects. if field["mode"].upper() == "REPEATED": dtypes[name] = "object" continue dtype = dtype_map.get(field["type"].upper()) if dtype: dtypes[name] = dtype return dtypes def _cast_empty_df_dtypes(schema_fields, df): """Cast any columns in an empty dataframe to correct type. In an empty dataframe, pandas cannot choose a dtype unless one is explicitly provided. The _bqschema_to_nullsafe_dtypes() function only provides dtypes when the dtype safely handles null values. This means that empty int64 and boolean columns are incorrectly classified as ``object``. """ if not df.empty: raise ValueError("DataFrame must be empty in order to cast non-nullsafe dtypes") dtype_map = {"BOOLEAN": bool, "INTEGER": np.int64} for field in schema_fields: column = str(field["name"]) if field["mode"].upper() == "REPEATED": continue dtype = dtype_map.get(field["type"].upper()) if dtype: df[column] = df[column].astype(dtype) return df
[docs]def read_gbq( query, project_id=None, index_col=None, col_order=None, reauth=False, auth_local_webserver=False, dialect=None, location=None, configuration=None, credentials=None, use_bqstorage_api=False, max_results=None, verbose=None, private_key=None, progress_bar_type="tqdm", dtypes=None, ): r"""Load data from Google BigQuery using google-cloud-python The main method a user calls to execute a Query in Google BigQuery and read results into a pandas DataFrame. This method uses the Google Cloud client library to make requests to Google BigQuery, documented `here <https://google-cloud-python.readthedocs.io/en/latest/bigquery/usage.html>`__. See the :ref:`How to authenticate with Google BigQuery <authentication>` guide for authentication instructions. Parameters ---------- query : str SQL-Like Query to return data values. project_id : str, optional Google BigQuery Account project ID. Optional when available from the environment. index_col : str, optional Name of result column to use for index in results DataFrame. col_order : list(str), optional List of BigQuery column names in the desired order for results DataFrame. reauth : boolean, default False Force Google BigQuery to re-authenticate the user. This is useful if multiple accounts are used. auth_local_webserver : boolean, default False Use the `local webserver flow`_ instead of the `console flow`_ when getting user credentials. .. _local webserver flow: http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server .. _console flow: http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console .. versionadded:: 0.2.0 dialect : str, default 'standard' Note: The default value changed to 'standard' in version 0.10.0. SQL syntax dialect to use. Value can be one of: ``'legacy'`` Use BigQuery's legacy SQL dialect. For more information see `BigQuery Legacy SQL Reference <https://cloud.google.com/bigquery/docs/reference/legacy-sql>`__. ``'standard'`` Use BigQuery's standard SQL, which is compliant with the SQL 2011 standard. For more information see `BigQuery Standard SQL Reference <https://cloud.google.com/bigquery/docs/reference/standard-sql/>`__. location : str, optional Location where the query job should run. See the `BigQuery locations documentation <https://cloud.google.com/bigquery/docs/dataset-locations>`__ for a list of available locations. The location must match that of any datasets used in the query. .. versionadded:: 0.5.0 configuration : dict, optional Query config parameters for job processing. For example: configuration = {'query': {'useQueryCache': False}} For more information see `BigQuery REST API Reference <https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query>`__. credentials : google.auth.credentials.Credentials, optional Credentials for accessing Google APIs. Use this parameter to override default credentials, such as to use Compute Engine :class:`google.auth.compute_engine.Credentials` or Service Account :class:`google.oauth2.service_account.Credentials` directly. .. versionadded:: 0.8.0 use_bqstorage_api : bool, default False Use the `BigQuery Storage API <https://cloud.google.com/bigquery/docs/reference/storage/>`__ to download query results quickly, but at an increased cost. To use this API, first `enable it in the Cloud Console <https://console.cloud.google.com/apis/library/bigquerystorage.googleapis.com>`__. You must also have the `bigquery.readsessions.create <https://cloud.google.com/bigquery/docs/access-control#roles>`__ permission on the project you are billing queries to. **Note:** Due to a `known issue in the ``google-cloud-bigquery`` package <https://github.com/googleapis/google-cloud-python/pull/7633>`__ (fixed in version 1.11.0), you must write your query results to a destination table. To do this with ``read_gbq``, supply a ``configuration`` dictionary. This feature requires the ``google-cloud-bigquery-storage`` and ``pyarrow`` packages. This value is ignored if ``max_results`` is set. .. versionadded:: 0.10.0 max_results : int, optional If set, limit the maximum number of rows to fetch from the query results. .. versionadded:: 0.12.0 progress_bar_type (Optional[str]): If set, use the `tqdm <https://tqdm.github.io/>`__ library to display a progress bar while the data downloads. Install the ``tqdm`` package to use this feature. Possible values of ``progress_bar_type`` include: ``None`` No progress bar. ``'tqdm'`` Use the :func:`tqdm.tqdm` function to print a progress bar to :data:`sys.stderr`. ``'tqdm_notebook'`` Use the :func:`tqdm.tqdm_notebook` function to display a progress bar as a Jupyter notebook widget. ``'tqdm_gui'`` Use the :func:`tqdm.tqdm_gui` function to display a progress bar as a graphical dialog box. dtypes : dict, optional A dictionary of column names to pandas ``dtype``. The provided ``dtype`` is used when constructing the series for the column specified. Otherwise, a default ``dtype`` is used. verbose : None, deprecated Deprecated in Pandas-GBQ 0.4.0. Use the `logging module to adjust verbosity instead <https://pandas-gbq.readthedocs.io/en/latest/intro.html#logging>`__. private_key : str, deprecated Deprecated in pandas-gbq version 0.8.0. Use the ``credentials`` parameter and :func:`google.oauth2.service_account.Credentials.from_service_account_info` or :func:`google.oauth2.service_account.Credentials.from_service_account_file` instead. Returns ------- df: DataFrame DataFrame representing results of query. """ global context if dialect is None: dialect = context.dialect if dialect is None: dialect = "standard" _test_google_api_imports() if verbose is not None and FEATURES.pandas_has_deprecated_verbose: warnings.warn( "verbose is deprecated and will be removed in " "a future version. Set logging level in order to vary " "verbosity", FutureWarning, stacklevel=2, ) if dialect not in ("legacy", "standard"): raise ValueError("'{0}' is not valid for dialect".format(dialect)) connector = GbqConnector( project_id, reauth=reauth, dialect=dialect, auth_local_webserver=auth_local_webserver, location=location, credentials=credentials, private_key=private_key, use_bqstorage_api=use_bqstorage_api, ) final_df = connector.run_query( query, configuration=configuration, max_results=max_results, progress_bar_type=progress_bar_type, dtypes=dtypes, ) # Reindex the DataFrame on the provided column if index_col is not None: if index_col in final_df.columns: final_df.set_index(index_col, inplace=True) else: raise InvalidIndexColumn( 'Index column "{0}" does not exist in DataFrame.'.format(index_col) ) # Change the order of columns in the DataFrame based on provided list if col_order is not None: if sorted(col_order) == sorted(final_df.columns): final_df = final_df[col_order] else: raise InvalidColumnOrder("Column order does not match this DataFrame.") connector.log_elapsed_seconds( "Total time taken", datetime.now().strftime("s.\nFinished at %Y-%m-%d %H:%M:%S."), ) return final_df
[docs]def to_gbq( dataframe, destination_table, project_id=None, chunksize=None, reauth=False, if_exists="fail", auth_local_webserver=False, table_schema=None, location=None, progress_bar=True, credentials=None, api_method: str = "default", verbose=None, private_key=None, ): """Write a DataFrame to a Google BigQuery table. The main method a user calls to export pandas DataFrame contents to Google BigQuery table. This method uses the Google Cloud client library to make requests to Google BigQuery, documented `here <https://google-cloud-python.readthedocs.io/en/latest/bigquery/usage.html>`__. See the :ref:`How to authenticate with Google BigQuery <authentication>` guide for authentication instructions. Parameters ---------- dataframe : pandas.DataFrame DataFrame to be written to a Google BigQuery table. destination_table : str Name of table to be written, in the form ``dataset.tablename`` or ``project.dataset.tablename``. project_id : str, optional Google BigQuery Account project ID. Optional when available from the environment. chunksize : int, optional Number of rows to be inserted in each chunk from the dataframe. Set to ``None`` to load the whole dataframe at once. reauth : bool, default False Force Google BigQuery to re-authenticate the user. This is useful if multiple accounts are used. if_exists : str, default 'fail' Behavior when the destination table exists. Value can be one of: ``'fail'`` If table exists, do nothing. ``'replace'`` If table exists, drop it, recreate it, and insert data. ``'append'`` If table exists, insert data. Create if does not exist. auth_local_webserver : bool, default False Use the `local webserver flow`_ instead of the `console flow`_ when getting user credentials. .. _local webserver flow: http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_local_server .. _console flow: http://google-auth-oauthlib.readthedocs.io/en/latest/reference/google_auth_oauthlib.flow.html#google_auth_oauthlib.flow.InstalledAppFlow.run_console .. versionadded:: 0.2.0 table_schema : list of dicts, optional List of BigQuery table fields to which according DataFrame columns conform to, e.g. ``[{'name': 'col1', 'type': 'STRING'},...]``. The ``type`` values must be BigQuery type names. - If ``table_schema`` is provided, it may contain all or a subset of DataFrame columns. If a subset is provided, the rest will be inferred from the DataFrame dtypes. If ``table_schema`` contains columns not in the DataFrame, they'll be ignored. - If ``table_schema`` is **not** provided, it will be generated according to dtypes of DataFrame columns. See `Inferring the Table Schema <https://pandas-gbq.readthedocs.io/en/latest/writing.html#writing-schema>`__. for a description of the schema inference. See `BigQuery API documentation on valid column names <https://cloud.google.com/bigquery/docs/schemas#column_names`>__. .. versionadded:: 0.3.1 location : str, optional Location where the load job should run. See the `BigQuery locations documentation <https://cloud.google.com/bigquery/docs/dataset-locations>`__ for a list of available locations. The location must match that of the target dataset. .. versionadded:: 0.5.0 progress_bar : bool, default True Use the library `tqdm` to show the progress bar for the upload, chunk by chunk. .. versionadded:: 0.5.0 credentials : google.auth.credentials.Credentials, optional Credentials for accessing Google APIs. Use this parameter to override default credentials, such as to use Compute Engine :class:`google.auth.compute_engine.Credentials` or Service Account :class:`google.oauth2.service_account.Credentials` directly. .. versionadded:: 0.8.0 api_method : str, optional API method used to upload DataFrame to BigQuery. One of "load_parquet", "load_csv". Default "load_parquet" if pandas is version 1.1.0+, otherwise "load_csv". .. versionadded:: 0.16.0 verbose : bool, deprecated Deprecated in Pandas-GBQ 0.4.0. Use the `logging module to adjust verbosity instead <https://pandas-gbq.readthedocs.io/en/latest/intro.html#logging>`__. private_key : str, deprecated Deprecated in pandas-gbq version 0.8.0. Use the ``credentials`` parameter and :func:`google.oauth2.service_account.Credentials.from_service_account_info` or :func:`google.oauth2.service_account.Credentials.from_service_account_file` instead. """ _test_google_api_imports() if verbose is not None and FEATURES.pandas_has_deprecated_verbose: warnings.warn( "verbose is deprecated and will be removed in " "a future version. Set logging level in order to vary " "verbosity", FutureWarning, stacklevel=1, ) if api_method == "default": # Avoid using parquet if pandas doesn't support lossless conversions to # parquet timestamp. See: https://stackoverflow.com/a/69758676/101923 if FEATURES.pandas_has_parquet_with_lossless_timestamp: api_method = "load_parquet" else: api_method = "load_csv" if chunksize is not None: if api_method == "load_parquet": warnings.warn( "chunksize is ignored when using api_method='load_parquet'", DeprecationWarning, stacklevel=2, ) elif api_method == "load_csv": warnings.warn( "chunksize will be ignored when using api_method='load_csv' in a future version of pandas-gbq", PendingDeprecationWarning, stacklevel=2, ) if if_exists not in ("fail", "replace", "append"): raise ValueError("'{0}' is not valid for if_exists".format(if_exists)) if "." not in destination_table: raise NotFoundException( "Invalid Table Name. Should be of the form 'datasetId.tableId' or " "'projectId.datasetId.tableId'" ) connector = GbqConnector( project_id, reauth=reauth, auth_local_webserver=auth_local_webserver, location=location, credentials=credentials, private_key=private_key, ) bqclient = connector.client destination_table_ref = bigquery.table.TableReference.from_string( destination_table, default_project=connector.project_id ) project_id_table = destination_table_ref.project dataset_id = destination_table_ref.dataset_id table_id = destination_table_ref.table_id default_schema = _generate_bq_schema(dataframe) if not table_schema: table_schema = default_schema else: table_schema = pandas_gbq.schema.update_schema( default_schema, dict(fields=table_schema) ) # If table exists, check if_exists parameter try: table = bqclient.get_table(destination_table_ref) except google_exceptions.NotFound: table_connector = _Table( project_id_table, dataset_id, location=location, credentials=connector.credentials, ) table_connector.create(table_id, table_schema) else: original_schema = pandas_gbq.schema.to_pandas_gbq(table.schema) if if_exists == "fail": raise TableCreationError( "Could not create the table because it " "already exists. " "Change the if_exists parameter to " "'append' or 'replace' data." ) elif if_exists == "replace": connector.delete_and_recreate_table(dataset_id, table_id, table_schema) elif if_exists == "append": if not pandas_gbq.schema.schema_is_subset(original_schema, table_schema): raise InvalidSchema( "Please verify that the structure and " "data types in the DataFrame match the " "schema of the destination table." ) # Update the local `table_schema` so mode (NULLABLE/REQUIRED) # matches. See: https://github.com/pydata/pandas-gbq/issues/315 table_schema = pandas_gbq.schema.update_schema( table_schema, original_schema ) if dataframe.empty: # Create the table (if needed), but don't try to run a load job with an # empty file. See: https://github.com/pydata/pandas-gbq/issues/237 return connector.load_data( dataframe, destination_table_ref, chunksize=chunksize, schema=table_schema, progress_bar=progress_bar, api_method=api_method, )
def generate_bq_schema(df, default_type="STRING"): """DEPRECATED: Given a passed df, generate the associated Google BigQuery schema. Parameters ---------- df : DataFrame default_type : string The default big query type in case the type of the column does not exist in the schema. """ # deprecation TimeSeries, #11121 warnings.warn( "generate_bq_schema is deprecated and will be removed in " "a future version", FutureWarning, stacklevel=2, ) return _generate_bq_schema(df, default_type=default_type) def _generate_bq_schema(df, default_type="STRING"): """DEPRECATED: Given a dataframe, generate a Google BigQuery schema. This is a private method, but was used in external code to work around issues in the default schema generation. Now that individual columns can be overridden: https://github.com/pydata/pandas-gbq/issues/218, this method can be removed after there is time to migrate away from this method.""" from pandas_gbq import schema return schema.generate_bq_schema(df, default_type=default_type) class _Table(GbqConnector): def __init__( self, project_id, dataset_id, reauth=False, location=None, credentials=None, private_key=None, ): self.dataset_id = dataset_id super(_Table, self).__init__( project_id, reauth, location=location, credentials=credentials, private_key=private_key, ) def _table_ref(self, table_id): """Return a BigQuery client library table reference""" from google.cloud.bigquery import DatasetReference from google.cloud.bigquery import TableReference return TableReference( DatasetReference(self.project_id, self.dataset_id), table_id ) def exists(self, table_id): """Check if a table exists in Google BigQuery Parameters ---------- table : str Name of table to be verified Returns ------- boolean true if table exists, otherwise false """ from google.api_core.exceptions import NotFound table_ref = self._table_ref(table_id) try: self.client.get_table(table_ref) return True except NotFound: return False except self.http_error as ex: self.process_http_error(ex) def create(self, table_id, schema): """Create a table in Google BigQuery given a table and schema Parameters ---------- table : str Name of table to be written schema : str Use the generate_bq_schema to generate your table schema from a dataframe. """ from google.cloud.bigquery import DatasetReference from google.cloud.bigquery import Table from google.cloud.bigquery import TableReference if self.exists(table_id): raise TableCreationError("Table {0} already exists".format(table_id)) if not _Dataset(self.project_id, credentials=self.credentials).exists( self.dataset_id ): _Dataset( self.project_id, credentials=self.credentials, location=self.location, ).create(self.dataset_id) table_ref = TableReference( DatasetReference(self.project_id, self.dataset_id), table_id ) table = Table(table_ref) table.schema = pandas_gbq.schema.to_google_cloud_bigquery(schema) try: self.client.create_table(table) except self.http_error as ex: self.process_http_error(ex) def delete(self, table_id): """Delete a table in Google BigQuery Parameters ---------- table : str Name of table to be deleted """ from google.api_core.exceptions import NotFound if not self.exists(table_id): raise NotFoundException("Table does not exist") table_ref = self._table_ref(table_id) try: self.client.delete_table(table_ref) except NotFound: # Ignore 404 error which may occur if table already deleted pass except self.http_error as ex: self.process_http_error(ex) class _Dataset(GbqConnector): def __init__( self, project_id, reauth=False, location=None, credentials=None, private_key=None, ): super(_Dataset, self).__init__( project_id, reauth, credentials=credentials, location=location, private_key=private_key, ) def _dataset_ref(self, dataset_id): """Return a BigQuery client library dataset reference""" from google.cloud.bigquery import DatasetReference return DatasetReference(self.project_id, dataset_id) def exists(self, dataset_id): """Check if a dataset exists in Google BigQuery Parameters ---------- dataset_id : str Name of dataset to be verified Returns ------- boolean true if dataset exists, otherwise false """ from google.api_core.exceptions import NotFound try: self.client.get_dataset(self._dataset_ref(dataset_id)) return True except NotFound: return False except self.http_error as ex: self.process_http_error(ex) def create(self, dataset_id): """Create a dataset in Google BigQuery Parameters ---------- dataset : str Name of dataset to be written """ from google.cloud.bigquery import Dataset if self.exists(dataset_id): raise DatasetCreationError( "Dataset {0} already " "exists".format(dataset_id) ) dataset = Dataset(self._dataset_ref(dataset_id)) if self.location is not None: dataset.location = self.location try: self.client.create_dataset(dataset) except self.http_error as ex: self.process_http_error(ex)