Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

refactor: copy PaginationRange from spanner #168

Merged
merged 2 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions google/cloud/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,8 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC_UTILS)
internal/background_threads_impl.cc
internal/background_threads_impl.h
internal/completion_queue_impl.cc
internal/completion_queue_impl.h)
internal/completion_queue_impl.h
internal/pagination_range.h)
target_link_libraries(
google_cloud_cpp_grpc_utils
PUBLIC googleapis-c++::rpc_status_protos google_cloud_cpp_common
Expand Down Expand Up @@ -360,7 +361,8 @@ if (GOOGLE_CLOUD_CPP_ENABLE_GRPC_UTILS)
completion_queue_test.cc
connection_options_test.cc
grpc_error_delegate_test.cc
internal/background_threads_impl_test.cc)
internal/background_threads_impl_test.cc
internal/pagination_range_test.cc)

# Export the list of unit tests so the Bazel BUILD file can pick it up.
export_list_to_bazel("google_cloud_cpp_grpc_utils_unit_tests.bzl"
Expand Down
1 change: 1 addition & 0 deletions google/cloud/google_cloud_cpp_grpc_utils.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ google_cloud_cpp_grpc_utils_hdrs = [
"internal/async_read_stream_impl.h",
"internal/background_threads_impl.h",
"internal/completion_queue_impl.h",
"internal/pagination_range.h",
]

google_cloud_cpp_grpc_utils_srcs = [
Expand Down
1 change: 1 addition & 0 deletions google/cloud/google_cloud_cpp_grpc_utils_unit_tests.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,5 @@ google_cloud_cpp_grpc_utils_unit_tests = [
"connection_options_test.cc",
"grpc_error_delegate_test.cc",
"internal/background_threads_impl_test.cc",
"internal/pagination_range_test.cc",
]
214 changes: 214 additions & 0 deletions google/cloud/internal/pagination_range.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_PAGINATION_RANGE_H
#define GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_PAGINATION_RANGE_H

#include "google/cloud/status_or.h"
#include "google/cloud/version.h"
#include <google/protobuf/util/message_differencer.h>
#include <functional>
#include <iterator>
#include <string>
#include <utility>
#include <vector>

namespace google {
namespace cloud {
inline namespace GOOGLE_CLOUD_CPP_NS {
namespace internal {

/**
* An input iterator for a class with the same interface as `PaginationRange`.
*/
template <typename T, typename Range>
class PaginationIterator {
public:
//@{
/// @name Iterator traits
using iterator_category = std::input_iterator_tag;
using value_type = StatusOr<T>;
using difference_type = std::ptrdiff_t;
using pointer = value_type*;
using reference = value_type&;
//@}

PaginationIterator() : owner_(nullptr) {}

PaginationIterator& operator++() {
*this = owner_->GetNext();
return *this;
}

PaginationIterator operator++(int) {
PaginationIterator tmp(*this);
operator++();
return tmp;
}

value_type const* operator->() const { return &value_; }
value_type* operator->() { return &value_; }

value_type const& operator*() const& { return value_; }
value_type& operator*() & { return value_; }
#if GOOGLE_CLOUD_CPP_HAVE_CONST_REF_REF
value_type const&& operator*() const&& { return std::move(value_); }
#endif // GOOGLE_CLOUD_CPP_HAVE_CONST_REF_REF
value_type&& operator*() && { return std::move(value_); }

private:
friend Range;

friend bool operator==(PaginationIterator const& lhs,
PaginationIterator const& rhs) {
// Iterators on different streams are always different.
if (lhs.owner_ != rhs.owner_) {
return false;
}
// All end iterators are equal.
if (lhs.owner_ == nullptr) {
return true;
}
// Iterators on the same stream are equal if they point to the same object.
if (lhs.value_.ok() && rhs.value_.ok()) {
return google::protobuf::util::MessageDifferencer::Equals(*lhs.value_,
*rhs.value_);
}
// If one is an error and the other is not then they must be different,
// because only one iterator per range can have an error status. For the
// same reason, if both have an error they both are pointing to the same
// element.
return lhs.value_.ok() == rhs.value_.ok();
}

friend bool operator!=(PaginationIterator const& lhs,
PaginationIterator const& rhs) {
return !(lhs == rhs);
}

PaginationIterator(Range* owner, value_type value)
: owner_(owner), value_(std::move(value)) {}

Range* owner_;
value_type value_;
};

/**
* Adapt pagination APIs to look like input ranges.
*
* A number of gRPC APIs iterate over the elements in a "collection" using
* pagination APIs. The application calls a `List*()` RPC which returns
* a "page" of elements and a token, calling the same `List*()` RPC with the
* token returns the next "page". We want to expose these APIs as input ranges
* in the C++ client libraries. This class performs that work.
*
* @tparam T the type of the items, typically a proto describing the resources
* @tparam Request the type of the request object for the `List` RPC.
* @tparam Response the type of the response object for the `List` RPC.
*/
template <typename T, typename Request, typename Response>
class PaginationRange {
public:
/**
* Create a new range to paginate over some elements.
*
* @param request the first request to start the iteration, the library may
* initialize this request with any filtering constraints.
* @param loader makes the RPC request to fetch a new page of items.
* @param get_items extracts the items from the response using native C++
* types (as opposed to the proto types used in `Response`).
*/
PaginationRange(Request request,
std::function<StatusOr<Response>(Request const& r)> loader,
std::function<std::vector<T>(Response r)> get_items)
: request_(std::move(request)),
next_page_loader_(std::move(loader)),
get_items_(std::move(get_items)),
on_last_page_(false) {
current_ = current_page_.begin();
}

/// The iterator type for this Range.
using iterator = PaginationIterator<T, PaginationRange>;

/**
* Return an iterator over the range of `T` objects.
*
* The returned iterator is a single-pass input iterator that reads new `T`
* objects from the underlying `PaginationRange` when incremented.
*
* Creating, and particularly incrementing, multiple iterators on the same
* PaginationRange<> is unsupported and can produce incorrect results.
*/
iterator begin() { return GetNext(); }

/// Return an iterator pointing to the end of the stream.
iterator end() { return PaginationIterator<T, PaginationRange>{}; }

protected:
friend class PaginationIterator<T, PaginationRange>;

/**
* Fetches (or returns if already fetched) the next object from the stream.
*
* @return An iterator pointing to the next element in the stream. On error,
* it returns an iterator that is different from `.end()`, but has an error
* status. If the stream is exhausted, it returns the `.end()` iterator.
*/
iterator GetNext() {
static Status const kPastTheEndError(
StatusCode::kFailedPrecondition,
"Cannot iterating past the end of ListObjectReader");
if (current_page_.end() == current_) {
if (on_last_page_) {
return iterator(nullptr, kPastTheEndError);
}
request_.set_page_token(std::move(next_page_token_));
auto response = next_page_loader_(request_);
if (!response.ok()) {
next_page_token_.clear();
current_page_.clear();
on_last_page_ = true;
current_ = current_page_.begin();
return iterator(this, std::move(response).status());
}
next_page_token_ = std::move(*response->mutable_next_page_token());
current_page_ = get_items_(*std::move(response));
current_ = current_page_.begin();
if (next_page_token_.empty()) {
on_last_page_ = true;
}
if (current_page_.end() == current_) {
return iterator(nullptr, kPastTheEndError);
}
}
return iterator(this, std::move(*current_++));
}

private:
Request request_;
std::function<StatusOr<Response>(Request const& r)> next_page_loader_;
std::function<std::vector<T>(Response r)> get_items_;
std::vector<T> current_page_;
typename std::vector<T>::iterator current_;
std::string next_page_token_;
bool on_last_page_;
};

} // namespace internal
} // namespace GOOGLE_CLOUD_CPP_NS
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_GOOGLE_CLOUD_INTERNAL_PAGINATION_RANGE_H
Loading