Skip to content

Commit

Permalink
DX-61034: Arrow changes from apacheGH-34865 PR
Browse files Browse the repository at this point in the history
  • Loading branch information
stevelorddremio committed Jan 23, 2024
1 parent 31cf166 commit 2b478fe
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 164 deletions.
25 changes: 0 additions & 25 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -703,31 +703,6 @@ Status FlightClient::DoExchange(const FlightCallOptions& options,
return Status::OK();
}

// PHOXME implement these...
arrow::Result<SetSessionOptionsResult> FlightClient::SetSessionOptions(
const FlightCallOptions& options, const SetSessionOptionsRequest& request) {
RETURN_NOT_OK(CheckOpen());
/*
*/
}

arrow::Result<GetSessionOptionsResult> FlightClient::GetSessionOptions(
const FlightCallOptions& options) {
RETURN_NOT_OK(CheckOpen());
/*
*/
}

arrow::Result<> FlightClient::CloseSession(const FlightCallOptions& options) {
RETURN_NOT_OK(CheckOpen());
/*
*/
}

::arrow::Result<SetSessionOptionsResult>
FlightClient::SetSessionOptions(
const FlightCallOptions& options,
const SetSessionOptionsRequest& request) {
::arrow::Result<SetSessionOptionsResult> FlightClient::SetSessionOptions(
const FlightCallOptions& options, const SetSessionOptionsRequest& request) {
RETURN_NOT_OK(CheckOpen());
Expand Down
8 changes: 1 addition & 7 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,7 @@ class ARROW_FLIGHT_EXPORT FlightClient {
*reader = std::move(output.reader);
return Status::OK();
}
/// \\brief Set server session option(s) by key/value. Sessions are generally
/// persisted via HTTP cookies.
/// \param[in] options Per-RPC options
/// \param[in] request The server session options to set
::arrow::Result<SetSessionOptionsResult> SetSessionOptions(
const FlightCallOptions& options,
const SetSessionOptionsRequest& request);

/// \\brief Set server session option(s) by key/value. Sessions are generally
/// persisted via HTTP cookies.
/// \param[in] options Per-RPC options
Expand Down
4 changes: 1 addition & 3 deletions cpp/src/arrow/flight/sql/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -755,9 +755,7 @@ Status FlightSqlServerBase::DoPut(const ServerCallContext& context,

Status FlightSqlServerBase::ListActions(const ServerCallContext& context,
std::vector<ActionType>* actions) {
*actions = {ActionType::kCancelFlightInfo,
ActionType::kRenewFlightEndpoint,
FlightSqlServerBase::kBeginSavepointActionType,
*actions = {FlightSqlServerBase::kBeginSavepointActionType,
FlightSqlServerBase::kBeginTransactionActionType,
FlightSqlServerBase::kCancelQueryActionType,
FlightSqlServerBase::kCreatePreparedStatementActionType,
Expand Down
7 changes: 0 additions & 7 deletions cpp/src/arrow/flight/sql/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -595,13 +595,6 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase {
virtual Status EndTransaction(const ServerCallContext& context,
const ActionEndTransactionRequest& request);

/// \brief Attempt to explicitly cancel a FlightInfo.
/// \param[in] context The call context.
/// \param[in] request The CancelFlightInfoRequest.
/// \return The cancellation result.
virtual arrow::Result<CancelFlightInfoResult> CancelFlightInfo(
const ServerCallContext& context, const CancelFlightInfoRequest& request);

/// \brief Set server session option(s).
/// \param[in] context The call context.
/// \param[in] request The session options to set.
Expand Down
37 changes: 8 additions & 29 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -831,35 +831,6 @@ std::string ActionType::ToString() const {
description, "'>");
}

<<<<<<< HEAD
=======
const ActionType ActionType::kCancelFlightInfo =
ActionType{"CancelFlightInfo",
"Explicitly cancel a running FlightInfo.\n"
"Request Message: CancelFlightInfoRequest\n"
"Response Message: CancelFlightInfoResult"};
const ActionType ActionType::kRenewFlightEndpoint =
ActionType{"RenewFlightEndpoint",
"Extend expiration time of the given FlightEndpoint.\n"
"Request Message: RenewFlightEndpointRequest\n"
"Response Message: Renewed FlightEndpoint"};
const ActionType ActionType::kSetSessionOptions =
ActionType{"SetSessionOptions",
"Set client session options by name/value pairs.\n"
"Request Message: SetSessionOptionsRequest\n"
"Response Message: SetSessionOptionsResult"};
const ActionType ActionType::kGetSessionOptions =
ActionType{"GetSessionOptions",
"Get current client session options\n"
"Request Message: GetSessionOptionsRequest\n"
"Response Message: GetSessionOptionsResult"};
const ActionType ActionType::kCloseSession =
ActionType{"CloseSession",
"Explicitly close/invalidate the cookie-specified client session.\n"
"Request Message: CloseSessionRequest\n"
"Response Message: CloseSessionResult"};

>>>>>>> 5df37ac44 (WIP: High-level design review ONLY)
bool ActionType::Equals(const ActionType& other) const {
return type == other.type && description == other.description;
}
Expand Down Expand Up @@ -1015,6 +986,14 @@ Status MetadataRecordBatchReader::Next(FlightStreamChunk* next) {
return Next().Value(next);
}

Status ResultStream::Drain() {
while (true) {
ARROW_ASSIGN_OR_RAISE(auto result, Next());
if (!result) break;
}
return Status::OK();
}

arrow::Result<std::vector<std::shared_ptr<RecordBatch>>>
MetadataRecordBatchReader::ToRecordBatches() {
std::vector<std::shared_ptr<RecordBatch>> batches;
Expand Down
94 changes: 6 additions & 88 deletions cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <cstddef>
#include <cstdint>
#include <map>
#include <memory>
#include <string>
#include <string_view>
Expand Down Expand Up @@ -601,94 +602,6 @@ class ARROW_FLIGHT_EXPORT FlightInfo {
mutable bool reconstructed_schema_;
};

/// \brief The information to process a long-running query.
class ARROW_FLIGHT_EXPORT PollInfo {
public:
/// The currently available results so far.
std::unique_ptr<FlightInfo> info = NULLPTR;
/// The descriptor the client should use on the next try. If unset,
/// the query is complete.
std::optional<FlightDescriptor> descriptor = std::nullopt;
/// Query progress. Must be in [0.0, 1.0] but need not be
/// monotonic or nondecreasing. If unknown, do not set.
std::optional<double> progress = std::nullopt;
/// Expiration time for this request. After this passes, the server
/// might not accept the poll descriptor anymore (and the query may
/// be cancelled). This may be updated on a call to PollFlightInfo.
std::optional<Timestamp> expiration_time = std::nullopt;

PollInfo()
: info(NULLPTR),
descriptor(std::nullopt),
progress(std::nullopt),
expiration_time(std::nullopt) {}

explicit PollInfo(std::unique_ptr<FlightInfo> info,
std::optional<FlightDescriptor> descriptor,
std::optional<double> progress,
std::optional<Timestamp> expiration_time)
: info(std::move(info)),
descriptor(std::move(descriptor)),
progress(progress),
expiration_time(expiration_time) {}

explicit PollInfo(const PollInfo& other)
: info(other.info ? std::make_unique<FlightInfo>(*other.info) : NULLPTR),
descriptor(other.descriptor),
progress(other.progress),
expiration_time(other.expiration_time) {}

/// \brief Get the wire-format representation of this type.
///
/// Useful when interoperating with non-Flight systems (e.g. REST
/// services) that may want to return Flight types.
arrow::Result<std::string> SerializeToString() const;

/// \brief Parse the wire-format representation of this type.
///
/// Useful when interoperating with non-Flight systems (e.g. REST
/// services) that may want to return Flight types.
static arrow::Result<std::unique_ptr<PollInfo>> Deserialize(
std::string_view serialized);

std::string ToString() const;

/// Compare two PollInfo for equality. This will compare the
/// serialized schema representations, NOT the logical equality of
/// the schemas.
bool Equals(const PollInfo& other) const;

friend bool operator==(const PollInfo& left, const PollInfo& right) {
return left.Equals(right);
}
friend bool operator!=(const PollInfo& left, const PollInfo& right) {
return !(left == right);
}
};

/// \brief The request of the CancelFlightInfoRequest action.
struct ARROW_FLIGHT_EXPORT CancelFlightInfoRequest {
std::unique_ptr<FlightInfo> info;

std::string ToString() const;
bool Equals(const CancelFlightInfoRequest& other) const;

friend bool operator==(const CancelFlightInfoRequest& left,
const CancelFlightInfoRequest& right) {
return left.Equals(right);
}
friend bool operator!=(const CancelFlightInfoRequest& left,
const CancelFlightInfoRequest& right) {
return !(left == right);
}

/// \brief Serialize this message to its wire-format representation.
arrow::Result<std::string> SerializeToString() const;

/// \brief Deserialize this message from its wire-format representation.
static arrow::Result<CancelFlightInfoRequest> Deserialize(std::string_view serialized);
};

/// \brief Variant supporting all possible value types for {Set,Get}SessionOptions
///
/// By convention, an attempt to set a valueless (std::monostate) SessionOptionValue
Expand Down Expand Up @@ -907,6 +820,11 @@ class ARROW_FLIGHT_EXPORT ResultStream {

ARROW_DEPRECATED("Deprecated in 8.0.0. Use Result-returning overload instead.")
Status Next(std::unique_ptr<Result>* info);

/// \brief Read and drop the remaining messages to get the error (if any) from a server.
/// \return Status OK if this is no error from a server, any other status if a
/// server returns an error.
Status Drain();
};

/// \brief A holder for a RecordBatch with associated Flight metadata.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,6 @@ void flightSqlExtension() throws Exception {
testScenario("flight_sql:extension");
}

@Test
void appMetadataFlightInfoEndpoint() throws Exception {
testScenario("app_metadata_flight_info_endpoint");
}

@Test
void sessionOptions() throws Exception {
testScenario("session_options");
Expand Down

0 comments on commit 2b478fe

Please sign in to comment.