Google Cloud Storage C++ Client  1.40.2
A C++ Client Library for Google Cloud Storage
parallel_upload.h
Go to the documentation of this file.
1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
17 
18 #include "google/cloud/storage/client.h"
19 #include "google/cloud/storage/internal/tuple_filter.h"
20 #include "google/cloud/storage/object_stream.h"
21 #include "google/cloud/storage/version.h"
22 #include "google/cloud/future.h"
23 #include "google/cloud/internal/filesystem.h"
24 #include "google/cloud/status_or.h"
25 #include "absl/memory/memory.h"
26 #include "absl/types/optional.h"
27 #include <condition_variable>
28 #include <cstddef>
29 #include <fstream>
30 #include <functional>
31 #include <mutex>
32 #include <string>
33 #include <tuple>
34 #include <utility>
35 #include <vector>
36 
37 namespace google {
38 namespace cloud {
39 namespace storage {
41 /**
42  * A parameter type indicating the maximum number of streams to
43  * `ParallelUploadFile`.
44  */
45 class MaxStreams {
46  public:
47  explicit MaxStreams(std::size_t value) : value_(value) {}
48  std::size_t value() const { return value_; }
49 
50  private:
51  std::size_t value_;
52 };
53 
54 /**
55  * A parameter type indicating the minimum stream size to `ParallelUploadFile`.
56  *
57  * If `ParallelUploadFile`, receives this option it will attempt to make sure
58  * that every shard is at least this long. This might not apply to the last
59  * shard because it will be the remainder of the division of the file.
60  */
62  public:
63  explicit MinStreamSize(std::uintmax_t value) : value_(value) {}
64  std::uintmax_t value() const { return value_; }
65 
66  private:
67  std::uintmax_t value_;
68 };
69 
70 namespace internal {
71 
72 class ParallelUploadFileShard;
73 struct CreateParallelUploadShards;
74 
75 /**
76  * Return an empty option if Tuple contains an element of type T, otherwise
77  * return the value of the first element of type T
78  */
79 template <typename T, typename Tuple, typename Enable = void>
80 struct ExtractFirstOccurrenceOfTypeImpl {
81  absl::optional<T> operator()(Tuple const&) { return absl::optional<T>(); }
82 };
83 
84 template <typename T, typename... Options>
85 struct ExtractFirstOccurrenceOfTypeImpl<
86  T, std::tuple<Options...>,
87  typename std::enable_if<
88  Among<typename std::decay<Options>::type...>::template TPred<
89  typename std::decay<T>::type>::value>::type> {
90  absl::optional<T> operator()(std::tuple<Options...> const& tuple) {
91  return std::get<0>(StaticTupleFilter<Among<T>::template TPred>(tuple));
92  }
93 };
94 
95 template <typename T, typename Tuple>
96 absl::optional<T> ExtractFirstOccurrenceOfType(Tuple const& tuple) {
97  return ExtractFirstOccurrenceOfTypeImpl<T, Tuple>()(tuple);
98 }
99 
100 /**
101  * An option for `PrepareParallelUpload` to associate opaque data with upload.
102  *
103  * This is used by `CreateUploadShards()` to store additional information in the
104  * parallel upload persistent state. The additional information is where each
105  * shard starts in the uploaded file.
106  */
107 class ParallelUploadExtraPersistentState {
108  public:
109  std::string payload() && { return std::move(payload_); }
110  std::string payload() const& { return payload_; }
111 
112  private:
113  friend struct CreateParallelUploadShards;
114  explicit ParallelUploadExtraPersistentState(std::string payload)
115  : payload_(std::move(payload)) {}
116 
117  std::string payload_;
118 };
119 
120 class ParallelObjectWriteStreambuf;
121 
122 // Type-erased function object to execute ComposeMany with most arguments
123 // bound.
124 using Composer =
125  std::function<StatusOr<ObjectMetadata>(std::vector<ComposeSourceObject>)>;
126 
127 struct ParallelUploadPersistentState {
128  struct Stream {
129  std::string object_name;
130  std::string resumable_session_id;
131  };
132 
133  std::string ToString() const;
134  static StatusOr<ParallelUploadPersistentState> FromString(
135  std::string const& json_rep);
136 
137  std::string destination_object_name;
138  std::int64_t expected_generation;
139  std::string custom_data;
140  std::vector<Stream> streams;
141 };
142 
143 // The `ObjectWriteStream`s have to hold references to the state of
144 // the parallel upload so that they can update it when finished and trigger
145 // shards composition, hence `ResumableParallelUploadState` has to be
146 // destroyed after the `ObjectWriteStream`s.
147 // `ResumableParallelUploadState` and `ObjectWriteStream`s are passed
148 // around by values, so we don't control their lifetime. In order to
149 // circumvent it, we move the state to something held by a `shared_ptr`.
150 class ParallelUploadStateImpl
151  : public std::enable_shared_from_this<ParallelUploadStateImpl> {
152  public:
153  ParallelUploadStateImpl(bool cleanup_on_failures,
154  std::string destination_object_name,
155  std::int64_t expected_generation,
156  std::shared_ptr<ScopedDeleter> deleter,
157  Composer composer);
158  ~ParallelUploadStateImpl();
159 
160  StatusOr<ObjectWriteStream> CreateStream(
161  std::shared_ptr<RawClient> raw_client,
162  ResumableUploadRequest const& request);
163 
164  void AllStreamsFinished(std::unique_lock<std::mutex>& lk);
165  void StreamFinished(std::size_t stream_idx,
166  StatusOr<QueryResumableUploadResponse> const& response);
167 
168  void StreamDestroyed(std::size_t stream_idx);
169 
170  future<StatusOr<ObjectMetadata>> WaitForCompletion() const;
171 
172  Status EagerCleanup();
173 
174  void Fail(Status status);
175 
176  ParallelUploadPersistentState ToPersistentState() const;
177 
178  std::string custom_data() const {
179  std::unique_lock<std::mutex> lk(mu_);
180  return custom_data_;
181  }
182 
183  void set_custom_data(std::string custom_data) {
184  std::unique_lock<std::mutex> lk(mu_);
185  custom_data_ = std::move(custom_data);
186  }
187 
188  std::string resumable_session_id() {
189  std::unique_lock<std::mutex> lk(mu_);
190  return resumable_session_id_;
191  }
192 
193  void set_resumable_session_id(std::string resumable_session_id) {
194  std::unique_lock<std::mutex> lk(mu_);
195  resumable_session_id_ = std::move(resumable_session_id);
196  }
197 
198  void PreventFromFinishing() {
199  std::unique_lock<std::mutex> lk(mu_);
200  ++num_unfinished_streams_;
201  }
202 
203  void AllowFinishing() {
204  std::unique_lock<std::mutex> lk(mu_);
205  if (--num_unfinished_streams_ == 0) {
206  AllStreamsFinished(lk);
207  }
208  }
209 
210  private:
211  struct StreamInfo {
212  std::string object_name;
213  std::string resumable_session_id;
214  absl::optional<ComposeSourceObject> composition_arg;
215  bool finished;
216  };
217 
218  mutable std::mutex mu_;
219  // Promises made via `WaitForCompletion()`
220  mutable std::vector<promise<StatusOr<ObjectMetadata>>> res_promises_;
221  // Type-erased object for deleting temporary objects.
222  std::shared_ptr<ScopedDeleter> deleter_;
223  Composer composer_;
224  std::string destination_object_name_;
225  std::int64_t expected_generation_;
226  // Set when all streams are closed and composed but before cleanup.
227  bool finished_ = false;
228  // Tracks how many streams are still written to.
229  std::size_t num_unfinished_streams_ = 0;
230  std::vector<StreamInfo> streams_;
231  absl::optional<StatusOr<ObjectMetadata>> res_;
232  Status cleanup_status_;
233  std::string custom_data_;
234  std::string resumable_session_id_;
235 };
236 
237 struct ComposeManyApplyHelper {
238  template <typename... Options>
239  StatusOr<ObjectMetadata> operator()(Options&&... options) const {
240  return ComposeMany(client, bucket_name, std::move(source_objects), prefix,
241  std::move(destination_object_name), true,
242  std::forward<Options>(options)...);
243  }
244 
245  Client& client;
246  std::string bucket_name;
247  std::vector<ComposeSourceObject> source_objects;
248  std::string prefix;
249  std::string destination_object_name;
250 };
251 
252 class SetOptionsApplyHelper {
253  public:
254  explicit SetOptionsApplyHelper(ResumableUploadRequest& request)
255  : request_(request) {}
256 
257  template <typename... Options>
258  void operator()(Options&&... options) const {
259  request_.set_multiple_options(std::forward<Options>(options)...);
260  }
261 
262  private:
263  ResumableUploadRequest& request_;
264 };
265 
266 struct ReadObjectApplyHelper {
267  template <typename... Options>
268  ObjectReadStream operator()(Options&&... options) const {
269  return client.ReadObject(bucket_name, object_name,
270  std::forward<Options>(options)...);
271  }
272 
273  Client& client;
274  std::string const& bucket_name;
275  std::string const& object_name;
276 };
277 
278 struct GetObjectMetadataApplyHelper {
279  template <typename... Options>
280  StatusOr<ObjectMetadata> operator()(Options... options) const {
281  return client.GetObjectMetadata(bucket_name, object_name,
282  std::move(options)...);
283  }
284 
285  Client& client;
286  std::string bucket_name;
287  std::string object_name;
288 };
289 
290 /**
291  * A class representing an individual shard of the parallel upload.
292  *
293  * In order to perform a parallel upload of a file, you should call
294  * `CreateUploadShards()` and it will return a vector of objects of this class.
295  * You should execute the `Upload()` member function on them in parallel to
296  * execute the upload.
297  *
298  * You can then obtain the status of the whole upload via `WaitForCompletion()`.
299  */
300 class ParallelUploadFileShard {
301  public:
302  ParallelUploadFileShard(ParallelUploadFileShard const&) = delete;
303  ParallelUploadFileShard& operator=(ParallelUploadFileShard const&) = delete;
304  ParallelUploadFileShard(ParallelUploadFileShard&&) = default;
305  ParallelUploadFileShard& operator=(ParallelUploadFileShard&&) = default;
306  ~ParallelUploadFileShard();
307 
308  /**
309  * Perform the upload of this shard.
310  *
311  * This function will block until the shard is completed, or a permanent
312  * failure is encountered, or the retry policy is exhausted.
313  */
314  Status Upload();
315 
316  /**
317  * Asynchronously wait for completion of the whole upload operation (not only
318  * this shard).
319  *
320  * @return the returned future will become satisfied once the whole upload
321  * operation finishes (i.e. `Upload()` completes on all shards); on
322  * success, it will hold the destination object's metadata
323  */
324  future<StatusOr<ObjectMetadata>> WaitForCompletion() {
325  return state_->WaitForCompletion();
326  }
327 
328  /**
329  * Cleanup all the temporary files
330  *
331  * The destruction of the last of these objects tied to a parallel upload will
332  * cleanup of all the temporary files used in the process of that parallel
333  * upload. If the cleanup fails, it will fail silently not to crash the
334  * program.
335  *
336  * If you want to control the status of the cleanup, use this member function
337  * to do it eagerly, before destruction.
338  *
339  * It is enough to call it on one of the objects, but it is not invalid to
340  * call it on all objects.
341  */
342  Status EagerCleanup() { return state_->EagerCleanup(); }
343 
344  /**
345  * Retrieve resumable session ID to allow for potential future resume.
346  */
347  std::string resumable_session_id() { return resumable_session_id_; }
348 
349  private:
350  friend struct CreateParallelUploadShards;
351  ParallelUploadFileShard(std::shared_ptr<ParallelUploadStateImpl> state,
352  ObjectWriteStream ostream, std::string file_name,
353  std::uintmax_t offset_in_file,
354  std::uintmax_t bytes_to_upload,
355  std::size_t upload_buffer_size)
356  : state_(std::move(state)),
357  ostream_(std::move(ostream)),
358  file_name_(std::move(file_name)),
359  offset_in_file_(offset_in_file),
360  left_to_upload_(bytes_to_upload),
361  upload_buffer_size_(upload_buffer_size),
362  resumable_session_id_(state_->resumable_session_id()) {}
363 
364  std::shared_ptr<ParallelUploadStateImpl> state_;
365  ObjectWriteStream ostream_;
366  std::string file_name_;
367  // GCS only supports objects up to 5TiB, that fits comfortably in a
368  // `std::int64_t`, allows for any expected growth on that limit (not that we
369  // anticipate any), and plays nicer with the types in the standard C++
370  // library.
371  std::int64_t offset_in_file_;
372  std::int64_t left_to_upload_;
373  std::size_t upload_buffer_size_;
374  std::string resumable_session_id_;
375 };
376 
377 /**
378  * The state controlling uploading a GCS object via multiple parallel streams.
379  *
380  * To use this class obtain the state via `PrepareParallelUpload` and then write
381  * the data to the streams associated with each shard. Once writing is done,
382  * close or destroy the streams.
383  *
384  * When all the streams are `Close`d or destroyed, this class will join the
385  * them (via `ComposeMany`) into the destination object and set the value in
386  * `future`s returned by `WaitForCompletion`.
387  *
388  * Parallel upload will create temporary files. Upon completion of the whole
389  * operation, this class will attempt to remove them in its destructor, but if
390  * they fail, they fail silently. In order to proactively cleanup these files,
391  * one can call `EagerCleanup()`.
392  */
393 class NonResumableParallelUploadState {
394  public:
395  template <typename... Options>
396  static StatusOr<NonResumableParallelUploadState> Create(
397  Client client, std::string const& bucket_name,
398  std::string const& object_name, std::size_t num_shards,
399  std::string const& prefix, std::tuple<Options...> options);
400 
401  /**
402  * Asynchronously wait for completion of the whole upload operation.
403  *
404  * @return the returned future will have a value set to the destination object
405  * metadata when all the streams are `Close`d or destroyed.
406  */
407  future<StatusOr<ObjectMetadata>> WaitForCompletion() const {
408  return impl_->WaitForCompletion();
409  }
410 
411  /**
412  * Cleanup all the temporary files
413  *
414  * The destruction of this object will perform cleanup of all the temporary
415  * files used in the process of the parallel upload. If the cleanup fails, it
416  * will fail silently not to crash the program.
417  *
418  * If you want to control the status of the cleanup, use this member function
419  * to do it eagerly, before destruction.
420  *
421  * @return the status of the cleanup.
422  */
423  Status EagerCleanup() { return impl_->EagerCleanup(); }
424 
425  /**
426  * The streams to write to.
427  *
428  * When the streams are `Close`d, they will be concatenated into the
429  * destination object in the same order as they appeared in this vector upon
430  * this object's creation.
431  *
432  * It is safe to destroy or `std::move()` these streams.
433  */
434  std::vector<ObjectWriteStream>& shards() { return shards_; }
435 
436  /**
437  * Fail the whole operation.
438  *
439  * If called before all streams are closed or destroyed, calling this
440  * operation will prevent composing the streams into the final destination
441  * object and return a failure via `WaitForCompletion()`.
442  *
443  * @param status the status to fail the operation with.
444  */
445  void Fail(Status status) { return impl_->Fail(std::move(status)); }
446 
447  private:
448  NonResumableParallelUploadState(
449  std::shared_ptr<ParallelUploadStateImpl> state,
450  std::vector<ObjectWriteStream> shards)
451  : impl_(std::move(state)), shards_(std::move(shards)) {}
452 
453  std::shared_ptr<ParallelUploadStateImpl> impl_;
454  std::vector<ObjectWriteStream> shards_;
455 
456  friend struct CreateParallelUploadShards;
457 };
458 
459 /**
460  * The state controlling uploading a GCS object via multiple parallel streams,
461  * allowing for resuming.
462  *
463  * To use this class obtain the state via `PrepareParallelUpload` (with
464  * `UseResumableUploadSession` option) and then write the data to the streams
465  * associated with each shard. Once writing is done, close or destroy the
466  * streams.
467  *
468  * When all the streams are `Close`d or destroyed, this class will join the
469  * them (via `ComposeMany`) into the destination object and set the value in
470  * `future`s returned by `WaitForCompletion`.
471  *
472  * Parallel upload will create temporary files. Upon successful completion of
473  * the whole operation, this class will attempt to remove them in its
474  * destructor, but if they fail, they fail silently. In order to proactively
475  * cleanup these files, one can call `EagerCleanup()`.
476  *
477  * In order to resume an interrupted upload, provide `UseResumableUploadSession`
478  * to `PrepareParallelUpload` with value set to what `resumable_session_id()`
479  * returns.
480  */
481 class ResumableParallelUploadState {
482  public:
483  static std::string session_id_prefix() { return "ParUpl:"; }
484 
485  template <typename... Options>
486  static StatusOr<ResumableParallelUploadState> CreateNew(
487  Client client, std::string const& bucket_name,
488  std::string const& object_name, std::size_t num_shards,
489  std::string const& prefix, std::string extra_state,
490  std::tuple<Options...> const& options);
491 
492  template <typename... Options>
493  static StatusOr<ResumableParallelUploadState> Resume(
494  Client client, std::string const& bucket_name,
495  std::string const& object_name, std::size_t num_shards,
496  std::string const& prefix, std::string resumable_session_id,
497  std::tuple<Options...> options);
498 
499  /**
500  * Retrieve the resumable session id.
501  *
502  * This value, if passed via `UseResumableUploadSession` option indicates that
503  * an upload should be a continuation of the one which this object represents.
504  */
505  std::string resumable_session_id() { return resumable_session_id_; }
506 
507  /**
508  * Asynchronously wait for completion of the whole upload operation.
509  *
510  * @return the returned future will have a value set to the destination object
511  * metadata when all the streams are `Close`d or destroyed.
512  */
513  future<StatusOr<ObjectMetadata>> WaitForCompletion() const {
514  return impl_->WaitForCompletion();
515  }
516 
517  /**
518  * Cleanup all the temporary files
519  *
520  * The destruction of this object will perform cleanup of all the temporary
521  * files used in the process of the parallel upload. If the cleanup fails, it
522  * will fail silently not to crash the program.
523  *
524  * If you want to control the status of the cleanup, use this member function
525  * to do it eagerly, before destruction.
526  *
527  * @return the status of the cleanup.
528  */
529  Status EagerCleanup() { return impl_->EagerCleanup(); }
530 
531  /**
532  * The streams to write to.
533  *
534  * When the streams are `Close`d, they will be concatenated into the
535  * destination object in the same order as they appeared in this vector upon
536  * this object's creation.
537  *
538  * It is safe to destroy or `std::move()` these streams.
539  */
540  std::vector<ObjectWriteStream>& shards() { return shards_; }
541 
542  /**
543  * Fail the whole operation.
544  *
545  * If called before all streams are closed or destroyed, calling this
546  * operation will prevent composing the streams into the final destination
547  * object and return a failure via `WaitForCompletion()`.
548  *
549  * @param status the status to fail the operation with.
550  */
551  void Fail(Status status) { return impl_->Fail(std::move(status)); }
552 
553  private:
554  template <typename... Options>
555  static std::shared_ptr<ScopedDeleter> CreateDeleter(
556  Client client, std::string const& bucket_name,
557  std::tuple<Options...> const& options);
558 
559  template <typename... Options>
560  static Composer CreateComposer(Client client, std::string const& bucket_name,
561  std::string const& object_name,
562  std::int64_t expected_generation,
563  std::string const& prefix,
564  std::tuple<Options...> const& options);
565 
566  ResumableParallelUploadState(std::string resumable_session_id,
567  std::shared_ptr<ParallelUploadStateImpl> state,
568  std::vector<ObjectWriteStream> shards)
569  : resumable_session_id_(std::move(resumable_session_id)),
570  impl_(std::move(state)),
571  shards_(std::move(shards)) {}
572 
573  std::string resumable_session_id_;
574  std::shared_ptr<ParallelUploadStateImpl> impl_;
575  std::vector<ObjectWriteStream> shards_;
576 
577  friend struct CreateParallelUploadShards;
578 };
579 
580 /**
581  * Prepare a parallel upload state.
582  *
583  * The returned `NonResumableParallelUploadState` will contain streams to which
584  * data can be uploaded in parallel.
585  *
586  * @param client the client on which to perform the operation.
587  * @param bucket_name the name of the bucket that will contain the object.
588  * @param object_name the uploaded object name.
589  * @param num_shards how many streams to upload the object through.
590  * @param prefix the prefix with which temporary objects will be created.
591  * @param options a list of optional query parameters and/or request headers.
592  * Valid types for this operation include `DestinationPredefinedAcl`,
593  * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
594  * `KmsKeyName`, `QuotaUser`, `UserIp`, `UserProject`, `WithObjectMetadata`.
595  *
596  * @return the state of the parallel upload
597  *
598  * @par Idempotency
599  * This operation is not idempotent. While each request performed by this
600  * function is retried based on the client policies, the operation itself stops
601  * on the first request that fails.
602  */
603 template <typename... Options,
604  typename std::enable_if<
605  NotAmong<typename std::decay<Options>::type...>::template TPred<
607  int>::type EnableIfNotResumable = 0>
608 StatusOr<NonResumableParallelUploadState> PrepareParallelUpload(
609  Client client, std::string const& bucket_name,
610  std::string const& object_name, std::size_t num_shards,
611  std::string const& prefix, Options&&... options) {
612  return NonResumableParallelUploadState::Create(
613  std::move(client), bucket_name, object_name, num_shards, prefix,
614  StaticTupleFilter<NotAmong<ParallelUploadExtraPersistentState>::TPred>(
615  std::forward_as_tuple(std::forward<Options>(options)...)));
616 }
617 
618 template <typename... Options,
619  typename std::enable_if<
620  Among<typename std::decay<Options>::type...>::template TPred<
622  int>::type EnableIfResumable = 0>
623 StatusOr<ResumableParallelUploadState> PrepareParallelUpload(
624  Client client, std::string const& bucket_name,
625  std::string const& object_name, std::size_t num_shards,
626  std::string const& prefix, Options&&... options) {
627  auto resumable_args =
628  StaticTupleFilter<Among<UseResumableUploadSession>::TPred>(
629  std::tie(options...));
630  static_assert(
631  std::tuple_size<decltype(resumable_args)>::value == 1,
632  "There should be exactly one UseResumableUploadSession argument");
633  std::string resumable_session_id = std::get<0>(resumable_args).value();
634  auto extra_state_arg =
635  ExtractFirstOccurrenceOfType<ParallelUploadExtraPersistentState>(
636  std::tie(options...));
637 
638  auto forwarded_args =
639  StaticTupleFilter<NotAmong<UseResumableUploadSession,
640  ParallelUploadExtraPersistentState>::TPred>(
641  std::forward_as_tuple(std::forward<Options>(options)...));
642 
643  if (resumable_session_id.empty()) {
644  return ResumableParallelUploadState::CreateNew(
645  std::move(client), bucket_name, object_name, num_shards, prefix,
646  extra_state_arg ? (*std::move(extra_state_arg)).payload()
647  : std::string(),
648  std::move(forwarded_args));
649  }
650  return ResumableParallelUploadState::Resume(
651  std::move(client), bucket_name, object_name, num_shards, prefix,
652  resumable_session_id, std::move(forwarded_args));
653 }
654 
655 template <typename... Options>
656 StatusOr<NonResumableParallelUploadState>
657 NonResumableParallelUploadState::Create(Client client,
658  std::string const& bucket_name,
659  std::string const& object_name,
660  std::size_t num_shards,
661  std::string const& prefix,
662  std::tuple<Options...> options) {
663  using internal::StaticTupleFilter;
664  auto delete_options =
665  StaticTupleFilter<Among<QuotaUser, UserProject, UserIp>::TPred>(options);
666  auto deleter = std::make_shared<ScopedDeleter>(
667  [client, bucket_name, delete_options](std::string const& object_name,
668  std::int64_t generation) mutable {
669  return google::cloud::internal::apply(
670  DeleteApplyHelper{client, std::move(bucket_name), object_name,
671  generation},
672  std::move(delete_options));
673  });
674 
675  auto compose_options = StaticTupleFilter<
678  WithObjectMetadata>::TPred>(options);
679  auto composer = [client, bucket_name, object_name, compose_options,
680  prefix](std::vector<ComposeSourceObject> sources) mutable {
681  return google::cloud::internal::apply(
682  ComposeManyApplyHelper{client, std::move(bucket_name),
683  std::move(sources), prefix + ".compose_many",
684  std::move(object_name)},
685  std::move(compose_options));
686  };
687 
688  auto lock = internal::LockPrefix(client, bucket_name, prefix, options);
689  if (!lock) {
690  return Status(
691  lock.status().code(),
692  "Failed to lock prefix for ParallelUpload: " + lock.status().message());
693  }
694  deleter->Add(*lock);
695 
696  auto internal_state = std::make_shared<ParallelUploadStateImpl>(
697  true, object_name, 0, std::move(deleter), std::move(composer));
698  std::vector<ObjectWriteStream> streams;
699 
700  auto upload_options = StaticTupleFilter<
703  WithObjectMetadata>::TPred>(std::move(options));
704  for (std::size_t i = 0; i < num_shards; ++i) {
705  ResumableUploadRequest request(
706  bucket_name, prefix + ".upload_shard_" + std::to_string(i));
707  google::cloud::internal::apply(SetOptionsApplyHelper(request),
708  upload_options);
709  auto stream = internal_state->CreateStream(
710  internal::ClientImplDetails::GetRawClient(client), request);
711  if (!stream) {
712  return stream.status();
713  }
714  streams.emplace_back(*std::move(stream));
715  }
716  return NonResumableParallelUploadState(std::move(internal_state),
717  std::move(streams));
718 }
719 
720 template <typename... Options>
721 std::shared_ptr<ScopedDeleter> ResumableParallelUploadState::CreateDeleter(
722  Client client, // NOLINT(performance-unnecessary-value-param)
723  std::string const& bucket_name, std::tuple<Options...> const& options) {
724  using internal::StaticTupleFilter;
725  auto delete_options =
726  StaticTupleFilter<Among<QuotaUser, UserProject, UserIp>::TPred>(options);
727  return std::make_shared<ScopedDeleter>(
728  [client, bucket_name, delete_options](std::string const& object_name,
729  std::int64_t generation) mutable {
730  return google::cloud::internal::apply(
731  DeleteApplyHelper{client, std::move(bucket_name), object_name,
732  generation},
733  std::move(delete_options));
734  });
735 }
736 
737 template <typename... Options>
738 Composer ResumableParallelUploadState::CreateComposer(
739  Client client, // NOLINT(performance-unnecessary-value-param)
740  std::string const& bucket_name, std::string const& object_name,
741  std::int64_t expected_generation, std::string const& prefix,
742  std::tuple<Options...> const& options) {
743  auto compose_options = std::tuple_cat(
744  StaticTupleFilter<
746  UserIp, UserProject, WithObjectMetadata>::TPred>(options),
747  std::make_tuple(IfGenerationMatch(expected_generation)));
748  auto get_metadata_options = StaticTupleFilter<
749  Among<EncryptionKey, QuotaUser, UserIp, UserProject>::TPred>(options);
750  auto composer = [client, bucket_name, object_name, compose_options,
751  get_metadata_options,
752  prefix](std::vector<ComposeSourceObject> sources) mutable
753  -> StatusOr<ObjectMetadata> {
754  auto res = google::cloud::internal::apply(
755  ComposeManyApplyHelper{client, bucket_name, std::move(sources),
756  prefix + ".compose_many", object_name},
757  std::move(compose_options));
758  if (res) {
759  return res;
760  }
761  if (res.status().code() != StatusCode::kFailedPrecondition) {
762  return res.status();
763  }
764  // This means that the object already exists and it is not the object, which
765  // existed upon start of parallel upload. For simplicity, we assume that
766  // it's a result of a previously interrupted ComposeMany invocation.
767  return google::cloud::internal::apply(
768  GetObjectMetadataApplyHelper{client, std::move(bucket_name),
769  std::move(object_name)},
770  std::move(get_metadata_options));
771  };
772  return Composer(std::move(composer));
773 }
774 
775 template <typename... Options>
776 StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::CreateNew(
777  Client client, std::string const& bucket_name,
778  std::string const& object_name, std::size_t num_shards,
779  std::string const& prefix, std::string extra_state,
780  std::tuple<Options...> const& options) {
781  using internal::StaticTupleFilter;
782 
783  auto get_object_meta_options = StaticTupleFilter<
785  IfMetagenerationNotMatch, UserProject>::TPred>(options);
786  auto object_meta = google::cloud::internal::apply(
787  GetObjectMetadataApplyHelper{client, bucket_name, object_name},
788  std::move(get_object_meta_options));
789  if (!object_meta && object_meta.status().code() != StatusCode::kNotFound) {
790  return object_meta.status();
791  }
792  std::int64_t expected_generation =
793  object_meta ? object_meta->generation() : 0;
794 
795  auto deleter = CreateDeleter(client, bucket_name, options);
796  auto composer = CreateComposer(client, bucket_name, object_name,
797  expected_generation, prefix, options);
798  auto internal_state = std::make_shared<ParallelUploadStateImpl>(
799  false, object_name, expected_generation, deleter, std::move(composer));
800  internal_state->set_custom_data(std::move(extra_state));
801 
802  std::vector<ObjectWriteStream> streams;
803 
804  auto upload_options = std::tuple_cat(
805  StaticTupleFilter<
808  UserProject, WithObjectMetadata>::TPred>(options),
809  std::make_tuple(UseResumableUploadSession("")));
810  for (std::size_t i = 0; i < num_shards; ++i) {
811  ResumableUploadRequest request(
812  bucket_name, prefix + ".upload_shard_" + std::to_string(i));
813  google::cloud::internal::apply(SetOptionsApplyHelper(request),
814  upload_options);
815  auto stream = internal_state->CreateStream(
816  internal::ClientImplDetails::GetRawClient(client), request);
817  if (!stream) {
818  return stream.status();
819  }
820  streams.emplace_back(*std::move(stream));
821  }
822 
823  auto state_object_name = prefix + ".upload_state";
824  auto insert_options = std::tuple_cat(
825  std::make_tuple(IfGenerationMatch(0)),
826  StaticTupleFilter<
828  UserProject, WithObjectMetadata>::TPred>(options));
829  auto state_object = google::cloud::internal::apply(
830  InsertObjectApplyHelper{client, bucket_name, state_object_name,
831  internal_state->ToPersistentState().ToString()},
832  std::move(insert_options));
833  if (!state_object) {
834  internal_state->Fail(state_object.status());
835  return std::move(state_object).status();
836  }
837  std::string resumable_session_id = session_id_prefix() + state_object_name +
838  ":" +
839  std::to_string(state_object->generation());
840  internal_state->set_resumable_session_id(resumable_session_id);
841  deleter->Add(std::move(*state_object));
842  return ResumableParallelUploadState(std::move(resumable_session_id),
843  std::move(internal_state),
844  std::move(streams));
845 }
846 
847 StatusOr<std::pair<std::string, std::int64_t>> ParseResumableSessionId(
848  std::string const& session_id);
849 
850 template <typename... Options>
851 StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::Resume(
852  Client client, std::string const& bucket_name,
853  std::string const& object_name, std::size_t num_shards,
854  std::string const& prefix, std::string resumable_session_id,
855  std::tuple<Options...> options) {
856  using internal::StaticTupleFilter;
857 
858  auto state_and_gen = ParseResumableSessionId(resumable_session_id);
859  if (!state_and_gen) {
860  return state_and_gen.status();
861  }
862 
863  auto read_options = std::tuple_cat(
864  StaticTupleFilter<Among<DisableCrc32cChecksum, DisableMD5Hash,
866  options),
867  std::make_tuple(IfGenerationMatch(state_and_gen->second)));
868 
869  auto state_stream = google::cloud::internal::apply(
870  ReadObjectApplyHelper{client, bucket_name, state_and_gen->first},
871  std::move(read_options));
872  std::string state_string(std::istreambuf_iterator<char>{state_stream}, {});
873  state_stream.Close();
874 
875  auto persistent_state =
876  ParallelUploadPersistentState::FromString(state_string);
877  if (!persistent_state) {
878  return persistent_state.status();
879  }
880 
881  if (persistent_state->destination_object_name != object_name) {
883  "Specified resumable session ID is doesn't match the "
884  "destination object name (" +
885  object_name + " vs " +
886  persistent_state->destination_object_name + ")");
887  }
888  if (persistent_state->streams.size() != num_shards && num_shards != 0) {
890  "Specified resumable session ID is doesn't match the "
891  "previously specified number of shards (" +
892  std::to_string(num_shards) + " vs " +
893  std::to_string(persistent_state->streams.size()) + ")");
894  }
895 
896  auto deleter = CreateDeleter(client, bucket_name, options);
897  deleter->Add(state_and_gen->first, state_and_gen->second);
898  auto composer =
899  CreateComposer(client, bucket_name, object_name,
900  persistent_state->expected_generation, prefix, options);
901  auto internal_state = std::make_shared<ParallelUploadStateImpl>(
902  false, object_name, persistent_state->expected_generation, deleter,
903  std::move(composer));
904  internal_state->set_custom_data(std::move(persistent_state->custom_data));
905  internal_state->set_resumable_session_id(resumable_session_id);
906  // If a resumed stream is already finalized, callbacks from streams will be
907  // executed immediately. We don't want them to trigger composition before all
908  // of them are created.
909  internal_state->PreventFromFinishing();
910  std::vector<ObjectWriteStream> streams;
911 
912  auto upload_options = StaticTupleFilter<
915  WithObjectMetadata>::TPred>(std::move(options));
916  for (auto& stream_desc : persistent_state->streams) {
917  ResumableUploadRequest request(bucket_name,
918  std::move(stream_desc.object_name));
919  google::cloud::internal::apply(
920  SetOptionsApplyHelper(request),
921  std::tuple_cat(upload_options,
922  std::make_tuple(UseResumableUploadSession(
923  std::move(stream_desc.resumable_session_id)))));
924  auto stream = internal_state->CreateStream(
925  internal::ClientImplDetails::GetRawClient(client), request);
926  if (!stream) {
927  internal_state->AllowFinishing();
928  return stream.status();
929  }
930  streams.emplace_back(*std::move(stream));
931  }
932 
933  internal_state->AllowFinishing();
934  return ResumableParallelUploadState(std::move(resumable_session_id),
935  std::move(internal_state),
936  std::move(streams));
937 }
938 
939 template <typename... Options>
940 std::vector<std::uintmax_t> ComputeParallelFileUploadSplitPoints(
941  std::uintmax_t file_size, std::tuple<Options...> const& options) {
942  auto div_ceil = [](std::uintmax_t dividend, std::uintmax_t divisor) {
943  return (dividend + divisor - 1) / divisor;
944  };
945  // These defaults were obtained by experiments summarized in
946  // https://github.com/googleapis/google-cloud-cpp/issues/2951#issuecomment-566237128
947  MaxStreams const default_max_streams(64);
948  MinStreamSize const default_min_stream_size(32 * 1024 * 1024);
949 
950  auto const min_stream_size =
951  (std::max<std::uintmax_t>)(1, ExtractFirstOccurrenceOfType<MinStreamSize>(
952  options)
953  .value_or(default_min_stream_size)
954  .value());
955  auto const max_streams = ExtractFirstOccurrenceOfType<MaxStreams>(options)
956  .value_or(default_max_streams)
957  .value();
958 
959  auto const wanted_num_streams =
960  (std::max<
961  std::uintmax_t>)(1, (std::min<std::uintmax_t>)(max_streams,
962  div_ceil(
963  file_size,
964  min_stream_size)));
965 
966  auto const stream_size =
967  (std::max<std::uintmax_t>)(1, div_ceil(file_size, wanted_num_streams));
968 
969  std::vector<std::uintmax_t> res;
970  for (auto split = stream_size; split < file_size; split += stream_size) {
971  res.push_back(split);
972  }
973  return res;
974 }
975 
976 std::string ParallelFileUploadSplitPointsToString(
977  std::vector<std::uintmax_t> const& split_points);
978 
979 StatusOr<std::vector<std::uintmax_t>> ParallelFileUploadSplitPointsFromString(
980  std::string const& s);
981 
982 /**
983  * Helper functor to call `PrepareParallelUpload` via `apply`.
984  *
985  * This object holds only references to objects, hence it should not be stored.
986  * Instead, it should be used only as a transient object allowing for calling
987  * `PrepareParallelUpload` via `apply`.
988  */
989 struct PrepareParallelUploadApplyHelper {
990  // Some gcc versions crash on using decltype for return type here.
991  template <typename... Options>
992  StatusOr<typename std::conditional<
993  Among<typename std::decay<Options>::type...>::template TPred<
995  ResumableParallelUploadState, NonResumableParallelUploadState>::type>
996  operator()(Options&&... options) {
997  return PrepareParallelUpload(std::move(client), bucket_name, object_name,
998  num_shards, prefix,
999  std::forward<Options>(options)...);
1000  }
1001 
1002  Client client;
1003  std::string const& bucket_name;
1004  std::string const& object_name;
1005  std::size_t num_shards;
1006  std::string const& prefix;
1007 };
1008 
1009 struct CreateParallelUploadShards {
1010  /**
1011  * Prepare a parallel upload of a given file.
1012  *
1013  * The returned opaque objects reflect computed shards of the given file. Each
1014  * of them has an `Upload()` member function which will perform the upload of
1015  * that shard. You should parallelize running this function on them according
1016  * to your needs. You can affect how many shards will be created by using the
1017  * `MaxStreams` and `MinStreamSize` options.
1018  *
1019  * Any of the returned objects can be used for obtaining the metadata of the
1020  * resulting object.
1021  *
1022  * @param client the client on which to perform the operation.
1023  * @param file_name the path to the file to be uploaded
1024  * @param bucket_name the name of the bucket that will contain the object.
1025  * @param object_name the uploaded object name.
1026  * @param prefix the prefix with which temporary objects will be created.
1027  * @param options a list of optional query parameters and/or request headers.
1028  * Valid types for this operation include `DestinationPredefinedAcl`,
1029  * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
1030  * `KmsKeyName`, `MaxStreams, `MinStreamSize`, `QuotaUser`, `UserIp`,
1031  * `UserProject`, `WithObjectMetadata`, `UseResumableUploadSession`.
1032  *
1033  * @return the shards of the input file to be uploaded in parallel
1034  *
1035  * @par Idempotency
1036  * This operation is not idempotent. While each request performed by this
1037  * function is retried based on the client policies, the operation itself
1038  * stops on the first request that fails.
1039  *
1040  * @par Example
1041  * @snippet storage_object_file_transfer_samples.cc parallel upload file
1042  */
1043  template <typename... Options>
1044  static StatusOr<std::vector<ParallelUploadFileShard>> Create(
1045  Client client, // NOLINT(performance-unnecessary-value-param)
1046  std::string file_name, std::string const& bucket_name,
1047  std::string const& object_name, std::string const& prefix,
1048  Options&&... options) {
1049  std::error_code size_err;
1050  auto file_size = google::cloud::internal::file_size(file_name, size_err);
1051  if (size_err) {
1052  return Status(StatusCode::kNotFound, size_err.message());
1053  }
1054 
1055  auto const resumable_session_id_arg =
1056  ExtractFirstOccurrenceOfType<UseResumableUploadSession>(
1057  std::tie(options...));
1058  bool const new_session = !resumable_session_id_arg ||
1059  resumable_session_id_arg.value().value().empty();
1060  auto upload_options =
1061  StaticTupleFilter<NotAmong<MaxStreams, MinStreamSize>::TPred>(
1062  std::tie(options...));
1063 
1064  std::vector<uintmax_t> file_split_points;
1065  std::size_t num_shards = 0;
1066  if (new_session) {
1067  file_split_points =
1068  ComputeParallelFileUploadSplitPoints(file_size, std::tie(options...));
1069  num_shards = file_split_points.size() + 1;
1070  }
1071 
1072  // Create the upload state.
1073  auto state = google::cloud::internal::apply(
1074  PrepareParallelUploadApplyHelper{client, bucket_name, object_name,
1075  num_shards, prefix},
1076  std::tuple_cat(
1077  std::move(upload_options),
1078  std::make_tuple(ParallelUploadExtraPersistentState(
1079  ParallelFileUploadSplitPointsToString(file_split_points)))));
1080  if (!state) {
1081  return state.status();
1082  }
1083 
1084  if (!new_session) {
1085  // We need to recreate the split points of the file.
1086  auto maybe_split_points =
1087  ParallelFileUploadSplitPointsFromString(state->impl_->custom_data());
1088  if (!maybe_split_points) {
1089  state->Fail(maybe_split_points.status());
1090  return std::move(maybe_split_points).status();
1091  }
1092  file_split_points = *std::move(maybe_split_points);
1093  }
1094 
1095  // Everything ready - we've got the shared state and the files open, let's
1096  // prepare the returned objects.
1097  auto upload_buffer_size =
1098  google::cloud::storage::internal::ClientImplDetails::GetRawClient(
1099  client)
1100  ->client_options()
1102 
1103  file_split_points.emplace_back(file_size);
1104  assert(file_split_points.size() == state->shards().size());
1105  std::vector<ParallelUploadFileShard> res;
1106  std::uintmax_t offset = 0;
1107  std::size_t shard_idx = 0;
1108  for (auto shard_end : file_split_points) {
1109  res.emplace_back(ParallelUploadFileShard(
1110  state->impl_, std::move(state->shards()[shard_idx++]), file_name,
1111  offset, shard_end - offset, upload_buffer_size));
1112  offset = shard_end;
1113  }
1114  return res;
1115  }
1116 };
1117 
1118 /// @copydoc CreateParallelUploadShards::Create()
1119 template <typename... Options>
1120 StatusOr<std::vector<ParallelUploadFileShard>> CreateUploadShards(
1121  Client client, std::string file_name, std::string const& bucket_name,
1122  std::string const& object_name, std::string const& prefix,
1123  Options&&... options) {
1124  return CreateParallelUploadShards::Create(
1125  std::move(client), std::move(file_name), bucket_name, object_name, prefix,
1126  std::forward<Options>(options)...);
1127 }
1128 
1129 } // namespace internal
1130 
1131 /**
1132  * Perform a parallel upload of a given file.
1133  *
1134  * You can affect how many shards will be created by using the `MaxStreams` and
1135  * `MinStreamSize` options.
1136  *
1137  * @param client the client on which to perform the operation.
1138  * @param file_name the path to the file to be uploaded
1139  * @param bucket_name the name of the bucket that will contain the object.
1140  * @param object_name the uploaded object name.
1141  * @param prefix the prefix with which temporary objects will be created.
1142  * @param ignore_cleanup_failures treat failures to cleanup the temporary
1143  * objects as not fatal.
1144  * @param options a list of optional query parameters and/or request headers.
1145  * Valid types for this operation include `DestinationPredefinedAcl`,
1146  * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
1147  * `KmsKeyName`, `MaxStreams, `MinStreamSize`, `QuotaUser`, `UserIp`,
1148  * `UserProject`, `WithObjectMetadata`, `UseResumableUploadSession`.
1149  *
1150  * @return the metadata of the object created by the upload.
1151  *
1152  * @par Idempotency
1153  * This operation is not idempotent. While each request performed by this
1154  * function is retried based on the client policies, the operation itself stops
1155  * on the first request that fails.
1156  *
1157  * @par Example
1158  * @snippet storage_object_file_transfer_samples.cc parallel upload file
1159  */
1160 template <typename... Options>
1162  Client client, std::string file_name, std::string bucket_name,
1163  std::string object_name, std::string prefix, bool ignore_cleanup_failures,
1164  Options&&... options) {
1165  auto shards = internal::CreateParallelUploadShards::Create(
1166  std::move(client), std::move(file_name), std::move(bucket_name),
1167  std::move(object_name), std::move(prefix),
1168  std::forward<Options>(options)...);
1169  if (!shards) {
1170  return shards.status();
1171  }
1172 
1173  std::vector<std::thread> threads;
1174  threads.reserve(shards->size());
1175  for (auto& shard : *shards) {
1176  threads.emplace_back([&shard] {
1177  // We can safely ignore the status - if something fails we'll know
1178  // when obtaining final metadata.
1179  shard.Upload();
1180  });
1181  }
1182  for (auto& thread : threads) {
1183  thread.join();
1184  }
1185  auto res = (*shards)[0].WaitForCompletion().get();
1186  auto cleanup_res = (*shards)[0].EagerCleanup();
1187  if (!cleanup_res.ok() && !ignore_cleanup_failures) {
1188  return cleanup_res;
1189  }
1190  return res;
1191 }
1192 
1194 } // namespace storage
1195 } // namespace cloud
1196 } // namespace google
1197 
1198 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H