Google Cloud C++ Client 2.10.1
C++ Client Library for Google Cloud Platform
Loading...
Searching...
No Matches
stream_range.h
1// Copyright 2020 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// https://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STREAM_RANGE_H
16#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STREAM_RANGE_H
17
18#include "google/cloud/internal/call_context.h"
19#include "google/cloud/status.h"
20#include "google/cloud/status_or.h"
21#include "google/cloud/version.h"
22#include "absl/types/variant.h"
23#include <functional>
24#include <iterator>
25#include <utility>
26
27namespace google {
28namespace cloud {
30
31// Defined below.
32template <typename T>
33class StreamRange;
34
35namespace internal {
36
37/**
38 * A function that repeatedly returns `T`s, and ends with a `Status`.
39 *
40 * This function should return instances of `T` from its underlying stream
41 * until there are no more. The end-of-stream is indicated by returning a
42 * `Status` indicating either success or an error. This function will not be
43 * invoked any more after it returns any `Status`.
44 *
45 * @par Example: returning the integers from 1-10.
46 *
47 * @code
48 * int counter = 0;
49 * auto reader = [&counter]() -> StreamReader<int>::result_type {
50 * if (++counter <= 10) return counter;
51 * return Status{}; // OK
52 * };
53 * @endcode
54 */
55template <typename T>
56using StreamReader = std::function<absl::variant<Status, T>()>;
57
58// Defined below.
59template <typename T>
60StreamRange<T> MakeStreamRange(StreamReader<T>);
61
62} // namespace internal
63
64/**
65 * A `StreamRange<T>` is a range of `StatusOr<T>` where the end-of-stream is
66 * indicated by a non-OK `Status`.
67 *
68 * Callers can iterate the range using its `begin()` and `end()` members to
69 * access iterators that will work with any normal C++ constructs and
70 * algorithms that accept [Input Iterators][input-iter-link].
71 *
72 * Callers should only consume/iterate this range.
73 *
74 * @par Example: Iterating a range of 10 integers
75 *
76 * @code
77 * // Some function that returns a StreamRange<int>
78 * StreamRange<int> MakeRangeFromOneTo(int n);
79 *
80 * StreamRange<int> sr = MakeRangeFromOneTo(10);
81 * for (StatusOr<int> const& x : sr) {
82 * if (!x) {
83 * std::cerr << "Fail: " << x.status() << "\n";
84 * } else {
85 * std::cout << *x << "\n";
86 * }
87 * }
88 * @endcode
89 *
90 * @note To construct a `StreamRange<T>` for testing (e.g. to mock a
91 * `Connection::ListFoo` call), see #google::cloud::mocks::MakeStreamRange.
92 *
93 * [input-iter-link]: https://en.cppreference.com/w/cpp/named_req/InputIterator
94 */
95template <typename T>
96class StreamRange {
97 public:
98 /**
99 * An input iterator for a `StreamRange<T>` -- DO NOT USE DIRECTLY.
100 *
101 * Use `StreamRange::iterator` instead.
102 */
103 template <typename U>
104 class IteratorImpl {
105 public:
106 using iterator_category = std::input_iterator_tag;
107 using value_type = U;
108 using difference_type = std::size_t;
109 using reference = value_type&;
110 using pointer = value_type*;
111 using const_reference = value_type const&;
112 using const_pointer = value_type const*;
113
114 /// Constructs an "end" iterator.
115 explicit IteratorImpl() = default;
116
117 reference operator*() { return owner_->current_; }
118 pointer operator->() { return &owner_->current_; }
119 const_reference operator*() const { return owner_->current_; }
120 const_pointer operator->() const { return &owner_->current_; }
121
123 owner_->Next();
124 is_end_ = owner_->is_end_;
125 return *this;
126 }
127
128 IteratorImpl operator++(int) {
129 auto copy = *this;
130 ++*this;
131 return copy;
132 }
133
134 friend bool operator==(IteratorImpl const& a, IteratorImpl const& b) {
135 return a.is_end_ == b.is_end_;
136 }
137
138 friend bool operator!=(IteratorImpl const& a, IteratorImpl const& b) {
139 return !(a == b);
140 }
141
142 private:
143 friend class StreamRange;
144 explicit IteratorImpl(StreamRange* owner)
145 : owner_(owner), is_end_(owner_->is_end_) {}
146 StreamRange* owner_;
147 bool is_end_ = true;
148 };
149
150 using value_type = StatusOr<T>;
151 using iterator = IteratorImpl<value_type>;
152 using difference_type = typename iterator::difference_type;
153 using reference = typename iterator::reference;
154 using pointer = typename iterator::pointer;
155 using const_reference = typename iterator::const_reference;
156 using const_pointer = typename iterator::const_pointer;
157
158 /**
159 * Default-constructs an empty range.
160 */
161 StreamRange() = default;
162
163 ~StreamRange() {
164 internal::ScopedCallContext scope(call_context_);
165 reader_ = nullptr;
166 }
167
168 ///@{
169 /// @name Move-only
170 StreamRange(StreamRange const&) = delete;
171 StreamRange& operator=(StreamRange const&) = delete;
172 // NOLINTNEXTLINE(performance-noexcept-move-constructor)
173 StreamRange(StreamRange&&) = default;
174 // NOLINTNEXTLINE(performance-noexcept-move-constructor)
175 StreamRange& operator=(StreamRange&&) = default;
176 ///@}
177
178 iterator begin() { return iterator(this); }
179 iterator end() { return iterator(); }
180
181 private:
182 void Next() {
183 // Jump to the end if we previously got an error.
184 if (!is_end_ && !current_ok_) {
185 is_end_ = true;
186 return;
187 }
188 struct UnpackVariant {
189 StreamRange& sr;
190 void operator()(Status&& status) {
191 sr.is_end_ = status.ok();
192 sr.current_ok_ = status.ok();
193 if (!status.ok()) sr.current_ = std::move(status);
194 }
195 void operator()(T&& t) {
196 sr.is_end_ = false;
197 sr.current_ok_ = true;
198 sr.current_ = std::move(t);
199 }
200 };
201 internal::ScopedCallContext scope(call_context_);
202 auto v = reader_();
203 absl::visit(UnpackVariant{*this}, std::move(v));
204 }
205
206 template <typename U>
207 friend StreamRange<U> internal::MakeStreamRange(internal::StreamReader<U>);
208
209 /**
210 * Constructs a `StreamRange<T>` that will use the given @p reader.
211 *
212 * The `T` objects are read from the caller-provided `internal::StreamReader`
213 * functor, which is invoked repeatedly as the range is iterated. The
214 * `internal::StreamReader` can return an OK `Status` to indicate a successful
215 * end of stream, or a non-OK `Status` to indicate an error, or a `T`. The
216 * `internal::StreamReader` will not be invoked again after it returns a
217 * `Status`.
218 *
219 * @par Example: Printing integers from 1-10.
220 *
221 * @code
222 * int counter = 0;
223 * auto reader = [&counter]() -> internal::StreamReader<int>::result_type {
224 * if (++counter <= 10) return counter;
225 * return Status{};
226 * };
227 * StreamRange<int> sr(std::move(reader));
228 * for (StatusOr<int> const& x : sr) {
229 * std::cout << *x << "\n";
230 * }
231 * @endcode
232 *
233 * @param reader must not be nullptr.
234 */
235 explicit StreamRange(internal::StreamReader<T> reader)
236 : reader_(std::move(reader)) {
237 Next();
238 }
239
240 internal::CallContext call_context_;
241 internal::StreamReader<T> reader_;
242 StatusOr<T> current_;
243 bool current_ok_ = false;
244 bool is_end_ = true;
245};
246
247namespace internal {
248
249/**
250 * Factory to construct a `StreamRange<T>` with the given `StreamReader<T>`.
251 *
252 * Callers should explicitly specify the `T` parameter when calling this
253 * function template so that lambdas will implicitly convert to the underlying
254 * `StreamReader<T>` (i.e., std::function). For example:
255 *
256 * @code
257 * StreamReader<int> empty = MakeStreamReader<int>([] { return Status{}; });
258 * @endcode
259 */
260template <typename T>
261StreamRange<T> MakeStreamRange(StreamReader<T> reader) {
262 return StreamRange<T>{std::move(reader)};
263}
264
265} // namespace internal
266
268} // namespace cloud
269} // namespace google
270
271#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STREAM_RANGE_H
Represents success or an error with info about the error.
Definition: status.h:295
bool ok() const
Returns true if the status code is StatusCode::kOk.
Definition: status.h:328
An input iterator for a StreamRange<T> – DO NOT USE DIRECTLY.
Definition: stream_range.h:104
friend bool operator==(IteratorImpl const &a, IteratorImpl const &b)
Definition: stream_range.h:134
pointer operator->()
Definition: stream_range.h:118
IteratorImpl & operator++()
Definition: stream_range.h:122
IteratorImpl operator++(int)
Definition: stream_range.h:128
const_pointer operator->() const
Definition: stream_range.h:120
IteratorImpl()=default
Constructs an "end" iterator.
const_reference operator*() const
Definition: stream_range.h:119
reference operator*()
Definition: stream_range.h:117
friend bool operator!=(IteratorImpl const &a, IteratorImpl const &b)
Definition: stream_range.h:138
StreamRange & operator=(StreamRange &&)=default
iterator begin()
Definition: stream_range.h:178
StreamRange()=default
Default-constructs an empty range.
StreamRange(StreamRange const &)=delete
StreamRange(StreamRange &&)=default
iterator end()
Definition: stream_range.h:179
StreamRange & operator=(StreamRange const &)=delete
~StreamRange()
Definition: stream_range.h:163
Contains all the Google Cloud C++ Library APIs.
Definition: async_operation.h:23
Definition: async_operation.h:22
#define GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_END
Definition: version.h:45
#define GOOGLE_CLOUD_CPP_INLINE_NAMESPACE_BEGIN
Definition: version.h:43