Google Cloud Storage C++ Client 2.13.0
A C++ Client Library for Google Cloud Storage
Loading...
Searching...
No Matches
parallel_upload.h
1// Copyright 2019 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
16#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
17
18#include "google/cloud/storage/client.h"
19#include "google/cloud/storage/internal/tuple_filter.h"
20#include "google/cloud/storage/object_stream.h"
21#include "google/cloud/storage/version.h"
22#include "google/cloud/future.h"
23#include "google/cloud/internal/filesystem.h"
24#include "google/cloud/internal/type_list.h"
25#include "google/cloud/status_or.h"
26#include "absl/types/optional.h"
27#include <condition_variable>
28#include <cstddef>
29#include <fstream>
30#include <functional>
31#include <mutex>
32#include <string>
33#include <tuple>
34#include <type_traits>
35#include <utility>
36#include <vector>
37
38namespace google {
39namespace cloud {
40namespace storage {
41GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
42/**
43 * A parameter type indicating the maximum number of streams to
44 * `ParallelUploadFile`.
45 */
46class MaxStreams {
47 public:
48 explicit MaxStreams(std::size_t value) : value_(value) {}
49 std::size_t value() const { return value_; }
50
51 private:
52 std::size_t value_;
53};
54
55/**
56 * A parameter type indicating the minimum stream size to `ParallelUploadFile`.
57 *
58 * If `ParallelUploadFile`, receives this option it will attempt to make sure
59 * that every shard is at least this long. This might not apply to the last
60 * shard because it will be the remainder of the division of the file.
61 */
62class MinStreamSize {
63 public:
64 explicit MinStreamSize(std::uintmax_t value) : value_(value) {}
65 std::uintmax_t value() const { return value_; }
66
67 private:
68 std::uintmax_t value_;
69};
70
71namespace internal {
72
73class ParallelUploadFileShard;
74struct CreateParallelUploadShards;
75
76/**
77 * Return an empty option if Tuple contains an element of type T, otherwise
78 * return the value of the first element of type T
79 */
80template <typename T, typename Tuple, typename Enable = void>
81struct ExtractFirstOccurrenceOfTypeImpl {
82 absl::optional<T> operator()(Tuple const&) { return absl::optional<T>(); }
83};
84
85template <typename T, typename... Options>
86struct ExtractFirstOccurrenceOfTypeImpl<
87 T, std::tuple<Options...>,
88 typename std::enable_if<
89 Among<typename std::decay<Options>::type...>::template TPred<
90 typename std::decay<T>::type>::value>::type> {
91 absl::optional<T> operator()(std::tuple<Options...> const& tuple) {
92 return std::get<0>(StaticTupleFilter<Among<T>::template TPred>(tuple));
93 }
94};
95
96template <typename T, typename Tuple>
97absl::optional<T> ExtractFirstOccurrenceOfType(Tuple const& tuple) {
98 return ExtractFirstOccurrenceOfTypeImpl<T, Tuple>()(tuple);
99}
100
101/**
102 * An option for `PrepareParallelUpload` to associate opaque data with upload.
103 *
104 * This is used by `CreateUploadShards()` to store additional information in the
105 * parallel upload persistent state. The additional information is where each
106 * shard starts in the uploaded file.
107 */
108class ParallelUploadExtraPersistentState {
109 public:
110 std::string payload() && { return std::move(payload_); }
111 std::string payload() const& { return payload_; }
112
113 private:
114 friend struct CreateParallelUploadShards;
115 explicit ParallelUploadExtraPersistentState(std::string payload)
116 : payload_(std::move(payload)) {}
117
118 std::string payload_;
119};
120
121class ParallelObjectWriteStreambuf;
122
123// Type-erased function object to execute ComposeMany with most arguments
124// bound.
125using Composer =
126 std::function<StatusOr<ObjectMetadata>(std::vector<ComposeSourceObject>)>;
127
128struct ParallelUploadPersistentState {
129 struct Stream {
130 std::string object_name;
131 std::string resumable_session_id;
132 };
133
134 std::string ToString() const;
135 static StatusOr<ParallelUploadPersistentState> FromString(
136 std::string const& json_rep);
137
138 std::string destination_object_name;
139 std::int64_t expected_generation;
140 std::string custom_data;
141 std::vector<Stream> streams;
142};
143
144// The `ObjectWriteStream`s have to hold references to the state of
145// the parallel upload so that they can update it when finished and trigger
146// shards composition, hence `ResumableParallelUploadState` has to be
147// destroyed after the `ObjectWriteStream`s.
148// `ResumableParallelUploadState` and `ObjectWriteStream`s are passed
149// around by values, so we don't control their lifetime. In order to
150// circumvent it, we move the state to something held by a `shared_ptr`.
151class ParallelUploadStateImpl
152 : public std::enable_shared_from_this<ParallelUploadStateImpl> {
153 public:
154 ParallelUploadStateImpl(bool cleanup_on_failures,
155 std::string destination_object_name,
156 std::int64_t expected_generation,
157 std::shared_ptr<ScopedDeleter> deleter,
158 Composer composer);
159 ~ParallelUploadStateImpl();
160
161 StatusOr<ObjectWriteStream> CreateStream(
162 std::shared_ptr<RawClient> raw_client,
163 ResumableUploadRequest const& request);
164
165 void AllStreamsFinished(std::unique_lock<std::mutex>& lk);
166 void StreamFinished(std::size_t stream_idx,
167 StatusOr<QueryResumableUploadResponse> const& response);
168
169 void StreamDestroyed(std::size_t stream_idx);
170
171 future<StatusOr<ObjectMetadata>> WaitForCompletion() const;
172
173 Status EagerCleanup();
174
175 void Fail(Status status);
176
177 ParallelUploadPersistentState ToPersistentState() const;
178
179 std::string custom_data() const {
180 std::unique_lock<std::mutex> lk(mu_);
181 return custom_data_;
182 }
183
184 void set_custom_data(std::string custom_data) {
185 std::unique_lock<std::mutex> lk(mu_);
186 custom_data_ = std::move(custom_data);
187 }
188
189 std::string resumable_session_id() {
190 std::unique_lock<std::mutex> lk(mu_);
191 return resumable_session_id_;
192 }
193
194 void set_resumable_session_id(std::string resumable_session_id) {
195 std::unique_lock<std::mutex> lk(mu_);
196 resumable_session_id_ = std::move(resumable_session_id);
197 }
198
199 void PreventFromFinishing() {
200 std::unique_lock<std::mutex> lk(mu_);
201 ++num_unfinished_streams_;
202 }
203
204 void AllowFinishing() {
205 std::unique_lock<std::mutex> lk(mu_);
206 if (--num_unfinished_streams_ == 0) {
207 AllStreamsFinished(lk);
208 }
209 }
210
211 private:
212 struct StreamInfo {
213 std::string object_name;
214 std::string resumable_session_id;
215 absl::optional<ComposeSourceObject> composition_arg;
216 bool finished;
217 };
218
219 mutable std::mutex mu_;
220 // Promises made via `WaitForCompletion()`
221 mutable std::vector<promise<StatusOr<ObjectMetadata>>> res_promises_;
222 // Type-erased object for deleting temporary objects.
223 std::shared_ptr<ScopedDeleter> deleter_;
224 Composer composer_;
225 std::string destination_object_name_;
226 std::int64_t expected_generation_;
227 // Set when all streams are closed and composed but before cleanup.
228 bool finished_ = false;
229 // Tracks how many streams are still written to.
230 std::size_t num_unfinished_streams_ = 0;
231 std::vector<StreamInfo> streams_;
232 absl::optional<StatusOr<ObjectMetadata>> res_;
233 Status cleanup_status_;
234 std::string custom_data_;
235 std::string resumable_session_id_;
236};
237
238struct ComposeManyApplyHelper {
239 template <typename... Options>
240 StatusOr<ObjectMetadata> operator()(Options&&... options) const {
241 return ComposeMany(client, bucket_name, std::move(source_objects), prefix,
242 std::move(destination_object_name), true,
243 std::forward<Options>(options)...);
244 }
245
246 Client& client;
247 std::string bucket_name;
248 std::vector<ComposeSourceObject> source_objects;
249 std::string prefix;
250 std::string destination_object_name;
251};
252
253class SetOptionsApplyHelper {
254 public:
255 explicit SetOptionsApplyHelper(ResumableUploadRequest& request)
256 : request_(request) {}
257
258 template <typename... Options>
259 void operator()(Options&&... options) const {
260 request_.set_multiple_options(std::forward<Options>(options)...);
261 }
262
263 private:
264 ResumableUploadRequest& request_;
265};
266
267struct ReadObjectApplyHelper {
268 template <typename... Options>
269 ObjectReadStream operator()(Options&&... options) const {
270 return client.ReadObject(bucket_name, object_name,
271 std::forward<Options>(options)...);
272 }
273
274 Client& client;
275 std::string const& bucket_name;
276 std::string const& object_name;
277};
278
279struct GetObjectMetadataApplyHelper {
280 template <typename... Options>
281 StatusOr<ObjectMetadata> operator()(Options... options) const {
282 return client.GetObjectMetadata(bucket_name, object_name,
283 std::move(options)...);
284 }
285
286 Client& client;
287 std::string bucket_name;
288 std::string object_name;
289};
290
291/**
292 * A class representing an individual shard of the parallel upload.
293 *
294 * In order to perform a parallel upload of a file, you should call
295 * `CreateUploadShards()` and it will return a vector of objects of this class.
296 * You should execute the `Upload()` member function on them in parallel to
297 * execute the upload.
298 *
299 * You can then obtain the status of the whole upload via `WaitForCompletion()`.
300 */
301class ParallelUploadFileShard {
302 public:
303 ParallelUploadFileShard(ParallelUploadFileShard const&) = delete;
304 ParallelUploadFileShard& operator=(ParallelUploadFileShard const&) = delete;
305 ParallelUploadFileShard(ParallelUploadFileShard&&) = default;
306 ParallelUploadFileShard& operator=(ParallelUploadFileShard&&) = default;
307 ~ParallelUploadFileShard();
308
309 /**
310 * Perform the upload of this shard.
311 *
312 * This function will block until the shard is completed, or a permanent
313 * failure is encountered, or the retry policy is exhausted.
314 */
315 Status Upload();
316
317 /**
318 * Asynchronously wait for completion of the whole upload operation (not only
319 * this shard).
320 *
321 * @return the returned future will become satisfied once the whole upload
322 * operation finishes (i.e. `Upload()` completes on all shards); on
323 * success, it will hold the destination object's metadata
324 */
325 future<StatusOr<ObjectMetadata>> WaitForCompletion() {
326 return state_->WaitForCompletion();
327 }
328
329 /**
330 * Cleanup all the temporary files
331 *
332 * The destruction of the last of these objects tied to a parallel upload will
333 * cleanup of all the temporary files used in the process of that parallel
334 * upload. If the cleanup fails, it will fail silently not to crash the
335 * program.
336 *
337 * If you want to control the status of the cleanup, use this member function
338 * to do it eagerly, before destruction.
339 *
340 * It is enough to call it on one of the objects, but it is not invalid to
341 * call it on all objects.
342 */
343 Status EagerCleanup() { return state_->EagerCleanup(); }
344
345 /**
346 * Retrieve resumable session ID to allow for potential future resume.
347 */
348 std::string resumable_session_id() { return resumable_session_id_; }
349
350 private:
351 friend struct CreateParallelUploadShards;
352 ParallelUploadFileShard(std::shared_ptr<ParallelUploadStateImpl> state,
353 ObjectWriteStream ostream, std::string file_name,
354 std::uintmax_t offset_in_file,
355 std::uintmax_t bytes_to_upload,
356 std::size_t upload_buffer_size)
357 : state_(std::move(state)),
358 ostream_(std::move(ostream)),
359 file_name_(std::move(file_name)),
360 offset_in_file_(offset_in_file),
361 left_to_upload_(bytes_to_upload),
362 upload_buffer_size_(upload_buffer_size),
363 resumable_session_id_(state_->resumable_session_id()) {}
364
365 std::shared_ptr<ParallelUploadStateImpl> state_;
366 ObjectWriteStream ostream_;
367 std::string file_name_;
368 // GCS only supports objects up to 5TiB, that fits comfortably in a
369 // `std::int64_t`, allows for any expected growth on that limit (not that we
370 // anticipate any), and plays nicer with the types in the standard C++
371 // library.
372 std::int64_t offset_in_file_;
373 std::int64_t left_to_upload_;
374 std::size_t upload_buffer_size_;
375 std::string resumable_session_id_;
376};
377
378/**
379 * The state controlling uploading a GCS object via multiple parallel streams.
380 *
381 * To use this class obtain the state via `PrepareParallelUpload` and then write
382 * the data to the streams associated with each shard. Once writing is done,
383 * close or destroy the streams.
384 *
385 * When all the streams are `Close`d or destroyed, this class will join the
386 * them (via `ComposeMany`) into the destination object and set the value in
387 * `future`s returned by `WaitForCompletion`.
388 *
389 * Parallel upload will create temporary files. Upon completion of the whole
390 * operation, this class will attempt to remove them in its destructor, but if
391 * they fail, they fail silently. In order to proactively cleanup these files,
392 * one can call `EagerCleanup()`.
393 */
394class NonResumableParallelUploadState {
395 public:
396 template <typename... Options>
397 static StatusOr<NonResumableParallelUploadState> Create(
398 Client client, std::string const& bucket_name,
399 std::string const& object_name, std::size_t num_shards,
400 std::string const& prefix, std::tuple<Options...> options);
401
402 /**
403 * Asynchronously wait for completion of the whole upload operation.
404 *
405 * @return the returned future will have a value set to the destination object
406 * metadata when all the streams are `Close`d or destroyed.
407 */
408 future<StatusOr<ObjectMetadata>> WaitForCompletion() const {
409 return impl_->WaitForCompletion();
410 }
411
412 /**
413 * Cleanup all the temporary files
414 *
415 * The destruction of this object will perform cleanup of all the temporary
416 * files used in the process of the parallel upload. If the cleanup fails, it
417 * will fail silently not to crash the program.
418 *
419 * If you want to control the status of the cleanup, use this member function
420 * to do it eagerly, before destruction.
421 *
422 * @return the status of the cleanup.
423 */
424 Status EagerCleanup() { return impl_->EagerCleanup(); }
425
426 /**
427 * The streams to write to.
428 *
429 * When the streams are `Close`d, they will be concatenated into the
430 * destination object in the same order as they appeared in this vector upon
431 * this object's creation.
432 *
433 * It is safe to destroy or `std::move()` these streams.
434 */
435 std::vector<ObjectWriteStream>& shards() { return shards_; }
436
437 /**
438 * Fail the whole operation.
439 *
440 * If called before all streams are closed or destroyed, calling this
441 * operation will prevent composing the streams into the final destination
442 * object and return a failure via `WaitForCompletion()`.
443 *
444 * @param status the status to fail the operation with.
445 */
446 void Fail(Status status) { return impl_->Fail(std::move(status)); }
447
448 private:
449 NonResumableParallelUploadState(
450 std::shared_ptr<ParallelUploadStateImpl> state,
451 std::vector<ObjectWriteStream> shards)
452 : impl_(std::move(state)), shards_(std::move(shards)) {}
453
454 std::shared_ptr<ParallelUploadStateImpl> impl_;
455 std::vector<ObjectWriteStream> shards_;
456
457 friend struct CreateParallelUploadShards;
458};
459
460/**
461 * The state controlling uploading a GCS object via multiple parallel streams,
462 * allowing for resuming.
463 *
464 * To use this class obtain the state via `PrepareParallelUpload` (with
465 * `UseResumableUploadSession` option) and then write the data to the streams
466 * associated with each shard. Once writing is done, close or destroy the
467 * streams.
468 *
469 * When all the streams are `Close`d or destroyed, this class will join the
470 * them (via `ComposeMany`) into the destination object and set the value in
471 * `future`s returned by `WaitForCompletion`.
472 *
473 * Parallel upload will create temporary files. Upon successful completion of
474 * the whole operation, this class will attempt to remove them in its
475 * destructor, but if they fail, they fail silently. In order to proactively
476 * cleanup these files, one can call `EagerCleanup()`.
477 *
478 * In order to resume an interrupted upload, provide `UseResumableUploadSession`
479 * to `PrepareParallelUpload` with value set to what `resumable_session_id()`
480 * returns.
481 */
482class ResumableParallelUploadState {
483 public:
484 static std::string session_id_prefix() { return "ParUpl:"; }
485
486 template <typename... Options>
487 static StatusOr<ResumableParallelUploadState> CreateNew(
488 Client client, std::string const& bucket_name,
489 std::string const& object_name, std::size_t num_shards,
490 std::string const& prefix, std::string extra_state,
491 std::tuple<Options...> const& options);
492
493 template <typename... Options>
494 static StatusOr<ResumableParallelUploadState> Resume(
495 Client client, std::string const& bucket_name,
496 std::string const& object_name, std::size_t num_shards,
497 std::string const& prefix, std::string resumable_session_id,
498 std::tuple<Options...> options);
499
500 /**
501 * Retrieve the resumable session id.
502 *
503 * This value, if passed via `UseResumableUploadSession` option indicates that
504 * an upload should be a continuation of the one which this object represents.
505 */
506 std::string resumable_session_id() { return resumable_session_id_; }
507
508 /**
509 * Asynchronously wait for completion of the whole upload operation.
510 *
511 * @return the returned future will have a value set to the destination object
512 * metadata when all the streams are `Close`d or destroyed.
513 */
514 future<StatusOr<ObjectMetadata>> WaitForCompletion() const {
515 return impl_->WaitForCompletion();
516 }
517
518 /**
519 * Cleanup all the temporary files
520 *
521 * The destruction of this object will perform cleanup of all the temporary
522 * files used in the process of the parallel upload. If the cleanup fails, it
523 * will fail silently not to crash the program.
524 *
525 * If you want to control the status of the cleanup, use this member function
526 * to do it eagerly, before destruction.
527 *
528 * @return the status of the cleanup.
529 */
530 Status EagerCleanup() { return impl_->EagerCleanup(); }
531
532 /**
533 * The streams to write to.
534 *
535 * When the streams are `Close`d, they will be concatenated into the
536 * destination object in the same order as they appeared in this vector upon
537 * this object's creation.
538 *
539 * It is safe to destroy or `std::move()` these streams.
540 */
541 std::vector<ObjectWriteStream>& shards() { return shards_; }
542
543 /**
544 * Fail the whole operation.
545 *
546 * If called before all streams are closed or destroyed, calling this
547 * operation will prevent composing the streams into the final destination
548 * object and return a failure via `WaitForCompletion()`.
549 *
550 * @param status the status to fail the operation with.
551 */
552 void Fail(Status status) { return impl_->Fail(std::move(status)); }
553
554 private:
555 template <typename... Options>
556 static std::shared_ptr<ScopedDeleter> CreateDeleter(
557 Client client, std::string const& bucket_name,
558 std::tuple<Options...> const& options);
559
560 template <typename... Options>
561 static Composer CreateComposer(Client client, std::string const& bucket_name,
562 std::string const& object_name,
563 std::int64_t expected_generation,
564 std::string const& prefix,
565 std::tuple<Options...> const& options);
566
567 ResumableParallelUploadState(std::string resumable_session_id,
568 std::shared_ptr<ParallelUploadStateImpl> state,
569 std::vector<ObjectWriteStream> shards)
570 : resumable_session_id_(std::move(resumable_session_id)),
571 impl_(std::move(state)),
572 shards_(std::move(shards)) {}
573
574 std::string resumable_session_id_;
575 std::shared_ptr<ParallelUploadStateImpl> impl_;
576 std::vector<ObjectWriteStream> shards_;
577
578 friend struct CreateParallelUploadShards;
579};
580
581/**
582 * Prepare a parallel upload state.
583 *
584 * The returned `NonResumableParallelUploadState` will contain streams to which
585 * data can be uploaded in parallel.
586 *
587 * @param client the client on which to perform the operation.
588 * @param bucket_name the name of the bucket that will contain the object.
589 * @param object_name the uploaded object name.
590 * @param num_shards how many streams to upload the object through.
591 * @param prefix the prefix with which temporary objects will be created.
592 * @param options a list of optional query parameters and/or request headers.
593 * Valid types for this operation include `DestinationPredefinedAcl`,
594 * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
595 * `KmsKeyName`, `QuotaUser`, `UserIp`, `UserProject`, `WithObjectMetadata`.
596 *
597 * @return the state of the parallel upload
598 *
599 * @par Idempotency
600 * This operation is not idempotent. While each request performed by this
601 * function is retried based on the client policies, the operation itself stops
602 * on the first request that fails.
603 */
604template <typename... Options,
605 typename std::enable_if<
606 NotAmong<typename std::decay<Options>::type...>::template TPred<
608 int>::type EnableIfNotResumable = 0>
609StatusOr<NonResumableParallelUploadState> PrepareParallelUpload(
610 Client client, std::string const& bucket_name,
611 std::string const& object_name, std::size_t num_shards,
612 std::string const& prefix, Options&&... options) {
613 return NonResumableParallelUploadState::Create(
614 std::move(client), bucket_name, object_name, num_shards, prefix,
615 StaticTupleFilter<NotAmong<ParallelUploadExtraPersistentState>::TPred>(
616 std::forward_as_tuple(std::forward<Options>(options)...)));
617}
618
619template <typename... Options,
620 typename std::enable_if<
621 Among<typename std::decay<Options>::type...>::template TPred<
623 int>::type EnableIfResumable = 0>
624StatusOr<ResumableParallelUploadState> PrepareParallelUpload(
625 Client client, std::string const& bucket_name,
626 std::string const& object_name, std::size_t num_shards,
627 std::string const& prefix, Options&&... options) {
628 auto resumable_args =
629 StaticTupleFilter<Among<UseResumableUploadSession>::TPred>(
630 std::tie(options...));
631 static_assert(
632 std::tuple_size<decltype(resumable_args)>::value == 1,
633 "There should be exactly one UseResumableUploadSession argument");
634 std::string resumable_session_id = std::get<0>(resumable_args).value();
635 auto extra_state_arg =
636 ExtractFirstOccurrenceOfType<ParallelUploadExtraPersistentState>(
637 std::tie(options...));
638
639 auto forwarded_args =
640 StaticTupleFilter<NotAmong<UseResumableUploadSession,
641 ParallelUploadExtraPersistentState>::TPred>(
642 std::forward_as_tuple(std::forward<Options>(options)...));
643
644 if (resumable_session_id.empty()) {
645 return ResumableParallelUploadState::CreateNew(
646 std::move(client), bucket_name, object_name, num_shards, prefix,
647 extra_state_arg ? (*std::move(extra_state_arg)).payload()
648 : std::string(),
649 std::move(forwarded_args));
650 }
651 return ResumableParallelUploadState::Resume(
652 std::move(client), bucket_name, object_name, num_shards, prefix,
653 resumable_session_id, std::move(forwarded_args));
654}
655
656template <typename... Options>
657StatusOr<NonResumableParallelUploadState>
658NonResumableParallelUploadState::Create(Client client,
659 std::string const& bucket_name,
660 std::string const& object_name,
661 std::size_t num_shards,
662 std::string const& prefix,
663 std::tuple<Options...> options) {
664 using internal::StaticTupleFilter;
665 auto delete_options =
666 StaticTupleFilter<Among<QuotaUser, UserProject, UserIp>::TPred>(options);
667 auto deleter = std::make_shared<ScopedDeleter>(
668 [client, bucket_name, delete_options](std::string const& object_name,
669 std::int64_t generation) mutable {
670 return google::cloud::internal::apply(
671 DeleteApplyHelper{client, std::move(bucket_name), object_name,
672 generation},
673 std::move(delete_options));
674 });
675
676 auto compose_options = StaticTupleFilter<
679 WithObjectMetadata>::TPred>(options);
680 auto composer = [client, bucket_name, object_name, compose_options,
681 prefix](std::vector<ComposeSourceObject> sources) mutable {
682 return google::cloud::internal::apply(
683 ComposeManyApplyHelper{client, std::move(bucket_name),
684 std::move(sources), prefix + ".compose_many",
685 std::move(object_name)},
686 std::move(compose_options));
687 };
688
689 auto lock = internal::LockPrefix(client, bucket_name, prefix, options);
690 if (!lock) {
691 return Status(
692 lock.status().code(),
693 "Failed to lock prefix for ParallelUpload: " + lock.status().message());
694 }
695 deleter->Add(*lock);
696
697 auto internal_state = std::make_shared<ParallelUploadStateImpl>(
698 true, object_name, 0, std::move(deleter), std::move(composer));
699 std::vector<ObjectWriteStream> streams;
700
701 auto upload_options = StaticTupleFilter<
704 WithObjectMetadata>::TPred>(std::move(options));
705 for (std::size_t i = 0; i < num_shards; ++i) {
706 ResumableUploadRequest request(
707 bucket_name, prefix + ".upload_shard_" + std::to_string(i));
708 google::cloud::internal::apply(SetOptionsApplyHelper(request),
709 upload_options);
710 auto stream = internal_state->CreateStream(
711 internal::ClientImplDetails::GetRawClient(client), request);
712 if (!stream) {
713 return stream.status();
714 }
715 streams.emplace_back(*std::move(stream));
716 }
717 return NonResumableParallelUploadState(std::move(internal_state),
718 std::move(streams));
719}
720
721template <typename... Options>
722std::shared_ptr<ScopedDeleter> ResumableParallelUploadState::CreateDeleter(
723 Client client, // NOLINT(performance-unnecessary-value-param)
724 std::string const& bucket_name, std::tuple<Options...> const& options) {
725 using internal::StaticTupleFilter;
726 auto delete_options =
727 StaticTupleFilter<Among<QuotaUser, UserProject, UserIp>::TPred>(options);
728 return std::make_shared<ScopedDeleter>(
729 [client, bucket_name, delete_options](std::string const& object_name,
730 std::int64_t generation) mutable {
731 return google::cloud::internal::apply(
732 DeleteApplyHelper{client, std::move(bucket_name), object_name,
733 generation},
734 std::move(delete_options));
735 });
736}
737
738template <typename... Options>
739Composer ResumableParallelUploadState::CreateComposer(
740 Client client, // NOLINT(performance-unnecessary-value-param)
741 std::string const& bucket_name, std::string const& object_name,
742 std::int64_t expected_generation, std::string const& prefix,
743 std::tuple<Options...> const& options) {
744 auto compose_options = std::tuple_cat(
745 StaticTupleFilter<
747 UserIp, UserProject, WithObjectMetadata>::TPred>(options),
748 std::make_tuple(IfGenerationMatch(expected_generation)));
749 auto get_metadata_options = StaticTupleFilter<
750 Among<EncryptionKey, QuotaUser, UserIp, UserProject>::TPred>(options);
751 auto composer = [client, bucket_name, object_name, compose_options,
752 get_metadata_options,
753 prefix](std::vector<ComposeSourceObject> sources) mutable
754 -> StatusOr<ObjectMetadata> {
755 auto res = google::cloud::internal::apply(
756 ComposeManyApplyHelper{client, bucket_name, std::move(sources),
757 prefix + ".compose_many", object_name},
758 std::move(compose_options));
759 if (res) {
760 return res;
761 }
762 if (res.status().code() != StatusCode::kFailedPrecondition) {
763 return res.status();
764 }
765 // This means that the object already exists and it is not the object, which
766 // existed upon start of parallel upload. For simplicity, we assume that
767 // it's a result of a previously interrupted ComposeMany invocation.
768 return google::cloud::internal::apply(
769 GetObjectMetadataApplyHelper{client, std::move(bucket_name),
770 std::move(object_name)},
771 std::move(get_metadata_options));
772 };
773 return Composer(std::move(composer));
774}
775
776template <typename... Options>
777StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::CreateNew(
778 Client client, std::string const& bucket_name,
779 std::string const& object_name, std::size_t num_shards,
780 std::string const& prefix, std::string extra_state,
781 std::tuple<Options...> const& options) {
782 using internal::StaticTupleFilter;
783
784 auto get_object_meta_options = StaticTupleFilter<
786 IfMetagenerationNotMatch, UserProject>::TPred>(options);
787 auto object_meta = google::cloud::internal::apply(
788 GetObjectMetadataApplyHelper{client, bucket_name, object_name},
789 std::move(get_object_meta_options));
790 if (!object_meta && object_meta.status().code() != StatusCode::kNotFound) {
791 return object_meta.status();
792 }
793 std::int64_t expected_generation =
794 object_meta ? object_meta->generation() : 0;
795
796 auto deleter = CreateDeleter(client, bucket_name, options);
797 auto composer = CreateComposer(client, bucket_name, object_name,
798 expected_generation, prefix, options);
799 auto internal_state = std::make_shared<ParallelUploadStateImpl>(
800 false, object_name, expected_generation, deleter, std::move(composer));
801 internal_state->set_custom_data(std::move(extra_state));
802
803 std::vector<ObjectWriteStream> streams;
804
805 auto upload_options = std::tuple_cat(
806 StaticTupleFilter<
809 UserProject, WithObjectMetadata>::TPred>(options),
810 std::make_tuple(UseResumableUploadSession("")));
811 for (std::size_t i = 0; i < num_shards; ++i) {
812 ResumableUploadRequest request(
813 bucket_name, prefix + ".upload_shard_" + std::to_string(i));
814 google::cloud::internal::apply(SetOptionsApplyHelper(request),
815 upload_options);
816 auto stream = internal_state->CreateStream(
817 internal::ClientImplDetails::GetRawClient(client), request);
818 if (!stream) {
819 return stream.status();
820 }
821 streams.emplace_back(*std::move(stream));
822 }
823
824 auto state_object_name = prefix + ".upload_state";
825 auto insert_options = std::tuple_cat(
826 std::make_tuple(IfGenerationMatch(0)),
827 StaticTupleFilter<
829 UserProject, WithObjectMetadata>::TPred>(options));
830 auto state_object = google::cloud::internal::apply(
831 InsertObjectApplyHelper{client, bucket_name, state_object_name,
832 internal_state->ToPersistentState().ToString()},
833 std::move(insert_options));
834 if (!state_object) {
835 internal_state->Fail(state_object.status());
836 return std::move(state_object).status();
837 }
838 std::string resumable_session_id = session_id_prefix() + state_object_name +
839 ":" +
840 std::to_string(state_object->generation());
841 internal_state->set_resumable_session_id(resumable_session_id);
842 deleter->Add(std::move(*state_object));
843 return ResumableParallelUploadState(std::move(resumable_session_id),
844 std::move(internal_state),
845 std::move(streams));
846}
847
848StatusOr<std::pair<std::string, std::int64_t>> ParseResumableSessionId(
849 std::string const& session_id);
850
851template <typename... Options>
852StatusOr<ResumableParallelUploadState> ResumableParallelUploadState::Resume(
853 Client client, std::string const& bucket_name,
854 std::string const& object_name, std::size_t num_shards,
855 std::string const& prefix, std::string resumable_session_id,
856 std::tuple<Options...> options) {
857 using internal::StaticTupleFilter;
858
859 auto state_and_gen = ParseResumableSessionId(resumable_session_id);
860 if (!state_and_gen) {
861 return state_and_gen.status();
862 }
863
864 auto read_options = std::tuple_cat(
865 StaticTupleFilter<Among<DisableCrc32cChecksum, DisableMD5Hash,
867 options),
868 std::make_tuple(IfGenerationMatch(state_and_gen->second)));
869
870 auto state_stream = google::cloud::internal::apply(
871 ReadObjectApplyHelper{client, bucket_name, state_and_gen->first},
872 std::move(read_options));
873 std::string state_string(std::istreambuf_iterator<char>{state_stream}, {});
874 state_stream.Close();
875
876 auto persistent_state =
877 ParallelUploadPersistentState::FromString(state_string);
878 if (!persistent_state) {
879 return persistent_state.status();
880 }
881
882 if (persistent_state->destination_object_name != object_name) {
883 return Status(StatusCode::kInternal,
884 "Specified resumable session ID is doesn't match the "
885 "destination object name (" +
886 object_name + " vs " +
887 persistent_state->destination_object_name + ")");
888 }
889 if (persistent_state->streams.size() != num_shards && num_shards != 0) {
890 return Status(StatusCode::kInternal,
891 "Specified resumable session ID is doesn't match the "
892 "previously specified number of shards (" +
893 std::to_string(num_shards) + " vs " +
894 std::to_string(persistent_state->streams.size()) + ")");
895 }
896
897 auto deleter = CreateDeleter(client, bucket_name, options);
898 deleter->Add(state_and_gen->first, state_and_gen->second);
899 auto composer =
900 CreateComposer(client, bucket_name, object_name,
901 persistent_state->expected_generation, prefix, options);
902 auto internal_state = std::make_shared<ParallelUploadStateImpl>(
903 false, object_name, persistent_state->expected_generation, deleter,
904 std::move(composer));
905 internal_state->set_custom_data(std::move(persistent_state->custom_data));
906 internal_state->set_resumable_session_id(resumable_session_id);
907 // If a resumed stream is already finalized, callbacks from streams will be
908 // executed immediately. We don't want them to trigger composition before all
909 // of them are created.
910 internal_state->PreventFromFinishing();
911 std::vector<ObjectWriteStream> streams;
912
913 auto upload_options = StaticTupleFilter<
916 WithObjectMetadata>::TPred>(std::move(options));
917 for (auto& stream_desc : persistent_state->streams) {
918 ResumableUploadRequest request(bucket_name,
919 std::move(stream_desc.object_name));
920 google::cloud::internal::apply(
921 SetOptionsApplyHelper(request),
922 std::tuple_cat(upload_options,
923 std::make_tuple(UseResumableUploadSession(
924 std::move(stream_desc.resumable_session_id)))));
925 auto stream = internal_state->CreateStream(
926 internal::ClientImplDetails::GetRawClient(client), request);
927 if (!stream) {
928 internal_state->AllowFinishing();
929 return stream.status();
930 }
931 streams.emplace_back(*std::move(stream));
932 }
933
934 internal_state->AllowFinishing();
935 return ResumableParallelUploadState(std::move(resumable_session_id),
936 std::move(internal_state),
937 std::move(streams));
938}
939
940template <typename... Options>
941std::vector<std::uintmax_t> ComputeParallelFileUploadSplitPoints(
942 std::uintmax_t file_size, std::tuple<Options...> const& options) {
943 auto div_ceil = [](std::uintmax_t dividend, std::uintmax_t divisor) {
944 return (dividend + divisor - 1) / divisor;
945 };
946 // These defaults were obtained by experiments summarized in
947 // https://github.com/googleapis/google-cloud-cpp/issues/2951#issuecomment-566237128
948 MaxStreams const default_max_streams(64);
949 MinStreamSize const default_min_stream_size(32 * 1024 * 1024);
950
951 auto const min_stream_size =
952 (std::max<std::uintmax_t>)(1, ExtractFirstOccurrenceOfType<MinStreamSize>(
953 options)
954 .value_or(default_min_stream_size)
955 .value());
956 auto const max_streams = ExtractFirstOccurrenceOfType<MaxStreams>(options)
957 .value_or(default_max_streams)
958 .value();
959
960 auto const wanted_num_streams =
961 (std::max<
962 std::uintmax_t>)(1, (std::min<std::uintmax_t>)(max_streams,
963 div_ceil(
964 file_size,
965 min_stream_size)));
966
967 auto const stream_size =
968 (std::max<std::uintmax_t>)(1, div_ceil(file_size, wanted_num_streams));
969
970 std::vector<std::uintmax_t> res;
971 for (auto split = stream_size; split < file_size; split += stream_size) {
972 res.push_back(split);
973 }
974 return res;
975}
976
977std::string ParallelFileUploadSplitPointsToString(
978 std::vector<std::uintmax_t> const& split_points);
979
980StatusOr<std::vector<std::uintmax_t>> ParallelFileUploadSplitPointsFromString(
981 std::string const& s);
982
983/**
984 * Helper functor to call `PrepareParallelUpload` via `apply`.
985 *
986 * This object holds only references to objects, hence it should not be stored.
987 * Instead, it should be used only as a transient object allowing for calling
988 * `PrepareParallelUpload` via `apply`.
989 */
990struct PrepareParallelUploadApplyHelper {
991 // Some gcc versions crash on using decltype for return type here.
992 template <typename... Options>
993 StatusOr<typename std::conditional<
994 Among<typename std::decay<Options>::type...>::template TPred<
996 ResumableParallelUploadState, NonResumableParallelUploadState>::type>
997 operator()(Options&&... options) {
998 return PrepareParallelUpload(std::move(client), bucket_name, object_name,
999 num_shards, prefix,
1000 std::forward<Options>(options)...);
1001 }
1002
1003 Client client;
1004 std::string const& bucket_name;
1005 std::string const& object_name;
1006 std::size_t num_shards;
1007 std::string const& prefix;
1008};
1009
1010using ParallelUploadFileSupportedOptions = google::cloud::internal::TypeList<
1014
1015template <typename T>
1016using SupportsParallelOption =
1017 google::cloud::internal::TypeListHasType<ParallelUploadFileSupportedOptions,
1018 std::decay_t<T>>;
1019
1020template <typename... Provided>
1021struct IsOptionSupportedWithParallelUpload
1022 : std::integral_constant<
1023 bool,
1024 std::is_same<
1025 std::tuple_size<std::tuple<Provided...>>,
1026 std::tuple_size<typename google::cloud::internal::TypeListFilter<
1027 SupportsParallelOption, std::tuple<Provided...>>::type>>::
1028 value> {};
1029
1030struct CreateParallelUploadShards {
1031 /**
1032 * Prepare a parallel upload of a given file.
1033 *
1034 * The returned opaque objects reflect computed shards of the given file. Each
1035 * of them has an `Upload()` member function which will perform the upload of
1036 * that shard. You should parallelize running this function on them according
1037 * to your needs. You can affect how many shards will be created by using the
1038 * `MaxStreams` and `MinStreamSize` options.
1039 *
1040 * Any of the returned objects can be used for obtaining the metadata of the
1041 * resulting object.
1042 *
1043 * @param client the client on which to perform the operation.
1044 * @param file_name the path to the file to be uploaded
1045 * @param bucket_name the name of the bucket that will contain the object.
1046 * @param object_name the uploaded object name.
1047 * @param prefix the prefix with which temporary objects will be created.
1048 * @param options a list of optional query parameters and/or request headers.
1049 * Valid types for this operation include `DestinationPredefinedAcl`,
1050 * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
1051 * `KmsKeyName`, `MaxStreams, `MinStreamSize`, `QuotaUser`, `UserIp`,
1052 * `UserProject`, `WithObjectMetadata`, `UseResumableUploadSession`.
1053 *
1054 * @return the shards of the input file to be uploaded in parallel
1055 *
1056 * @par Idempotency
1057 * This operation is not idempotent. While each request performed by this
1058 * function is retried based on the client policies, the operation itself
1059 * stops on the first request that fails.
1060 *
1061 * @par Example
1062 * @snippet storage_object_file_transfer_samples.cc parallel upload file
1063 */
1064 template <typename... Options>
1065 static StatusOr<std::vector<ParallelUploadFileShard>> Create(
1066 Client client, // NOLINT(performance-unnecessary-value-param)
1067 std::string file_name, std::string const& bucket_name,
1068 std::string const& object_name, std::string const& prefix,
1069 Options&&... options) {
1070 std::error_code size_err;
1071 auto file_size = google::cloud::internal::file_size(file_name, size_err);
1072 if (size_err) {
1073 return Status(StatusCode::kNotFound, size_err.message());
1074 }
1075
1076 auto const resumable_session_id_arg =
1077 ExtractFirstOccurrenceOfType<UseResumableUploadSession>(
1078 std::tie(options...));
1079 bool const new_session = !resumable_session_id_arg ||
1080 resumable_session_id_arg.value().value().empty();
1081 auto upload_options =
1082 StaticTupleFilter<NotAmong<MaxStreams, MinStreamSize>::TPred>(
1083 std::tie(options...));
1084
1085 std::vector<uintmax_t> file_split_points;
1086 std::size_t num_shards = 0;
1087 if (new_session) {
1088 file_split_points =
1089 ComputeParallelFileUploadSplitPoints(file_size, std::tie(options...));
1090 num_shards = file_split_points.size() + 1;
1091 }
1092
1093 // Create the upload state.
1094 auto state = google::cloud::internal::apply(
1095 PrepareParallelUploadApplyHelper{client, bucket_name, object_name,
1096 num_shards, prefix},
1097 std::tuple_cat(
1098 std::move(upload_options),
1099 std::make_tuple(ParallelUploadExtraPersistentState(
1100 ParallelFileUploadSplitPointsToString(file_split_points)))));
1101 if (!state) {
1102 return state.status();
1103 }
1104
1105 if (!new_session) {
1106 // We need to recreate the split points of the file.
1107 auto maybe_split_points =
1108 ParallelFileUploadSplitPointsFromString(state->impl_->custom_data());
1109 if (!maybe_split_points) {
1110 state->Fail(maybe_split_points.status());
1111 return std::move(maybe_split_points).status();
1112 }
1113 file_split_points = *std::move(maybe_split_points);
1114 }
1115
1116 // Everything ready - we've got the shared state and the files open, let's
1117 // prepare the returned objects.
1118 auto upload_buffer_size =
1119 google::cloud::storage::internal::ClientImplDetails::GetRawClient(
1120 client)
1121 ->options()
1123
1124 file_split_points.emplace_back(file_size);
1125 assert(file_split_points.size() == state->shards().size());
1126 std::vector<ParallelUploadFileShard> res;
1127 std::uintmax_t offset = 0;
1128 std::size_t shard_idx = 0;
1129 for (auto shard_end : file_split_points) {
1130 res.emplace_back(ParallelUploadFileShard(
1131 state->impl_, std::move(state->shards()[shard_idx++]), file_name,
1132 offset, shard_end - offset, upload_buffer_size));
1133 offset = shard_end;
1134 }
1135 return res;
1136 }
1137};
1138
1139/// @copydoc CreateParallelUploadShards::Create()
1140template <typename... Options>
1141StatusOr<std::vector<ParallelUploadFileShard>> CreateUploadShards(
1142 Client client, std::string file_name, std::string const& bucket_name,
1143 std::string const& object_name, std::string const& prefix,
1144 Options&&... options) {
1145 return CreateParallelUploadShards::Create(
1146 std::move(client), std::move(file_name), bucket_name, object_name, prefix,
1147 std::forward<Options>(options)...);
1148}
1149
1150} // namespace internal
1151
1152/**
1153 * Perform a parallel upload of a given file.
1154 *
1155 * You can affect how many shards will be created by using the `MaxStreams` and
1156 * `MinStreamSize` options.
1157 *
1158 * @param client the client on which to perform the operation.
1159 * @param file_name the path to the file to be uploaded
1160 * @param bucket_name the name of the bucket that will contain the object.
1161 * @param object_name the uploaded object name.
1162 * @param prefix the prefix with which temporary objects will be created.
1163 * @param ignore_cleanup_failures treat failures to cleanup the temporary
1164 * objects as not fatal.
1165 * @param options a list of optional query parameters and/or request headers.
1166 * Valid types for this operation include `DestinationPredefinedAcl`,
1167 * `EncryptionKey`, `IfGenerationMatch`, `IfMetagenerationMatch`,
1168 * `KmsKeyName`, `MaxStreams, `MinStreamSize`, `QuotaUser`, `UserIp`,
1169 * `UserProject`, `WithObjectMetadata`, `UseResumableUploadSession`.
1170 *
1171 * @return the metadata of the object created by the upload.
1172 *
1173 * @par Idempotency
1174 * This operation is not idempotent. While each request performed by this
1175 * function is retried based on the client policies, the operation itself stops
1176 * on the first request that fails.
1177 *
1178 * @par Example
1179 * @snippet storage_object_file_transfer_samples.cc parallel upload file
1180 */
1181template <typename... Options>
1183 Client client, std::string file_name, std::string bucket_name,
1184 std::string object_name, std::string prefix, bool ignore_cleanup_failures,
1185 Options&&... options) {
1186 static_assert(
1187 internal::IsOptionSupportedWithParallelUpload<Options...>::value,
1188 "Provided Option not found in ParallelUploadFileSupportedOptions.");
1189
1190 auto shards = internal::CreateParallelUploadShards::Create(
1191 std::move(client), std::move(file_name), std::move(bucket_name),
1192 std::move(object_name), std::move(prefix),
1193 std::forward<Options>(options)...);
1194 if (!shards) {
1195 return shards.status();
1196 }
1197
1198 std::vector<std::thread> threads;
1199 threads.reserve(shards->size());
1200 for (auto& shard : *shards) {
1201 threads.emplace_back([&shard] {
1202 // We can safely ignore the status - if something fails we'll know
1203 // when obtaining final metadata.
1204 shard.Upload();
1205 });
1206 }
1207 for (auto& thread : threads) {
1208 thread.join();
1209 }
1210 auto res = (*shards)[0].WaitForCompletion().get();
1211 auto cleanup_res = (*shards)[0].EagerCleanup();
1212 if (!cleanup_res.ok() && !ignore_cleanup_failures) {
1213 return cleanup_res;
1214 }
1215 return res;
1216}
1217
1218GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
1219} // namespace storage
1220} // namespace cloud
1221} // namespace google
1222
1223#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STORAGE_PARALLEL_UPLOAD_H
ValueTypeT< T > const & get() const
Status const & status() const &
Status(StatusCode code, std::string message, ErrorInfo info={})
friend friend class future
The Google Cloud Storage (GCS) Client.
Definition: client.h:263
A parameter type indicating the maximum number of streams to ParallelUploadFile.
Definition: parallel_upload.h:46
std::size_t value() const
Definition: parallel_upload.h:49
MaxStreams(std::size_t value)
Definition: parallel_upload.h:48
A parameter type indicating the minimum stream size to ParallelUploadFile.
Definition: parallel_upload.h:62
std::uintmax_t value() const
Definition: parallel_upload.h:65
MinStreamSize(std::uintmax_t value)
Definition: parallel_upload.h:64
Represents the metadata for a Google Cloud Storage Object.
Definition: object_metadata.h:94
Defines a std::basic_istream<char> to read from a GCS Object.
Definition: object_read_stream.h:33
Defines a std::basic_ostream<char> to write to a GCS object.
Definition: object_write_stream.h:131
ObjectWriteStream(ObjectWriteStream &&rhs) noexcept
Contains all the Google Cloud Storage C++ client APIs.
Definition: auto_finalize.h:24
StatusOr< ObjectMetadata > ParallelUploadFile(Client client, std::string file_name, std::string bucket_name, std::string object_name, std::string prefix, bool ignore_cleanup_failures, Options &&... options)
Perform a parallel upload of a given file.
Definition: parallel_upload.h:1182
Defines one of the source objects for a compose operation.
Definition: object_metadata.h:44
Sets the contentEncoding option for object uploads.
Definition: well_known_parameters.h:83
Set the MIME content type of an object.
Definition: well_known_headers.h:74
Set the ACL to a predefined value when copying Objects.
Definition: well_known_parameters.h:357
Disable CRC32C checksum computations.
Definition: hashing_options.h:139
Disable or enable MD5 Hashing computations.
Definition: hashing_options.h:74
An optional parameter to set the Customer-Supplied Encryption key.
Definition: well_known_headers.h:183
Set the version of an object to operate on.
Definition: well_known_parameters.h:145
A pre-condition: the request succeeds only if the object generation matches.
Definition: well_known_parameters.h:157
A pre-condition: the request succeeds unless the object generation matches.
Definition: well_known_parameters.h:169
A pre-condition: the request succeeds if the metadata generation matches.
Definition: well_known_parameters.h:181
A pre-condition: the request succeeds unless the metadata generation matches.
Definition: well_known_parameters.h:194
Configure the Customer-Managed Encryption Key (CMEK) for an upload.
Definition: well_known_parameters.h:274
Set the ACL to predefined values when creating Buckets or Objects.
Definition: well_known_parameters.h:320
Sets the user for this operation for quota enforcement purposes.
Definition: well_known_parameters.h:524
Control the formatted I/O upload buffer.
Definition: options.h:179
Request a resumable upload, restoring a previous session if necessary.
Definition: upload_options.h:38
Sets the user IP on an operation for quota enforcement purposes.
Definition: user_ip_option.h:43
Set the project used for billing in "requester pays" Buckets.
Definition: well_known_parameters.h:573
A request option to define the object metadata attributes.
Definition: object_metadata.h:622