Google Cloud Bigtable C++ Client  1.42.0
A C++ Client Library for Google Cloud Bigtable
mutation_batcher.h
Go to the documentation of this file.
1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_MUTATION_BATCHER_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_MUTATION_BATCHER_H
17 
18 #include "google/cloud/bigtable/client_options.h"
19 #include "google/cloud/bigtable/completion_queue.h"
20 #include "google/cloud/bigtable/mutations.h"
21 #include "google/cloud/bigtable/table.h"
22 #include "google/cloud/bigtable/version.h"
23 #include "google/cloud/status.h"
24 #include "absl/memory/memory.h"
25 #include <google/bigtable/v2/bigtable.pb.h>
26 #include <deque>
27 #include <functional>
28 #include <memory>
29 #include <queue>
30 #include <vector>
31 
32 namespace google {
33 namespace cloud {
34 namespace bigtable {
36 /**
37  * Objects of this class pack single row mutations into bulk mutations.
38  *
39  * In order to maximize throughput when applying a lot of mutations to Cloud
40  * Bigtable, one should pack the mutations in `BulkMutations`. This class helps
41  * in doing so. Create a `MutationBatcher` and use
42  * `MutationBatcher::AsyncApply()` to apply a large stream of mutations to the
43  * same `Table`. Objects of this class will efficiently create batches of
44  * `SingleRowMutations` and maintain multiple batches "in flight".
45  *
46  * This class also offers an easy-to-use flow control mechanism to avoid
47  * unbounded growth in its internal buffers.
48  *
49  * Applications must provide a `CompletionQueue` to (asynchronously) execute
50  * these operations. The application is responsible of executing the
51  * `CompletionQueue` event loop in one or more threads.
52  *
53  * @par Thread-safety
54  * Instances of this class are guaranteed to work when accessed concurrently
55  * from multiple threads.
56  */
58  public:
59  /// Configuration for `MutationBatcher`.
60  struct Options {
62 
63  /// A single RPC will not have more mutations than this.
64  Options& SetMaxMutationsPerBatch(size_t max_mutations_per_batch_arg);
65 
66  /// Sum of mutations' sizes in a single RPC will not be larger than this.
67  Options& SetMaxSizePerBatch(size_t max_size_per_batch_arg) {
68  max_size_per_batch = max_size_per_batch_arg;
69  return *this;
70  }
71 
72  /// There will be no more RPCs outstanding (except for retries) than this.
73  Options& SetMaxBatches(size_t max_batches_arg) {
74  max_batches = max_batches_arg;
75  return *this;
76  }
77 
78  /// MutationBatcher will at most admit mutations of this total size.
79  Options& SetMaxOutstandingSize(size_t max_outstanding_size_arg) {
80  max_outstanding_size = max_outstanding_size_arg;
81  return *this;
82  }
83 
84  /// MutationBatcher will at most admit this many mutations.
85  Options& SetMaxOutstandingMutations(size_t max_outstanding_mutations_arg);
86 
88  std::size_t max_size_per_batch;
89  std::size_t max_batches;
90  std::size_t max_outstanding_size;
92  };
93 
94  explicit MutationBatcher(Table table, Options options = Options())
95  : table_(std::move(table)),
96  options_(options),
97  cur_batch_(std::make_shared<Batch>()) {}
98 
99  virtual ~MutationBatcher() = default;
100 
101  /**
102  * Asynchronously apply mutation.
103  *
104  * The mutation will most likely be batched together with others to optimize
105  * for throughput. As a result, latency is likely to be worse than
106  * `Table::AsyncApply`.
107  *
108  * @param mut the mutation. Note that this function takes ownership
109  * (and then discards) the data in the mutation. In general, a
110  * `SingleRowMutation` can be used to modify and/or delete
111  * multiple cells, across different columns and column families.
112  * @param cq the completion queue that will execute the asynchronous
113  * calls, the application must ensure that one or more threads are
114  * blocked on `cq.Run()`.
115  *
116  * @return *admission* and *completion* futures
117  *
118  * The *completion* future will report the mutation's status once it
119  * completes.
120  *
121  * The *admission* future should be used for flow control. In order to bound
122  * the memory usage used by `MutationBatcher`, one should not submit more
123  * mutations before the *admission* future is satisfied. Note that while the
124  * future is often already satisfied when the function returns, applications
125  * should not assume that this is always the case.
126  *
127  * One should not make assumptions on which future will be satisfied first.
128  *
129  * This quasi-synchronous example shows the intended use:
130  * @code
131  * bigtable::MutationBatcher batcher(bigtable::Table(...args...));
132  * bigtable::CompletionQueue cq;
133  * std::thread cq_runner([]() { cq.Run(); });
134  *
135  * while (HasMoreMutations()) {
136  * auto admission_completion = batcher.AsyncApply(cq, GenerateMutation());
137  * auto& admission_future = admission_completion.first;
138  * auto& completion_future = admission_completion.second;
139  * completion_future.then([](future<Status> completion_status) {
140  * // handle mutation completion asynchronously
141  * });
142  * // Potentially slow down submission not to make buffers in
143  * // MutationBatcher grow unbounded.
144  * admission_future.get();
145  * }
146  * // Wait for all mutations to complete
147  * batcher.AsyncWaitForNoPendingRequests().get();
148  * cq.Shutdown();
149  * cq_runner.join();
150  * @endcode
151  */
153  SingleRowMutation mut);
154 
155  /**
156  * Asynchronously wait until all submitted mutations complete.
157  *
158  * @return a future which will be satisfied once all mutations submitted
159  * before calling this function finish; if there are no such operations,
160  * the returned future is already satisfied.
161  */
163 
164  protected:
165  // Wrap calling underlying operation in a virtual function to ease testing.
166  virtual future<std::vector<FailedMutation>> AsyncBulkApplyImpl(
167  Table& table, BulkMutation&& mut);
168 
169  private:
170  using CompletionPromise = promise<Status>;
171  using AdmissionPromise = promise<void>;
172  using NoMorePendingPromise = promise<void>;
173  struct Batch;
174 
175  /**
176  * This structure represents a single mutation before it is admitted.
177  */
178  struct PendingSingleRowMutation {
179  PendingSingleRowMutation(SingleRowMutation mut_arg,
180  CompletionPromise completion_promise,
181  AdmissionPromise admission_promise);
182 
183  SingleRowMutation mut;
184  size_t num_mutations;
185  size_t request_size;
186  CompletionPromise completion_promise;
187  AdmissionPromise admission_promise;
188  };
189 
190  /**
191  * A mutation that has been sent to the Cloud Bigtable service.
192  *
193  * We need to save the `CompletionPromise` associated with each mutation.
194  * Because only failures are reported, we need to track whether the mutation
195  * is "done", so we can simulate a success report.
196  */
197  struct MutationData {
198  explicit MutationData(PendingSingleRowMutation pending)
199  : completion_promise(std::move(pending.completion_promise)) {}
200  CompletionPromise completion_promise;
201  bool done = false;
202  };
203 
204  /**
205  * This class represents a single batch of mutations sent in one RPC.
206  *
207  * Objects of this class hold the accumulated mutations, their completion
208  * promises and basic statistics.
209  *
210  * Objects of this class don't need separate synchronization.
211  * There are 2 important stages of these objects' lifecycle: when mutations
212  * are accumulated and when the batch is worked on by `AsyncBulkApply`. In the
213  * first stage, `MutationBatcher`'s synchronization ensures that its data is
214  * not accessed from multiple threads. In the second stage we rely on the fact
215  * that `AsyncBulkApply` invokes the callbacks serially. This in turn
216  * relies on the fact that `CompletionQueue` invokes callbacks from a
217  * streaming response in sequence and that `AsyncRetryOp` doesn't schedule
218  * another attempt before invoking callbacks for the previous one.
219  */
220  struct Batch {
221  Batch() = default;
222 
223  std::size_t num_mutations = 0;
224  std::size_t requests_size = 0;
225  BulkMutation requests;
226  std::vector<MutationData> mutation_data;
227  };
228 
229  /// Check if a mutation doesn't exceed allowed limits.
230  grpc::Status IsValid(PendingSingleRowMutation& mut) const;
231 
232  /**
233  * Check whether there is space for the passed mutation in the currently
234  * constructed batch.
235  */
236  bool HasSpaceFor(PendingSingleRowMutation const& mut) const;
237 
238  /**
239  * Check if one can append a mutation to the currently constructed batch.
240  * Even if there is space for the mutation, we shouldn't append mutations if
241  * some other are not admitted yet.
242  */
243  bool CanAppendToBatch(PendingSingleRowMutation const& mut) const {
244  // If some mutations are already subject to flow control, don't admit any
245  // new, even if there's space for them. Otherwise we might starve big
246  // mutations.
247  return pending_mutations_.empty() && HasSpaceFor(mut);
248  }
249 
250  /**
251  * Send the currently constructed batch if there are not too many outstanding
252  * already. If there are no mutations in the batch, it's a noop.
253  */
254  bool FlushIfPossible(CompletionQueue cq);
255 
256  /// Handle a completed batch.
257  void OnBulkApplyDone(CompletionQueue cq, MutationBatcher::Batch batch,
258  std::vector<FailedMutation> const& failed);
259 
260  /**
261  * Try to move mutations waiting in `pending_mutations_` to the currently
262  * constructed batch.
263  *
264  * @return the admission promises of the newly admitted mutations.
265  */
266  std::vector<MutationBatcher::AdmissionPromise> TryAdmit(CompletionQueue& cq);
267 
268  /**
269  * Append mutation `mut` to the currently constructed batch.
270  */
271  void Admit(PendingSingleRowMutation mut);
272 
273  /**
274  * Satisfies passed admission promises and potentially the promises of no more
275  * pending requests. Unlocks `lk`.
276  */
277  void SatisfyPromises(std::vector<AdmissionPromise>,
278  std::unique_lock<std::mutex>& lk);
279 
280  std::mutex mu_;
281  Table table_;
282  Options options_;
283 
284  /// Num batches sent but not completed.
285  std::size_t num_outstanding_batches_ = 0;
286  /// Size of admitted but uncompleted mutations.
287  std::size_t outstanding_size_ = 0;
288  /// Number of admitted but uncompleted mutations.
289  std::size_t outstanding_mutations_ = 0;
290  // Number of uncompleted SingleRowMutations (including not admitted).
291  std::size_t num_requests_pending_ = 0;
292 
293  /// Currently constructed batch of mutations.
294  std::shared_ptr<Batch> cur_batch_;
295 
296  /**
297  * These are the mutations which have not been admitted yet. If the user is
298  * properly reacting to `admission_promise`s, there should be very few of
299  * these (likely no more than one).
300  */
301  std::queue<PendingSingleRowMutation> pending_mutations_;
302 
303  /**
304  * The list of promises made to this point.
305  *
306  * These promises are satisfied as part of calling
307  * `AsyncWaitForNoPendingRequests()`.
308  */
309  std::vector<NoMorePendingPromise> no_more_pending_promises_;
310 };
311 
313 } // namespace bigtable
314 } // namespace cloud
315 } // namespace google
316 
317 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_MUTATION_BATCHER_H