15#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
16#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
18#include "google/cloud/storage/client.h"
19#include "google/cloud/storage/internal/tuple_filter.h"
20#include "google/cloud/storage/object_stream.h"
21#include "google/cloud/storage/version.h"
22#include "google/cloud/future.h"
23#include "google/cloud/internal/filesystem.h"
24#include "google/cloud/internal/type_list.h"
25#include "google/cloud/status_or.h"
26#include "absl/types/optional.h"
27#include <condition_variable>
41GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
43
44
45
48 explicit MaxStreams(std::size_t value) : value_(value) {}
49 std::size_t
value()
const {
return value_; }
56
57
58
59
60
61
64 explicit MinStreamSize(std::uintmax_t value) : value_(value) {}
68 std::uintmax_t value_;
73class ParallelUploadFileShard;
74struct CreateParallelUploadShards;
77
78
79
80template <
typename T,
typename Tuple,
typename Enable =
void>
81struct ExtractFirstOccurrenceOfTypeImpl {
82 absl::optional<T> operator()(Tuple
const&) {
return absl::optional<T>(); }
85template <
typename T,
typename... Options>
86struct ExtractFirstOccurrenceOfTypeImpl<
87 T, std::tuple<Options...>,
88 typename std::enable_if<
89 Among<
typename std::decay<Options>::type...>::
template TPred<
90 typename std::decay<T>::type>::value>::type> {
91 absl::optional<T> operator()(std::tuple<Options...>
const& tuple) {
92 return std::get<0>(StaticTupleFilter<Among<T>::
template TPred>(tuple));
96template <
typename T,
typename Tuple>
97absl::optional<T> ExtractFirstOccurrenceOfType(Tuple
const& tuple) {
98 return ExtractFirstOccurrenceOfTypeImpl<T, Tuple>()(tuple);
102
103
104
105
106
107
108class ParallelUploadExtraPersistentState {
110 std::string payload() && {
return std::move(payload_); }
111 std::string payload()
const& {
return payload_; }
114 friend struct CreateParallelUploadShards;
115 explicit ParallelUploadExtraPersistentState(std::string payload)
116 : payload_(std::move(payload)) {}
118 std::string payload_;
121class ParallelObjectWriteStreambuf;
128struct ParallelUploadPersistentState {
130 std::string object_name;
131 std::string resumable_session_id;
134 std::string ToString()
const;
135 static StatusOr<ParallelUploadPersistentState> FromString(
136 std::string
const& json_rep);
138 std::string destination_object_name;
139 std::int64_t expected_generation;
140 std::string custom_data;
141 std::vector<Stream> streams;
151class ParallelUploadStateImpl
152 :
public std::enable_shared_from_this<ParallelUploadStateImpl> {
154 ParallelUploadStateImpl(
bool cleanup_on_failures,
155 std::string destination_object_name,
156 std::int64_t expected_generation,
157 std::shared_ptr<ScopedDeleter> deleter,
159 ~ParallelUploadStateImpl();
162 std::shared_ptr<RawClient> raw_client,
163 ResumableUploadRequest
const& request);
165 void AllStreamsFinished(std::unique_lock<std::mutex>& lk);
166 void StreamFinished(std::size_t stream_idx,
167 StatusOr<QueryResumableUploadResponse>
const& response);
169 void StreamDestroyed(std::size_t stream_idx);
177 ParallelUploadPersistentState ToPersistentState()
const;
179 std::string custom_data()
const {
180 std::unique_lock<std::mutex> lk(mu_);
184 void set_custom_data(std::string custom_data) {
185 std::unique_lock<std::mutex> lk(mu_);
186 custom_data_ = std::move(custom_data);
189 std::string resumable_session_id() {
190 std::unique_lock<std::mutex> lk(mu_);
191 return resumable_session_id_;
194 void set_resumable_session_id(std::string resumable_session_id) {
195 std::unique_lock<std::mutex> lk(mu_);
196 resumable_session_id_ = std::move(resumable_session_id);
199 void PreventFromFinishing() {
200 std::unique_lock<std::mutex> lk(mu_);
201 ++num_unfinished_streams_;
204 void AllowFinishing() {
205 std::unique_lock<std::mutex> lk(mu_);
206 if (--num_unfinished_streams_ == 0) {
207 AllStreamsFinished(lk);
213 std::string object_name;
214 std::string resumable_session_id;
219 mutable std::mutex mu_;
221 mutable std::vector<promise<StatusOr<
ObjectMetadata>>> res_promises_;
223 std::shared_ptr<ScopedDeleter> deleter_;
225 std::string destination_object_name_;
226 std::int64_t expected_generation_;
228 bool finished_ =
false;
230 std::size_t num_unfinished_streams_ = 0;
231 std::vector<StreamInfo> streams_;
234 std::string custom_data_;
235 std::string resumable_session_id_;
238struct ComposeManyApplyHelper {
239 template <
typename... Options>
241 return ComposeMany(client, bucket_name, std::move(source_objects), prefix,
242 std::move(destination_object_name),
true,
243 std::forward<Options>(options)...);
247 std::string bucket_name;
250 std::string destination_object_name;
253class SetOptionsApplyHelper {
255 explicit SetOptionsApplyHelper(ResumableUploadRequest& request)
256 : request_(request) {}
258 template <
typename... Options>
259 void operator()(Options&&... options)
const {
260 request_.set_multiple_options(std::forward<Options>(options)...);
264 ResumableUploadRequest& request_;
267struct ReadObjectApplyHelper {
268 template <
typename... Options>
270 return client.ReadObject(bucket_name, object_name,
271 std::forward<Options>(options)...);
275 std::string
const& bucket_name;
276 std::string
const& object_name;
279struct GetObjectMetadataApplyHelper {
280 template <
typename... Options>
282 return client.GetObjectMetadata(bucket_name, object_name,
283 std::move(options)...);
287 std::string bucket_name;
288 std::string object_name;
292
293
294
295
296
297
298
299
300
301class ParallelUploadFileShard {
303 ParallelUploadFileShard(ParallelUploadFileShard
const&) =
delete;
304 ParallelUploadFileShard& operator=(ParallelUploadFileShard
const&) =
delete;
305 ParallelUploadFileShard(ParallelUploadFileShard&&) =
default;
306 ParallelUploadFileShard& operator=(ParallelUploadFileShard&&) =
default;
307 ~ParallelUploadFileShard();
310
311
312
313
314
318
319
320
321
322
323
324
326 return state_->WaitForCompletion();
330
331
332
333
334
335
336
337
338
339
340
341
342
343 Status EagerCleanup() {
return state_->EagerCleanup(); }
346
347
348 std::string resumable_session_id() {
return resumable_session_id_; }
351 friend struct CreateParallelUploadShards;
352 ParallelUploadFileShard(std::shared_ptr<ParallelUploadStateImpl> state,
354 std::uintmax_t offset_in_file,
355 std::uintmax_t bytes_to_upload,
356 std::size_t upload_buffer_size)
357 : state_(std::move(state)),
358 ostream_
(std::move(ostream)
),
359 file_name_(std::move(file_name)),
360 offset_in_file_(offset_in_file),
361 left_to_upload_(bytes_to_upload),
362 upload_buffer_size_(upload_buffer_size),
363 resumable_session_id_(state_->resumable_session_id()) {}
365 std::shared_ptr<ParallelUploadStateImpl> state_;
367 std::string file_name_;
372 std::int64_t offset_in_file_;
373 std::int64_t left_to_upload_;
374 std::size_t upload_buffer_size_;
375 std::string resumable_session_id_;
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394class NonResumableParallelUploadState {
396 template <
typename... Options>
397 static StatusOr<NonResumableParallelUploadState> Create(
398 Client client, std::string
const& bucket_name,
399 std::string
const& object_name, std::size_t num_shards,
400 std::string
const& prefix, std::tuple<Options...> options);
403
404
405
406
407
409 return impl_->WaitForCompletion();
413
414
415
416
417
418
419
420
421
422
423
424 Status EagerCleanup() {
return impl_->EagerCleanup(); }
427
428
429
430
431
432
433
434
438
439
440
441
442
443
444
445
446 void Fail(
Status status) {
return impl_->Fail(std::move(status)); }
449 NonResumableParallelUploadState(
450 std::shared_ptr<ParallelUploadStateImpl> state,
452 : impl_(std::move(state)), shards_(std::move(shards)) {}
454 std::shared_ptr<ParallelUploadStateImpl> impl_;
457 friend struct CreateParallelUploadShards;
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482class ResumableParallelUploadState {
484 static std::string session_id_prefix() {
return "ParUpl:"; }
486 template <
typename... Options>
487 static StatusOr<ResumableParallelUploadState> CreateNew(
488 Client client, std::string
const& bucket_name,
489 std::string
const& object_name, std::size_t num_shards,
490 std::string
const& prefix, std::string extra_state,
491 std::tuple<Options...>
const& options);
493 template <
typename... Options>
494 static StatusOr<ResumableParallelUploadState> Resume(
495 Client client, std::string
const& bucket_name,
496 std::string
const& object_name, std::size_t num_shards,
497 std::string
const& prefix, std::string resumable_session_id,
498 std::tuple<Options...> options);
501
502
503
504
505
506 std::string resumable_session_id() {
return resumable_session_id_; }
509
510
511
512
513
515 return impl_->WaitForCompletion();
519
520
521
522
523
524
525
526
527
528
529
530 Status EagerCleanup() {
return impl_->EagerCleanup(); }
533
534
535
536
537
538
539
540
544
545
546
547
548
549
550
551
552 void Fail(
Status status) {
return impl_->Fail(std::move(status)); }
555 template <
typename... Options>
556 static std::shared_ptr<ScopedDeleter> CreateDeleter(
557 Client client, std::string
const& bucket_name,
558 std::tuple<Options...>
const& options);
560 template <
typename... Options>
561 static Composer CreateComposer(
Client client, std::string
const& bucket_name,
562 std::string
const& object_name,
563 std::int64_t expected_generation,
564 std::string
const& prefix,
565 std::tuple<Options...>
const& options);
567 ResumableParallelUploadState(std::string resumable_session_id,
568 std::shared_ptr<ParallelUploadStateImpl> state,
570 : resumable_session_id_(std::move(resumable_session_id)),
571 impl_(std::move(state)),
572 shards_(std::move(shards)) {}
574 std::string resumable_session_id_;
575 std::shared_ptr<ParallelUploadStateImpl> impl_;
578 friend struct CreateParallelUploadShards;
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604template <
typename... Options,
605 typename std::enable_if<
606 NotAmong<
typename std::decay<Options>::type...>::
template TPred<
608 int>::type EnableIfNotResumable = 0>
609StatusOr<NonResumableParallelUploadState> PrepareParallelUpload(
610 Client client, std::string
const& bucket_name,
611 std::string
const& object_name, std::size_t num_shards,
612 std::string
const& prefix, Options&&... options) {
613 return NonResumableParallelUploadState::Create(
614 std::move(client), bucket_name, object_name, num_shards, prefix,
615 StaticTupleFilter<NotAmong<ParallelUploadExtraPersistentState>::TPred>(
616 std::forward_as_tuple(std::forward<Options>(options)...)));
619template <
typename... Options,
620 typename std::enable_if<
621 Among<
typename std::decay<Options>::type...>::
template TPred<
623 int>::type EnableIfResumable = 0>
624StatusOr<ResumableParallelUploadState> PrepareParallelUpload(
625 Client client, std::string
const& bucket_name,
626 std::string
const& object_name, std::size_t num_shards,
627 std::string
const& prefix, Options&&... options) {
628 auto resumable_args =
630 std::tie(options...));
632 std::tuple_size<
decltype(resumable_args)>::value == 1,
633 "There should be exactly one UseResumableUploadSession argument");
634 std::string resumable_session_id = std::get<0>(resumable_args).value();
635 auto extra_state_arg =
636 ExtractFirstOccurrenceOfType<ParallelUploadExtraPersistentState>(
637 std::tie(options...));
639 auto forwarded_args =
641 ParallelUploadExtraPersistentState>::TPred>(
642 std::forward_as_tuple(std::forward<Options>(options)...));
644 if (resumable_session_id.empty()) {
645 return ResumableParallelUploadState::CreateNew(
646 std::move(client), bucket_name, object_name, num_shards, prefix,
647 extra_state_arg ? (*std::move(extra_state_arg)).payload()
649 std::move(forwarded_args));
651 return ResumableParallelUploadState::Resume(
652 std::move(client), bucket_name, object_name, num_shards, prefix,
653 resumable_session_id, std::move(forwarded_args));
656template <
typename... Options>
657StatusOr<NonResumableParallelUploadState>
658NonResumableParallelUploadState::Create(
Client client,
659 std::string
const& bucket_name,
660 std::string
const& object_name,
661 std::size_t num_shards,
662 std::string
const& prefix,
663 std::tuple<Options...> options) {
664 using internal::StaticTupleFilter;
665 auto delete_options =
667 auto deleter = std::make_shared<ScopedDeleter>(
668 [client, bucket_name, delete_options](std::string
const& object_name,
669 std::int64_t generation)
mutable {
671 DeleteApplyHelper{client, std::move(bucket_name), object_name,
673 std::move(delete_options));
676 auto compose_options = StaticTupleFilter<
680 auto composer = [client, bucket_name, object_name, compose_options,
683 ComposeManyApplyHelper{client, std::move(bucket_name),
684 std::move(sources), prefix +
".compose_many",
685 std::move(object_name)},
686 std::move(compose_options));
689 auto lock = internal::LockPrefix(client, bucket_name, prefix, options);
692 lock.status().code(),
693 "Failed to lock prefix for ParallelUpload: " + lock.status().message());
697 auto internal_state = std::make_shared<ParallelUploadStateImpl>(
698 true, object_name, 0, std::move(deleter), std::move(composer));
701 auto upload_options = StaticTupleFilter<
705 for (std::size_t i = 0; i < num_shards; ++i) {
706 ResumableUploadRequest request(
707 bucket_name, prefix +
".upload_shard_" + std::to_string(i));
708 google::
cloud::internal::apply(SetOptionsApplyHelper(request),
710 auto stream = internal_state->CreateStream(
711 internal::ClientImplDetails::GetRawClient(client), request);
713 return stream.status();
715 streams.emplace_back(*std::move(stream));
717 return NonResumableParallelUploadState(std::move(internal_state),
721template <
typename... Options>
722std::shared_ptr<ScopedDeleter> ResumableParallelUploadState::CreateDeleter(
724 std::string
const& bucket_name, std::tuple<Options...>
const& options) {
725 using internal::StaticTupleFilter;
726 auto delete_options =
728 return std::make_shared<ScopedDeleter>(
729 [client, bucket_name, delete_options](std::string
const& object_name,
730 std::int64_t generation)
mutable {
732 DeleteApplyHelper{client, std::move(bucket_name), object_name,
734 std::move(delete_options));
738template <
typename... Options>
739Composer ResumableParallelUploadState::CreateComposer(
741 std::string
const& bucket_name, std::string
const& object_name,
742 std::int64_t expected_generation, std::string
const& prefix,
743 std::tuple<Options...>
const& options) {
744 auto compose_options = std::tuple_cat(
749 auto get_metadata_options = StaticTupleFilter<
751 auto composer = [client, bucket_name, object_name, compose_options,
752 get_metadata_options,
756 ComposeManyApplyHelper{client, bucket_name, std::move(sources),
757 prefix +
".compose_many", object_name},
758 std::move(compose_options));
762 if (res.status().code() !=
StatusCode::kFailedPrecondition) {
769 GetObjectMetadataApplyHelper{client, std::move(bucket_name),
770 std::move(object_name)},
771 std::move(get_metadata_options));
773 return Composer(std::move(composer));
776template <
typename... Options>
777StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::CreateNew(
778 Client client, std::string
const& bucket_name,
779 std::string
const& object_name, std::size_t num_shards,
780 std::string
const& prefix, std::string extra_state,
781 std::tuple<Options...>
const& options) {
782 using internal::StaticTupleFilter;
784 auto get_object_meta_options = StaticTupleFilter<
788 GetObjectMetadataApplyHelper{client, bucket_name, object_name},
789 std::move(get_object_meta_options));
790 if (!object_meta && object_meta.status().code() !=
StatusCode::kNotFound) {
791 return object_meta.status();
793 std::int64_t expected_generation =
794 object_meta ? object_meta->generation() : 0;
796 auto deleter = CreateDeleter(client, bucket_name, options);
797 auto composer = CreateComposer(client, bucket_name, object_name,
798 expected_generation, prefix, options);
799 auto internal_state = std::make_shared<ParallelUploadStateImpl>(
800 false, object_name, expected_generation, deleter, std::move(composer));
801 internal_state->set_custom_data(std::move(extra_state));
805 auto upload_options = std::tuple_cat(
811 for (std::size_t i = 0; i < num_shards; ++i) {
812 ResumableUploadRequest request(
813 bucket_name, prefix +
".upload_shard_" + std::to_string(i));
814 google::
cloud::internal::apply(SetOptionsApplyHelper(request),
816 auto stream = internal_state->CreateStream(
817 internal::ClientImplDetails::GetRawClient(client), request);
819 return stream.status();
821 streams.emplace_back(*std::move(stream));
824 auto state_object_name = prefix +
".upload_state";
825 auto insert_options = std::tuple_cat(
830 auto state_object =
google::
cloud::internal::apply(
831 InsertObjectApplyHelper{client, bucket_name, state_object_name,
832 internal_state->ToPersistentState().ToString()},
833 std::move(insert_options));
835 internal_state->Fail(state_object.status());
836 return std::move(state_object).status();
838 std::string resumable_session_id = session_id_prefix() + state_object_name +
840 std::to_string(state_object->generation());
841 internal_state->set_resumable_session_id(resumable_session_id);
842 deleter->Add(std::move(*state_object));
843 return ResumableParallelUploadState(std::move(resumable_session_id),
844 std::move(internal_state),
848StatusOr<std::pair<std::string, std::int64_t>> ParseResumableSessionId(
849 std::string
const& session_id);
851template <
typename... Options>
852StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::Resume(
853 Client client, std::string
const& bucket_name,
854 std::string
const& object_name, std::size_t num_shards,
855 std::string
const& prefix, std::string resumable_session_id,
856 std::tuple<Options...> options) {
857 using internal::StaticTupleFilter;
859 auto state_and_gen = ParseResumableSessionId(resumable_session_id);
860 if (!state_and_gen) {
864 auto read_options = std::tuple_cat(
870 auto state_stream =
google::
cloud::internal::apply(
871 ReadObjectApplyHelper{client, bucket_name, state_and_gen
->first},
872 std::move(read_options));
873 std::string state_string(std::istreambuf_iterator<
char>{state_stream}, {});
874 state_stream.Close();
876 auto persistent_state =
877 ParallelUploadPersistentState::FromString(state_string);
878 if (!persistent_state) {
882 if (persistent_state
->destination_object_name != object_name) {
884 "Specified resumable session ID is doesn't match the "
885 "destination object name (" +
886 object_name +
" vs " +
887 persistent_state
->destination_object_name +
")");
889 if (persistent_state
->streams.size() != num_shards && num_shards != 0) {
891 "Specified resumable session ID is doesn't match the "
892 "previously specified number of shards (" +
893 std::to_string(num_shards) +
" vs " +
894 std::to_string(persistent_state
->streams.size()) +
")");
897 auto deleter = CreateDeleter(client, bucket_name, options);
898 deleter->Add(state_and_gen
->first, state_and_gen
->second);
900 CreateComposer(client, bucket_name, object_name,
901 persistent_state
->expected_generation, prefix, options);
902 auto internal_state = std::make_shared<ParallelUploadStateImpl>(
903 false, object_name, persistent_state
->expected_generation, deleter,
904 std::move(composer));
905 internal_state->set_custom_data(std::move(persistent_state
->custom_data));
906 internal_state->set_resumable_session_id(resumable_session_id);
910 internal_state->PreventFromFinishing();
913 auto upload_options = StaticTupleFilter<
917 for (
auto& stream_desc : persistent_state
->streams) {
918 ResumableUploadRequest request(bucket_name,
919 std::move(stream_desc.object_name));
921 SetOptionsApplyHelper(request),
922 std::tuple_cat(upload_options,
924 std::move(stream_desc.resumable_session_id)))));
925 auto stream = internal_state->CreateStream(
926 internal::ClientImplDetails::GetRawClient(client), request);
928 internal_state->AllowFinishing();
929 return stream.status();
931 streams.emplace_back(*std::move(stream));
934 internal_state->AllowFinishing();
935 return ResumableParallelUploadState(std::move(resumable_session_id),
936 std::move(internal_state),
940template <
typename... Options>
941std::vector<std::uintmax_t> ComputeParallelFileUploadSplitPoints(
942 std::uintmax_t file_size, std::tuple<Options...>
const& options) {
943 auto div_ceil = [](std::uintmax_t dividend, std::uintmax_t divisor) {
944 return (dividend + divisor - 1) / divisor;
951 auto const min_stream_size =
952 (std::max<std::uintmax_t>)(1, ExtractFirstOccurrenceOfType<
MinStreamSize>(
954 .value_or(default_min_stream_size)
956 auto const max_streams = ExtractFirstOccurrenceOfType<
MaxStreams>(options)
957 .value_or(default_max_streams)
960 auto const wanted_num_streams =
962 std::uintmax_t>)(1, (std::min<std::uintmax_t>)(max_streams,
967 auto const stream_size =
968 (std::max<std::uintmax_t>)(1, div_ceil(file_size, wanted_num_streams));
970 std::vector<std::uintmax_t> res;
971 for (
auto split = stream_size; split < file_size; split += stream_size) {
972 res.push_back(split);
977std::string ParallelFileUploadSplitPointsToString(
978 std::vector<std::uintmax_t>
const& split_points);
980StatusOr<std::vector<std::uintmax_t>> ParallelFileUploadSplitPointsFromString(
981 std::string
const& s);
984
985
986
987
988
989
990struct PrepareParallelUploadApplyHelper {
992 template <
typename... Options>
993 StatusOr<
typename std::conditional<
994 Among<
typename std::decay<Options>::type...>::
template TPred<
996 ResumableParallelUploadState, NonResumableParallelUploadState>::type>
997 operator()(Options&&... options) {
998 return PrepareParallelUpload(std::move(client), bucket_name, object_name,
1000 std::forward<Options>(options)...);
1004 std::string
const& bucket_name;
1005 std::string
const& object_name;
1006 std::size_t num_shards;
1007 std::string
const& prefix;
1010using ParallelUploadFileSupportedOptions =
google::
cloud::internal::TypeList<
1015template <
typename T>
1016using SupportsParallelOption =
1017 google::
cloud::internal::TypeListHasType<ParallelUploadFileSupportedOptions,
1020template <
typename... Provided>
1021struct IsOptionSupportedWithParallelUpload
1022 : std::integral_constant<
1025 std::tuple_size<std::tuple<Provided...>>,
1026 std::tuple_size<
typename google::
cloud::internal::TypeListFilter<
1027 SupportsParallelOption, std::tuple<Provided...>>::type>>::
1030struct CreateParallelUploadShards {
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064 template <
typename... Options>
1065 static StatusOr<std::vector<ParallelUploadFileShard>> Create(
1067 std::string file_name, std::string
const& bucket_name,
1068 std::string
const& object_name, std::string
const& prefix,
1069 Options&&... options) {
1070 std::error_code size_err;
1071 auto file_size =
google::
cloud::internal::file_size(file_name, size_err);
1076 auto const resumable_session_id_arg =
1078 std::tie(options...));
1079 bool const new_session = !resumable_session_id_arg ||
1080 resumable_session_id_arg.value().value().empty();
1081 auto upload_options =
1083 std::tie(options...));
1085 std::vector<uintmax_t> file_split_points;
1086 std::size_t num_shards = 0;
1089 ComputeParallelFileUploadSplitPoints(file_size, std::tie(options...));
1090 num_shards = file_split_points.size() + 1;
1095 PrepareParallelUploadApplyHelper{client, bucket_name, object_name,
1096 num_shards, prefix},
1098 std::move(upload_options),
1099 std::make_tuple(ParallelUploadExtraPersistentState(
1100 ParallelFileUploadSplitPointsToString(file_split_points)))));
1102 return state.status();
1107 auto maybe_split_points =
1108 ParallelFileUploadSplitPointsFromString(state->impl_->custom_data());
1109 if (!maybe_split_points) {
1110 state->Fail(maybe_split_points.status());
1111 return std::move(maybe_split_points).status();
1113 file_split_points = *std::move(maybe_split_points);
1118 auto upload_buffer_size =
1124 file_split_points.emplace_back(file_size);
1125 assert(file_split_points.size() == state->shards().size());
1126 std::vector<ParallelUploadFileShard> res;
1127 std::uintmax_t offset = 0;
1128 std::size_t shard_idx = 0;
1129 for (
auto shard_end : file_split_points) {
1130 res.emplace_back(ParallelUploadFileShard(
1131 state->impl_, std::move(state->shards()[shard_idx++]), file_name,
1132 offset, shard_end - offset, upload_buffer_size));
1140template <
typename... Options>
1141StatusOr<std::vector<ParallelUploadFileShard>> CreateUploadShards(
1142 Client client, std::string file_name, std::string
const& bucket_name,
1143 std::string
const& object_name, std::string
const& prefix,
1144 Options&&... options) {
1145 return CreateParallelUploadShards::Create(
1146 std::move(client), std::move(file_name), bucket_name, object_name, prefix,
1147 std::forward<Options>(options)...);
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181template <
typename... Options>
1183 Client client, std::string file_name, std::string bucket_name,
1184 std::string object_name, std::string prefix,
bool ignore_cleanup_failures,
1185 Options&&... options) {
1187 internal::IsOptionSupportedWithParallelUpload<Options...>::value,
1188 "Provided Option not found in ParallelUploadFileSupportedOptions.");
1190 auto shards = internal::CreateParallelUploadShards::Create(
1191 std::move(client), std::move(file_name), std::move(bucket_name),
1192 std::move(object_name), std::move(prefix),
1193 std::forward<Options>(options)...);
1195 return shards.status();
1198 std::vector<std::thread> threads;
1199 threads.reserve(shards->size());
1200 for (
auto& shard : *shards) {
1201 threads.emplace_back([&shard] {
1207 for (
auto& thread : threads) {
1210 auto res = (*shards)[0].WaitForCompletion().get();
1211 auto cleanup_res = (*shards)[0].EagerCleanup();
1212 if (!cleanup_res.ok() && !ignore_cleanup_failures) {
1218GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
ValueTypeT< T > const & get() const
Status const & status() const &
Status(StatusCode code, std::string message, ErrorInfo info={})
friend friend class future
The Google Cloud Storage (GCS) Client.
Definition: client.h:263
A parameter type indicating the maximum number of streams to ParallelUploadFile.
Definition: parallel_upload.h:46
std::size_t value() const
Definition: parallel_upload.h:49
MaxStreams(std::size_t value)
Definition: parallel_upload.h:48
A parameter type indicating the minimum stream size to ParallelUploadFile.
Definition: parallel_upload.h:62
std::uintmax_t value() const
Definition: parallel_upload.h:65
MinStreamSize(std::uintmax_t value)
Definition: parallel_upload.h:64
Defines a std::basic_istream<char> to read from a GCS Object.
Definition: object_read_stream.h:33
Defines a std::basic_ostream<char> to write to a GCS object.
Definition: object_write_stream.h:131
ObjectWriteStream(ObjectWriteStream &&rhs) noexcept
Contains all the Google Cloud Storage C++ client APIs.
Definition: auto_finalize.h:24
StatusOr< ObjectMetadata > ParallelUploadFile(Client client, std::string file_name, std::string bucket_name, std::string object_name, std::string prefix, bool ignore_cleanup_failures, Options &&... options)
Perform a parallel upload of a given file.
Definition: parallel_upload.h:1182
Defines one of the source objects for a compose operation.
Definition: object_metadata.h:44
Sets the contentEncoding option for object uploads.
Definition: well_known_parameters.h:83
Set the MIME content type of an object.
Definition: well_known_headers.h:74
Set the ACL to a predefined value when copying Objects.
Definition: well_known_parameters.h:357
Disable CRC32C checksum computations.
Definition: hashing_options.h:139
Disable or enable MD5 Hashing computations.
Definition: hashing_options.h:74
An optional parameter to set the Customer-Supplied Encryption key.
Definition: well_known_headers.h:183
Set the version of an object to operate on.
Definition: well_known_parameters.h:145
A pre-condition: the request succeeds only if the object generation matches.
Definition: well_known_parameters.h:157
A pre-condition: the request succeeds unless the object generation matches.
Definition: well_known_parameters.h:169
Configure the Customer-Managed Encryption Key (CMEK) for an upload.
Definition: well_known_parameters.h:274
Set the ACL to predefined values when creating Buckets or Objects.
Definition: well_known_parameters.h:320
Sets the user for this operation for quota enforcement purposes.
Definition: well_known_parameters.h:524
Control the formatted I/O upload buffer.
Definition: options.h:179
Request a resumable upload, restoring a previous session if necessary.
Definition: upload_options.h:38
Sets the user IP on an operation for quota enforcement purposes.
Definition: user_ip_option.h:43
Set the project used for billing in "requester pays" Buckets.
Definition: well_known_parameters.h:573