Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flight session management #14

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cpp/src/arrow/flight/sql/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -802,6 +802,15 @@ ::arrow::Result<CancelResult> FlightSqlClient::CancelQuery(
return Status::IOError("Server returned unknown result ", result.result());
}

::arrow::Result<std::vector<SetSessionOptionResult>> FlightSqlClient::SetSessionOptions(const FlightCallOptions& options,
const std::vector<SessionOption>& session_options) {

}

::arrow::Result<CloseSessionResult> FlightSqlClient::CloseSession(const FlightCallOptions& options) {

}

Status FlightSqlClient::Close() { return impl_->Close(); }

std::ostream& operator<<(std::ostream& os, CancelResult result) {
Expand Down
17 changes: 17 additions & 0 deletions cpp/src/arrow/flight/sql/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,23 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient {
/// \param[in] info The FlightInfo of the query to cancel.
::arrow::Result<CancelResult> CancelQuery(const FlightCallOptions& options,
const FlightInfo& info);

/// \brief Sets session options.
///
/// \param[in] options RPC-layer hints for this call.
/// \param[in] session_options The session options to set.
::arrow::Result<std::vector<SetSessionOptionResult>> SetSessionOptions(const FlightCallOptions& options,
const std::vector<SessionOption>& session_options);

/// \brief Explicitly closes the session if applicable.
///
/// \param[in] options RPC-layer hints for this call.
::arrow::Result<CloseSessionResult> CloseSession(const FlightCallOptions& options);

/// \brief Explicitly closes the session if applicable.
///
/// \param[in] options RPC-layer hints for this call.
::arrow::Result<CloseSessionResult> CloseSession(const FlightCallOptions& options);

/// \brief Explicitly shut down and clean up the client.
Status Close();
Expand Down
44 changes: 44 additions & 0 deletions cpp/src/arrow/flight/sql/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,17 @@ arrow::Result<ActionCancelQueryRequest> ParseActionCancelQueryRequest(
return result;
}

arrow::Result<ActionCloseSessionRequest> ParseActionCloseSessionRequest(
const google::protobuf::Any& any) {
pb::sql::ActionCloseSessionRequest command;
if (!any.UnpackTo(&command)) {
return Status::Invalid("Unable to unpack ActionCloseSessionRequest");
}

ActionCloseSessionRequest result;
return result;
}

arrow::Result<ActionCreatePreparedStatementRequest>
ParseActionCreatePreparedStatementRequest(const google::protobuf::Any& any) {
pb::sql::ActionCreatePreparedStatementRequest command;
Expand Down Expand Up @@ -359,6 +370,22 @@ arrow::Result<ActionEndTransactionRequest> ParseActionEndTransactionRequest(
return result;
}

arrow::Result<ActionSetSessionOptionRequest> ParseActionSetSessionOptionRequest(
const google::protobuf::Any& any) {
pb::sql::ActionSetSessionOptionRequest command;
if (!any.UnpackTo(&command)) {
return Status::Invalid("Unable to unpack ActionCloseSessionRequest");
}

ActionSetSessionOptionRequest result;
if (command.session_options_size() > 0) {
result.session_options.reserve(command.session_options_size());
result.session_options.assign(command.session_options().begin(), command.session_options().end());
}

return result;
}

arrow::Result<Result> PackActionResult(const google::protobuf::Message& message) {
google::protobuf::Any any;
if (!any.PackFrom(message)) {
Expand Down Expand Up @@ -747,8 +774,10 @@ Status FlightSqlServerBase::ListActions(const ServerCallContext& context,
FlightSqlServerBase::kCreatePreparedStatementActionType,
FlightSqlServerBase::kCreatePreparedSubstraitPlanActionType,
FlightSqlServerBase::kClosePreparedStatementActionType,
FlightSqlServerBase::kCloseSessionActionType,
FlightSqlServerBase::kEndSavepointActionType,
FlightSqlServerBase::kEndTransactionActionType,
FlightSqlServerBase::kSetSessionOptionActionType
};
return Status::OK();
}
Expand Down Expand Up @@ -782,9 +811,17 @@ Status FlightSqlServerBase::DoAction(const ServerCallContext& context,
ARROW_ASSIGN_OR_RAISE(ActionCancelQueryRequest internal_command,
ParseActionCancelQueryRequest(any));
ARROW_ASSIGN_OR_RAISE(CancelResult result, CancelQuery(context, internal_command));
} else if (action.type == FlightSqlServerBase::kCloseSessionActionType.type) {
ARROW_ASSIGN_OR_RAISE(ActionCloseSessionRequest internal_command,
ParseActionCloseSessionRequest(any));
ARROW_ASSIGN_OR_RAISE(CloseSessionResult result, CloseSession(context, internal_command));
ARROW_ASSIGN_OR_RAISE(Result packed_result, PackActionResult(result));

results.push_back(std::move(packed_result));
} else if (action.type == FlightSqlServerBase::kClosePreparedStatementActionType.type) {
ARROW_ASSIGN_OR_RAISE(ActionClosePreparedStatementRequest internal_command,
ParseActionClosePreparedStatementRequest(any));
ARROW_RETURN_NOT_OK(ClosePreparedStatement(context, internal_command));
} else if (action.type ==
FlightSqlServerBase::kCreatePreparedStatementActionType.type) {
ARROW_ASSIGN_OR_RAISE(ActionCreatePreparedStatementRequest internal_command,
Expand Down Expand Up @@ -815,6 +852,13 @@ Status FlightSqlServerBase::DoAction(const ServerCallContext& context,
ARROW_ASSIGN_OR_RAISE(ActionEndTransactionRequest internal_command,
ParseActionEndTransactionRequest(any));
ARROW_RETURN_NOT_OK(EndTransaction(context, internal_command));
} else if (action.type == FlightSqlServerBase::kSetSessionOptionActionType.type) {
ARROW_ASSIGN_OR_RAISE(ActionSetSessionOptionRequest internal_command,
ParseActionSetSessionOptionRequest(any));
ARROW_ASSIGN_OR_RAISE(SetSessionOptionResult result, SetSessionOption(context, internal_command));
ARROW_ASSIGN_OR_RAISE(Result packed_result, PackActionResult(result));

results.push_back(std::move(packed_result));
} else {
return Status::NotImplemented("Action not implemented: ", action.type);
}
Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/flight/sql/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase {
"Closes a reusable prepared statement resource on the server.\n"
"Request Message: ActionClosePreparedStatementRequest\n"
"Response Message: N/A"};
const ActionType kCloseSessionActionType =
ActionType{"CloseSession",
"Explicitly close an open session.\n"
"Request Message: ActionCloseSessionRequest\n"
"Response Message: ActionCloseSessionResult"};
const ActionType kEndSavepointActionType =
ActionType{"EndSavepoint",
"End a savepoint.\n"
Expand All @@ -671,6 +676,16 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase {
"End a savepoint.\n"
"Request Message: ActionEndTransactionRequest\n"
"Response Message: N/A"};
const ActionType kSetSessionOptionActionType =
ActionType{"SetSessionOption",
"Set a series of session options.\n"
"Request Message: ActionSetSessionOptionRequest\n"
"Response Message: ActionSetSessionOptionResult"};
const ActionType kGetSessionOptionActionType =
ActionType{"GetSessionOption",
"Get a series of session options.\n"
"Request Message: ActionGetSessionOptionRequest\n"
"Response Message: ActionGetSessionOptionResult"};

Status ListActions(const ServerCallContext& context,
std::vector<ActionType>* actions) final;
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/flight/sql/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ using SqlInfoResult =
/// \brief Map SQL info identifier to its value.
using SqlInfoResultMap = std::unordered_map<int32_t, SqlInfoResult>;

/// \brief Variant supporting all possible types for SetSessionOption
using SessionOptionValue =
std::variant<std::string, bool, int64_t, int32_t, double, float, std::vector<std::string>>;

struct ARROW_FLIGHT_SQL_EXPORT SessionOption {
std::string option_key;
SessionOptionValue option_value;
}

/// \brief Options to be set in the SqlInfo.
struct ARROW_FLIGHT_SQL_EXPORT SqlInfoOptions {
/// \brief Predefined info values for GetSqlInfo.
Expand Down
28 changes: 28 additions & 0 deletions cpp/src/arrow/flight/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,34 @@ std::string Location::scheme() const {
return scheme;
}

std::string Location::path() const { return uri_->path(); }
arrow::Result<std::vector<std::pair<std::string, std::string>>> Location::query_items()
const {
return uri_->query_items();
}

arrow::Result<std::vector<std::pair<std::string, std::string>>> Location::as_headers()
const {
std::string catalog = path();
if (catalog.empty()) {
return query_items();
}

std::vector<std::pair<std::string, std::string>> headers;

auto query_items_result = query_items();
if (!query_items_result.ok()) {
return query_items_result;
}
std::vector<std::pair<std::string, std::string>> items(*query_items_result);
headers.reserve(items.size() + 1);

headers.emplace_back(std::pair<std::string, std::string>("catalog", catalog));
headers.insert(headers.end(), items.begin(), items.end());

return headers;
}

bool Location::Equals(const Location& other) const {
return ToString() == other.ToString();
}
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/flight/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,15 @@ struct ARROW_FLIGHT_EXPORT Location {
/// \brief Get the scheme of this URI.
std::string scheme() const;

/// \brief Get the path of this URI.
std::string path() const;

/// \brief Get the query parameters of this URI.
arrow::Result<std::vector<std::pair<std::string, std::string>>> query_items() const;

/// \brief Convert URI path and parameters to headers.
arrow::Result<std::vector<std::pair<std::string, std::string>>> as_headers() const;

bool Equals(const Location& other) const;

friend bool operator==(const Location& left, const Location& right) {
Expand Down
66 changes: 66 additions & 0 deletions format/FlightSql.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1842,6 +1842,72 @@ message ActionCancelQueryResult {
CancelResult result = 1;
}

/*
* Request message for the "Close Session" action.
*/
message ActionCloseSessionRequest {
option (experimental) = true;
}

/*
* The result of closing a session.
*
* The result should be wrapped in a google.protobuf.Any message.
*/
message ActionCloseSessionResult {
option (experimental) = true;

enum CloseSessionResult {
// The cancellation status is unknown. Servers should avoid using
// this value (send a NOT_FOUND error if the requested query is
// not known). Clients can retry the request.
CLOSE_RESULT_UNSPECIFIED = 0;
// The session close request is complete. Subsequent requests with
// a NOT_FOUND error.
CLOSE_RESULT_CLOSED = 1;
// The session close request is in progress. The client may retry
// the close request.
CLOSE_RESULT_CLOSING = 2;
// The session is not closeable. The client should not retry the
// close request.
CLOSE_RESULT_NOT_CLOSEABLE = 3;
}

CloseSessionResult result = 1;
}

message SessionOption {
option (experimental) = true;

string option_name = 1;
bytes option_value = 2;
}

message ActionSetSessionOptionRequest {
option (experimental) = true;

repeated SessionOption session_options = 1;
}

message ActionSetSessionOptionResult {
option (experimental) = true;

enum SetSessionOptionResult {
// The status of setting the option is unknown. Servers should avoid using
// this value (send a NOT_FOUND error if the requested query is
// not known). Clients can retry the request.
SET_SESSION_OPTION_RESULT_UNSPECIFIED = 0;
// The session option setting completed successfully.
SET_SESSION_OPTION_RESULT_OK = 1;
// The session cannot be set to the given value.
SET_SESSION_OPTION_RESULT_INVALID_VALUE = 2;
// The session cannot be set.
SET_SESSION_OPTION_RESULT_ERROR = 2;
}

SetSessionOptionResult result = 1;
}

extend google.protobuf.MessageOptions {
bool experimental = 1000;
}