Google Cloud C++ Client  1.42.0
C++ Client Library for Google Cloud Platform
stream_range.h
Go to the documentation of this file.
1 // Copyright 2020 Google LLC
2 //
3 // Licensed under the Apache License, Version 2.0 (the "License");
4 // you may not use this file except in compliance with the License.
5 // You may obtain a copy of the License at
6 //
7 // https://www.apache.org/licenses/LICENSE-2.0
8 //
9 // Unless required by applicable law or agreed to in writing, software
10 // distributed under the License is distributed on an "AS IS" BASIS,
11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 // See the License for the specific language governing permissions and
13 // limitations under the License.
14 
15 #ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STREAM_RANGE_H
16 #define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STREAM_RANGE_H
17 
18 #include "google/cloud/options.h"
19 #include "google/cloud/status.h"
20 #include "google/cloud/status_or.h"
21 #include "google/cloud/version.h"
22 #include "absl/types/variant.h"
23 #include <functional>
24 #include <iterator>
25 #include <utility>
26 
27 namespace google {
28 namespace cloud {
30 
31 // Defined below.
32 template <typename T>
33 class StreamRange;
34 
35 namespace internal {
36 
37 /**
38  * A function that repeatedly returns `T`s, and ends with a `Status`.
39  *
40  * This function should return instances of `T` from its underlying stream
41  * until there are no more. The end-of-stream is indicated by returning a
42  * `Status` indicating either success or an error. This function will not be
43  * invoked any more after it returns any `Status`.
44  *
45  * @par Example `StreamReader` that returns the integers from 1-10.
46  *
47  * @code
48  * int counter = 0;
49  * auto reader = [&counter]() -> StreamReader<int>::result_type {
50  * if (++counter <= 10) return counter;
51  * return Status{}; // OK
52  * };
53  * @endcode
54  */
55 template <typename T>
56 using StreamReader = std::function<absl::variant<Status, T>()>;
57 
58 // Defined below.
59 template <typename T>
60 StreamRange<T> MakeStreamRange(StreamReader<T>);
61 
62 } // namespace internal
63 
64 /**
65  * A `StreamRange<T>` is a range of `StatusOr<T>` where the end-of-stream is
66  * indicated by a non-OK `Status`.
67  *
68  * Callers can iterate the range using its `begin()` and `end()` members to
69  * access iterators that will work with any normal C++ constructs and
70  * algorithms that accept [Input Iterators][input-iter-link].
71  *
72  * Callers should only consume/iterate this range. There is no public way for a
73  * caller to construct a non-empty instance.
74  *
75  * @par Example: Iterating a range of 10 integers
76  *
77  * @code
78  * // Some function that returns a StreamRange<int>
79  * StreamRange<int> MakeRangeFromOneTo(int n);
80  *
81  * StreamRange<int> sr = MakeRangeFromOneTo(10);
82  * for (StatusOr<int> const& x : sr) {
83  * std::cout << *x << "\n";
84  * }
85  * @endcode
86  *
87  * [input-iter-link]: https://en.cppreference.com/w/cpp/named_req/InputIterator
88  */
89 template <typename T>
90 class StreamRange {
91  public:
92  /**
93  * An input iterator for a `StreamRange<T>` -- DO NOT USE DIRECTLY.
94  *
95  * Use `StreamRange::iterator` instead.
96  */
97  template <typename U>
98  class IteratorImpl {
99  public:
100  using iterator_category = std::input_iterator_tag;
101  using value_type = U;
102  using difference_type = std::size_t;
103  using reference = value_type&;
104  using pointer = value_type*;
105  using const_reference = value_type const&;
106  using const_pointer = value_type const*;
107 
108  /// Constructs an "end" iterator.
109  explicit IteratorImpl() = default;
110 
111  reference operator*() { return owner_->current_; }
112  pointer operator->() { return &owner_->current_; }
113  const_reference operator*() const { return owner_->current_; }
114  const_pointer operator->() const { return &owner_->current_; }
115 
117  owner_->Next();
118  is_end_ = owner_->is_end_;
119  return *this;
120  }
121 
123  auto copy = *this;
124  ++*this;
125  return copy;
126  }
127 
128  friend bool operator==(IteratorImpl const& a, IteratorImpl const& b) {
129  return a.is_end_ == b.is_end_;
130  }
131 
132  friend bool operator!=(IteratorImpl const& a, IteratorImpl const& b) {
133  return !(a == b);
134  }
135 
136  private:
137  friend class StreamRange;
138  explicit IteratorImpl(StreamRange* owner)
139  : owner_(owner), is_end_(owner_->is_end_) {}
140  StreamRange* owner_;
141  bool is_end_ = true;
142  };
143 
144  using value_type = StatusOr<T>;
145  using iterator = IteratorImpl<value_type>;
146  using difference_type = typename iterator::difference_type;
147  using reference = typename iterator::reference;
148  using pointer = typename iterator::pointer;
149  using const_reference = typename iterator::const_reference;
150  using const_pointer = typename iterator::const_pointer;
151 
152  /**
153  * Default-constructs an empty range.
154  */
155  StreamRange() = default;
156 
158  internal::OptionsSpan span(options_);
159  reader_ = nullptr;
160  }
161 
162  //@{
163  // @name Move-only
164  StreamRange(StreamRange const&) = delete;
165  StreamRange& operator=(StreamRange const&) = delete;
166  // NOLINTNEXTLINE(performance-noexcept-move-constructor)
167  StreamRange(StreamRange&&) = default;
168  // NOLINTNEXTLINE(performance-noexcept-move-constructor)
169  StreamRange& operator=(StreamRange&&) = default;
170  //@}
171 
172  iterator begin() { return iterator(this); }
173  iterator end() { return iterator(); }
174 
175  private:
176  void Next() {
177  // Jump to the end if we previously got an error.
178  if (!is_end_ && !current_ok_) {
179  is_end_ = true;
180  return;
181  }
182  struct UnpackVariant {
183  StreamRange& sr;
184  void operator()(Status&& status) {
185  sr.is_end_ = status.ok();
186  sr.current_ok_ = status.ok();
187  if (!status.ok()) sr.current_ = std::move(status);
188  }
189  void operator()(T&& t) {
190  sr.is_end_ = false;
191  sr.current_ok_ = true;
192  sr.current_ = std::move(t);
193  }
194  };
195  internal::OptionsSpan span(options_);
196  auto v = reader_();
197  absl::visit(UnpackVariant{*this}, std::move(v));
198  }
199 
200  template <typename U>
201  friend StreamRange<U> internal::MakeStreamRange(internal::StreamReader<U>);
202 
203  /**
204  * Constructs a `StreamRange<T>` that will use the given @p reader.
205  *
206  * The `T` objects are read from the caller-provided `internal::StreamReader`
207  * functor, which is invoked repeatedly as the range is iterated. The
208  * `internal::StreamReader` can return an OK `Status` to indicate a successful
209  * end of stream, or a non-OK `Status` to indicate an error, or a `T`. The
210  * `internal::StreamReader` will not be invoked again after it returns a
211  * `Status`.
212  *
213  * @par Example: Printing integers from 1-10.
214  *
215  * @code
216  * int counter = 0;
217  * auto reader = [&counter]() -> internal::StreamReader<int>::result_type {
218  * if (++counter <= 10) return counter;
219  * return Status{};
220  * };
221  * StreamRange<int> sr(std::move(reader));
222  * for (StatusOr<int> const& x : sr) {
223  * std::cout << *x << "\n";
224  * }
225  * @endcode
226  *
227  * @param reader must not be nullptr.
228  */
229  explicit StreamRange(internal::StreamReader<T> reader)
230  : reader_(std::move(reader)) {
231  Next();
232  }
233 
234  Options options_ = internal::CurrentOptions();
235  internal::StreamReader<T> reader_;
236  StatusOr<T> current_;
237  bool current_ok_ = false;
238  bool is_end_ = true;
239 };
240 
241 namespace internal {
242 
243 /**
244  * Factory to construct a `StreamRange<T>` with the given `StreamReader<T>`.
245  *
246  * Callers should explicitly specify the `T` parameter when calling this
247  * function template so that lambdas will implicitly convert to the underlying
248  * `StreamReader<T>` (i.e., std::function). For example:
249  *
250  * @code
251  * StreamReader<int> empty = MakeStreamReader<int>([] { return Status{}; });
252  * @endcode
253  */
254 template <typename T>
255 StreamRange<T> MakeStreamRange(StreamReader<T> reader) {
256  return StreamRange<T>{std::move(reader)};
257 }
258 
259 } // namespace internal
260 
262 } // namespace cloud
263 } // namespace google
264 
265 #endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_STREAM_RANGE_H