Google Cloud Spanner C++ Client  1.32.0
A C++ Client Library for Google Cloud Spanner
client.cc
Go to the documentation of this file.
1 // Copyright 2019 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // http://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #include "google/cloud/spanner/client.h"
16 #include "google/cloud/spanner/internal/connection_impl.h"
17 #include "google/cloud/spanner/internal/spanner_stub.h"
18 #include "google/cloud/spanner/internal/status_utils.h"
19 #include "google/cloud/spanner/options.h"
20 #include "google/cloud/spanner/retry_policy.h"
21 #include "google/cloud/spanner/transaction.h"
22 #include "google/cloud/backoff_policy.h"
23 #include "google/cloud/internal/getenv.h"
24 #include "google/cloud/internal/retry_loop.h"
25 #include "google/cloud/log.h"
26 #include <grpcpp/grpcpp.h>
27 #include <thread>
28 
29 namespace google {
30 namespace cloud {
31 namespace spanner {
32 inline namespace SPANNER_CLIENT_NS {
33 
34 RowStream Client::Read(std::string table, KeySet keys,
35  std::vector<std::string> columns,
36  ReadOptions read_options) {
37  return conn_->Read({spanner_internal::MakeSingleUseTransaction(
39  std::move(table),
40  std::move(keys),
41  std::move(columns),
42  std::move(read_options),
43  {}});
44 }
45 
47  std::string table, KeySet keys,
48  std::vector<std::string> columns,
49  ReadOptions read_options) {
50  return conn_->Read({spanner_internal::MakeSingleUseTransaction(
51  std::move(transaction_options)),
52  std::move(table),
53  std::move(keys),
54  std::move(columns),
55  std::move(read_options),
56  {}});
57 }
58 
59 RowStream Client::Read(Transaction transaction, std::string table, KeySet keys,
60  std::vector<std::string> columns,
61  ReadOptions read_options) {
62  return conn_->Read({std::move(transaction),
63  std::move(table),
64  std::move(keys),
65  std::move(columns),
66  std::move(read_options),
67  {}});
68 }
69 
70 RowStream Client::Read(ReadPartition const& read_partition) {
71  return conn_->Read(spanner_internal::MakeReadParams(read_partition));
72 }
73 
74 StatusOr<std::vector<ReadPartition>> Client::PartitionRead(
75  Transaction transaction, std::string table, KeySet keys,
76  std::vector<std::string> columns, ReadOptions read_options,
77  PartitionOptions const& partition_options) {
78  return conn_->PartitionRead({{std::move(transaction),
79  std::move(table),
80  std::move(keys),
81  std::move(columns),
82  std::move(read_options),
83  {}},
84  partition_options});
85 }
86 
88  QueryOptions const& opts) {
89  return conn_->ExecuteQuery({spanner_internal::MakeSingleUseTransaction(
91  std::move(statement),
92  OverlayQueryOptions(opts),
93  {}});
94 }
95 
97  Transaction::SingleUseOptions transaction_options, SqlStatement statement,
98  QueryOptions const& opts) {
99  return conn_->ExecuteQuery({spanner_internal::MakeSingleUseTransaction(
100  std::move(transaction_options)),
101  std::move(statement),
102  OverlayQueryOptions(opts),
103  {}});
104 }
105 
107  QueryOptions const& opts) {
108  return conn_->ExecuteQuery({std::move(transaction),
109  std::move(statement),
110  OverlayQueryOptions(opts),
111  {}});
112 }
113 
115  QueryOptions const& opts) {
116  auto params = spanner_internal::MakeSqlParams(partition);
117  params.query_options = OverlayQueryOptions(opts);
118  return conn_->ExecuteQuery(std::move(params));
119 }
120 
122  QueryOptions const& opts) {
123  return conn_->ProfileQuery({spanner_internal::MakeSingleUseTransaction(
125  std::move(statement),
126  OverlayQueryOptions(opts),
127  {}});
128 }
129 
131  Transaction::SingleUseOptions transaction_options, SqlStatement statement,
132  QueryOptions const& opts) {
133  return conn_->ProfileQuery({spanner_internal::MakeSingleUseTransaction(
134  std::move(transaction_options)),
135  std::move(statement),
136  OverlayQueryOptions(opts),
137  {}});
138 }
139 
141  SqlStatement statement,
142  QueryOptions const& opts) {
143  return conn_->ProfileQuery({std::move(transaction),
144  std::move(statement),
145  OverlayQueryOptions(opts),
146  {}});
147 }
148 
149 StatusOr<std::vector<QueryPartition>> Client::PartitionQuery(
150  Transaction transaction, SqlStatement statement,
151  PartitionOptions const& partition_options) {
152  return conn_->PartitionQuery(
153  {std::move(transaction), std::move(statement), partition_options});
154 }
155 
156 StatusOr<DmlResult> Client::ExecuteDml(Transaction transaction,
157  SqlStatement statement,
158  QueryOptions const& opts) {
159  return conn_->ExecuteDml({std::move(transaction),
160  std::move(statement),
161  OverlayQueryOptions(opts),
162  {}});
163 }
164 
166  SqlStatement statement,
167  QueryOptions const& opts) {
168  return conn_->ProfileDml({std::move(transaction),
169  std::move(statement),
170  OverlayQueryOptions(opts),
171  {}});
172 }
173 
174 StatusOr<ExecutionPlan> Client::AnalyzeSql(Transaction transaction,
175  SqlStatement statement,
176  QueryOptions const& opts) {
177  return conn_->AnalyzeSql({std::move(transaction),
178  std::move(statement),
179  OverlayQueryOptions(opts),
180  {}});
181 }
182 
184  Transaction transaction, std::vector<SqlStatement> statements,
185  Options opts) {
186  internal::CheckExpectedOptions<RequestOptionList>(opts, __func__);
187  return conn_->ExecuteBatchDml(
188  {std::move(transaction), std::move(statements), std::move(opts)});
189 }
190 
192  std::function<StatusOr<Mutations>(Transaction)> const& mutator,
193  std::unique_ptr<TransactionRerunPolicy> rerun_policy,
194  std::unique_ptr<BackoffPolicy> backoff_policy,
195  CommitOptions const& options) {
196  // The status-code discriminator of TransactionRerunPolicy.
197  using RerunnablePolicy = spanner_internal::SafeTransactionRerun;
198 
199  auto const opts =
202  for (int rerun = 0;; ++rerun) {
203  StatusOr<Mutations> mutations;
204 #if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
205  try {
206 #endif
207  mutations = mutator(txn);
208 #if GOOGLE_CLOUD_CPP_HAVE_EXCEPTIONS
209  } catch (RuntimeStatusError const& error) {
210  // Treat this like mutator() returned a bad Status.
211  Status status = error.status();
212  if (status.ok()) {
213  status = Status(StatusCode::kUnknown, "OK Status thrown from mutator");
214  }
215  mutations = status;
216  } catch (...) {
217  auto rb_status = Rollback(txn);
218  if (!RerunnablePolicy::IsOk(rb_status)) {
219  GCP_LOG(WARNING) << "Rollback() failure in Client::Commit(): "
220  << rb_status.message();
221  }
222  throw;
223  }
224 #endif
225  auto status = mutations.status();
226  if (RerunnablePolicy::IsOk(status)) {
227  auto result = Commit(txn, *mutations, options);
228  status = result.status();
229  if (!RerunnablePolicy::IsTransientFailure(status)) {
230  return result;
231  }
232  } else {
233  if (!RerunnablePolicy::IsTransientFailure(status)) {
234  auto rb_status = Rollback(txn);
235  if (!RerunnablePolicy::IsOk(rb_status)) {
236  GCP_LOG(WARNING) << "Rollback() failure in Client::Commit(): "
237  << rb_status.message();
238  }
239  return status;
240  }
241  }
242  // A transient failure (e.g., kAborted), so consider rerunning.
243  if (!rerun_policy->OnFailure(status)) {
244  return status; // reruns exhausted
245  }
246  if (spanner_internal::IsSessionNotFound(status)) {
247  // Marks the session bad and creates a new Transaction for the next loop.
248  spanner_internal::Visit(
249  txn, [](spanner_internal::SessionHolder& s,
250  StatusOr<google::spanner::v1::TransactionSelector> const&,
251  std::string const&, std::int64_t) {
252  if (s) s->set_bad();
253  return true;
254  });
256  } else {
257  // Create a new transaction for the next loop, but reuse the session
258  // so that we have a slightly better chance of avoiding another abort.
259  txn = MakeReadWriteTransaction(txn, opts);
260  }
261  std::this_thread::sleep_for(backoff_policy->OnCompletion());
262  }
263 }
264 
266  std::function<StatusOr<Mutations>(Transaction)> const& mutator,
267  CommitOptions const& options) {
268  auto const rerun_maximum_duration = std::chrono::minutes(10);
269  auto default_commit_rerun_policy =
270  LimitedTimeTransactionRerunPolicy(rerun_maximum_duration).clone();
271 
272  auto const backoff_initial_delay = std::chrono::milliseconds(100);
273  auto const backoff_maximum_delay = std::chrono::minutes(5);
274  auto const backoff_scaling = 2.0;
275  auto default_commit_backoff_policy =
276  ExponentialBackoffPolicy(backoff_initial_delay, backoff_maximum_delay,
277  backoff_scaling)
278  .clone();
279 
280  return Commit(mutator, std::move(default_commit_rerun_policy),
281  std::move(default_commit_backoff_policy), options);
282 }
283 
284 StatusOr<CommitResult> Client::Commit(Mutations mutations,
285  CommitOptions const& options) {
286  return Commit([&mutations](Transaction const&) { return mutations; },
287  options);
288 }
289 
290 StatusOr<CommitResult> Client::Commit(Transaction transaction,
291  Mutations mutations,
292  CommitOptions const& options) {
293  return conn_->Commit({std::move(transaction), std::move(mutations), options});
294 }
295 
297  return conn_->Rollback({std::move(transaction)});
298 }
299 
301  SqlStatement statement, QueryOptions const& opts) {
302  return conn_->ExecutePartitionedDml(
303  {std::move(statement), OverlayQueryOptions(opts)});
304 }
305 
306 // Returns a QueryOptions that has each attribute set according to
307 // the hierarchy that options specified at the function call (i.e.,
308 // `preferred`) are preferred, followed by options set at the Client
309 // level (i.e., `opts_.query_options()`), followed by some environment
310 // variables. If none are set, the attribute's optional will be unset
311 // and nothing will be included in the proto sent to Spanner, in which
312 // case the Database default will be used.
313 QueryOptions Client::OverlayQueryOptions(QueryOptions const& preferred) {
314  // GetEnv() is not super fast, so we look it up once and cache it.
315  static auto const* const kOptimizerVersionEnvValue =
316  new auto(google::cloud::internal::GetEnv("SPANNER_OPTIMIZER_VERSION"));
317  static auto const* const kOptimizerStatisticsPackageEnvValue = new auto(
318  google::cloud::internal::GetEnv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE"));
319 
320  return spanner_internal::OverlayQueryOptions(
321  preferred, opts_.query_options(), *kOptimizerVersionEnvValue,
322  *kOptimizerStatisticsPackageEnvValue);
323 }
324 
325 std::shared_ptr<spanner::Connection> MakeConnection(spanner::Database const& db,
326  Options opts) {
327  internal::CheckExpectedOptions<
328  CommonOptionList, GrpcOptionList, SessionPoolOptionList,
329  spanner_internal::SessionPoolClockOption, SpannerPolicyOptionList>(
330  opts, __func__);
331  opts = spanner_internal::DefaultOptions(std::move(opts));
332 
333  std::vector<std::shared_ptr<spanner_internal::SpannerStub>> stubs(
335  int id = 0;
336  std::generate(stubs.begin(), stubs.end(), [&id, db, opts] {
337  return spanner_internal::CreateDefaultSpannerStub(db, opts, id++);
338  });
339  return std::make_shared<spanner_internal::ConnectionImpl>(
340  std::move(db), std::move(stubs), opts);
341 }
342 
343 std::shared_ptr<Connection> MakeConnection(
344  Database const& db, ConnectionOptions const& connection_options,
345  SessionPoolOptions session_pool_options) {
346  auto opts = internal::MergeOptions(
347  internal::MakeOptions(connection_options),
348  spanner_internal::MakeOptions(std::move(session_pool_options)));
349  return MakeConnection(db, std::move(opts));
350 }
351 
352 std::shared_ptr<Connection> MakeConnection(
353  Database const& db, ConnectionOptions const& connection_options,
354  SessionPoolOptions session_pool_options,
355  std::unique_ptr<RetryPolicy> retry_policy,
356  std::unique_ptr<BackoffPolicy> backoff_policy) {
357  auto opts = internal::MergeOptions(
358  internal::MakeOptions(connection_options),
359  spanner_internal::MakeOptions(std::move(session_pool_options)));
360  opts.set<SpannerRetryPolicyOption>(retry_policy->clone());
361  opts.set<SpannerBackoffPolicyOption>(backoff_policy->clone());
362  return MakeConnection(db, std::move(opts));
363 }
364 
365 } // namespace SPANNER_CLIENT_NS
366 } // namespace spanner
367 
368 namespace spanner_internal {
369 inline namespace SPANNER_CLIENT_NS {
370 
371 // Returns a QueryOptions that has each attribute set according to the
372 // preferred/fallback/environment hierarchy.
373 spanner::QueryOptions OverlayQueryOptions(
374  spanner::QueryOptions const& preferred,
375  spanner::QueryOptions const& fallback,
376  absl::optional<std::string> const& optimizer_version_env,
377  absl::optional<std::string> const& optimizer_statistics_package_env) {
378  spanner::QueryOptions opts;
379 
380  // Choose the `optimizer_version` option.
381  if (preferred.optimizer_version().has_value()) {
383  } else if (fallback.optimizer_version().has_value()) {
385  } else if (optimizer_version_env.has_value()) {
386  opts.set_optimizer_version(*optimizer_version_env);
387  }
388 
389  // Choose the `optimizer_statistics_package` option.
390  if (preferred.optimizer_statistics_package().has_value()) {
393  } else if (fallback.optimizer_statistics_package().has_value()) {
396  } else if (optimizer_statistics_package_env.has_value()) {
397  opts.set_optimizer_statistics_package(*optimizer_statistics_package_env);
398  }
399 
400  // Choose the `request_priority` option.
401  if (preferred.request_priority().has_value()) {
403  } else if (fallback.request_priority().has_value()) {
405  }
406 
407  // Choose the `request_tag` option.
408  if (preferred.request_tag().has_value()) {
410  } else if (fallback.request_tag().has_value()) {
412  }
413 
414  return opts;
415 }
416 
417 } // namespace SPANNER_CLIENT_NS
418 } // namespace spanner_internal
419 } // namespace cloud
420 } // namespace google