15#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H
16#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H
18#include "google/cloud/future.h"
19#include "google/cloud/internal/async_read_stream_impl.h"
20#include "google/cloud/internal/async_rpc_details.h"
21#include "google/cloud/internal/completion_queue_impl.h"
22#include "google/cloud/status_or.h"
23#include "google/cloud/version.h"
24#include "absl/meta/type_traits.h"
34std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(
37template <
typename Request,
typename Response>
38future<StatusOr<Response>> MakeUnaryRpcImpl(
40 Request
const& request, std::shared_ptr<grpc::ClientContext> context);
45
46
50 explicit CompletionQueue(std::shared_ptr<internal::CompletionQueueImpl> impl)
51 : impl_(std::move(impl)) {}
54
55
56
57
58
59 void Run() { impl_->Run(); }
62 void Shutdown() { impl_->Shutdown(); }
68
69
70
71
72
73
74
75
78 return impl_->MakeDeadlineTimer(deadline);
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 template <
typename Rep,
typename Period>
103 std::chrono::duration<Rep, Period> duration) {
104 return impl_->MakeRelativeTimer(
105 std::chrono::duration_cast<std::chrono::nanoseconds>(duration));
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
128 typename AsyncCallType,
typename Request,
129 typename Sig = internal::AsyncCallResponseType<AsyncCallType, Request>,
130 typename Response =
typename Sig::type,
131 typename std::enable_if<Sig::value,
int>::type = 0>
133 AsyncCallType async_call, Request
const& request,
134 std::unique_ptr<grpc::ClientContext> context) {
135 return internal::MakeUnaryRpcImpl<Request, Response>(
136 *
this, std::move(async_call), request, std::move(context));
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167 template <
typename AsyncCallType,
typename Request,
168 typename Response =
typename internal::
169 AsyncStreamingReadResponseType<AsyncCallType, Request>::type,
170 typename OnReadHandler,
typename OnFinishHandler>
172 AsyncCallType&& async_call, Request
const& request,
173 std::unique_ptr<grpc::ClientContext> context, OnReadHandler&& on_read,
174 OnFinishHandler&& on_finish) {
175 auto stream = internal::MakeAsyncReadStreamImpl<Response>(
176 std::forward<OnReadHandler>(on_read),
177 std::forward<OnFinishHandler>(on_finish));
178 stream->Start(std::forward<AsyncCallType>(async_call), request,
179 std::move(context), impl_);
184
185
186
187
188
189
190 template <
typename Functor,
191 typename std::enable_if<
192 internal::CheckRunAsyncCallback<Functor>::value,
int>::type = 0>
194 class Wrapper :
public internal::RunAsyncBase {
196 Wrapper(std::weak_ptr<internal::CompletionQueueImpl> impl, Functor&& f)
197 : impl_(std::move(impl)), fun_(std::forward<Functor>(f)) {}
198 ~Wrapper()
override =
default;
199 void exec()
override {
200 auto impl = impl_.lock();
207 std::weak_ptr<internal::CompletionQueueImpl> impl_;
208 absl::decay_t<Functor> fun_;
211 std::make_unique<Wrapper>(impl_, std::forward<Functor>(functor)));
215
216
217
218
219
220
221 template <
typename Functor,
222 typename std::enable_if<internal::is_invocable<Functor>::value,
225 class Wrapper :
public internal::RunAsyncBase {
227 explicit Wrapper(Functor&& f) : fun_(std::forward<Functor>(f)) {}
228 ~Wrapper()
override =
default;
229 void exec()
override { fun_(); }
232 absl::decay_t<Functor> fun_;
234 impl_->RunAsync(std::make_unique<Wrapper>(std::forward<Functor>(functor)));
238
239
240
241
242
243
244
245
246
247
248
250 std::shared_ptr<grpc::Channel> channel,
251 std::chrono::system_clock::time_point deadline);
254 friend std::shared_ptr<internal::CompletionQueueImpl>
256 std::shared_ptr<internal::CompletionQueueImpl> impl_;
261inline std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(
266template <
typename Request,
typename Response>
267future<StatusOr<Response>> MakeUnaryRpcImpl(
269 Request
const& request, std::shared_ptr<grpc::ClientContext> context) {
271 std::make_shared<internal::AsyncUnaryRpcFuture<Request, Response>>();
272 auto impl = GetCompletionQueueImpl(cq);
273 impl->StartOperation(op, [&, c = std::move(context)](
void* tag) {
274 op->Start(async_call, std::move(c), request, impl->cq(), tag);
276 return op->GetFuture();
Represents a pending asynchronous operation.
Definition: async_operation.h:42
Call the functor associated with asynchronous operations when they complete.
Definition: completion_queue.h:47
future< StatusOr< Response > > MakeUnaryRpc(AsyncCallType async_call, Request const &request, std::unique_ptr< grpc::ClientContext > context)
Make an asynchronous unary RPC.
Definition: completion_queue.h:132
std::shared_ptr< AsyncOperation > MakeStreamingReadRpc(AsyncCallType &&async_call, Request const &request, std::unique_ptr< grpc::ClientContext > context, OnReadHandler &&on_read, OnFinishHandler &&on_finish)
Make an asynchronous streaming read RPC.
Definition: completion_queue.h:171
void CancelAll()
Cancel all pending operations.
Definition: completion_queue.h:65
CompletionQueue(std::shared_ptr< internal::CompletionQueueImpl > impl)
Definition: completion_queue.h:50
future< StatusOr< std::chrono::system_clock::time_point > > MakeRelativeTimer(std::chrono::duration< Rep, Period > duration)
Create a timer that fires after the duration.
Definition: completion_queue.h:102
void RunAsync(Functor &&functor)
Asynchronously run a functor on a thread Run()ning the CompletionQueue.
Definition: completion_queue.h:193
void Run()
Run the completion queue event loop.
Definition: completion_queue.h:59
google::cloud::future< StatusOr< std::chrono::system_clock::time_point > > MakeDeadlineTimer(std::chrono::system_clock::time_point deadline)
Create a timer that fires at deadline.
Definition: completion_queue.h:77
future< Status > AsyncWaitConnectionReady(std::shared_ptr< grpc::Channel > channel, std::chrono::system_clock::time_point deadline)
Asynchronously wait for a connection to become ready.
void Shutdown()
Terminate the completion queue event loop.
Definition: completion_queue.h:62
Represents success or an error with info about the error.
Definition: status.h:295
friend class future
Definition: future_generic.h:137
Contains all the Google Cloud C++ Library APIs.
Definition: async_operation.h:23
Definition: async_operation.h:22
#define GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Definition: version.h:45
#define GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
Definition: version.h:43