Google Cloud C++ Client  2.7.0
C++ Client Library for Google Cloud Platform
stream_range.h
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STREAM_RANGE_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STREAM_RANGE_H
17 
18 #include "google/cloud/options.h"
19 #include "google/cloud/status.h"
20 #include "google/cloud/status_or.h"
21 #include "google/cloud/version.h"
22 #include "absl/types/variant.h"
23 #include <functional>
24 #include <iterator>
25 #include <utility>
26 
27 namespace google {
28 namespace cloud {
30 
31 // Defined below.
32 template <typename T>
33 class StreamRange;
34 
35 namespace internal {
36 
37 /**
38  * A function that repeatedly returns `T`s, and ends with a `Status`.
39  *
40  * This function should return instances of `T` from its underlying stream
41  * until there are no more. The end-of-stream is indicated by returning a
42  * `Status` indicating either success or an error. This function will not be
43  * invoked any more after it returns any `Status`.
44  *
45  * @par Example `StreamReader` that returns the integers from 1-10.
46  *
47  * @code
48  * int counter = 0;
49  * auto reader = [&counter]() -> StreamReader<int>::result_type {
50  * if (++counter <= 10) return counter;
51  * return Status{}; // OK
52  * };
53  * @endcode
54  */
55 template <typename T>
56 using StreamReader = std::function<absl::variant<Status, T>()>;
57 
58 // Defined below.
59 template <typename T>
60 StreamRange<T> MakeStreamRange(StreamReader<T>);
61 
62 } // namespace internal
63 
64 /**
65  * A `StreamRange<T>` is a range of `StatusOr<T>` where the end-of-stream is
66  * indicated by a non-OK `Status`.
67  *
68  * Callers can iterate the range using its `begin()` and `end()` members to
69  * access iterators that will work with any normal C++ constructs and
70  * algorithms that accept [Input Iterators][input-iter-link].
71  *
72  * Callers should only consume/iterate this range.
73  *
74  * @par Example: Iterating a range of 10 integers
75  *
76  * @code
77  * // Some function that returns a StreamRange<int>
78  * StreamRange<int> MakeRangeFromOneTo(int n);
79  *
80  * StreamRange<int> sr = MakeRangeFromOneTo(10);
81  * for (StatusOr<int> const& x : sr) {
82  * if (!x) {
83  * std::cerr << "Fail: " << x.status() << "\n";
84  * } else {
85  * std::cout << *x << "\n";
86  * }
87  * }
88  * @endcode
89  *
90  * @note To construct a `StreamRange<T>` for testing (e.g. to mock a
91  * `Connection::ListFoo` call), see #google::cloud::mocks::MakeStreamRange.
92  *
93  * [input-iter-link]: https://en.cppreference.com/w/cpp/named_req/InputIterator
94  */
95 template <typename T>
96 class StreamRange {
97  public:
98  /**
99  * An input iterator for a `StreamRange<T>` -- DO NOT USE DIRECTLY.
100  *
101  * Use `StreamRange::iterator` instead.
102  */
103  template <typename U>
104  class IteratorImpl {
105  public:
106  using iterator_category = std::input_iterator_tag;
107  using value_type = U;
108  using difference_type = std::size_t;
109  using reference = value_type&;
110  using pointer = value_type*;
111  using const_reference = value_type const&;
112  using const_pointer = value_type const*;
113 
114  /// Constructs an "end" iterator.
115  explicit IteratorImpl() = default;
116 
117  reference operator*() { return owner_->current_; }
118  pointer operator->() { return &owner_->current_; }
119  const_reference operator*() const { return owner_->current_; }
120  const_pointer operator->() const { return &owner_->current_; }
121 
123  owner_->Next();
124  is_end_ = owner_->is_end_;
125  return *this;
126  }
127 
129  auto copy = *this;
130  ++*this;
131  return copy;
132  }
133 
134  friend bool operator==(IteratorImpl const& a, IteratorImpl const& b) {
135  return a.is_end_ == b.is_end_;
136  }
137 
138  friend bool operator!=(IteratorImpl const& a, IteratorImpl const& b) {
139  return !(a == b);
140  }
141 
142  private:
143  friend class StreamRange;
144  explicit IteratorImpl(StreamRange* owner)
145  : owner_(owner), is_end_(owner_->is_end_) {}
146  StreamRange* owner_;
147  bool is_end_ = true;
148  };
149 
150  using value_type = StatusOr<T>;
151  using iterator = IteratorImpl<value_type>;
152  using difference_type = typename iterator::difference_type;
153  using reference = typename iterator::reference;
154  using pointer = typename iterator::pointer;
155  using const_reference = typename iterator::const_reference;
156  using const_pointer = typename iterator::const_pointer;
157 
158  /**
159  * Default-constructs an empty range.
160  */
161  StreamRange() = default;
162 
164  internal::OptionsSpan span(options_);
165  reader_ = nullptr;
166  }
167 
168  ///@{
169  /// @name Move-only
170  StreamRange(StreamRange const&) = delete;
171  StreamRange& operator=(StreamRange const&) = delete;
172  // NOLINTNEXTLINE(performance-noexcept-move-constructor)
173  StreamRange(StreamRange&&) = default;
174  // NOLINTNEXTLINE(performance-noexcept-move-constructor)
175  StreamRange& operator=(StreamRange&&) = default;
176  ///@}
177 
178  iterator begin() { return iterator(this); }
179  iterator end() { return iterator(); }
180 
181  private:
182  void Next() {
183  // Jump to the end if we previously got an error.
184  if (!is_end_ && !current_ok_) {
185  is_end_ = true;
186  return;
187  }
188  struct UnpackVariant {
189  StreamRange& sr;
190  void operator()(Status&& status) {
191  sr.is_end_ = status.ok();
192  sr.current_ok_ = status.ok();
193  if (!status.ok()) sr.current_ = std::move(status);
194  }
195  void operator()(T&& t) {
196  sr.is_end_ = false;
197  sr.current_ok_ = true;
198  sr.current_ = std::move(t);
199  }
200  };
201  internal::OptionsSpan span(options_);
202  auto v = reader_();
203  absl::visit(UnpackVariant{*this}, std::move(v));
204  }
205 
206  template <typename U>
207  friend StreamRange<U> internal::MakeStreamRange(internal::StreamReader<U>);
208 
209  /**
210  * Constructs a `StreamRange<T>` that will use the given @p reader.
211  *
212  * The `T` objects are read from the caller-provided `internal::StreamReader`
213  * functor, which is invoked repeatedly as the range is iterated. The
214  * `internal::StreamReader` can return an OK `Status` to indicate a successful
215  * end of stream, or a non-OK `Status` to indicate an error, or a `T`. The
216  * `internal::StreamReader` will not be invoked again after it returns a
217  * `Status`.
218  *
219  * @par Example: Printing integers from 1-10.
220  *
221  * @code
222  * int counter = 0;
223  * auto reader = [&counter]() -> internal::StreamReader<int>::result_type {
224  * if (++counter <= 10) return counter;
225  * return Status{};
226  * };
227  * StreamRange<int> sr(std::move(reader));
228  * for (StatusOr<int> const& x : sr) {
229  * std::cout << *x << "\n";
230  * }
231  * @endcode
232  *
233  * @param reader must not be nullptr.
234  */
235  explicit StreamRange(internal::StreamReader<T> reader)
236  : reader_(std::move(reader)) {
237  Next();
238  }
239 
240  Options options_ = internal::CurrentOptions();
241  internal::StreamReader<T> reader_;
242  StatusOr<T> current_;
243  bool current_ok_ = false;
244  bool is_end_ = true;
245 };
246 
247 namespace internal {
248 
249 /**
250  * Factory to construct a `StreamRange<T>` with the given `StreamReader<T>`.
251  *
252  * Callers should explicitly specify the `T` parameter when calling this
253  * function template so that lambdas will implicitly convert to the underlying
254  * `StreamReader<T>` (i.e., std::function). For example:
255  *
256  * @code
257  * StreamReader<int> empty = MakeStreamReader<int>([] { return Status{}; });
258  * @endcode
259  */
260 template <typename T>
261 StreamRange<T> MakeStreamRange(StreamReader<T> reader) {
262  return StreamRange<T>{std::move(reader)};
263 }
264 
265 } // namespace internal
266 
268 } // namespace cloud
269 } // namespace google
270 
271 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STREAM_RANGE_H