15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_COMPLETION_QUEUE_H
18 #include "google/cloud/future.h"
19 #include "google/cloud/internal/async_read_stream_impl.h"
20 #include "google/cloud/internal/async_rpc_details.h"
21 #include "google/cloud/internal/completion_queue_impl.h"
22 #include "google/cloud/status_or.h"
23 #include "google/cloud/version.h"
24 #include "absl/memory/memory.h"
25 #include "absl/meta/type_traits.h"
34 std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(
45 : impl_(std::move(impl)) {}
53 void Run() { impl_->Run(); }
72 return impl_->MakeDeadlineTimer(deadline);
95 template <
typename Rep,
typename Period>
97 std::chrono::duration<Rep, Period> duration) {
98 return impl_->MakeRelativeTimer(
99 std::chrono::duration_cast<std::chrono::nanoseconds>(duration));
122 typename AsyncCallType,
typename Request,
123 typename Sig = internal::AsyncCallResponseType<AsyncCallType, Request>,
124 typename Response =
typename Sig::type,
125 typename std::enable_if<Sig::value,
int>::type = 0>
127 AsyncCallType async_call, Request
const& request,
128 std::unique_ptr<grpc::ClientContext> context) {
130 std::make_shared<internal::AsyncUnaryRpcFuture<Request, Response>>();
131 impl_->StartOperation(op, [&](
void* tag) {
132 op->Start(async_call, std::move(context), request, impl_->cq(), tag);
134 return op->GetFuture();
165 template <
typename AsyncCallType,
typename Request,
166 typename Response =
typename internal::
167 AsyncStreamingReadResponseType<AsyncCallType, Request>::type,
168 typename OnReadHandler,
typename OnFinishHandler>
170 AsyncCallType&& async_call, Request
const& request,
171 std::unique_ptr<grpc::ClientContext> context, OnReadHandler&& on_read,
172 OnFinishHandler&& on_finish) {
173 auto stream = internal::MakeAsyncReadStreamImpl<Response>(
174 std::forward<OnReadHandler>(on_read),
175 std::forward<OnFinishHandler>(on_finish));
176 stream->Start(std::forward<AsyncCallType>(async_call), request,
177 std::move(context), impl_);
188 template <
typename Functor,
189 typename std::enable_if<
190 internal::CheckRunAsyncCallback<Functor>::value,
int>::type = 0>
192 class Wrapper :
public internal::RunAsyncBase {
194 Wrapper(std::weak_ptr<internal::CompletionQueueImpl> impl, Functor&& f)
195 : impl_(std::move(impl)), fun_(std::forward<Functor>(f)) {}
196 ~Wrapper()
override =
default;
197 void exec()
override {
198 auto impl = impl_.lock();
205 std::weak_ptr<internal::CompletionQueueImpl> impl_;
206 absl::decay_t<Functor> fun_;
209 absl::make_unique<Wrapper>(impl_, std::forward<Functor>(functor)));
219 template <
typename Functor,
220 typename std::enable_if<internal::is_invocable<Functor>::value,
223 class Wrapper :
public internal::RunAsyncBase {
225 explicit Wrapper(Functor&& f) : fun_(std::forward<Functor>(f)) {}
226 ~Wrapper()
override =
default;
227 void exec()
override { fun_(); }
230 absl::decay_t<Functor> fun_;
232 impl_->RunAsync(absl::make_unique<Wrapper>(std::forward<Functor>(functor)));
248 std::shared_ptr<grpc::Channel> channel,
249 std::chrono::system_clock::time_point deadline);
252 friend std::shared_ptr<internal::CompletionQueueImpl>
254 std::shared_ptr<internal::CompletionQueueImpl> impl_;
259 inline std::shared_ptr<CompletionQueueImpl> GetCompletionQueueImpl(