Google Cloud Bigtable C++ Client 2.13.0
A C++ Client Library for Google Cloud Bigtable
Loading...
Searching...
No Matches
mutation_batcher.h
1// Copyright 2019 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_MUTATION_BATCHER_H
16#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_MUTATION_BATCHER_H
17
18#include "google/cloud/bigtable/client_options.h"
19#include "google/cloud/bigtable/completion_queue.h"
20#include "google/cloud/bigtable/mutations.h"
21#include "google/cloud/bigtable/table.h"
22#include "google/cloud/bigtable/version.h"
23#include "google/cloud/status.h"
24#include <google/bigtable/v2/bigtable.pb.h>
25#include <deque>
26#include <functional>
27#include <memory>
28#include <queue>
29#include <vector>
30
31namespace google {
32namespace cloud {
33namespace bigtable {
34GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
35/**
36 * Objects of this class pack single row mutations into bulk mutations.
37 *
38 * In order to maximize throughput when applying a lot of mutations to Cloud
39 * Bigtable, one should pack the mutations in `BulkMutations`. This class helps
40 * in doing so. Create a `MutationBatcher` and use
41 * `MutationBatcher::AsyncApply()` to apply a large stream of mutations to the
42 * same `Table`. Objects of this class will efficiently create batches of
43 * `SingleRowMutations` and maintain multiple batches "in flight".
44 *
45 * This class also offers an easy-to-use flow control mechanism to avoid
46 * unbounded growth in its internal buffers.
47 *
48 * Applications must provide a `CompletionQueue` to (asynchronously) execute
49 * these operations. The application is responsible of executing the
50 * `CompletionQueue` event loop in one or more threads.
51 *
52 * @par Thread-safety
53 * Instances of this class are guaranteed to work when accessed concurrently
54 * from multiple threads.
55 */
56class MutationBatcher {
57 public:
58 /// Configuration for `MutationBatcher`.
59 struct Options {
60 Options();
61
62 /// A single RPC will not have more mutations than this.
63 Options& SetMaxMutationsPerBatch(size_t max_mutations_per_batch_arg);
64
65 /// Sum of mutations' sizes in a single RPC will not be larger than this.
66 Options& SetMaxSizePerBatch(size_t max_size_per_batch_arg) {
67 max_size_per_batch = max_size_per_batch_arg;
68 return *this;
69 }
70
71 /// There will be no more RPCs outstanding (except for retries) than this.
72 Options& SetMaxBatches(size_t max_batches_arg) {
73 max_batches = max_batches_arg;
74 return *this;
75 }
76
77 /// MutationBatcher will at most admit mutations of this total size.
78 Options& SetMaxOutstandingSize(size_t max_outstanding_size_arg) {
79 max_outstanding_size = max_outstanding_size_arg;
80 return *this;
81 }
82
83 /// MutationBatcher will at most admit this many mutations.
84 Options& SetMaxOutstandingMutations(size_t max_outstanding_mutations_arg);
85
86 std::size_t max_mutations_per_batch;
87 std::size_t max_size_per_batch;
88 std::size_t max_batches;
89 std::size_t max_outstanding_size;
90 std::size_t max_outstanding_mutations;
91 };
92
93 explicit MutationBatcher(Table table, Options options = Options())
94 : table_(std::move(table)),
95 options_(options),
96 cur_batch_(std::make_shared<Batch>()) {}
97
98 virtual ~MutationBatcher() = default;
99
100 /**
101 * Asynchronously apply mutation.
102 *
103 * The mutation will most likely be batched together with others to optimize
104 * for throughput. As a result, latency is likely to be worse than
105 * `Table::AsyncApply`.
106 *
107 * @param mut the mutation. Note that this function takes ownership
108 * (and then discards) the data in the mutation. In general, a
109 * `SingleRowMutation` can be used to modify and/or delete
110 * multiple cells, across different columns and column families.
111 * @param cq the completion queue that will execute the asynchronous
112 * calls, the application must ensure that one or more threads are
113 * blocked on `cq.Run()`.
114 *
115 * @return *admission* and *completion* futures
116 *
117 * The *completion* future will report the mutation's status once it
118 * completes.
119 *
120 * The *admission* future should be used for flow control. In order to bound
121 * the memory usage used by `MutationBatcher`, one should not submit more
122 * mutations before the *admission* future is satisfied. Note that while the
123 * future is often already satisfied when the function returns, applications
124 * should not assume that this is always the case.
125 *
126 * One should not make assumptions on which future will be satisfied first.
127 *
128 * This quasi-synchronous example shows the intended use:
129 * @code
130 * bigtable::MutationBatcher batcher(bigtable::Table(...args...));
131 * bigtable::CompletionQueue cq;
132 * std::thread cq_runner([]() { cq.Run(); });
133 *
134 * while (HasMoreMutations()) {
135 * auto admission_completion = batcher.AsyncApply(cq, GenerateMutation());
136 * auto& admission_future = admission_completion.first;
137 * auto& completion_future = admission_completion.second;
138 * completion_future.then([](future<Status> completion_status) {
139 * // handle mutation completion asynchronously
140 * });
141 * // Potentially slow down submission not to make buffers in
142 * // MutationBatcher grow unbounded.
143 * admission_future.get();
144 * }
145 * // Wait for all mutations to complete
146 * batcher.AsyncWaitForNoPendingRequests().get();
147 * cq.Shutdown();
148 * cq_runner.join();
149 * @endcode
150 */
151 std::pair<future<void>, future<Status>> AsyncApply(CompletionQueue& cq,
153
154 /**
155 * Asynchronously wait until all submitted mutations complete.
156 *
157 * @return a future which will be satisfied once all mutations submitted
158 * before calling this function finish; if there are no such operations,
159 * the returned future is already satisfied.
160 */
162
163 protected:
164 // Wrap calling underlying operation in a virtual function to ease testing.
165 virtual future<std::vector<FailedMutation>> AsyncBulkApplyImpl(
166 Table& table, BulkMutation&& mut);
167
168 private:
169 using CompletionPromise = promise<Status>;
170 using AdmissionPromise = promise<void>;
171 using NoMorePendingPromise = promise<void>;
172 struct Batch;
173
174 /**
175 * This structure represents a single mutation before it is admitted.
176 */
177 struct PendingSingleRowMutation {
178 PendingSingleRowMutation(SingleRowMutation mut_arg,
179 CompletionPromise completion_promise,
180 AdmissionPromise admission_promise);
181
183 size_t num_mutations;
184 size_t request_size;
185 CompletionPromise completion_promise;
186 AdmissionPromise admission_promise;
187 };
188
189 /**
190 * A mutation that has been sent to the Cloud Bigtable service.
191 *
192 * We need to save the `CompletionPromise` associated with each mutation.
193 * Because only failures are reported, we need to track whether the mutation
194 * is "done", so we can simulate a success report.
195 */
196 struct MutationData {
197 explicit MutationData(PendingSingleRowMutation pending)
198 : completion_promise(std::move(pending.completion_promise)) {}
199 CompletionPromise completion_promise;
200 bool done = false;
201 };
202
203 /**
204 * This class represents a single batch of mutations sent in one RPC.
205 *
206 * Objects of this class hold the accumulated mutations, their completion
207 * promises and basic statistics.
208 *
209 * Objects of this class don't need separate synchronization.
210 * There are 2 important stages of these objects' lifecycle: when mutations
211 * are accumulated and when the batch is worked on by `AsyncBulkApply`. In the
212 * first stage, `MutationBatcher`'s synchronization ensures that its data is
213 * not accessed from multiple threads. In the second stage we rely on the fact
214 * that `AsyncBulkApply` invokes the callbacks serially. This in turn
215 * relies on the fact that `CompletionQueue` invokes callbacks from a
216 * streaming response in sequence and that `AsyncRetryOp` doesn't schedule
217 * another attempt before invoking callbacks for the previous one.
218 */
219 struct Batch {
220 Batch() = default;
221
222 std::size_t num_mutations = 0;
223 std::size_t requests_size = 0;
224 BulkMutation requests;
225 std::vector<MutationData> mutation_data;
226 };
227
228 /// Check if a mutation doesn't exceed allowed limits.
229 grpc::Status IsValid(PendingSingleRowMutation& mut) const;
230
231 /**
232 * Check whether there is space for the passed mutation in the currently
233 * constructed batch.
234 */
235 bool HasSpaceFor(PendingSingleRowMutation const& mut) const;
236
237 /**
238 * Check if one can append a mutation to the currently constructed batch.
239 * Even if there is space for the mutation, we shouldn't append mutations if
240 * some other are not admitted yet.
241 */
242 bool CanAppendToBatch(PendingSingleRowMutation const& mut) const {
243 // If some mutations are already subject to flow control, don't admit any
244 // new, even if there's space for them. Otherwise we might starve big
245 // mutations.
246 return pending_mutations_.empty() && HasSpaceFor(mut);
247 }
248
249 /**
250 * Send the currently constructed batch if there are not too many outstanding
251 * already. If there are no mutations in the batch, it's a noop.
252 */
253 bool FlushIfPossible(CompletionQueue cq);
254
255 /// Handle a completed batch.
256 void OnBulkApplyDone(CompletionQueue cq, MutationBatcher::Batch batch,
257 std::vector<FailedMutation> const& failed);
258
259 /**
260 * Try to move mutations waiting in `pending_mutations_` to the currently
261 * constructed batch.
262 *
263 * @return the admission promises of the newly admitted mutations.
264 */
265 std::vector<MutationBatcher::AdmissionPromise> TryAdmit(CompletionQueue& cq);
266
267 /**
268 * Append mutation `mut` to the currently constructed batch.
269 */
270 void Admit(PendingSingleRowMutation mut);
271
272 /**
273 * Satisfies passed admission promises and potentially the promises of no more
274 * pending requests. Unlocks `lk`.
275 */
276 void SatisfyPromises(std::vector<AdmissionPromise>,
277 std::unique_lock<std::mutex>& lk);
278
279 std::mutex mu_;
280 Table table_;
281 Options options_;
282
283 /// Num batches sent but not completed.
284 std::size_t num_outstanding_batches_ = 0;
285 /// Size of admitted but uncompleted mutations.
286 std::size_t outstanding_size_ = 0;
287 /// Number of admitted but uncompleted mutations.
288 std::size_t outstanding_mutations_ = 0;
289 // Number of uncompleted SingleRowMutations (including not admitted).
290 std::size_t num_requests_pending_ = 0;
291
292 /// Currently constructed batch of mutations.
293 std::shared_ptr<Batch> cur_batch_;
294
295 /**
296 * These are the mutations which have not been admitted yet. If the user is
297 * properly reacting to `admission_promise`s, there should be very few of
298 * these (likely no more than one).
299 */
300 std::queue<PendingSingleRowMutation> pending_mutations_;
301
302 /**
303 * The list of promises made to this point.
304 *
305 * These promises are satisfied as part of calling
306 * `AsyncWaitForNoPendingRequests()`.
307 */
308 std::vector<NoMorePendingPromise> no_more_pending_promises_;
309};
310
311GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
312} // namespace bigtable
313} // namespace cloud
314} // namespace google
315
316#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_MUTATION_BATCHER_H
Represent a set of mutations across multiple rows.
Definition: mutations.h:492
A SingleRowMutation that failed.
Definition: mutations.h:409
Objects of this class pack single row mutations into bulk mutations.
Definition: mutation_batcher.h:56
MutationBatcher(Table table, Options options=Options())
Definition: mutation_batcher.h:93
std::pair< future< void >, future< Status > > AsyncApply(CompletionQueue &cq, SingleRowMutation mut)
Asynchronously apply mutation.
virtual future< std::vector< FailedMutation > > AsyncBulkApplyImpl(Table &table, BulkMutation &&mut)
future< void > AsyncWaitForNoPendingRequests()
Asynchronously wait until all submitted mutations complete.
Represent a single row mutation.
Definition: mutations.h:296
The main interface to interact with data in a Cloud Bigtable table.
Definition: table.h:166
friend friend class future
promise(promise &&)=default
Contains all the Cloud Bigtable C++ client APIs.
Definition: admin_client.h:28
Configuration for MutationBatcher.
Definition: mutation_batcher.h:59
Options & SetMaxMutationsPerBatch(size_t max_mutations_per_batch_arg)
A single RPC will not have more mutations than this.
std::size_t max_outstanding_mutations
Definition: mutation_batcher.h:90
Options & SetMaxOutstandingMutations(size_t max_outstanding_mutations_arg)
MutationBatcher will at most admit this many mutations.
std::size_t max_batches
Definition: mutation_batcher.h:88
std::size_t max_outstanding_size
Definition: mutation_batcher.h:89
std::size_t max_size_per_batch
Definition: mutation_batcher.h:87
Options & SetMaxBatches(size_t max_batches_arg)
There will be no more RPCs outstanding (except for retries) than this.
Definition: mutation_batcher.h:72
std::size_t max_mutations_per_batch
Definition: mutation_batcher.h:86
Options & SetMaxSizePerBatch(size_t max_size_per_batch_arg)
Sum of mutations' sizes in a single RPC will not be larger than this.
Definition: mutation_batcher.h:66
Options & SetMaxOutstandingSize(size_t max_outstanding_size_arg)
MutationBatcher will at most admit mutations of this total size.
Definition: mutation_batcher.h:78