Google Cloud Storage C++ Client  1.32.1
A C++ Client Library for Google Cloud Storage
parallel_upload.h
Go to the documentation of this file.
1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
17 
18 #include "google/cloud/storage/client.h"
19 #include "google/cloud/storage/internal/tuple_filter.h"
20 #include "google/cloud/storage/object_stream.h"
21 #include "google/cloud/storage/version.h"
22 #include "google/cloud/future.h"
23 #include "google/cloud/internal/filesystem.h"
24 #include "google/cloud/status_or.h"
25 #include "absl/memory/memory.h"
26 #include "absl/types/optional.h"
27 #include <condition_variable>
28 #include <cstddef>
29 #include <fstream>
30 #include <functional>
31 #include <mutex>
32 #include <string>
33 #include <tuple>
34 #include <utility>
35 #include <vector>
36 
37 namespace google {
38 namespace cloud {
39 namespace storage {
40 inline namespace STORAGE_CLIENT_NS {
41 /**
42  * A parameter type indicating the maximum number of streams to
43  * `ParallelUploadFile`.
44  */
45 class MaxStreams {
46  public:
47  // NOLINTNEXTLINE(google-explicit-constructor)
48  MaxStreams(std::size_t value) : value_(value) {}
49  std::size_t value() const { return value_; }
50 
51  private:
52  std::size_t value_;
53 };
54 
55 /**
56  * A parameter type indicating the minimum stream size to `ParallelUploadFile`.
57  *
58  * If `ParallelUploadFile`, receives this option it will attempt to make sure
59  * that every shard is at least this long. This might not apply to the last
60  * shard because it will be the remainder of the division of the file.
61  */
63  public:
64  // NOLINTNEXTLINE(google-explicit-constructor)
65  MinStreamSize(std::uintmax_t value) : value_(value) {}
66  std::uintmax_t value() const { return value_; }
67 
68  private:
69  std::uintmax_t value_;
70 };
71 
72 namespace internal {
73 
74 class ParallelUploadFileShard;
75 struct CreateParallelUploadShards;
76 
77 /**
78  * Return an empty option if Tuple contains an element of type T, otherwise
79  * return the value of the first element of type T
80  */
81 template <typename T, typename Tuple, typename Enable = void>
82 struct ExtractFirstOccurrenceOfTypeImpl {
83  absl::optional<T> operator()(Tuple const&) { return absl::optional<T>(); }
84 };
85 
86 template <typename T, typename... Options>
87 struct ExtractFirstOccurrenceOfTypeImpl<
88  T, std::tuple<Options...>,
89  typename std::enable_if<
90  Among<typename std::decay<Options>::type...>::template TPred<
91  typename std::decay<T>::type>::value>::type> {
92  absl::optional<T> operator()(std::tuple<Options...> const& tuple) {
93  return std::get<0>(StaticTupleFilter<Among<T>::template TPred>(tuple));
94  }
95 };
96 
97 template <typename T, typename Tuple>
98 absl::optional<T> ExtractFirstOccurrenceOfType(Tuple const& tuple) {
99  return ExtractFirstOccurrenceOfTypeImpl<T, Tuple>()(tuple);
100 }
101 
102 /**
103  * An option for `PrepareParallelUpload` to associate opaque data with upload.
104  *
105  * This is used by `CreateUploadShards()` to store additional information in the
106  * parallel upload persistent state. The additional information is where each
107  * shard starts in the uploaded file.
108  */
109 class ParallelUploadExtraPersistentState {
110  public:
111  std::string payload() && { return std::move(payload_); }
112  std::string payload() const& { return payload_; }
113 
114  private:
115  friend struct CreateParallelUploadShards;
116  explicit ParallelUploadExtraPersistentState(std::string payload)
117  : payload_(std::move(payload)) {}
118 
119  std::string payload_;
120 };
121 
122 class ParallelObjectWriteStreambuf;
123 
124 // Type-erased function object to execute ComposeMany with most arguments
125 // bound.
126 using Composer =
127  std::function<StatusOr<ObjectMetadata>(std::vector<ComposeSourceObject>)>;
128 
129 struct ParallelUploadPersistentState {
130  struct Stream {
131  std::string object_name;
132  std::string resumable_session_id;
133  };
134 
135  std::string ToString() const;
136  static StatusOr<ParallelUploadPersistentState> FromString(
137  std::string const& json_rep);
138 
139  std::string destination_object_name;
140  std::int64_t expected_generation;
141  std::string custom_data;
142  std::vector<Stream> streams;
143 };
144 
145 // The `ObjectWriteStream`s have to hold references to the state of
146 // the parallel upload so that they can update it when finished and trigger
147 // shards composition, hence `ResumableParallelUploadState` has to be
148 // destroyed after the `ObjectWriteStream`s.
149 // `ResumableParallelUploadState` and `ObjectWriteStream`s are passed
150 // around by values, so we don't control their lifetime. In order to
151 // circumvent it, we move the state to something held by a `shared_ptr`.
152 class ParallelUploadStateImpl
153  : public std::enable_shared_from_this<ParallelUploadStateImpl> {
154  public:
155  ParallelUploadStateImpl(bool cleanup_on_failures,
156  std::string destination_object_name,
157  std::int64_t expected_generation,
158  std::shared_ptr<ScopedDeleter> deleter,
159  Composer composer);
160  ~ParallelUploadStateImpl();
161 
162  StatusOr<ObjectWriteStream> CreateStream(
163  RawClient& raw_client, ResumableUploadRequest const& request);
164 
165  void AllStreamsFinished(std::unique_lock<std::mutex>& lk);
166  void StreamFinished(std::size_t stream_idx,
167  StatusOr<ResumableUploadResponse> const& response);
168 
169  void StreamDestroyed(std::size_t stream_idx);
170 
171  future<StatusOr<ObjectMetadata>> WaitForCompletion() const;
172 
173  Status EagerCleanup();
174 
175  void Fail(Status status);
176 
177  ParallelUploadPersistentState ToPersistentState() const;
178 
179  std::string custom_data() const {
180  std::unique_lock<std::mutex> lk(mu_);
181  return custom_data_;
182  }
183 
184  void set_custom_data(std::string custom_data) {
185  std::unique_lock<std::mutex> lk(mu_);
186  custom_data_ = std::move(custom_data);
187  }
188 
189  std::string resumable_session_id() {
190  std::unique_lock<std::mutex> lk(mu_);
191  return resumable_session_id_;
192  }
193 
194  void set_resumable_session_id(std::string resumable_session_id) {
195  std::unique_lock<std::mutex> lk(mu_);
196  resumable_session_id_ = std::move(resumable_session_id);
197  }
198 
199  void PreventFromFinishing() {
200  std::unique_lock<std::mutex> lk(mu_);
201  ++num_unfinished_streams_;
202  }
203 
204  void AllowFinishing() {
205  std::unique_lock<std::mutex> lk(mu_);
206  if (--num_unfinished_streams_ == 0) {
207  AllStreamsFinished(lk);
208  }
209  }
210 
211  private:
212  struct StreamInfo {
213  std::string object_name;
214  std::string resumable_session_id;
215  absl::optional<ComposeSourceObject> composition_arg;
216  bool finished;
217  };
218 
219  mutable std::mutex mu_;
220  // Promises made via `WaitForCompletion()`
221  mutable std::vector<promise<StatusOr<ObjectMetadata>>> res_promises_;
222  // Type-erased object for deleting temporary objects.
223  std::shared_ptr<ScopedDeleter> deleter_;
224  Composer composer_;
225  std::string destination_object_name_;
226  std::int64_t expected_generation_;
227  // Set when all streams are closed and composed but before cleanup.
228  bool finished_;
229  // Tracks how many streams are still written to.
230  std::size_t num_unfinished_streams_;
231  std::vector<StreamInfo> streams_;
232  absl::optional<StatusOr<ObjectMetadata>> res_;
233  Status cleanup_status_;
234  std::string custom_data_;
235  std::string resumable_session_id_;
236 };
237 
238 struct ComposeManyApplyHelper {
239  template <typename... Options>
240  StatusOr<ObjectMetadata> operator()(Options&&... options) const {
241  return ComposeMany(client, bucket_name, std::move(source_objects), prefix,
242  std::move(destination_object_name), true,
243  std::forward<Options>(options)...);
244  }
245 
246  Client& client;
247  std::string bucket_name;
248  std::vector<ComposeSourceObject> source_objects;
249  std::string prefix;
250  std::string destination_object_name;
251 };
252 
253 class SetOptionsApplyHelper {
254  public:
255  // NOLINTNEXTLINE(google-explicit-constructor)
256  SetOptionsApplyHelper(ResumableUploadRequest& request) : request_(request) {}
257 
258  template <typename... Options>
259  void operator()(Options&&... options) const {
260  request_.set_multiple_options(std::forward<Options>(options)...);
261  }
262 
263  private:
264  ResumableUploadRequest& request_;
265 };
266 
267 struct ReadObjectApplyHelper {
268  template <typename... Options>
269  ObjectReadStream operator()(Options&&... options) const {
270  return client.ReadObject(bucket_name, object_name,
271  std::forward<Options>(options)...);
272  }
273 
274  Client& client;
275  std::string const& bucket_name;
276  std::string const& object_name;
277 };
278 
279 struct GetObjectMetadataApplyHelper {
280  template <typename... Options>
281  StatusOr<ObjectMetadata> operator()(Options... options) const {
282  return client.GetObjectMetadata(bucket_name, object_name,
283  std::move(options)...);
284  }
285 
286  Client& client;
287  std::string bucket_name;
288  std::string object_name;
289 };
290 
291 /**
292  * A class representing an individual shard of the parallel upload.
293  *
294  * In order to perform a parallel upload of a file, you should call
295  * `CreateUploadShards()` and it will return a vector of objects of this class.
296  * You should execute the `Upload()` member function on them in parallel to
297  * execute the upload.
298  *
299  * You can then obtain the status of the whole upload via `WaitForCompletion()`.
300  */
301 class ParallelUploadFileShard {
302  public:
303  ParallelUploadFileShard(ParallelUploadFileShard const&) = delete;
304  ParallelUploadFileShard& operator=(ParallelUploadFileShard const&) = delete;
305  ParallelUploadFileShard(ParallelUploadFileShard&&) = default;
306  ParallelUploadFileShard& operator=(ParallelUploadFileShard&&) = default;
307  ~ParallelUploadFileShard();
308 
309  /**
310  * Perform the upload of this shard.
311  *
312  * This function will block until the shard is completed, or a permanent
313  * failure is encountered, or the retry policy is exhausted.
314  */
315  Status Upload();
316 
317  /**
318  * Asynchronously wait for completion of the whole upload operation (not only
319  * this shard).
320  *
321  * @return the returned future will become satisfied once the whole upload
322  * operation finishes (i.e. `Upload()` completes on all shards); on
323  * success, it will hold the destination object's metadata
324  */
325  future<StatusOr<ObjectMetadata>> WaitForCompletion() {
326  return state_->WaitForCompletion();
327  }
328 
329  /**
330  * Cleanup all the temporary files
331  *
332  * The destruction of the last of these objects tied to a parallel upload will
333  * cleanup of all the temporary files used in the process of that parallel
334  * upload. If the cleanup fails, it will fail silently not to crash the
335  * program.
336  *
337  * If you want to control the status of the cleanup, use this member function
338  * to do it eagerly, before destruction.
339  *
340  * It is enough to call it on one of the objects, but it is not invalid to
341  * call it on all objects.
342  */
343  Status EagerCleanup() { return state_->EagerCleanup(); }
344 
345  /**
346  * Retrieve resumable session ID to allow for potential future resume.
347  */
348  std::string resumable_session_id() { return resumable_session_id_; }
349 
350  private:
351  friend struct CreateParallelUploadShards;
352  ParallelUploadFileShard(std::shared_ptr<ParallelUploadStateImpl> state,
353  ObjectWriteStream ostream, std::string file_name,
354  std::uintmax_t offset_in_file,
355  std::uintmax_t bytes_to_upload,
356  std::size_t upload_buffer_size)
357  : state_(std::move(state)),
358  ostream_(std::move(ostream)),
359  file_name_(std::move(file_name)),
360  offset_in_file_(offset_in_file),
361  left_to_upload_(bytes_to_upload),
362  upload_buffer_size_(upload_buffer_size),
363  resumable_session_id_(state_->resumable_session_id()) {}
364 
365  std::shared_ptr<ParallelUploadStateImpl> state_;
366  ObjectWriteStream ostream_;
367  std::string file_name_;
368  std::uintmax_t offset_in_file_;
369  std::uintmax_t left_to_upload_;
370  std::size_t upload_buffer_size_;
371  std::string resumable_session_id_;
372 };
373 
374 /**
375  * The state controlling uploading a GCS object via multiple parallel streams.
376  *
377  * To use this class obtain the state via `PrepareParallelUpload` and then write
378  * the data to the streams associated with each shard. Once writing is done,
379  * close or destroy the streams.
380  *
381  * When all the streams are `Close`d or destroyed, this class will join the
382  * them (via `ComposeMany`) into the destination object and set the value in
383  * `future`s returned by `WaitForCompletion`.
384  *
385  * Parallel upload will create temporary files. Upon completion of the whole
386  * operation, this class will attempt to remove them in its destructor, but if
387  * they fail, they fail silently. In order to proactively cleanup these files,
388  * one can call `EagerCleanup()`.
389  */
390 class NonResumableParallelUploadState {
391  public:
392  template <typename... Options>
393  static StatusOr<NonResumableParallelUploadState> Create(
394  Client client, std::string const& bucket_name,
395  std::string const& object_name, std::size_t num_shards,
396  std::string const& prefix, std::tuple<Options...> options);
397 
398  /**
399  * Asynchronously wait for completion of the whole upload operation.
400  *
401  * @return the returned future will have a value set to the destination object
402  * metadata when all the streams are `Close`d or destroyed.
403  */
404  future<StatusOr<ObjectMetadata>> WaitForCompletion() const {
405  return impl_->WaitForCompletion();
406  }
407 
408  /**
409  * Cleanup all the temporary files
410  *
411  * The destruction of this object will perform cleanup of all the temporary
412  * files used in the process of the parallel upload. If the cleanup fails, it
413  * will fail silently not to crash the program.
414  *
415  * If you want to control the status of the cleanup, use this member function
416  * to do it eagerly, before destruction.
417  *
418  * @return the status of the cleanup.
419  */
420  Status EagerCleanup() { return impl_->EagerCleanup(); }
421 
422  /**
423  * The streams to write to.
424  *
425  * When the streams are `Close`d, they will be concatenated into the
426  * destination object in the same order as they appeared in this vector upon
427  * this object's creation.
428  *
429  * It is safe to destroy or `std::move()` these streams.
430  */
431  std::vector<ObjectWriteStream>& shards() { return shards_; }
432 
433  /**
434  * Fail the whole operation.
435  *
436  * If called before all streams are closed or destroyed, calling this
437  * operation will prevent composing the streams into the final destination
438  * object and return a failure via `WaitForCompletion()`.
439  *
440  * @param status the status to fail the operation with.
441  */
442  void Fail(Status status) { return impl_->Fail(std::move(status)); }
443 
444  private:
445  NonResumableParallelUploadState(
446  std::shared_ptr<ParallelUploadStateImpl> state,
447  std::vector<ObjectWriteStream> shards)
448  : impl_(std::move(state)), shards_(std::move(shards)) {}
449 
450  std::shared_ptr<ParallelUploadStateImpl> impl_;
451  std::vector<ObjectWriteStream> shards_;
452 
453  friend struct CreateParallelUploadShards;
454 };
455 
456 /**
457  * The state controlling uploading a GCS object via multiple parallel streams,
458  * allowing for resuming.
459  *
460  * To use this class obtain the state via `PrepareParallelUpload` (with
461  * `UseResumableUploadSession` option) and then write the data to the streams
462  * associated with each shard. Once writing is done, close or destroy the
463  * streams.
464  *
465  * When all the streams are `Close`d or destroyed, this class will join the
466  * them (via `ComposeMany`) into the destination object and set the value in
467  * `future`s returned by `WaitForCompletion`.
468  *
469  * Parallel upload will create temporary files. Upon successful completion of
470  * the whole operation, this class will attempt to remove them in its
471  * destructor, but if they fail, they fail silently. In order to proactively
472  * cleanup these files, one can call `EagerCleanup()`.
473  *
474  * In oder to resume an interrupted upload, provide `UseResumableUploadSession`
475  * to `PrepareParallelUpload` with value set to what `resumable_session_id()`
476  * returns.
477  */
478 class ResumableParallelUploadState {
479  public:
480  static std::string session_id_prefix() { return "ParUpl:"; }
481 
482  template <typename... Options>
483  static StatusOr<ResumableParallelUploadState> CreateNew(
484  Client client, std::string const& bucket_name,
485  std::string const& object_name, std::size_t num_shards,
486  std::string const& prefix, std::string extra_state,
487  std::tuple<Options...> const& options);
488 
489  template <typename... Options>
490  static StatusOr<ResumableParallelUploadState> Resume(
491  Client client, std::string const& bucket_name,
492  std::string const& object_name, std::size_t num_shards,
493  std::string const& prefix, std::string resumable_session_id,
494  std::tuple<Options...> options);
495 
496  /**
497  * Retrieve the resumable session id.
498  *
499  * This value, if passed via `UseResumableUploadSession` option indicates that
500  * an upload should be a continuation of the one which this object represents.
501  */
502  std::string resumable_session_id() { return resumable_session_id_; }
503 
504  /**
505  * Asynchronously wait for completion of the whole upload operation.
506  *
507  * @return the returned future will have a value set to the destination object
508  * metadata when all the streams are `Close`d or destroyed.
509  */
510  future<StatusOr<ObjectMetadata>> WaitForCompletion() const {
511  return impl_->WaitForCompletion();
512  }
513 
514  /**
515  * Cleanup all the temporary files
516  *
517  * The destruction of this object will perform cleanup of all the temporary
518  * files used in the process of the parallel upload. If the cleanup fails, it
519  * will fail silently not to crash the program.
520  *
521  * If you want to control the status of the cleanup, use this member function
522  * to do it eagerly, before destruction.
523  *
524  * @return the status of the cleanup.
525  */
526  Status EagerCleanup() { return impl_->EagerCleanup(); }
527 
528  /**
529  * The streams to write to.
530  *
531  * When the streams are `Close`d, they will be concatenated into the
532  * destination object in the same order as they appeared in this vector upon
533  * this object's creation.
534  *
535  * It is safe to destroy or `std::move()` these streams.
536  */
537  std::vector<ObjectWriteStream>& shards() { return shards_; }
538 
539  /**
540  * Fail the whole operation.
541  *
542  * If called before all streams are closed or destroyed, calling this
543  * operation will prevent composing the streams into the final destination
544  * object and return a failure via `WaitForCompletion()`.
545  *
546  * @param status the status to fail the operation with.
547  */
548  void Fail(Status status) { return impl_->Fail(std::move(status)); }
549 
550  private:
551  template <typename... Options>
552  static std::shared_ptr<ScopedDeleter> CreateDeleter(
553  Client client, std::string const& bucket_name,
554  std::tuple<Options...> const& options);
555 
556  template <typename... Options>
557  static Composer CreateComposer(Client client, std::string const& bucket_name,
558  std::string const& object_name,
559  std::int64_t expected_generation,
560  std::string const& prefix,
561  std::tuple<Options...> const& options);
562 
563  ResumableParallelUploadState(std::string resumable_session_id,
564  std::shared_ptr<ParallelUploadStateImpl> state,
565  std::vector<ObjectWriteStream> shards)
566  : resumable_session_id_(std::move(resumable_session_id)),
567  impl_(std::move(state)),
568  shards_(std::move(shards)) {}
569 
570  std::string resumable_session_id_;
571  std::shared_ptr<ParallelUploadStateImpl> impl_;
572  std::vector<ObjectWriteStream> shards_;
573 
574  friend struct CreateParallelUploadShards;
575 };
576 
577 /**
578  * Prepare a parallel upload state.
579  *
580  * The returned `NonResumableParallelUploadState` will contain streams to which
581  * data can be uploaded in parallel.
582  *
583  * @param client the client on which to perform the operation.
584  * @param bucket_name the name of the bucket that will contain the object.
585  * @param object_name the uploaded object name.
586  * @param num_shards how many streams to upload the object through.
587  * @param prefix the prefix with which temporary objects will be created.
588  * @param options a list of optional query parameters and/or request headers.
589  * Valid types for this operation include `DestinationPredefinedAcl`,
590  * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
591  * `KmsKeyName`, `QuotaUser`, `UserIp`, `UserProject`, `WithObjectMetadata`.
592  *
593  * @return the state of the parallel upload
594  *
595  * @par Idempotency
596  * This operation is not idempotent. While each request performed by this
597  * function is retried based on the client policies, the operation itself stops
598  * on the first request that fails.
599  */
600 template <typename... Options,
601  typename std::enable_if<
602  NotAmong<typename std::decay<Options>::type...>::template TPred<
604  int>::type EnableIfNotResumable = 0>
605 StatusOr<NonResumableParallelUploadState> PrepareParallelUpload(
606  Client client, std::string const& bucket_name,
607  std::string const& object_name, std::size_t num_shards,
608  std::string const& prefix, Options&&... options) {
609  return NonResumableParallelUploadState::Create(
610  std::move(client), bucket_name, object_name, num_shards, prefix,
611  StaticTupleFilter<NotAmong<ParallelUploadExtraPersistentState>::TPred>(
612  std::forward_as_tuple(std::forward<Options>(options)...)));
613 }
614 
615 template <typename... Options,
616  typename std::enable_if<
617  Among<typename std::decay<Options>::type...>::template TPred<
619  int>::type EnableIfResumable = 0>
620 StatusOr<ResumableParallelUploadState> PrepareParallelUpload(
621  Client client, std::string const& bucket_name,
622  std::string const& object_name, std::size_t num_shards,
623  std::string const& prefix, Options&&... options) {
624  auto resumable_args =
625  StaticTupleFilter<Among<UseResumableUploadSession>::TPred>(
626  std::tie(options...));
627  static_assert(
628  std::tuple_size<decltype(resumable_args)>::value == 1,
629  "There should be exactly one UseResumableUploadSession argument");
630  std::string resumable_session_id = std::get<0>(resumable_args).value();
631  auto extra_state_arg =
632  ExtractFirstOccurrenceOfType<ParallelUploadExtraPersistentState>(
633  std::tie(options...));
634 
635  auto forwarded_args =
636  StaticTupleFilter<NotAmong<UseResumableUploadSession,
637  ParallelUploadExtraPersistentState>::TPred>(
638  std::forward_as_tuple(std::forward<Options>(options)...));
639 
640  if (resumable_session_id.empty()) {
641  return ResumableParallelUploadState::CreateNew(
642  std::move(client), bucket_name, object_name, num_shards, prefix,
643  extra_state_arg ? std::move(extra_state_arg).value().payload()
644  : std::string(),
645  std::move(forwarded_args));
646  }
647  return ResumableParallelUploadState::Resume(
648  std::move(client), bucket_name, object_name, num_shards, prefix,
649  resumable_session_id, std::move(forwarded_args));
650 }
651 
652 template <typename... Options>
653 StatusOr<NonResumableParallelUploadState>
654 NonResumableParallelUploadState::Create(Client client,
655  std::string const& bucket_name,
656  std::string const& object_name,
657  std::size_t num_shards,
658  std::string const& prefix,
659  std::tuple<Options...> options) {
660  using internal::StaticTupleFilter;
661  auto delete_options =
662  StaticTupleFilter<Among<QuotaUser, UserProject, UserIp>::TPred>(options);
663  auto deleter = std::make_shared<ScopedDeleter>(
664  [client, bucket_name, delete_options](std::string const& object_name,
665  std::int64_t generation) mutable {
666  return google::cloud::internal::apply(
667  DeleteApplyHelper{client, std::move(bucket_name), object_name,
668  generation},
669  std::move(delete_options));
670  });
671 
672  auto compose_options = StaticTupleFilter<
675  WithObjectMetadata>::TPred>(options);
676  auto composer = [client, bucket_name, object_name, compose_options,
677  prefix](std::vector<ComposeSourceObject> sources) mutable {
678  return google::cloud::internal::apply(
679  ComposeManyApplyHelper{client, std::move(bucket_name),
680  std::move(sources), prefix + ".compose_many",
681  std::move(object_name)},
682  std::move(compose_options));
683  };
684 
685  auto lock = internal::LockPrefix(client, bucket_name, prefix, options);
686  if (!lock) {
687  return Status(
688  lock.status().code(),
689  "Failed to lock prefix for ParallelUpload: " + lock.status().message());
690  }
691  deleter->Add(*lock);
692 
693  auto internal_state = std::make_shared<ParallelUploadStateImpl>(
694  true, object_name, 0, std::move(deleter), std::move(composer));
695  std::vector<ObjectWriteStream> streams;
696 
697  auto upload_options = StaticTupleFilter<
700  WithObjectMetadata>::TPred>(std::move(options));
701  auto& raw_client = *client.raw_client_;
702  for (std::size_t i = 0; i < num_shards; ++i) {
703  ResumableUploadRequest request(
704  bucket_name, prefix + ".upload_shard_" + std::to_string(i));
705  google::cloud::internal::apply(SetOptionsApplyHelper(request),
706  upload_options);
707  auto stream = internal_state->CreateStream(raw_client, request);
708  if (!stream) {
709  return stream.status();
710  }
711  streams.emplace_back(*std::move(stream));
712  }
713  return NonResumableParallelUploadState(std::move(internal_state),
714  std::move(streams));
715 }
716 
717 template <typename... Options>
718 std::shared_ptr<ScopedDeleter> ResumableParallelUploadState::CreateDeleter(
719  Client client, // NOLINT(performance-unnecessary-value-param)
720  std::string const& bucket_name, std::tuple<Options...> const& options) {
721  using internal::StaticTupleFilter;
722  auto delete_options =
723  StaticTupleFilter<Among<QuotaUser, UserProject, UserIp>::TPred>(options);
724  return std::make_shared<ScopedDeleter>(
725  [client, bucket_name, delete_options](std::string const& object_name,
726  std::int64_t generation) mutable {
727  return google::cloud::internal::apply(
728  DeleteApplyHelper{client, std::move(bucket_name), object_name,
729  generation},
730  std::move(delete_options));
731  });
732 }
733 
734 template <typename... Options>
735 Composer ResumableParallelUploadState::CreateComposer(
736  Client client, // NOLINT(performance-unnecessary-value-param)
737  std::string const& bucket_name, std::string const& object_name,
738  std::int64_t expected_generation, std::string const& prefix,
739  std::tuple<Options...> const& options) {
740  auto compose_options = std::tuple_cat(
741  StaticTupleFilter<
743  UserIp, UserProject, WithObjectMetadata>::TPred>(options),
744  std::make_tuple(IfGenerationMatch(expected_generation)));
745  auto get_metadata_options = StaticTupleFilter<
747  UserIp, UserProject, WithObjectMetadata>::TPred>(options);
748  auto composer = [client, bucket_name, object_name, compose_options,
749  get_metadata_options,
750  prefix](std::vector<ComposeSourceObject> sources) mutable
751  -> StatusOr<ObjectMetadata> {
752  auto res = google::cloud::internal::apply(
753  ComposeManyApplyHelper{client, bucket_name, std::move(sources),
754  prefix + ".compose_many", object_name},
755  std::move(compose_options));
756  if (res) {
757  return res;
758  }
759  if (res.status().code() != StatusCode::kFailedPrecondition) {
760  return res.status();
761  }
762  // This means that the object already exists and it is not the object, which
763  // existed upon start of parallel upload. For simplicity, we assume that
764  // it's a result of a previously interrupted ComposeMany invocation.
765  return google::cloud::internal::apply(
766  GetObjectMetadataApplyHelper{client, std::move(bucket_name),
767  std::move(object_name)},
768  std::move(get_metadata_options));
769  };
770  return Composer(std::move(composer));
771 }
772 
773 template <typename... Options>
774 StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::CreateNew(
775  Client client, std::string const& bucket_name,
776  std::string const& object_name, std::size_t num_shards,
777  std::string const& prefix, std::string extra_state,
778  std::tuple<Options...> const& options) {
779  using internal::StaticTupleFilter;
780 
781  auto get_object_meta_options = StaticTupleFilter<
783  IfMetagenerationNotMatch, UserProject>::TPred>(options);
784  auto object_meta = google::cloud::internal::apply(
785  GetObjectMetadataApplyHelper{client, bucket_name, object_name},
786  std::move(get_object_meta_options));
787  if (!object_meta && object_meta.status().code() != StatusCode::kNotFound) {
788  return object_meta.status();
789  }
790  std::int64_t expected_generation =
791  object_meta ? object_meta->generation() : 0;
792 
793  auto deleter = CreateDeleter(client, bucket_name, options);
794  auto composer = CreateComposer(client, bucket_name, object_name,
795  expected_generation, prefix, options);
796  auto internal_state = std::make_shared<ParallelUploadStateImpl>(
797  false, object_name, expected_generation, deleter, std::move(composer));
798  internal_state->set_custom_data(std::move(extra_state));
799 
800  std::vector<ObjectWriteStream> streams;
801 
802  auto upload_options = std::tuple_cat(
803  StaticTupleFilter<
806  UserProject, WithObjectMetadata>::TPred>(options),
807  std::make_tuple(UseResumableUploadSession("")));
808  auto& raw_client = *client.raw_client_;
809  for (std::size_t i = 0; i < num_shards; ++i) {
810  ResumableUploadRequest request(
811  bucket_name, prefix + ".upload_shard_" + std::to_string(i));
812  google::cloud::internal::apply(SetOptionsApplyHelper(request),
813  upload_options);
814  auto stream = internal_state->CreateStream(raw_client, request);
815  if (!stream) {
816  return stream.status();
817  }
818  streams.emplace_back(*std::move(stream));
819  }
820 
821  auto state_object_name = prefix + ".upload_state";
822  auto insert_options = std::tuple_cat(
823  std::make_tuple(IfGenerationMatch(0)),
824  StaticTupleFilter<
826  UserProject, WithObjectMetadata>::TPred>(options));
827  auto state_object = google::cloud::internal::apply(
828  InsertObjectApplyHelper{client, bucket_name, state_object_name,
829  internal_state->ToPersistentState().ToString()},
830  std::move(insert_options));
831  if (!state_object) {
832  internal_state->Fail(state_object.status());
833  return std::move(state_object).status();
834  }
835  std::string resumable_session_id = session_id_prefix() + state_object_name +
836  ":" +
837  std::to_string(state_object->generation());
838  internal_state->set_resumable_session_id(resumable_session_id);
839  deleter->Add(std::move(*state_object));
840  return ResumableParallelUploadState(std::move(resumable_session_id),
841  std::move(internal_state),
842  std::move(streams));
843 }
844 
845 StatusOr<std::pair<std::string, std::int64_t>> ParseResumableSessionId(
846  std::string const& session_id);
847 
848 template <typename... Options>
849 StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::Resume(
850  Client client, std::string const& bucket_name,
851  std::string const& object_name, std::size_t num_shards,
852  std::string const& prefix, std::string resumable_session_id,
853  std::tuple<Options...> options) {
854  using internal::StaticTupleFilter;
855 
856  auto state_and_gen = ParseResumableSessionId(resumable_session_id);
857  if (!state_and_gen) {
858  return state_and_gen.status();
859  }
860 
861  auto read_options = std::tuple_cat(
862  StaticTupleFilter<Among<DisableCrc32cChecksum, DisableMD5Hash,
864  options),
865  std::make_tuple(IfGenerationMatch(state_and_gen->second)));
866 
867  auto state_stream = google::cloud::internal::apply(
868  ReadObjectApplyHelper{client, bucket_name, state_and_gen->first},
869  std::move(read_options));
870  std::string state_string(std::istreambuf_iterator<char>{state_stream}, {});
871  state_stream.Close();
872 
873  auto persistent_state =
874  ParallelUploadPersistentState::FromString(state_string);
875  if (!persistent_state) {
876  return persistent_state.status();
877  }
878 
879  if (persistent_state->destination_object_name != object_name) {
881  "Specified resumable session ID is doesn't match the "
882  "destination object name (" +
883  object_name + " vs " +
884  persistent_state->destination_object_name + ")");
885  }
886  if (persistent_state->streams.size() != num_shards && num_shards != 0) {
888  "Specified resumable session ID is doesn't match the "
889  "previously specified number of shards (" +
890  std::to_string(num_shards) + " vs " +
891  std::to_string(persistent_state->streams.size()) + ")");
892  }
893 
894  auto deleter = CreateDeleter(client, bucket_name, options);
895  deleter->Add(state_and_gen->first, state_and_gen->second);
896  auto composer =
897  CreateComposer(client, bucket_name, object_name,
898  persistent_state->expected_generation, prefix, options);
899  auto internal_state = std::make_shared<ParallelUploadStateImpl>(
900  false, object_name, persistent_state->expected_generation, deleter,
901  std::move(composer));
902  internal_state->set_custom_data(std::move(persistent_state->custom_data));
903  internal_state->set_resumable_session_id(resumable_session_id);
904  // If a resumed stream is already finalized, callbacks from streams will be
905  // executed immediately. We don't want them to trigger composition before all
906  // of them are created.
907  internal_state->PreventFromFinishing();
908  std::vector<ObjectWriteStream> streams;
909 
910  auto upload_options = StaticTupleFilter<
913  WithObjectMetadata>::TPred>(std::move(options));
914  auto& raw_client = *client.raw_client_;
915  for (auto& stream_desc : persistent_state->streams) {
916  ResumableUploadRequest request(bucket_name,
917  std::move(stream_desc.object_name));
918  google::cloud::internal::apply(
919  SetOptionsApplyHelper(request),
920  std::tuple_cat(upload_options,
921  std::make_tuple(UseResumableUploadSession(
922  std::move(stream_desc.resumable_session_id)))));
923  auto stream = internal_state->CreateStream(raw_client, request);
924  if (!stream) {
925  internal_state->AllowFinishing();
926  return stream.status();
927  }
928  streams.emplace_back(*std::move(stream));
929  }
930 
931  internal_state->AllowFinishing();
932  return ResumableParallelUploadState(std::move(resumable_session_id),
933  std::move(internal_state),
934  std::move(streams));
935 }
936 
937 template <typename... Options>
938 std::vector<std::uintmax_t> ComputeParallelFileUploadSplitPoints(
939  std::uintmax_t file_size, std::tuple<Options...> const& options) {
940  auto div_ceil = [](std::uintmax_t dividend, std::uintmax_t divisor) {
941  return (dividend + divisor - 1) / divisor;
942  };
943  // These defaults were obtained by experiments summarized in
944  // https://github.com/googleapis/google-cloud-cpp/issues/2951#issuecomment-566237128
945  MaxStreams const default_max_streams(64);
946  MinStreamSize const default_min_stream_size(32 * 1024 * 1024);
947 
948  auto const min_stream_size =
949  (std::max<std::uintmax_t>)(1, ExtractFirstOccurrenceOfType<MinStreamSize>(
950  options)
951  .value_or(default_min_stream_size)
952  .value());
953  auto const max_streams = ExtractFirstOccurrenceOfType<MaxStreams>(options)
954  .value_or(default_max_streams)
955  .value();
956 
957  auto const wanted_num_streams =
958  (std::max<
959  std::uintmax_t>)(1, (std::min<std::uintmax_t>)(max_streams,
960  div_ceil(
961  file_size,
962  min_stream_size)));
963 
964  auto const stream_size =
965  (std::max<std::uintmax_t>)(1, div_ceil(file_size, wanted_num_streams));
966 
967  std::vector<std::uintmax_t> res;
968  for (auto split = stream_size; split < file_size; split += stream_size) {
969  res.push_back(split);
970  }
971  return res;
972 }
973 
974 std::string ParallelFileUploadSplitPointsToString(
975  std::vector<std::uintmax_t> const& split_points);
976 
977 StatusOr<std::vector<std::uintmax_t>> ParallelFileUploadSplitPointsFromString(
978  std::string const& s);
979 
980 /**
981  * Helper functor to call `PrepareParallelUpload` via `apply`.
982  *
983  * This object holds only references to objects, hence it should not be stored.
984  * Instead, it should be used only as a transient object allowing for calling
985  * `PrepareParallelUpload` via `apply`.
986  */
987 struct PrepareParallelUploadApplyHelper {
988  // Some gcc versions crash on using decltype for return type here.
989  template <typename... Options>
990  StatusOr<typename std::conditional<
991  Among<typename std::decay<Options>::type...>::template TPred<
993  ResumableParallelUploadState, NonResumableParallelUploadState>::type>
994  operator()(Options&&... options) {
995  return PrepareParallelUpload(std::move(client), bucket_name, object_name,
996  num_shards, prefix,
997  std::forward<Options>(options)...);
998  }
999 
1000  Client client;
1001  std::string const& bucket_name;
1002  std::string const& object_name;
1003  std::size_t num_shards;
1004  std::string const& prefix;
1005 };
1006 
1007 struct CreateParallelUploadShards {
1008  /**
1009  * Prepare a parallel upload of a given file.
1010  *
1011  * The returned opaque objects reflect computed shards of the given file. Each
1012  * of them has an `Upload()` member function which will perform the upload of
1013  * that shard. You should parallelize running this function on them according
1014  * to your needs. You can affect how many shards will be created by using the
1015  * `MaxStreams` and `MinStreamSize` options.
1016  *
1017  * Any of the returned objects can be used for obtaining the metadata of the
1018  * resulting object.
1019  *
1020  * @param client the client on which to perform the operation.
1021  * @param file_name the path to the file to be uploaded
1022  * @param bucket_name the name of the bucket that will contain the object.
1023  * @param object_name the uploaded object name.
1024  * @param prefix the prefix with which temporary objects will be created.
1025  * @param options a list of optional query parameters and/or request headers.
1026  * Valid types for this operation include `DestinationPredefinedAcl`,
1027  * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
1028  * `KmsKeyName`, `MaxStreams, `MinStreamSize`, `QuotaUser`, `UserIp`,
1029  * `UserProject`, `WithObjectMetadata`, `UseResumableUploadSession`.
1030  *
1031  * @return the shards of the input file to be uploaded in parallel
1032  *
1033  * @par Idempotency
1034  * This operation is not idempotent. While each request performed by this
1035  * function is retried based on the client policies, the operation itself
1036  * stops on the first request that fails.
1037  *
1038  * @par Example
1039  * @snippet storage_object_file_transfer_samples.cc parallel upload file
1040  */
1041  template <typename... Options>
1042  static StatusOr<std::vector<ParallelUploadFileShard>> Create(
1043  Client client, // NOLINT(performance-unnecessary-value-param)
1044  std::string file_name, std::string const& bucket_name,
1045  std::string const& object_name, std::string const& prefix,
1046  Options&&... options) {
1047  std::error_code size_err;
1048  auto file_size = google::cloud::internal::file_size(file_name, size_err);
1049  if (size_err) {
1050  return Status(StatusCode::kNotFound, size_err.message());
1051  }
1052 
1053  auto const resumable_session_id_arg =
1054  ExtractFirstOccurrenceOfType<UseResumableUploadSession>(
1055  std::tie(options...));
1056  bool const new_session = !resumable_session_id_arg ||
1057  resumable_session_id_arg.value().value().empty();
1058  auto upload_options =
1059  StaticTupleFilter<NotAmong<MaxStreams, MinStreamSize>::TPred>(
1060  std::tie(options...));
1061 
1062  std::vector<uintmax_t> file_split_points;
1063  std::size_t num_shards = 0;
1064  if (new_session) {
1065  file_split_points =
1066  ComputeParallelFileUploadSplitPoints(file_size, std::tie(options...));
1067  num_shards = file_split_points.size() + 1;
1068  }
1069 
1070  // Create the upload state.
1071  auto state = google::cloud::internal::apply(
1072  PrepareParallelUploadApplyHelper{client, bucket_name, object_name,
1073  num_shards, prefix},
1074  std::tuple_cat(
1075  std::move(upload_options),
1076  std::make_tuple(ParallelUploadExtraPersistentState(
1077  ParallelFileUploadSplitPointsToString(file_split_points)))));
1078  if (!state) {
1079  return state.status();
1080  }
1081 
1082  if (!new_session) {
1083  // We need to recreate the split points of the file.
1084  auto maybe_split_points =
1085  ParallelFileUploadSplitPointsFromString(state->impl_->custom_data());
1086  if (!maybe_split_points) {
1087  state->Fail(maybe_split_points.status());
1088  return std::move(maybe_split_points).status();
1089  }
1090  file_split_points = *std::move(maybe_split_points);
1091  }
1092 
1093  // Everything ready - we've got the shared state and the files open, let's
1094  // prepare the returned objects.
1095  auto upload_buffer_size =
1096  google::cloud::storage::internal::ClientImplDetails::GetRawClient(
1097  client)
1098  ->client_options()
1100 
1101  file_split_points.emplace_back(file_size);
1102  assert(file_split_points.size() == state->shards().size());
1103  std::vector<ParallelUploadFileShard> res;
1104  std::uintmax_t offset = 0;
1105  std::size_t shard_idx = 0;
1106  for (auto shard_end : file_split_points) {
1107  res.emplace_back(ParallelUploadFileShard(
1108  state->impl_, std::move(state->shards()[shard_idx++]), file_name,
1109  offset, shard_end - offset, upload_buffer_size));
1110  offset = shard_end;
1111  }
1112  return res;
1113  }
1114 };
1115 
1116 /// @copydoc CreateParallelUploadShards::Create()
1117 template <typename... Options>
1118 StatusOr<std::vector<ParallelUploadFileShard>> CreateUploadShards(
1119  Client client, std::string file_name, std::string const& bucket_name,
1120  std::string const& object_name, std::string const& prefix,
1121  Options&&... options) {
1122  return CreateParallelUploadShards::Create(
1123  std::move(client), std::move(file_name), bucket_name, object_name, prefix,
1124  std::forward<Options>(options)...);
1125 }
1126 
1127 } // namespace internal
1128 
1129 /**
1130  * Perform a parallel upload of a given file.
1131  *
1132  * You can affect how many shards will be created by using the `MaxStreams` and
1133  * `MinStreamSize` options.
1134  *
1135  * @param client the client on which to perform the operation.
1136  * @param file_name the path to the file to be uploaded
1137  * @param bucket_name the name of the bucket that will contain the object.
1138  * @param object_name the uploaded object name.
1139  * @param prefix the prefix with which temporary objects will be created.
1140  * @param ignore_cleanup_failures treat failures to cleanup the temporary
1141  * objects as not fatal.
1142  * @param options a list of optional query parameters and/or request headers.
1143  * Valid types for this operation include `DestinationPredefinedAcl`,
1144  * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
1145  * `KmsKeyName`, `MaxStreams, `MinStreamSize`, `QuotaUser`, `UserIp`,
1146  * `UserProject`, `WithObjectMetadata`, `UseResumableUploadSession`.
1147  *
1148  * @return the metadata of the object created by the upload.
1149  *
1150  * @par Idempotency
1151  * This operation is not idempotent. While each request performed by this
1152  * function is retried based on the client policies, the operation itself stops
1153  * on the first request that fails.
1154  *
1155  * @par Example
1156  * @snippet storage_object_file_transfer_samples.cc parallel upload file
1157  */
1158 template <typename... Options>
1160  Client client, std::string file_name, std::string bucket_name,
1161  std::string object_name, std::string prefix, bool ignore_cleanup_failures,
1162  Options&&... options) {
1163  auto shards = internal::CreateParallelUploadShards::Create(
1164  std::move(client), std::move(file_name), std::move(bucket_name),
1165  std::move(object_name), std::move(prefix),
1166  std::forward<Options>(options)...);
1167  if (!shards) {
1168  return shards.status();
1169  }
1170 
1171  std::vector<std::thread> threads;
1172  threads.reserve(shards->size());
1173  for (auto& shard : *shards) {
1174  threads.emplace_back([&shard] {
1175  // We can safely ignore the status - if something fails we'll know
1176  // when obtaining final metadata.
1177  shard.Upload();
1178  });
1179  }
1180  for (auto& thread : threads) {
1181  thread.join();
1182  }
1183  auto res = (*shards)[0].WaitForCompletion().get();
1184  auto cleanup_res = (*shards)[0].EagerCleanup();
1185  if (!cleanup_res.ok() && !ignore_cleanup_failures) {
1186  return cleanup_res;
1187  }
1188  return res;
1189 }
1190 
1191 } // namespace STORAGE_CLIENT_NS
1192 } // namespace storage
1193 } // namespace cloud
1194 } // namespace google
1195 
1196 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H