Google Cloud Bigtable C++ Client  1.33.0
A C++ Client Library for Google Cloud Bigtable
mutation_batcher.cc
Go to the documentation of this file.
1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/bigtable/mutation_batcher.h"
16 #include "google/cloud/bigtable/internal/client_options_defaults.h"
17 #include "google/cloud/grpc_error_delegate.h"
18 #include <sstream>
19 
20 namespace google {
21 namespace cloud {
22 namespace bigtable {
24 
25 // Cloud Bigtable doesn't accept more than this in a single request.
26 auto constexpr kBigtableMutationLimit = 100000;
27 auto constexpr kDefaultMutationLimit = 1000;
28 // Maximum mutations that can be outstanding in the Cloud Bigtable front end.
29 // NOTE: this is a system-wide limit, but it is only enforced per process.
30 auto constexpr kBigtableOutstandingMutationLimit = 300000;
31 // Let's make the default slightly smaller, so that overheads or
32 // miscalculations don't tip us over.
33 auto constexpr kDefaultMaxSizePerBatch =
34  (BIGTABLE_CLIENT_DEFAULT_MAX_MESSAGE_LENGTH * 90LL) / 100LL;
35 auto constexpr kDefaultMaxBatches = 4;
38 
45 
47  size_t max_mutations_per_batch_arg) {
48  max_mutations_per_batch = std::min<std::size_t>(max_mutations_per_batch_arg,
50  return *this;
51 }
52 
54  size_t max_outstanding_mutations_arg) {
55  max_outstanding_mutations = std::min<std::size_t>(
56  max_outstanding_mutations_arg, kBigtableOutstandingMutationLimit);
57  return *this;
58 }
59 
62  AdmissionPromise admission_promise;
63  CompletionPromise completion_promise;
64  auto res = std::make_pair(admission_promise.get_future(),
65  completion_promise.get_future());
66  PendingSingleRowMutation pending(std::move(mut),
67  std::move(completion_promise),
68  std::move(admission_promise));
69  std::unique_lock<std::mutex> lk(mu_);
70 
71  grpc::Status mutation_status = IsValid(pending);
72  if (!mutation_status.ok()) {
73  lk.unlock();
74  // Destroy the mutation before satisfying the admission promise so that we
75  // can limit the memory usage.
76  pending.mut.Clear();
77  pending.completion_promise.set_value(
78  MakeStatusFromRpcError(mutation_status));
79  // No need to consider no_more_pending_promises because this operation
80  // didn't lower the number of pending operations.
81  pending.admission_promise.set_value();
82  return res;
83  }
84  ++num_requests_pending_;
85 
86  if (!CanAppendToBatch(pending)) {
87  pending_mutations_.push(std::move(pending));
88  return res;
89  }
90  std::vector<AdmissionPromise> admission_promises_to_satisfy;
91  admission_promises_to_satisfy.emplace_back(
92  std::move(pending.admission_promise));
93  Admit(std::move(pending));
94  FlushIfPossible(cq);
95  SatisfyPromises(std::move(admission_promises_to_satisfy), lk);
96  return res;
97 }
98 
100  std::unique_lock<std::mutex> lk(mu_);
101  if (num_requests_pending_ == 0) {
102  return make_ready_future();
103  }
104  no_more_pending_promises_.emplace_back();
105  return no_more_pending_promises_.back().get_future();
106 }
107 
108 MutationBatcher::PendingSingleRowMutation::PendingSingleRowMutation(
109  SingleRowMutation mut_arg, CompletionPromise completion_promise,
110  AdmissionPromise admission_promise)
111  : mut(std::move(mut_arg)),
112  completion_promise(std::move(completion_promise)),
113  admission_promise(std::move(admission_promise)) {
114  ::google::bigtable::v2::MutateRowsRequest::Entry tmp;
115  mut.MoveTo(&tmp);
116  // This operation might not be cheap, so let's cache it.
117  request_size = tmp.ByteSizeLong();
118  num_mutations = static_cast<std::size_t>(tmp.mutations_size());
119  mut = SingleRowMutation(std::move(tmp));
120 }
121 
122 grpc::Status MutationBatcher::IsValid(PendingSingleRowMutation& mut) const {
123  // Objects of this class need to be aware of the maximum allowed number of
124  // mutations in a batch because it should not pack more. If we have this
125  // knowledge, we might as well simplify everything and not admit larger
126  // mutations.
127  auto mutation_limit = (std::min)(options_.max_mutations_per_batch,
129  if (mut.num_mutations > mutation_limit) {
130  std::stringstream stream;
131  stream << "Too many (" << mut.num_mutations
132  << ") mutations in a SingleRowMutations request. " << mutation_limit
133  << " is the limit.";
134  return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, stream.str());
135  }
136  if (mut.num_mutations == 0) {
137  return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT,
138  "Supplied SingleRowMutations has no entries");
139  }
140  if (mut.request_size > options_.max_size_per_batch) {
141  std::stringstream stream;
142  stream << "Too large (" << mut.request_size
143  << " bytes) mutation in a SingleRowMutations request. "
144  << options_.max_size_per_batch << " bytes is the limit.";
145  return grpc::Status(grpc::StatusCode::INVALID_ARGUMENT, stream.str());
146  }
147  return grpc::Status();
148 }
149 
150 bool MutationBatcher::HasSpaceFor(PendingSingleRowMutation const& mut) const {
151  return outstanding_size_ + mut.request_size <=
152  options_.max_outstanding_size &&
153  outstanding_mutations_ + mut.num_mutations <=
154  options_.max_outstanding_mutations &&
155  cur_batch_->requests_size + mut.request_size <=
156  options_.max_size_per_batch &&
157  cur_batch_->num_mutations + mut.num_mutations <=
158  options_.max_mutations_per_batch;
159 }
160 
162  Table& table, BulkMutation&& mut) {
163  return table.AsyncBulkApply(std::move(mut));
164 }
165 
166 bool MutationBatcher::FlushIfPossible(CompletionQueue cq) {
167  if (cur_batch_->num_mutations > 0 &&
168  num_outstanding_batches_ < options_.max_batches) {
169  ++num_outstanding_batches_;
170 
171  auto batch = std::make_shared<Batch>();
172  cur_batch_.swap(batch);
173  AsyncBulkApplyImpl(table_, std::move(batch->requests))
174  .then([this, cq,
175  batch](future<std::vector<FailedMutation>> failed) mutable {
176  // Calling OnBulkApplyDone here might lead to a deadlock if the
177  // underlying operation completes very quickly, yielding the outer
178  // `.then()` call synchronous. The deadlock would occur because the
179  // mutex is held here and OnBulkApplyDone would try to reacquire it.
180  //
181  // We're not using a lambda here because in C++11 that would mean
182  // copying the `failed` vector.
183  struct Functor {
184  void operator()(CompletionQueue& cq) {
185  self->OnBulkApplyDone(cq, std::move(*batch), std::move(failed));
186  }
187 
188  MutationBatcher* self;
189  std::shared_ptr<Batch> batch;
190  std::vector<FailedMutation> failed;
191  };
192  cq.RunAsync(Functor{this, std::move(batch), failed.get()});
193  });
194  return true;
195  }
196  return false;
197 }
198 
199 void MutationBatcher::OnBulkApplyDone(
200  CompletionQueue cq, MutationBatcher::Batch batch,
201  std::vector<FailedMutation> const& failed) {
202  // First process all the failures, marking the mutations as done after
203  // processing them.
204  for (auto const& f : failed) {
205  int const idx = f.original_index();
206  if (idx < 0 ||
207  static_cast<std::size_t>(idx) >= batch.mutation_data.size()) {
208  // This is a bug on the server or the client, either terminate (when
209  // -fno-exceptions is set) or throw an exception.
210  std::ostringstream os;
211  os << "Index " << idx << " is out of range [0,"
212  << batch.mutation_data.size() << ")";
213  google::cloud::internal::ThrowRuntimeError(std::move(os).str());
214  }
215  MutationData& data = batch.mutation_data[idx];
216  data.completion_promise.set_value(f.status());
217  data.done = true;
218  }
219  // Any remaining mutations are treated as successful.
220  for (auto& data : batch.mutation_data) {
221  if (!data.done) {
222  data.completion_promise.set_value(Status());
223  data.done = true;
224  }
225  }
226  auto const num_mutations = batch.mutation_data.size();
227  batch.mutation_data.clear();
228 
229  std::unique_lock<std::mutex> lk(mu_);
230  outstanding_size_ -= batch.requests_size;
231  outstanding_mutations_ -= batch.num_mutations;
232  num_requests_pending_ -= num_mutations;
233  num_outstanding_batches_--;
234  SatisfyPromises(TryAdmit(cq), lk); // unlocks the lock
235 }
236 
237 std::vector<MutationBatcher::AdmissionPromise> MutationBatcher::TryAdmit(
238  CompletionQueue& cq) {
239  // Defer satisfying promises until we release the lock.
240  std::vector<AdmissionPromise> admission_promises;
241 
242  do {
243  while (!pending_mutations_.empty() &&
244  HasSpaceFor(pending_mutations_.front())) {
245  auto& mut = pending_mutations_.front();
246  admission_promises.emplace_back(std::move(mut.admission_promise));
247  Admit(std::move(mut));
248  pending_mutations_.pop();
249  }
250  } while (FlushIfPossible(cq));
251  return admission_promises;
252 }
253 
254 void MutationBatcher::Admit(PendingSingleRowMutation mut) {
255  outstanding_size_ += mut.request_size;
256  outstanding_mutations_ += mut.num_mutations;
257  cur_batch_->requests_size += mut.request_size;
258  cur_batch_->num_mutations += mut.num_mutations;
259  cur_batch_->requests.emplace_back(std::move(mut.mut));
260  cur_batch_->mutation_data.emplace_back(MutationData(std::move(mut)));
261 }
262 
263 void MutationBatcher::SatisfyPromises(
264  std::vector<AdmissionPromise> admission_promises,
265  std::unique_lock<std::mutex>& lk) {
266  std::vector<NoMorePendingPromise> no_more_pending_promises;
267  if (num_requests_pending_ == 0 && num_outstanding_batches_ == 0) {
268  // We should wait not only on num_requests_pending_ being zero but also on
269  // num_outstanding_batches_ because we want to allow the user to kill the
270  // completion queue after this promise is fulfilled. Otherwise, the user can
271  // destroy the completion queue while the last batch is still being
272  // processed - we've had this bug (#2140).
273  no_more_pending_promises_.swap(no_more_pending_promises);
274  }
275  lk.unlock();
276 
277  // Inform the user that we've admitted these mutations and there might be some
278  // space in the buffer finally.
279  for (auto& promise : admission_promises) {
280  promise.set_value();
281  }
282  for (auto& promise : no_more_pending_promises) {
283  promise.set_value();
284  }
285 }
286 
288 } // namespace bigtable
289 } // namespace cloud
290 } // namespace google