Google Cloud C++ Client  2.5.0
C++ Client Library for Google Cloud Platform
completion_queue.h
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H
17 
18 #include "google/cloud/future.h"
19 #include "google/cloud/internal/async_read_stream_impl.h"
20 #include "google/cloud/internal/async_rpc_details.h"
21 #include "google/cloud/internal/completion_queue_impl.h"
22 #include "google/cloud/status_or.h"
23 #include "google/cloud/version.h"
24 #include "absl/memory/memory.h"
25 #include "absl/meta/type_traits.h"
26 #include <chrono>
27 
28 namespace google {
29 namespace cloud {
31 class CompletionQueue;
32 
33 namespace internal {
34 std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(
35  CompletionQueue const& cq);
36 } // namespace internal
37 
38 /**
39  * Call the functor associated with asynchronous operations when they complete.
40  */
42  public:
44  explicit CompletionQueue(std::shared_ptr<internal::CompletionQueueImpl> impl)
45  : impl_(std::move(impl)) {}
46 
47  /**
48  * Run the completion queue event loop.
49  *
50  * Note that more than one thread can call this member function, to create a
51  * pool of threads completing asynchronous operations.
52  */
53  void Run() { impl_->Run(); }
54 
55  /// Terminate the completion queue event loop.
56  void Shutdown() { impl_->Shutdown(); }
57 
58  /// Cancel all pending operations.
59  void CancelAll() { impl_->CancelAll(); }
60 
61  /**
62  * Create a timer that fires at @p deadline.
63  *
64  * @param deadline when should the timer expire.
65  *
66  * @return a future that becomes satisfied after @p deadline.
67  * The result of the future is the time at which it expired, or an error
68  * Status if the timer did not run to expiration (e.g. it was cancelled).
69  */
70  google::cloud::future<StatusOr<std::chrono::system_clock::time_point>>
71  MakeDeadlineTimer(std::chrono::system_clock::time_point deadline) {
72  return impl_->MakeDeadlineTimer(deadline);
73  }
74 
75  /**
76  * Create a timer that fires after the @p duration.
77  *
78  * @tparam Rep a placeholder to match the Rep tparam for @p duration type,
79  * the semantics of this template parameter are documented in
80  * `std::chrono::duration<>` (in brief, the underlying arithmetic type
81  * used to store the number of ticks), for our purposes it is simply a
82  * formal parameter.
83  * @tparam Period a placeholder to match the Period tparam for @p duration
84  * type, the semantics of this template parameter are documented in
85  * `std::chrono::duration<>` (in brief, the length of the tick in seconds,
86  * expressed as a `std::ratio<>`), for our purposes it is simply a formal
87  * parameter.
88  *
89  * @param duration when should the timer expire relative to the current time.
90  *
91  * @return a future that becomes satisfied after @p duration time has elapsed.
92  * The result of the future is the time at which it expired, or an error
93  * Status if the timer did not run to expiration (e.g. it was cancelled).
94  */
95  template <typename Rep, typename Period>
96  future<StatusOr<std::chrono::system_clock::time_point>> MakeRelativeTimer(
97  std::chrono::duration<Rep, Period> duration) {
98  return impl_->MakeRelativeTimer(
99  std::chrono::duration_cast<std::chrono::nanoseconds>(duration));
100  }
101 
102  /**
103  * Make an asynchronous unary RPC.
104  *
105  * @param async_call a callable to start the asynchronous RPC.
106  * @param request the contents of the request.
107  * @param context an initialized request context to make the call.
108  *
109  * @tparam AsyncCallType the type of @a async_call. It must be invocable with
110  * `(grpc::ClientContext*, RequestType const&, grpc::CompletionQueue*)`.
111  * Furthermore, it should return a
112  * `std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<Response>>>`.
113  * These requirements are verified by
114  * `internal::CheckAsyncUnaryRpcSignature<>`, and this function is
115  * excluded from overload resolution if the parameters do not meet these
116  * requirements.
117  * @tparam Request the type of the request parameter in the gRPC.
118  *
119  * @return a future that becomes satisfied when the operation completes.
120  */
121  template <
122  typename AsyncCallType, typename Request,
123  typename Sig = internal::AsyncCallResponseType<AsyncCallType, Request>,
124  typename Response = typename Sig::type,
125  typename std::enable_if<Sig::value, int>::type = 0>
126  future<StatusOr<Response>> MakeUnaryRpc(
127  AsyncCallType async_call, Request const& request,
128  std::unique_ptr<grpc::ClientContext> context) {
129  auto op =
130  std::make_shared<internal::AsyncUnaryRpcFuture<Request, Response>>();
131  impl_->StartOperation(op, [&](void* tag) {
132  op->Start(async_call, std::move(context), request, &impl_->cq(), tag);
133  });
134  return op->GetFuture();
135  }
136 
137  /**
138  * Make an asynchronous streaming read RPC.
139  *
140  * Reading from the stream starts automatically, and the handler is notified
141  * of all interesting events in the stream. Note that then handler is called
142  * by any thread blocked on this object's Run() member function. However, only
143  * one callback in the handler is called at a time.
144  *
145  * @param async_call a callable to start the asynchronous RPC.
146  * @param request the contents of the request.
147  * @param context an initialized request context to make the call.
148  * @param on_read the callback to be invoked on each successful Read().
149  * @param on_finish the callback to be invoked when the stream is closed.
150  *
151  * @tparam AsyncCallType the type of @a async_call. It must be invocable with
152  * parameters
153  * `(grpc::ClientContext*, RequestType const&, grpc::CompletionQueue*)`.
154  * Furthermore, it should return a type convertible to
155  * `std::unique_ptr<grpc::ClientAsyncReaderInterface<Response>>>`.
156  * These requirements are verified by
157  * `internal::AsyncStreamingReadRpcUnwrap<>`, and this function is
158  * excluded from overload resolution if the parameters do not meet these
159  * requirements.
160  * @tparam Request the type of the request in the streaming RPC.
161  * @tparam Response the type of the response in the streaming RPC.
162  * @tparam OnReadHandler the type of the @p on_read callback.
163  * @tparam OnFinishHandler the type of the @p on_finish callback.
164  */
165  template <typename AsyncCallType, typename Request,
166  typename Response = typename internal::
167  AsyncStreamingReadResponseType<AsyncCallType, Request>::type,
168  typename OnReadHandler, typename OnFinishHandler>
170  AsyncCallType&& async_call, Request const& request,
171  std::unique_ptr<grpc::ClientContext> context, OnReadHandler&& on_read,
172  OnFinishHandler&& on_finish) {
173  auto stream = internal::MakeAsyncReadStreamImpl<Response>(
174  std::forward<OnReadHandler>(on_read),
175  std::forward<OnFinishHandler>(on_finish));
176  stream->Start(std::forward<AsyncCallType>(async_call), request,
177  std::move(context), impl_);
178  return stream;
179  }
180 
181  /**
182  * Asynchronously run a functor on a thread `Run()`ning the `CompletionQueue`.
183  *
184  * @tparam Functor the functor to call on the CompletionQueue thread.
185  * It must satisfy the `void(CompletionQueue&)` signature.
186  * @param functor the value of the functor.
187  */
188  template <typename Functor,
189  typename std::enable_if<
190  internal::CheckRunAsyncCallback<Functor>::value, int>::type = 0>
191  void RunAsync(Functor&& functor) {
192  class Wrapper : public internal::RunAsyncBase {
193  public:
194  Wrapper(std::weak_ptr<internal::CompletionQueueImpl> impl, Functor&& f)
195  : impl_(std::move(impl)), fun_(std::forward<Functor>(f)) {}
196  ~Wrapper() override = default;
197  void exec() override {
198  auto impl = impl_.lock();
199  if (!impl) return;
200  CompletionQueue cq(std::move(impl));
201  fun_(cq);
202  }
203 
204  private:
205  std::weak_ptr<internal::CompletionQueueImpl> impl_;
206  absl::decay_t<Functor> fun_;
207  };
208  impl_->RunAsync(
209  absl::make_unique<Wrapper>(impl_, std::forward<Functor>(functor)));
210  }
211 
212  /**
213  * Asynchronously run a functor on a thread `Run()`ning the `CompletionQueue`.
214  *
215  * @tparam Functor the functor to call on the CompletionQueue thread.
216  * It must satisfy the `void()` signature.
217  * @param functor the value of the functor.
218  */
219  template <typename Functor,
220  typename std::enable_if<internal::is_invocable<Functor>::value,
221  int>::type = 0>
222  void RunAsync(Functor&& functor) {
223  class Wrapper : public internal::RunAsyncBase {
224  public:
225  explicit Wrapper(Functor&& f) : fun_(std::forward<Functor>(f)) {}
226  ~Wrapper() override = default;
227  void exec() override { fun_(); }
228 
229  private:
230  absl::decay_t<Functor> fun_;
231  };
232  impl_->RunAsync(absl::make_unique<Wrapper>(std::forward<Functor>(functor)));
233  }
234 
235  /**
236  * Asynchronously wait for a connection to become ready.
237  *
238  * @param channel the channel on which to wait for state changes
239  * @param deadline give up waiting for the state change if this deadline
240  * passes
241  * @return `future<>` which will be satisfied when either of these events
242  * happen: (a) the connection is ready, (b) the connection permanently
243  * failed, (c) deadline passes before (a) or (b) happen; the future will
244  * be satisfied with `StatusCode::kOk` for (a), `StatusCode::kCancelled`
245  * for (b) and `StatusCode::kDeadlineExceeded` for (c)
246  */
248  std::shared_ptr<grpc::Channel> channel,
249  std::chrono::system_clock::time_point deadline);
250 
251  private:
252  friend std::shared_ptr<internal::CompletionQueueImpl>
253  internal::GetCompletionQueueImpl(CompletionQueue const& cq);
254  std::shared_ptr<internal::CompletionQueueImpl> impl_;
255 };
256 
257 namespace internal {
258 
259 inline std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(
260  CompletionQueue const& cq) {
261  return cq.impl_;
262 }
263 
264 } // namespace internal
265 
267 } // namespace cloud
268 } // namespace google
269 
270 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H