15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
18 #include "google/cloud/storage/client.h"
19 #include "google/cloud/storage/internal/tuple_filter.h"
20 #include "google/cloud/storage/object_stream.h"
21 #include "google/cloud/storage/version.h"
22 #include "google/cloud/future.h"
23 #include "google/cloud/internal/filesystem.h"
24 #include "google/cloud/status_or.h"
25 #include "absl/memory/memory.h"
26 #include "absl/types/optional.h"
27 #include <condition_variable>
47 explicit MaxStreams(std::size_t value) : value_(value) {}
48 std::size_t
value()
const {
return value_; }
64 std::uintmax_t
value()
const {
return value_; }
67 std::uintmax_t value_;
72 class ParallelUploadFileShard;
73 struct CreateParallelUploadShards;
79 template <
typename T,
typename Tuple,
typename Enable =
void>
80 struct ExtractFirstOccurrenceOfTypeImpl {
81 absl::optional<T> operator()(Tuple
const&) {
return absl::optional<T>(); }
84 template <
typename T,
typename... Options>
85 struct ExtractFirstOccurrenceOfTypeImpl<
86 T, std::tuple<Options...>,
87 typename std::enable_if<
88 Among<
typename std::decay<Options>::type...>::
template TPred<
89 typename std::decay<T>::type>::value>::type> {
90 absl::optional<T> operator()(std::tuple<Options...>
const& tuple) {
91 return std::get<0>(StaticTupleFilter<Among<T>::
template TPred>(tuple));
95 template <
typename T,
typename Tuple>
96 absl::optional<T> ExtractFirstOccurrenceOfType(Tuple
const& tuple) {
97 return ExtractFirstOccurrenceOfTypeImpl<T, Tuple>()(tuple);
107 class ParallelUploadExtraPersistentState {
109 std::string payload() && {
return std::move(payload_); }
110 std::string payload()
const& {
return payload_; }
113 friend struct CreateParallelUploadShards;
114 explicit ParallelUploadExtraPersistentState(std::string payload)
115 : payload_(std::move(payload)) {}
117 std::string payload_;
120 class ParallelObjectWriteStreambuf;
127 struct ParallelUploadPersistentState {
129 std::string object_name;
130 std::string resumable_session_id;
133 std::string ToString()
const;
134 static StatusOr<ParallelUploadPersistentState> FromString(
135 std::string
const& json_rep);
137 std::string destination_object_name;
138 std::int64_t expected_generation;
139 std::string custom_data;
140 std::vector<Stream> streams;
150 class ParallelUploadStateImpl
151 :
public std::enable_shared_from_this<ParallelUploadStateImpl> {
153 ParallelUploadStateImpl(
bool cleanup_on_failures,
154 std::string destination_object_name,
155 std::int64_t expected_generation,
156 std::shared_ptr<ScopedDeleter> deleter,
158 ~ParallelUploadStateImpl();
161 std::shared_ptr<RawClient> raw_client,
162 ResumableUploadRequest
const& request);
164 void AllStreamsFinished(std::unique_lock<std::mutex>& lk);
165 void StreamFinished(std::size_t stream_idx,
166 StatusOr<QueryResumableUploadResponse>
const& response);
168 void StreamDestroyed(std::size_t stream_idx);
176 ParallelUploadPersistentState ToPersistentState()
const;
178 std::string custom_data()
const {
179 std::unique_lock<std::mutex> lk(mu_);
183 void set_custom_data(std::string custom_data) {
184 std::unique_lock<std::mutex> lk(mu_);
185 custom_data_ = std::move(custom_data);
188 std::string resumable_session_id() {
189 std::unique_lock<std::mutex> lk(mu_);
190 return resumable_session_id_;
193 void set_resumable_session_id(std::string resumable_session_id) {
194 std::unique_lock<std::mutex> lk(mu_);
195 resumable_session_id_ = std::move(resumable_session_id);
198 void PreventFromFinishing() {
199 std::unique_lock<std::mutex> lk(mu_);
200 ++num_unfinished_streams_;
203 void AllowFinishing() {
204 std::unique_lock<std::mutex> lk(mu_);
205 if (--num_unfinished_streams_ == 0) {
206 AllStreamsFinished(lk);
212 std::string object_name;
213 std::string resumable_session_id;
218 mutable std::mutex mu_;
220 mutable std::vector<promise<StatusOr<
ObjectMetadata>>> res_promises_;
222 std::shared_ptr<ScopedDeleter> deleter_;
224 std::string destination_object_name_;
225 std::int64_t expected_generation_;
227 bool finished_ =
false;
229 std::size_t num_unfinished_streams_ = 0;
230 std::vector<StreamInfo> streams_;
233 std::string custom_data_;
234 std::string resumable_session_id_;
237 struct ComposeManyApplyHelper {
238 template <
typename... Options>
240 return ComposeMany(client, bucket_name, std::move(source_objects), prefix,
241 std::move(destination_object_name),
true,
242 std::forward<Options>(options)...);
246 std::string bucket_name;
249 std::string destination_object_name;
252 class SetOptionsApplyHelper {
254 explicit SetOptionsApplyHelper(ResumableUploadRequest& request)
255 : request_(request) {}
257 template <
typename... Options>
258 void operator()(Options&&... options)
const {
259 request_.set_multiple_options(std::forward<Options>(options)...);
263 ResumableUploadRequest& request_;
266 struct ReadObjectApplyHelper {
267 template <
typename... Options>
269 return client.ReadObject(bucket_name, object_name,
270 std::forward<Options>(options)...);
274 std::string
const& bucket_name;
275 std::string
const& object_name;
278 struct GetObjectMetadataApplyHelper {
279 template <
typename... Options>
281 return client.GetObjectMetadata(bucket_name, object_name,
282 std::move(options)...);
286 std::string bucket_name;
287 std::string object_name;
300 class ParallelUploadFileShard {
302 ParallelUploadFileShard(ParallelUploadFileShard
const&) =
delete;
303 ParallelUploadFileShard& operator=(ParallelUploadFileShard
const&) =
delete;
304 ParallelUploadFileShard(ParallelUploadFileShard&&) =
default;
305 ParallelUploadFileShard& operator=(ParallelUploadFileShard&&) =
default;
306 ~ParallelUploadFileShard();
325 return state_->WaitForCompletion();
342 Status EagerCleanup() {
return state_->EagerCleanup(); }
347 std::string resumable_session_id() {
return resumable_session_id_; }
350 friend struct CreateParallelUploadShards;
351 ParallelUploadFileShard(std::shared_ptr<ParallelUploadStateImpl> state,
353 std::uintmax_t offset_in_file,
354 std::uintmax_t bytes_to_upload,
355 std::size_t upload_buffer_size)
356 : state_(std::move(state)),
357 ostream_
(std::move(ostream)
),
358 file_name_(std::move(file_name)),
359 offset_in_file_(offset_in_file),
360 left_to_upload_(bytes_to_upload),
361 upload_buffer_size_(upload_buffer_size),
362 resumable_session_id_(state_->resumable_session_id()) {}
364 std::shared_ptr<ParallelUploadStateImpl> state_;
366 std::string file_name_;
371 std::int64_t offset_in_file_;
372 std::int64_t left_to_upload_;
373 std::size_t upload_buffer_size_;
374 std::string resumable_session_id_;
393 class NonResumableParallelUploadState {
395 template <
typename... Options>
396 static StatusOr<NonResumableParallelUploadState> Create(
397 Client client, std::string
const& bucket_name,
398 std::string
const& object_name, std::size_t num_shards,
399 std::string
const& prefix, std::tuple<Options...> options);
408 return impl_->WaitForCompletion();
423 Status EagerCleanup() {
return impl_->EagerCleanup(); }
445 void Fail(
Status status) {
return impl_->Fail(std::move(status)); }
448 NonResumableParallelUploadState(
449 std::shared_ptr<ParallelUploadStateImpl> state,
451 : impl_(std::move(state)), shards_(std::move(shards)) {}
453 std::shared_ptr<ParallelUploadStateImpl> impl_;
456 friend struct CreateParallelUploadShards;
481 class ResumableParallelUploadState {
483 static std::string session_id_prefix() {
return "ParUpl:"; }
485 template <
typename... Options>
486 static StatusOr<ResumableParallelUploadState> CreateNew(
487 Client client, std::string
const& bucket_name,
488 std::string
const& object_name, std::size_t num_shards,
489 std::string
const& prefix, std::string extra_state,
490 std::tuple<Options...>
const& options);
492 template <
typename... Options>
493 static StatusOr<ResumableParallelUploadState> Resume(
494 Client client, std::string
const& bucket_name,
495 std::string
const& object_name, std::size_t num_shards,
496 std::string
const& prefix, std::string resumable_session_id,
497 std::tuple<Options...> options);
505 std::string resumable_session_id() {
return resumable_session_id_; }
514 return impl_->WaitForCompletion();
529 Status EagerCleanup() {
return impl_->EagerCleanup(); }
551 void Fail(
Status status) {
return impl_->Fail(std::move(status)); }
554 template <
typename... Options>
555 static std::shared_ptr<ScopedDeleter> CreateDeleter(
556 Client client, std::string
const& bucket_name,
557 std::tuple<Options...>
const& options);
559 template <
typename... Options>
560 static Composer CreateComposer(
Client client, std::string
const& bucket_name,
561 std::string
const& object_name,
562 std::int64_t expected_generation,
563 std::string
const& prefix,
564 std::tuple<Options...>
const& options);
566 ResumableParallelUploadState(std::string resumable_session_id,
567 std::shared_ptr<ParallelUploadStateImpl> state,
569 : resumable_session_id_(std::move(resumable_session_id)),
570 impl_(std::move(state)),
571 shards_(std::move(shards)) {}
573 std::string resumable_session_id_;
574 std::shared_ptr<ParallelUploadStateImpl> impl_;
577 friend struct CreateParallelUploadShards;
603 template <
typename... Options,
604 typename std::enable_if<
605 NotAmong<
typename std::decay<Options>::type...>::
template TPred<
607 int>::type EnableIfNotResumable = 0>
608 StatusOr<NonResumableParallelUploadState> PrepareParallelUpload(
609 Client client, std::string
const& bucket_name,
610 std::string
const& object_name, std::size_t num_shards,
611 std::string
const& prefix, Options&&... options) {
612 return NonResumableParallelUploadState::Create(
613 std::move(client), bucket_name, object_name, num_shards, prefix,
614 StaticTupleFilter<NotAmong<ParallelUploadExtraPersistentState>::TPred>(
615 std::forward_as_tuple(std::forward<Options>(options)...)));
618 template <
typename... Options,
619 typename std::enable_if<
620 Among<
typename std::decay<Options>::type...>::
template TPred<
622 int>::type EnableIfResumable = 0>
623 StatusOr<ResumableParallelUploadState> PrepareParallelUpload(
624 Client client, std::string
const& bucket_name,
625 std::string
const& object_name, std::size_t num_shards,
626 std::string
const& prefix, Options&&... options) {
627 auto resumable_args =
629 std::tie(options...));
631 std::tuple_size<
decltype(resumable_args)>::value == 1,
632 "There should be exactly one UseResumableUploadSession argument");
633 std::string resumable_session_id = std::get<0>(resumable_args).value();
634 auto extra_state_arg =
635 ExtractFirstOccurrenceOfType<ParallelUploadExtraPersistentState>(
636 std::tie(options...));
638 auto forwarded_args =
640 ParallelUploadExtraPersistentState>::TPred>(
641 std::forward_as_tuple(std::forward<Options>(options)...));
643 if (resumable_session_id.empty()) {
644 return ResumableParallelUploadState::CreateNew(
645 std::move(client), bucket_name, object_name, num_shards, prefix,
646 extra_state_arg ? (*std::move(extra_state_arg)).payload()
648 std::move(forwarded_args));
650 return ResumableParallelUploadState::Resume(
651 std::move(client), bucket_name, object_name, num_shards, prefix,
652 resumable_session_id, std::move(forwarded_args));
655 template <
typename... Options>
656 StatusOr<NonResumableParallelUploadState>
657 NonResumableParallelUploadState::Create(
Client client,
658 std::string
const& bucket_name,
659 std::string
const& object_name,
660 std::size_t num_shards,
661 std::string
const& prefix,
662 std::tuple<Options...> options) {
663 using internal::StaticTupleFilter;
664 auto delete_options =
666 auto deleter = std::make_shared<ScopedDeleter>(
667 [client, bucket_name, delete_options](std::string
const& object_name,
668 std::int64_t generation)
mutable {
670 DeleteApplyHelper{client, std::move(bucket_name), object_name,
672 std::move(delete_options));
675 auto compose_options = StaticTupleFilter<
679 auto composer = [client, bucket_name, object_name, compose_options,
682 ComposeManyApplyHelper{client, std::move(bucket_name),
683 std::move(sources), prefix +
".compose_many",
684 std::move(object_name)},
685 std::move(compose_options));
688 auto lock = internal::LockPrefix(client, bucket_name, prefix, options);
691 lock.status().code(),
692 "Failed to lock prefix for ParallelUpload: " + lock.status().message());
696 auto internal_state = std::make_shared<ParallelUploadStateImpl>(
697 true, object_name, 0, std::move(deleter), std::move(composer));
700 auto upload_options = StaticTupleFilter<
704 for (std::size_t i = 0; i < num_shards; ++i) {
705 ResumableUploadRequest request(
706 bucket_name, prefix +
".upload_shard_" + std::to_string(i));
707 google::
cloud::internal::apply(SetOptionsApplyHelper(request),
709 auto stream = internal_state->CreateStream(
710 internal::ClientImplDetails::GetRawClient(client), request);
712 return stream.status();
714 streams.emplace_back(*std::move(stream));
716 return NonResumableParallelUploadState(std::move(internal_state),
720 template <
typename... Options>
721 std::shared_ptr<ScopedDeleter> ResumableParallelUploadState::CreateDeleter(
723 std::string
const& bucket_name, std::tuple<Options...>
const& options) {
724 using internal::StaticTupleFilter;
725 auto delete_options =
727 return std::make_shared<ScopedDeleter>(
728 [client, bucket_name, delete_options](std::string
const& object_name,
729 std::int64_t generation)
mutable {
731 DeleteApplyHelper{client, std::move(bucket_name), object_name,
733 std::move(delete_options));
737 template <
typename... Options>
738 Composer ResumableParallelUploadState::CreateComposer(
740 std::string
const& bucket_name, std::string
const& object_name,
741 std::int64_t expected_generation, std::string
const& prefix,
742 std::tuple<Options...>
const& options) {
743 auto compose_options = std::tuple_cat(
748 auto get_metadata_options = StaticTupleFilter<
750 auto composer = [client, bucket_name, object_name, compose_options,
751 get_metadata_options,
755 ComposeManyApplyHelper{client, bucket_name, std::move(sources),
756 prefix +
".compose_many", object_name},
757 std::move(compose_options));
768 GetObjectMetadataApplyHelper{client, std::move(bucket_name),
769 std::move(object_name)},
770 std::move(get_metadata_options));
772 return Composer(std::move(composer));
775 template <
typename... Options>
776 StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::CreateNew(
777 Client client, std::string
const& bucket_name,
778 std::string
const& object_name, std::size_t num_shards,
779 std::string
const& prefix, std::string extra_state,
780 std::tuple<Options...>
const& options) {
781 using internal::StaticTupleFilter;
783 auto get_object_meta_options = StaticTupleFilter<
787 GetObjectMetadataApplyHelper{client, bucket_name, object_name},
788 std::move(get_object_meta_options));
790 return object_meta.status();
792 std::int64_t expected_generation =
793 object_meta ? object_meta->generation() : 0;
795 auto deleter = CreateDeleter(client, bucket_name, options);
796 auto composer = CreateComposer(client, bucket_name, object_name,
797 expected_generation, prefix, options);
798 auto internal_state = std::make_shared<ParallelUploadStateImpl>(
799 false, object_name, expected_generation, deleter, std::move(composer));
800 internal_state->set_custom_data(std::move(extra_state));
804 auto upload_options = std::tuple_cat(
810 for (std::size_t i = 0; i < num_shards; ++i) {
811 ResumableUploadRequest request(
812 bucket_name, prefix +
".upload_shard_" + std::to_string(i));
813 google::
cloud::internal::apply(SetOptionsApplyHelper(request),
815 auto stream = internal_state->CreateStream(
816 internal::ClientImplDetails::GetRawClient(client), request);
818 return stream.status();
820 streams.emplace_back(*std::move(stream));
823 auto state_object_name = prefix +
".upload_state";
824 auto insert_options = std::tuple_cat(
829 auto state_object =
google::
cloud::internal::apply(
830 InsertObjectApplyHelper{client, bucket_name, state_object_name,
831 internal_state->ToPersistentState().ToString()},
832 std::move(insert_options));
834 internal_state->Fail(state_object.status());
835 return std::move(state_object).status();
837 std::string resumable_session_id = session_id_prefix() + state_object_name +
839 std::to_string(state_object->generation());
840 internal_state->set_resumable_session_id(resumable_session_id);
841 deleter->Add(std::move(*state_object));
842 return ResumableParallelUploadState(std::move(resumable_session_id),
843 std::move(internal_state),
847 StatusOr<std::pair<std::string, std::int64_t>> ParseResumableSessionId(
848 std::string
const& session_id);
850 template <
typename... Options>
851 StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::Resume(
852 Client client, std::string
const& bucket_name,
853 std::string
const& object_name, std::size_t num_shards,
854 std::string
const& prefix, std::string resumable_session_id,
855 std::tuple<Options...> options) {
856 using internal::StaticTupleFilter;
858 auto state_and_gen = ParseResumableSessionId(resumable_session_id);
859 if (!state_and_gen) {
863 auto read_options = std::tuple_cat(
869 auto state_stream =
google::
cloud::internal::apply(
870 ReadObjectApplyHelper{client, bucket_name, state_and_gen
->first},
871 std::move(read_options));
872 std::string state_string(std::istreambuf_iterator<
char>{state_stream}, {});
873 state_stream.Close();
875 auto persistent_state =
876 ParallelUploadPersistentState::FromString(state_string);
877 if (!persistent_state) {
881 if (persistent_state
->destination_object_name != object_name) {
883 "Specified resumable session ID is doesn't match the "
884 "destination object name (" +
885 object_name +
" vs " +
886 persistent_state
->destination_object_name +
")");
888 if (persistent_state
->streams.size() != num_shards && num_shards != 0) {
890 "Specified resumable session ID is doesn't match the "
891 "previously specified number of shards (" +
892 std::to_string(num_shards) +
" vs " +
893 std::to_string(persistent_state
->streams.size()) +
")");
896 auto deleter = CreateDeleter(client, bucket_name, options);
897 deleter->Add(state_and_gen
->first, state_and_gen
->second);
899 CreateComposer(client, bucket_name, object_name,
900 persistent_state
->expected_generation, prefix, options);
901 auto internal_state = std::make_shared<ParallelUploadStateImpl>(
902 false, object_name, persistent_state
->expected_generation, deleter,
903 std::move(composer));
904 internal_state->set_custom_data(std::move(persistent_state
->custom_data));
905 internal_state->set_resumable_session_id(resumable_session_id);
909 internal_state->PreventFromFinishing();
912 auto upload_options = StaticTupleFilter<
916 for (
auto& stream_desc : persistent_state
->streams) {
917 ResumableUploadRequest request(bucket_name,
918 std::move(stream_desc.object_name));
920 SetOptionsApplyHelper(request),
921 std::tuple_cat(upload_options,
923 std::move(stream_desc.resumable_session_id)))));
924 auto stream = internal_state->CreateStream(
925 internal::ClientImplDetails::GetRawClient(client), request);
927 internal_state->AllowFinishing();
928 return stream.status();
930 streams.emplace_back(*std::move(stream));
933 internal_state->AllowFinishing();
934 return ResumableParallelUploadState(std::move(resumable_session_id),
935 std::move(internal_state),
939 template <
typename... Options>
940 std::vector<std::uintmax_t> ComputeParallelFileUploadSplitPoints(
941 std::uintmax_t file_size, std::tuple<Options...>
const& options) {
942 auto div_ceil = [](std::uintmax_t dividend, std::uintmax_t divisor) {
943 return (dividend + divisor - 1) / divisor;
950 auto const min_stream_size =
951 (std::max<std::uintmax_t>)(1, ExtractFirstOccurrenceOfType<
MinStreamSize>(
953 .value_or(default_min_stream_size)
955 auto const max_streams = ExtractFirstOccurrenceOfType<
MaxStreams>(options)
956 .value_or(default_max_streams)
959 auto const wanted_num_streams =
961 std::uintmax_t>)(1, (std::min<std::uintmax_t>)(max_streams,
966 auto const stream_size =
967 (std::max<std::uintmax_t>)(1, div_ceil(file_size, wanted_num_streams));
969 std::vector<std::uintmax_t> res;
970 for (
auto split = stream_size; split < file_size; split += stream_size) {
971 res.push_back(split);
976 std::string ParallelFileUploadSplitPointsToString(
977 std::vector<std::uintmax_t>
const& split_points);
979 StatusOr<std::vector<std::uintmax_t>> ParallelFileUploadSplitPointsFromString(
980 std::string
const& s);
989 struct PrepareParallelUploadApplyHelper {
991 template <
typename... Options>
992 StatusOr<
typename std::conditional<
993 Among<
typename std::decay<Options>::type...>::
template TPred<
995 ResumableParallelUploadState, NonResumableParallelUploadState>::type>
996 operator()(Options&&... options) {
997 return PrepareParallelUpload(std::move(client), bucket_name, object_name,
999 std::forward<Options>(options)...);
1003 std::string
const& bucket_name;
1004 std::string
const& object_name;
1005 std::size_t num_shards;
1006 std::string
const& prefix;
1009 struct CreateParallelUploadShards {
1043 template <
typename... Options>
1044 static StatusOr<std::vector<ParallelUploadFileShard>> Create(
1046 std::string file_name, std::string
const& bucket_name,
1047 std::string
const& object_name, std::string
const& prefix,
1048 Options&&... options) {
1049 std::error_code size_err;
1050 auto file_size =
google::
cloud::internal::file_size(file_name, size_err);
1055 auto const resumable_session_id_arg =
1057 std::tie(options...));
1058 bool const new_session = !resumable_session_id_arg ||
1059 resumable_session_id_arg.value().value().empty();
1060 auto upload_options =
1062 std::tie(options...));
1064 std::vector<uintmax_t> file_split_points;
1065 std::size_t num_shards = 0;
1068 ComputeParallelFileUploadSplitPoints(file_size, std::tie(options...));
1069 num_shards = file_split_points.size() + 1;
1074 PrepareParallelUploadApplyHelper{client, bucket_name, object_name,
1075 num_shards, prefix},
1077 std::move(upload_options),
1078 std::make_tuple(ParallelUploadExtraPersistentState(
1079 ParallelFileUploadSplitPointsToString(file_split_points)))));
1081 return state.status();
1086 auto maybe_split_points =
1087 ParallelFileUploadSplitPointsFromString(state->impl_->custom_data());
1088 if (!maybe_split_points) {
1089 state->Fail(maybe_split_points.status());
1090 return std::move(maybe_split_points).status();
1092 file_split_points = *std::move(maybe_split_points);
1097 auto upload_buffer_size =
1103 file_split_points.emplace_back(file_size);
1104 assert(file_split_points.size() == state->shards().size());
1105 std::vector<ParallelUploadFileShard> res;
1106 std::uintmax_t offset = 0;
1107 std::size_t shard_idx = 0;
1108 for (
auto shard_end : file_split_points) {
1109 res.emplace_back(ParallelUploadFileShard(
1110 state->impl_, std::move(state->shards()[shard_idx++]), file_name,
1111 offset, shard_end - offset, upload_buffer_size));
1119 template <
typename... Options>
1120 StatusOr<std::vector<ParallelUploadFileShard>> CreateUploadShards(
1121 Client client, std::string file_name, std::string
const& bucket_name,
1122 std::string
const& object_name, std::string
const& prefix,
1123 Options&&... options) {
1124 return CreateParallelUploadShards::Create(
1125 std::move(client), std::move(file_name), bucket_name, object_name, prefix,
1126 std::forward<Options>(options)...);
1160 template <
typename... Options>
1162 Client client, std::string file_name, std::string bucket_name,
1163 std::string object_name, std::string prefix,
bool ignore_cleanup_failures,
1164 Options&&... options) {
1165 auto shards = internal::CreateParallelUploadShards::Create(
1166 std::move(client), std::move(file_name), std::move(bucket_name),
1167 std::move(object_name), std::move(prefix),
1168 std::forward<Options>(options)...);
1170 return shards.status();
1173 std::vector<std::thread> threads;
1174 threads.reserve(shards->size());
1175 for (
auto& shard : *shards) {
1176 threads.emplace_back([&shard] {
1182 for (
auto& thread : threads) {
1185 auto res = (*shards)[0].WaitForCompletion().get();
1186 auto cleanup_res = (*shards)[0].EagerCleanup();
1187 if (!cleanup_res.ok() && !ignore_cleanup_failures) {