As of January 1, 2020 this library no longer supports Python 2 on the latest released version.
Library versions released prior to that date will continue to be available. For more information please
visit Python 2 support on Google Cloud.
Source code for google.resumable_media.requests.upload
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Support for resumable uploads.
Also supported here are simple (media) uploads and multipart
uploads that contain both metadata and a small file as payload.
"""
from google.resumable_media import _upload
from google.resumable_media.requests import _request_helpers
[docs]class SimpleUpload(_request_helpers.RequestsMixin, _upload.SimpleUpload):
"""Upload a resource to a Google API.
A **simple** media upload sends no metadata and completes the upload
in a single request.
Args:
upload_url (str): The URL where the content will be uploaded.
headers (Optional[Mapping[str, str]]): Extra headers that should
be sent with the request, e.g. headers for encrypted data.
Attributes:
upload_url (str): The URL where the content will be uploaded.
"""
[docs] def transmit(
self,
transport,
data,
content_type,
timeout=(
_request_helpers._DEFAULT_CONNECT_TIMEOUT,
_request_helpers._DEFAULT_READ_TIMEOUT,
),
):
"""Transmit the resource to be uploaded.
Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
data (bytes): The resource content to be uploaded.
content_type (str): The content type of the resource, e.g. a JPEG
image has content type ``image/jpeg``.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
"""
method, url, payload, headers = self._prepare_request(data, content_type)
# Wrap the request business logic in a function to be retried.
def retriable_request():
result = transport.request(
method, url, data=payload, headers=headers, timeout=timeout
)
self._process_response(result)
return result
return _request_helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)
[docs]class MultipartUpload(_request_helpers.RequestsMixin, _upload.MultipartUpload):
"""Upload a resource with metadata to a Google API.
A **multipart** upload sends both metadata and the resource in a single
(multipart) request.
Args:
upload_url (str): The URL where the content will be uploaded.
headers (Optional[Mapping[str, str]]): Extra headers that should
be sent with the request, e.g. headers for encrypted data.
checksum Optional([str]): The type of checksum to compute to verify
the integrity of the object. The request metadata will be amended
to include the computed value. Using this option will override a
manually-set checksum value. Supported values are "md5",
"crc32c" and None. The default is None.
Attributes:
upload_url (str): The URL where the content will be uploaded.
"""
[docs] def transmit(
self,
transport,
data,
metadata,
content_type,
timeout=(
_request_helpers._DEFAULT_CONNECT_TIMEOUT,
_request_helpers._DEFAULT_READ_TIMEOUT,
),
):
"""Transmit the resource to be uploaded.
Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
data (bytes): The resource content to be uploaded.
metadata (Mapping[str, str]): The resource metadata, such as an
ACL list.
content_type (str): The content type of the resource, e.g. a JPEG
image has content type ``image/jpeg``.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
"""
method, url, payload, headers = self._prepare_request(
data, metadata, content_type
)
# Wrap the request business logic in a function to be retried.
def retriable_request():
result = transport.request(
method, url, data=payload, headers=headers, timeout=timeout
)
self._process_response(result)
return result
return _request_helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)
[docs]class ResumableUpload(_request_helpers.RequestsMixin, _upload.ResumableUpload):
"""Initiate and fulfill a resumable upload to a Google API.
A **resumable** upload sends an initial request with the resource metadata
and then gets assigned an upload ID / upload URL to send bytes to.
Using the upload URL, the upload is then done in chunks (determined by
the user) until all bytes have been uploaded.
When constructing a resumable upload, only the resumable upload URL and
the chunk size are required:
.. testsetup:: resumable-constructor
bucket = 'bucket-foo'
.. doctest:: resumable-constructor
>>> from google.resumable_media.requests import ResumableUpload
>>>
>>> url_template = (
... 'https://www.googleapis.com/upload/storage/v1/b/{bucket}/o?'
... 'uploadType=resumable')
>>> upload_url = url_template.format(bucket=bucket)
>>>
>>> chunk_size = 3 * 1024 * 1024 # 3MB
>>> upload = ResumableUpload(upload_url, chunk_size)
When initiating an upload (via :meth:`initiate`), the caller is expected
to pass the resource being uploaded as a file-like ``stream``. If the size
of the resource is explicitly known, it can be passed in directly:
.. testsetup:: resumable-explicit-size
import os
import tempfile
import mock
import requests
import http.client
from google.resumable_media.requests import ResumableUpload
upload_url = 'http://test.invalid'
chunk_size = 3 * 1024 * 1024 # 3MB
upload = ResumableUpload(upload_url, chunk_size)
file_desc, filename = tempfile.mkstemp()
os.close(file_desc)
data = b'some bytes!'
with open(filename, 'wb') as file_obj:
file_obj.write(data)
fake_response = requests.Response()
fake_response.status_code = int(http.client.OK)
fake_response._content = b''
resumable_url = 'http://test.invalid?upload_id=7up'
fake_response.headers['location'] = resumable_url
post_method = mock.Mock(return_value=fake_response, spec=[])
transport = mock.Mock(request=post_method, spec=['request'])
.. doctest:: resumable-explicit-size
>>> import os
>>>
>>> upload.total_bytes is None
True
>>>
>>> stream = open(filename, 'rb')
>>> total_bytes = os.path.getsize(filename)
>>> metadata = {'name': filename}
>>> response = upload.initiate(
... transport, stream, metadata, 'text/plain',
... total_bytes=total_bytes)
>>> response
<Response [200]>
>>>
>>> upload.total_bytes == total_bytes
True
.. testcleanup:: resumable-explicit-size
os.remove(filename)
If the stream is in a "final" state (i.e. it won't have any more bytes
written to it), the total number of bytes can be determined implicitly
from the ``stream`` itself:
.. testsetup:: resumable-implicit-size
import io
import mock
import requests
import http.client
from google.resumable_media.requests import ResumableUpload
upload_url = 'http://test.invalid'
chunk_size = 3 * 1024 * 1024 # 3MB
upload = ResumableUpload(upload_url, chunk_size)
fake_response = requests.Response()
fake_response.status_code = int(http.client.OK)
fake_response._content = b''
resumable_url = 'http://test.invalid?upload_id=7up'
fake_response.headers['location'] = resumable_url
post_method = mock.Mock(return_value=fake_response, spec=[])
transport = mock.Mock(request=post_method, spec=['request'])
data = b'some MOAR bytes!'
metadata = {'name': 'some-file.jpg'}
content_type = 'image/jpeg'
.. doctest:: resumable-implicit-size
>>> stream = io.BytesIO(data)
>>> response = upload.initiate(
... transport, stream, metadata, content_type)
>>>
>>> upload.total_bytes == len(data)
True
If the size of the resource is **unknown** when the upload is initiated,
the ``stream_final`` argument can be used. This might occur if the
resource is being dynamically created on the client (e.g. application
logs). To use this argument:
.. testsetup:: resumable-unknown-size
import io
import mock
import requests
import http.client
from google.resumable_media.requests import ResumableUpload
upload_url = 'http://test.invalid'
chunk_size = 3 * 1024 * 1024 # 3MB
upload = ResumableUpload(upload_url, chunk_size)
fake_response = requests.Response()
fake_response.status_code = int(http.client.OK)
fake_response._content = b''
resumable_url = 'http://test.invalid?upload_id=7up'
fake_response.headers['location'] = resumable_url
post_method = mock.Mock(return_value=fake_response, spec=[])
transport = mock.Mock(request=post_method, spec=['request'])
metadata = {'name': 'some-file.jpg'}
content_type = 'application/octet-stream'
stream = io.BytesIO(b'data')
.. doctest:: resumable-unknown-size
>>> response = upload.initiate(
... transport, stream, metadata, content_type,
... stream_final=False)
>>>
>>> upload.total_bytes is None
True
Args:
upload_url (str): The URL where the resumable upload will be initiated.
chunk_size (int): The size of each chunk used to upload the resource.
headers (Optional[Mapping[str, str]]): Extra headers that should
be sent with the :meth:`initiate` request, e.g. headers for
encrypted data. These **will not** be sent with
:meth:`transmit_next_chunk` or :meth:`recover` requests.
checksum Optional([str]): The type of checksum to compute to verify
the integrity of the object. After the upload is complete, the
server-computed checksum of the resulting object will be checked
and google.resumable_media.common.DataCorruption will be raised on
a mismatch. The corrupted file will not be deleted from the remote
host automatically. Supported values are "md5", "crc32c" and None.
The default is None.
Attributes:
upload_url (str): The URL where the content will be uploaded.
Raises:
ValueError: If ``chunk_size`` is not a multiple of
:data:`.UPLOAD_CHUNK_SIZE`.
"""
[docs] def initiate(
self,
transport,
stream,
metadata,
content_type,
total_bytes=None,
stream_final=True,
timeout=(
_request_helpers._DEFAULT_CONNECT_TIMEOUT,
_request_helpers._DEFAULT_READ_TIMEOUT,
),
):
"""Initiate a resumable upload.
By default, this method assumes your ``stream`` is in a "final"
state ready to transmit. However, ``stream_final=False`` can be used
to indicate that the size of the resource is not known. This can happen
if bytes are being dynamically fed into ``stream``, e.g. if the stream
is attached to application logs.
If ``stream_final=False`` is used, :attr:`chunk_size` bytes will be
read from the stream every time :meth:`transmit_next_chunk` is called.
If one of those reads produces strictly fewer bites than the chunk
size, the upload will be concluded.
Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
stream (IO[bytes]): The stream (i.e. file-like object) that will
be uploaded. The stream **must** be at the beginning (i.e.
``stream.tell() == 0``).
metadata (Mapping[str, str]): The resource metadata, such as an
ACL list.
content_type (str): The content type of the resource, e.g. a JPEG
image has content type ``image/jpeg``.
total_bytes (Optional[int]): The total number of bytes to be
uploaded. If specified, the upload size **will not** be
determined from the stream (even if ``stream_final=True``).
stream_final (Optional[bool]): Indicates if the ``stream`` is
"final" (i.e. no more bytes will be added to it). In this case
we determine the upload size from the size of the stream. If
``total_bytes`` is passed, this argument will be ignored.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
"""
method, url, payload, headers = self._prepare_initiate_request(
stream,
metadata,
content_type,
total_bytes=total_bytes,
stream_final=stream_final,
)
# Wrap the request business logic in a function to be retried.
def retriable_request():
result = transport.request(
method, url, data=payload, headers=headers, timeout=timeout
)
self._process_initiate_response(result)
return result
return _request_helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)
[docs] def transmit_next_chunk(
self,
transport,
timeout=(
_request_helpers._DEFAULT_CONNECT_TIMEOUT,
_request_helpers._DEFAULT_READ_TIMEOUT,
),
):
"""Transmit the next chunk of the resource to be uploaded.
If the current upload was initiated with ``stream_final=False``,
this method will dynamically determine if the upload has completed.
The upload will be considered complete if the stream produces
fewer than :attr:`chunk_size` bytes when a chunk is read from it.
In the case of failure, an exception is thrown that preserves the
failed response:
.. testsetup:: bad-response
import io
import mock
import requests
import http.client
from google import resumable_media
import google.resumable_media.requests.upload as upload_mod
transport = mock.Mock(spec=['request'])
fake_response = requests.Response()
fake_response.status_code = int(http.client.BAD_REQUEST)
transport.request.return_value = fake_response
upload_url = 'http://test.invalid'
upload = upload_mod.ResumableUpload(
upload_url, resumable_media.UPLOAD_CHUNK_SIZE)
# Fake that the upload has been initiate()-d
data = b'data is here'
upload._stream = io.BytesIO(data)
upload._total_bytes = len(data)
upload._resumable_url = 'http://test.invalid?upload_id=nope'
.. doctest:: bad-response
:options: +NORMALIZE_WHITESPACE
>>> error = None
>>> try:
... upload.transmit_next_chunk(transport)
... except resumable_media.InvalidResponse as caught_exc:
... error = caught_exc
...
>>> error
InvalidResponse('Request failed with status code', 400,
'Expected one of', <HTTPStatus.OK: 200>, <HTTPStatus.PERMANENT_REDIRECT: 308>)
>>> error.response
<Response [400]>
Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
Raises:
~google.resumable_media.common.InvalidResponse: If the status
code is not 200 or http.client.PERMANENT_REDIRECT.
~google.resumable_media.common.DataCorruption: If this is the final
chunk, a checksum validation was requested, and the checksum
does not match or is not available.
"""
method, url, payload, headers = self._prepare_request()
# Wrap the request business logic in a function to be retried.
def retriable_request():
result = transport.request(
method, url, data=payload, headers=headers, timeout=timeout
)
self._process_resumable_response(result, len(payload))
return result
return _request_helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)
[docs] def recover(self, transport):
"""Recover from a failure and check the status of the current upload.
This will verify the progress with the server and make sure the
current upload is in a valid state before :meth:`transmit_next_chunk`
can be used again. See https://cloud.google.com/storage/docs/performing-resumable-uploads#status-check
for more information.
This method can be used when a :class:`ResumableUpload` is in an
:attr:`~ResumableUpload.invalid` state due to a request failure.
Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
"""
timeout = (
_request_helpers._DEFAULT_CONNECT_TIMEOUT,
_request_helpers._DEFAULT_READ_TIMEOUT,
)
method, url, payload, headers = self._prepare_recover_request()
# NOTE: We assume "payload is None" but pass it along anyway.
# Wrap the request business logic in a function to be retried.
def retriable_request():
result = transport.request(
method, url, data=payload, headers=headers, timeout=timeout
)
self._process_recover_response(result)
return result
return _request_helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)
[docs]class XMLMPUContainer(_request_helpers.RequestsMixin, _upload.XMLMPUContainer):
"""Initiate and close an upload using the XML MPU API.
An XML MPU sends an initial request and then receives an upload ID.
Using the upload ID, the upload is then done in numbered parts and the
parts can be uploaded concurrently.
In order to avoid concurrency issues with this container object, the
uploading of individual parts is handled separately, by XMLMPUPart objects
spawned from this container class. The XMLMPUPart objects are not
necessarily in the same process as the container, so they do not update the
container automatically.
MPUs are sometimes referred to as "Multipart Uploads", which is ambiguous
given the JSON multipart upload, so the abbreviation "MPU" will be used
throughout.
See: https://cloud.google.com/storage/docs/multipart-uploads
Args:
upload_url (str): The URL of the object (without query parameters). The
initiate, PUT, and finalization requests will all use this URL, with
varying query parameters.
headers (Optional[Mapping[str, str]]): Extra headers that should
be sent with the :meth:`initiate` request, e.g. headers for
encrypted data. These headers will be propagated to individual
XMLMPUPart objects spawned from this container as well.
Attributes:
upload_url (str): The URL where the content will be uploaded.
upload_id (Optional(int)): The ID of the upload from the initialization
response.
"""
[docs] def initiate(
self,
transport,
content_type,
timeout=(
_request_helpers._DEFAULT_CONNECT_TIMEOUT,
_request_helpers._DEFAULT_READ_TIMEOUT,
),
):
"""Initiate an MPU and record the upload ID.
Args:
transport (object): An object which can make authenticated
requests.
content_type (str): The content type of the resource, e.g. a JPEG
image has content type ``image/jpeg``.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
"""
method, url, payload, headers = self._prepare_initiate_request(
content_type,
)
# Wrap the request business logic in a function to be retried.
def retriable_request():
result = transport.request(
method, url, data=payload, headers=headers, timeout=timeout
)
self._process_initiate_response(result)
return result
return _request_helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)
[docs] def finalize(
self,
transport,
timeout=(
_request_helpers._DEFAULT_CONNECT_TIMEOUT,
_request_helpers._DEFAULT_READ_TIMEOUT,
),
):
"""Finalize an MPU request with all the parts.
Args:
transport (object): An object which can make authenticated
requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
"""
method, url, payload, headers = self._prepare_finalize_request()
# Wrap the request business logic in a function to be retried.
def retriable_request():
result = transport.request(
method, url, data=payload, headers=headers, timeout=timeout
)
self._process_finalize_response(result)
return result
return _request_helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)
[docs] def cancel(
self,
transport,
timeout=(
_request_helpers._DEFAULT_CONNECT_TIMEOUT,
_request_helpers._DEFAULT_READ_TIMEOUT,
),
):
"""Cancel an MPU request and permanently delete any uploaded parts.
This cannot be undone.
Args:
transport (object): An object which can make authenticated
requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
"""
method, url, payload, headers = self._prepare_cancel_request()
# Wrap the request business logic in a function to be retried.
def retriable_request():
result = transport.request(
method, url, data=payload, headers=headers, timeout=timeout
)
self._process_cancel_response(result)
return result
return _request_helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)
[docs]class XMLMPUPart(_request_helpers.RequestsMixin, _upload.XMLMPUPart):
[docs] def upload(
self,
transport,
timeout=(
_request_helpers._DEFAULT_CONNECT_TIMEOUT,
_request_helpers._DEFAULT_READ_TIMEOUT,
),
):
"""Upload the part.
Args:
transport (object): An object which can make authenticated
requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
"""
method, url, payload, headers = self._prepare_upload_request()
# Wrap the request business logic in a function to be retried.
def retriable_request():
result = transport.request(
method, url, data=payload, headers=headers, timeout=timeout
)
self._process_upload_response(result)
return result
return _request_helpers.wait_and_retry(
retriable_request, self._get_status_code, self._retry_strategy
)