15#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_MUTATION_BATCHER_H
16#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_BIGTABLE_MUTATION_BATCHER_H
18#include "google/cloud/bigtable/client_options.h"
19#include "google/cloud/bigtable/completion_queue.h"
20#include "google/cloud/bigtable/mutations.h"
21#include "google/cloud/bigtable/table.h"
22#include "google/cloud/bigtable/version.h"
23#include "google/cloud/status.h"
24#include <google/bigtable/v2/bigtable.pb.h>
34GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
94 : table_(std::move(table)),
96 cur_batch_(std::make_shared<Batch>()) {}
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
155
156
157
158
159
160
169 using CompletionPromise = promise<
Status>;
170 using AdmissionPromise = promise<
void>;
171 using NoMorePendingPromise = promise<
void>;
175
176
177 struct PendingSingleRowMutation {
179 CompletionPromise completion_promise,
180 AdmissionPromise admission_promise);
183 size_t num_mutations;
185 CompletionPromise completion_promise;
186 AdmissionPromise admission_promise;
190
191
192
193
194
195
196 struct MutationData {
197 explicit MutationData(PendingSingleRowMutation pending)
198 : completion_promise
(std::move(pending.completion_promise)
) {}
199 CompletionPromise completion_promise;
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
222 std::size_t num_mutations = 0;
223 std::size_t requests_size = 0;
225 std::vector<MutationData> mutation_data;
229 grpc::Status IsValid(PendingSingleRowMutation& mut)
const;
232
233
234
235 bool HasSpaceFor(PendingSingleRowMutation
const& mut)
const;
238
239
240
241
242 bool CanAppendToBatch(PendingSingleRowMutation
const& mut)
const {
246 return pending_mutations_.empty() && HasSpaceFor(mut);
250
251
252
253 bool FlushIfPossible(CompletionQueue cq);
260
261
262
263
264
265 std::vector<
MutationBatcher::AdmissionPromise> TryAdmit(CompletionQueue& cq);
268
269
270 void Admit(PendingSingleRowMutation mut);
273
274
275
276 void SatisfyPromises(std::vector<AdmissionPromise>,
277 std::unique_lock<std::mutex>& lk);
284 std::size_t num_outstanding_batches_ = 0;
286 std::size_t outstanding_size_ = 0;
288 std::size_t outstanding_mutations_ = 0;
290 std::size_t num_requests_pending_ = 0;
293 std::shared_ptr<Batch> cur_batch_;
296
297
298
299
300 std::queue<PendingSingleRowMutation> pending_mutations_;
303
304
305
306
307
308 std::vector<NoMorePendingPromise> no_more_pending_promises_;
311GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Represent a set of mutations across multiple rows.
Definition: mutations.h:492
A SingleRowMutation that failed.
Definition: mutations.h:409
Objects of this class pack single row mutations into bulk mutations.
Definition: mutation_batcher.h:56
MutationBatcher(Table table, Options options=Options())
Definition: mutation_batcher.h:93
std::pair< future< void >, future< Status > > AsyncApply(CompletionQueue &cq, SingleRowMutation mut)
Asynchronously apply mutation.
virtual future< std::vector< FailedMutation > > AsyncBulkApplyImpl(Table &table, BulkMutation &&mut)
future< void > AsyncWaitForNoPendingRequests()
Asynchronously wait until all submitted mutations complete.
virtual ~MutationBatcher()=default
Represent a single row mutation.
Definition: mutations.h:296
The main interface to interact with data in a Cloud Bigtable table.
Definition: table.h:166
friend friend class future
promise(promise &&)=default
Contains all the Cloud Bigtable C++ client APIs.
Definition: admin_client.h:28
Configuration for MutationBatcher.
Definition: mutation_batcher.h:59
Options & SetMaxMutationsPerBatch(size_t max_mutations_per_batch_arg)
A single RPC will not have more mutations than this.
std::size_t max_outstanding_mutations
Definition: mutation_batcher.h:90
Options & SetMaxOutstandingMutations(size_t max_outstanding_mutations_arg)
MutationBatcher will at most admit this many mutations.
std::size_t max_batches
Definition: mutation_batcher.h:88
std::size_t max_outstanding_size
Definition: mutation_batcher.h:89
std::size_t max_size_per_batch
Definition: mutation_batcher.h:87
Options & SetMaxBatches(size_t max_batches_arg)
There will be no more RPCs outstanding (except for retries) than this.
Definition: mutation_batcher.h:72
std::size_t max_mutations_per_batch
Definition: mutation_batcher.h:86
Options & SetMaxSizePerBatch(size_t max_size_per_batch_arg)
Sum of mutations' sizes in a single RPC will not be larger than this.
Definition: mutation_batcher.h:66
Options & SetMaxOutstandingSize(size_t max_outstanding_size_arg)
MutationBatcher will at most admit mutations of this total size.
Definition: mutation_batcher.h:78