Google Cloud C++ Client 2.10.1
C++ Client Library for Google Cloud Platform
Loading...
Searching...
No Matches
completion_queue.h
1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H
16#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H
17
18#include "google/cloud/future.h"
19#include "google/cloud/internal/async_read_stream_impl.h"
20#include "google/cloud/internal/async_rpc_details.h"
21#include "google/cloud/internal/completion_queue_impl.h"
22#include "google/cloud/status_or.h"
23#include "google/cloud/version.h"
24#include "absl/meta/type_traits.h"
25#include <chrono>
26
27namespace google {
28namespace cloud {
30class CompletionQueue;
31
32namespace internal {
33
34std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(
35 CompletionQueue const& cq);
36
37template <typename Request, typename Response>
38future<StatusOr<Response>> MakeUnaryRpcImpl(
39 CompletionQueue& cq, GrpcAsyncCall<Request, Response> async_call,
40 Request const& request, std::shared_ptr<grpc::ClientContext> context);
41
42} // namespace internal
43
44/**
45 * Call the functor associated with asynchronous operations when they complete.
46 */
47class CompletionQueue {
48 public:
50 explicit CompletionQueue(std::shared_ptr<internal::CompletionQueueImpl> impl)
51 : impl_(std::move(impl)) {}
52
53 /**
54 * Run the completion queue event loop.
55 *
56 * Note that more than one thread can call this member function, to create a
57 * pool of threads completing asynchronous operations.
58 */
59 void Run() { impl_->Run(); }
60
61 /// Terminate the completion queue event loop.
62 void Shutdown() { impl_->Shutdown(); }
63
64 /// Cancel all pending operations.
65 void CancelAll() { impl_->CancelAll(); }
66
67 /**
68 * Create a timer that fires at @p deadline.
69 *
70 * @param deadline when should the timer expire.
71 *
72 * @return a future that becomes satisfied after @p deadline.
73 * The result of the future is the time at which it expired, or an error
74 * Status if the timer did not run to expiration (e.g. it was cancelled).
75 */
76 google::cloud::future<StatusOr<std::chrono::system_clock::time_point>>
77 MakeDeadlineTimer(std::chrono::system_clock::time_point deadline) {
78 return impl_->MakeDeadlineTimer(deadline);
79 }
80
81 /**
82 * Create a timer that fires after the @p duration.
83 *
84 * @tparam Rep a placeholder to match the Rep tparam for @p duration type,
85 * the semantics of this template parameter are documented in
86 * `std::chrono::duration<>` (in brief, the underlying arithmetic type
87 * used to store the number of ticks), for our purposes it is simply a
88 * formal parameter.
89 * @tparam Period a placeholder to match the Period tparam for @p duration
90 * type, the semantics of this template parameter are documented in
91 * `std::chrono::duration<>` (in brief, the length of the tick in seconds,
92 * expressed as a `std::ratio<>`), for our purposes it is simply a formal
93 * parameter.
94 *
95 * @param duration when should the timer expire relative to the current time.
96 *
97 * @return a future that becomes satisfied after @p duration time has elapsed.
98 * The result of the future is the time at which it expired, or an error
99 * Status if the timer did not run to expiration (e.g. it was cancelled).
100 */
101 template <typename Rep, typename Period>
102 future<StatusOr<std::chrono::system_clock::time_point>> MakeRelativeTimer(
103 std::chrono::duration<Rep, Period> duration) {
104 return impl_->MakeRelativeTimer(
105 std::chrono::duration_cast<std::chrono::nanoseconds>(duration));
106 }
107
108 /**
109 * Make an asynchronous unary RPC.
110 *
111 * @param async_call a callable to start the asynchronous RPC.
112 * @param request the contents of the request.
113 * @param context an initialized request context to make the call.
114 *
115 * @tparam AsyncCallType the type of @a async_call. It must be invocable with
116 * `(grpc::ClientContext*, RequestType const&, grpc::CompletionQueue*)`.
117 * Furthermore, it should return a
118 * `std::unique_ptr<grpc::ClientAsyncResponseReaderInterface<Response>>>`.
119 * These requirements are verified by
120 * `internal::CheckAsyncUnaryRpcSignature<>`, and this function is
121 * excluded from overload resolution if the parameters do not meet these
122 * requirements.
123 * @tparam Request the type of the request parameter in the gRPC.
124 *
125 * @return a future that becomes satisfied when the operation completes.
126 */
127 template <
128 typename AsyncCallType, typename Request,
129 typename Sig = internal::AsyncCallResponseType<AsyncCallType, Request>,
130 typename Response = typename Sig::type,
131 typename std::enable_if<Sig::value, int>::type = 0>
132 future<StatusOr<Response>> MakeUnaryRpc(
133 AsyncCallType async_call, Request const& request,
134 std::unique_ptr<grpc::ClientContext> context) {
135 return internal::MakeUnaryRpcImpl<Request, Response>(
136 *this, std::move(async_call), request, std::move(context));
137 }
138
139 /**
140 * Make an asynchronous streaming read RPC.
141 *
142 * Reading from the stream starts automatically, and the handler is notified
143 * of all interesting events in the stream. Note that then handler is called
144 * by any thread blocked on this object's Run() member function. However, only
145 * one callback in the handler is called at a time.
146 *
147 * @param async_call a callable to start the asynchronous RPC.
148 * @param request the contents of the request.
149 * @param context an initialized request context to make the call.
150 * @param on_read the callback to be invoked on each successful Read().
151 * @param on_finish the callback to be invoked when the stream is closed.
152 *
153 * @tparam AsyncCallType the type of @a async_call. It must be invocable with
154 * parameters
155 * `(grpc::ClientContext*, RequestType const&, grpc::CompletionQueue*)`.
156 * Furthermore, it should return a type convertible to
157 * `std::unique_ptr<grpc::ClientAsyncReaderInterface<Response>>>`.
158 * These requirements are verified by
159 * `internal::AsyncStreamingReadRpcUnwrap<>`, and this function is
160 * excluded from overload resolution if the parameters do not meet these
161 * requirements.
162 * @tparam Request the type of the request in the streaming RPC.
163 * @tparam Response the type of the response in the streaming RPC.
164 * @tparam OnReadHandler the type of the @p on_read callback.
165 * @tparam OnFinishHandler the type of the @p on_finish callback.
166 */
167 template <typename AsyncCallType, typename Request,
168 typename Response = typename internal::
169 AsyncStreamingReadResponseType<AsyncCallType, Request>::type,
170 typename OnReadHandler, typename OnFinishHandler>
171 std::shared_ptr<AsyncOperation> MakeStreamingReadRpc(
172 AsyncCallType&& async_call, Request const& request,
173 std::unique_ptr<grpc::ClientContext> context, OnReadHandler&& on_read,
174 OnFinishHandler&& on_finish) {
175 auto stream = internal::MakeAsyncReadStreamImpl<Response>(
176 std::forward<OnReadHandler>(on_read),
177 std::forward<OnFinishHandler>(on_finish));
178 stream->Start(std::forward<AsyncCallType>(async_call), request,
179 std::move(context), impl_);
180 return stream;
181 }
182
183 /**
184 * Asynchronously run a functor on a thread `Run()`ning the `CompletionQueue`.
185 *
186 * @tparam Functor the functor to call on the CompletionQueue thread.
187 * It must satisfy the `void(CompletionQueue&)` signature.
188 * @param functor the value of the functor.
189 */
190 template <typename Functor,
191 typename std::enable_if<
192 internal::CheckRunAsyncCallback<Functor>::value, int>::type = 0>
193 void RunAsync(Functor&& functor) {
194 class Wrapper : public internal::RunAsyncBase {
195 public:
196 Wrapper(std::weak_ptr<internal::CompletionQueueImpl> impl, Functor&& f)
197 : impl_(std::move(impl)), fun_(std::forward<Functor>(f)) {}
198 ~Wrapper() override = default;
199 void exec() override {
200 auto impl = impl_.lock();
201 if (!impl) return;
202 CompletionQueue cq(std::move(impl));
203 fun_(cq);
204 }
205
206 private:
207 std::weak_ptr<internal::CompletionQueueImpl> impl_;
208 absl::decay_t<Functor> fun_;
209 };
210 impl_->RunAsync(
211 std::make_unique<Wrapper>(impl_, std::forward<Functor>(functor)));
212 }
213
214 /**
215 * Asynchronously run a functor on a thread `Run()`ning the `CompletionQueue`.
216 *
217 * @tparam Functor the functor to call on the CompletionQueue thread.
218 * It must satisfy the `void()` signature.
219 * @param functor the value of the functor.
220 */
221 template <typename Functor,
222 typename std::enable_if<internal::is_invocable<Functor>::value,
223 int>::type = 0>
224 void RunAsync(Functor&& functor) {
225 class Wrapper : public internal::RunAsyncBase {
226 public:
227 explicit Wrapper(Functor&& f) : fun_(std::forward<Functor>(f)) {}
228 ~Wrapper() override = default;
229 void exec() override { fun_(); }
230
231 private:
232 absl::decay_t<Functor> fun_;
233 };
234 impl_->RunAsync(std::make_unique<Wrapper>(std::forward<Functor>(functor)));
235 }
236
237 /**
238 * Asynchronously wait for a connection to become ready.
239 *
240 * @param channel the channel on which to wait for state changes
241 * @param deadline give up waiting for the state change if this deadline
242 * passes
243 * @return `future<>` which will be satisfied when either of these events
244 * happen: (a) the connection is ready, (b) the connection permanently
245 * failed, (c) deadline passes before (a) or (b) happen; the future will
246 * be satisfied with `StatusCode::kOk` for (a), `StatusCode::kCancelled`
247 * for (b) and `StatusCode::kDeadlineExceeded` for (c)
248 */
250 std::shared_ptr<grpc::Channel> channel,
251 std::chrono::system_clock::time_point deadline);
252
253 private:
254 friend std::shared_ptr<internal::CompletionQueueImpl>
255 internal::GetCompletionQueueImpl(CompletionQueue const& cq);
256 std::shared_ptr<internal::CompletionQueueImpl> impl_;
257};
258
259namespace internal {
260
261inline std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(
262 CompletionQueue const& cq) {
263 return cq.impl_;
264}
265
266template <typename Request, typename Response>
267future<StatusOr<Response>> MakeUnaryRpcImpl(
268 CompletionQueue& cq, GrpcAsyncCall<Request, Response> async_call,
269 Request const& request, std::shared_ptr<grpc::ClientContext> context) {
270 auto op =
271 std::make_shared<internal::AsyncUnaryRpcFuture<Request, Response>>();
272 auto impl = GetCompletionQueueImpl(cq);
273 impl->StartOperation(op, [&, c = std::move(context)](void* tag) {
274 op->Start(async_call, std::move(c), request, impl->cq(), tag);
275 });
276 return op->GetFuture();
277}
278
279} // namespace internal
280
282} // namespace cloud
283} // namespace google
284
285#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H
Represents a pending asynchronous operation.
Definition: async_operation.h:42
Call the functor associated with asynchronous operations when they complete.
Definition: completion_queue.h:47
future< StatusOr< Response > > MakeUnaryRpc(AsyncCallType async_call, Request const &request, std::unique_ptr< grpc::ClientContext > context)
Make an asynchronous unary RPC.
Definition: completion_queue.h:132
std::shared_ptr< AsyncOperation > MakeStreamingReadRpc(AsyncCallType &&async_call, Request const &request, std::unique_ptr< grpc::ClientContext > context, OnReadHandler &&on_read, OnFinishHandler &&on_finish)
Make an asynchronous streaming read RPC.
Definition: completion_queue.h:171
void CancelAll()
Cancel all pending operations.
Definition: completion_queue.h:65
CompletionQueue(std::shared_ptr< internal::CompletionQueueImpl > impl)
Definition: completion_queue.h:50
future< StatusOr< std::chrono::system_clock::time_point > > MakeRelativeTimer(std::chrono::duration< Rep, Period > duration)
Create a timer that fires after the duration.
Definition: completion_queue.h:102
void RunAsync(Functor &&functor)
Asynchronously run a functor on a thread Run()ning the CompletionQueue.
Definition: completion_queue.h:193
void Run()
Run the completion queue event loop.
Definition: completion_queue.h:59
google::cloud::future< StatusOr< std::chrono::system_clock::time_point > > MakeDeadlineTimer(std::chrono::system_clock::time_point deadline)
Create a timer that fires at deadline.
Definition: completion_queue.h:77
future< Status > AsyncWaitConnectionReady(std::shared_ptr< grpc::Channel > channel, std::chrono::system_clock::time_point deadline)
Asynchronously wait for a connection to become ready.
void Shutdown()
Terminate the completion queue event loop.
Definition: completion_queue.h:62
Represents success or an error with info about the error.
Definition: status.h:295
friend class future
Definition: future_generic.h:137
Contains all the Google Cloud C++ Library APIs.
Definition: async_operation.h:23
Definition: async_operation.h:22
#define GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Definition: version.h:45
#define GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
Definition: version.h:43