diff --git a/cpp/src/arrow/flight/sql/client.cc b/cpp/src/arrow/flight/sql/client.cc index 25bf8e384ef06..f59e30741bd36 100644 --- a/cpp/src/arrow/flight/sql/client.cc +++ b/cpp/src/arrow/flight/sql/client.cc @@ -802,6 +802,15 @@ ::arrow::Result FlightSqlClient::CancelQuery( return Status::IOError("Server returned unknown result ", result.result()); } +::arrow::Result> FlightSqlClient::SetSessionOptions(const FlightCallOptions& options, + const std::vector& session_options) { + + } + +::arrow::Result FlightSqlClient::CloseSession(const FlightCallOptions& options) { + +} + Status FlightSqlClient::Close() { return impl_->Close(); } std::ostream& operator<<(std::ostream& os, CancelResult result) { diff --git a/cpp/src/arrow/flight/sql/client.h b/cpp/src/arrow/flight/sql/client.h index 648f71563e9c7..4784e048d1613 100644 --- a/cpp/src/arrow/flight/sql/client.h +++ b/cpp/src/arrow/flight/sql/client.h @@ -329,6 +329,23 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlClient { /// \param[in] info The FlightInfo of the query to cancel. ::arrow::Result CancelQuery(const FlightCallOptions& options, const FlightInfo& info); + + /// \brief Sets session options. + /// + /// \param[in] options RPC-layer hints for this call. + /// \param[in] session_options The session options to set. + ::arrow::Result> SetSessionOptions(const FlightCallOptions& options, + const std::vector& session_options); + + /// \brief Explicitly closes the session if applicable. + /// + /// \param[in] options RPC-layer hints for this call. + ::arrow::Result CloseSession(const FlightCallOptions& options); + + /// \brief Explicitly closes the session if applicable. + /// + /// \param[in] options RPC-layer hints for this call. + ::arrow::Result CloseSession(const FlightCallOptions& options); /// \brief Explicitly shut down and clean up the client. Status Close(); diff --git a/cpp/src/arrow/flight/sql/server.cc b/cpp/src/arrow/flight/sql/server.cc index 7f6d9b75a88f7..5ea4dd7323a63 100644 --- a/cpp/src/arrow/flight/sql/server.cc +++ b/cpp/src/arrow/flight/sql/server.cc @@ -269,6 +269,17 @@ arrow::Result ParseActionCancelQueryRequest( return result; } +arrow::Result ParseActionCloseSessionRequest( + const google::protobuf::Any& any) { + pb::sql::ActionCloseSessionRequest command; + if (!any.UnpackTo(&command)) { + return Status::Invalid("Unable to unpack ActionCloseSessionRequest"); + } + + ActionCloseSessionRequest result; + return result; +} + arrow::Result ParseActionCreatePreparedStatementRequest(const google::protobuf::Any& any) { pb::sql::ActionCreatePreparedStatementRequest command; @@ -359,6 +370,22 @@ arrow::Result ParseActionEndTransactionRequest( return result; } +arrow::Result ParseActionSetSessionOptionRequest( + const google::protobuf::Any& any) { + pb::sql::ActionSetSessionOptionRequest command; + if (!any.UnpackTo(&command)) { + return Status::Invalid("Unable to unpack ActionCloseSessionRequest"); + } + + ActionSetSessionOptionRequest result; + if (command.session_options_size() > 0) { + result.session_options.reserve(command.session_options_size()); + result.session_options.assign(command.session_options().begin(), command.session_options().end()); + } + + return result; +} + arrow::Result PackActionResult(const google::protobuf::Message& message) { google::protobuf::Any any; if (!any.PackFrom(message)) { @@ -747,8 +774,10 @@ Status FlightSqlServerBase::ListActions(const ServerCallContext& context, FlightSqlServerBase::kCreatePreparedStatementActionType, FlightSqlServerBase::kCreatePreparedSubstraitPlanActionType, FlightSqlServerBase::kClosePreparedStatementActionType, + FlightSqlServerBase::kCloseSessionActionType, FlightSqlServerBase::kEndSavepointActionType, FlightSqlServerBase::kEndTransactionActionType, + FlightSqlServerBase::kSetSessionOptionActionType }; return Status::OK(); } @@ -782,9 +811,17 @@ Status FlightSqlServerBase::DoAction(const ServerCallContext& context, ARROW_ASSIGN_OR_RAISE(ActionCancelQueryRequest internal_command, ParseActionCancelQueryRequest(any)); ARROW_ASSIGN_OR_RAISE(CancelResult result, CancelQuery(context, internal_command)); + } else if (action.type == FlightSqlServerBase::kCloseSessionActionType.type) { + ARROW_ASSIGN_OR_RAISE(ActionCloseSessionRequest internal_command, + ParseActionCloseSessionRequest(any)); + ARROW_ASSIGN_OR_RAISE(CloseSessionResult result, CloseSession(context, internal_command)); ARROW_ASSIGN_OR_RAISE(Result packed_result, PackActionResult(result)); results.push_back(std::move(packed_result)); + } else if (action.type == FlightSqlServerBase::kClosePreparedStatementActionType.type) { + ARROW_ASSIGN_OR_RAISE(ActionClosePreparedStatementRequest internal_command, + ParseActionClosePreparedStatementRequest(any)); + ARROW_RETURN_NOT_OK(ClosePreparedStatement(context, internal_command)); } else if (action.type == FlightSqlServerBase::kCreatePreparedStatementActionType.type) { ARROW_ASSIGN_OR_RAISE(ActionCreatePreparedStatementRequest internal_command, @@ -815,6 +852,13 @@ Status FlightSqlServerBase::DoAction(const ServerCallContext& context, ARROW_ASSIGN_OR_RAISE(ActionEndTransactionRequest internal_command, ParseActionEndTransactionRequest(any)); ARROW_RETURN_NOT_OK(EndTransaction(context, internal_command)); + } else if (action.type == FlightSqlServerBase::kSetSessionOptionActionType.type) { + ARROW_ASSIGN_OR_RAISE(ActionSetSessionOptionRequest internal_command, + ParseActionSetSessionOptionRequest(any)); + ARROW_ASSIGN_OR_RAISE(SetSessionOptionResult result, SetSessionOption(context, internal_command)); + ARROW_ASSIGN_OR_RAISE(Result packed_result, PackActionResult(result)); + + results.push_back(std::move(packed_result)); } else { return Status::NotImplemented("Action not implemented: ", action.type); } diff --git a/cpp/src/arrow/flight/sql/server.h b/cpp/src/arrow/flight/sql/server.h index 65f6670171dfd..fae573cccf40b 100644 --- a/cpp/src/arrow/flight/sql/server.h +++ b/cpp/src/arrow/flight/sql/server.h @@ -661,6 +661,11 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase { "Closes a reusable prepared statement resource on the server.\n" "Request Message: ActionClosePreparedStatementRequest\n" "Response Message: N/A"}; + const ActionType kCloseSessionActionType = + ActionType{"CloseSession", + "Explicitly close an open session.\n" + "Request Message: ActionCloseSessionRequest\n" + "Response Message: ActionCloseSessionResult"}; const ActionType kEndSavepointActionType = ActionType{"EndSavepoint", "End a savepoint.\n" @@ -671,6 +676,16 @@ class ARROW_FLIGHT_SQL_EXPORT FlightSqlServerBase : public FlightServerBase { "End a savepoint.\n" "Request Message: ActionEndTransactionRequest\n" "Response Message: N/A"}; + const ActionType kSetSessionOptionActionType = + ActionType{"SetSessionOption", + "Set a series of session options.\n" + "Request Message: ActionSetSessionOptionRequest\n" + "Response Message: ActionSetSessionOptionResult"}; + const ActionType kGetSessionOptionActionType = + ActionType{"GetSessionOption", + "Get a series of session options.\n" + "Request Message: ActionGetSessionOptionRequest\n" + "Response Message: ActionGetSessionOptionResult"}; Status ListActions(const ServerCallContext& context, std::vector* actions) final; diff --git a/cpp/src/arrow/flight/sql/types.h b/cpp/src/arrow/flight/sql/types.h index 293b1d5579ec0..a37915c8fb2ca 100644 --- a/cpp/src/arrow/flight/sql/types.h +++ b/cpp/src/arrow/flight/sql/types.h @@ -44,6 +44,15 @@ using SqlInfoResult = /// \brief Map SQL info identifier to its value. using SqlInfoResultMap = std::unordered_map; +/// \brief Variant supporting all possible types for SetSessionOption +using SessionOptionValue = + std::variant>; + +struct ARROW_FLIGHT_SQL_EXPORT SessionOption { + std::string option_key; + SessionOptionValue option_value; +} + /// \brief Options to be set in the SqlInfo. struct ARROW_FLIGHT_SQL_EXPORT SqlInfoOptions { /// \brief Predefined info values for GetSqlInfo. diff --git a/cpp/src/arrow/flight/types.cc b/cpp/src/arrow/flight/types.cc index a09f09ff9dbe1..cb77666f0a02d 100644 --- a/cpp/src/arrow/flight/types.cc +++ b/cpp/src/arrow/flight/types.cc @@ -389,6 +389,34 @@ std::string Location::scheme() const { return scheme; } +std::string Location::path() const { return uri_->path(); } +arrow::Result>> Location::query_items() + const { + return uri_->query_items(); +} + +arrow::Result>> Location::as_headers() + const { + std::string catalog = path(); + if (catalog.empty()) { + return query_items(); + } + + std::vector> headers; + + auto query_items_result = query_items(); + if (!query_items_result.ok()) { + return query_items_result; + } + std::vector> items(*query_items_result); + headers.reserve(items.size() + 1); + + headers.emplace_back(std::pair("catalog", catalog)); + headers.insert(headers.end(), items.begin(), items.end()); + + return headers; +} + bool Location::Equals(const Location& other) const { return ToString() == other.ToString(); } diff --git a/cpp/src/arrow/flight/types.h b/cpp/src/arrow/flight/types.h index 6957c5992a328..98279bf6eec7a 100644 --- a/cpp/src/arrow/flight/types.h +++ b/cpp/src/arrow/flight/types.h @@ -403,6 +403,15 @@ struct ARROW_FLIGHT_EXPORT Location { /// \brief Get the scheme of this URI. std::string scheme() const; + /// \brief Get the path of this URI. + std::string path() const; + + /// \brief Get the query parameters of this URI. + arrow::Result>> query_items() const; + + /// \brief Convert URI path and parameters to headers. + arrow::Result>> as_headers() const; + bool Equals(const Location& other) const; friend bool operator==(const Location& left, const Location& right) { diff --git a/format/FlightSql.proto b/format/FlightSql.proto index d8a6cb5bfdb07..76de93a6a5134 100644 --- a/format/FlightSql.proto +++ b/format/FlightSql.proto @@ -1842,6 +1842,72 @@ message ActionCancelQueryResult { CancelResult result = 1; } +/* + * Request message for the "Close Session" action. + */ +message ActionCloseSessionRequest { + option (experimental) = true; +} + +/* + * The result of closing a session. + * + * The result should be wrapped in a google.protobuf.Any message. + */ +message ActionCloseSessionResult { + option (experimental) = true; + + enum CloseSessionResult { + // The cancellation status is unknown. Servers should avoid using + // this value (send a NOT_FOUND error if the requested query is + // not known). Clients can retry the request. + CLOSE_RESULT_UNSPECIFIED = 0; + // The session close request is complete. Subsequent requests with + // a NOT_FOUND error. + CLOSE_RESULT_CLOSED = 1; + // The session close request is in progress. The client may retry + // the close request. + CLOSE_RESULT_CLOSING = 2; + // The session is not closeable. The client should not retry the + // close request. + CLOSE_RESULT_NOT_CLOSEABLE = 3; + } + + CloseSessionResult result = 1; +} + +message SessionOption { + option (experimental) = true; + + string option_name = 1; + bytes option_value = 2; +} + +message ActionSetSessionOptionRequest { + option (experimental) = true; + + repeated SessionOption session_options = 1; +} + +message ActionSetSessionOptionResult { + option (experimental) = true; + + enum SetSessionOptionResult { + // The status of setting the option is unknown. Servers should avoid using + // this value (send a NOT_FOUND error if the requested query is + // not known). Clients can retry the request. + SET_SESSION_OPTION_RESULT_UNSPECIFIED = 0; + // The session option setting completed successfully. + SET_SESSION_OPTION_RESULT_OK = 1; + // The session cannot be set to the given value. + SET_SESSION_OPTION_RESULT_INVALID_VALUE = 2; + // The session cannot be set. + SET_SESSION_OPTION_RESULT_ERROR = 2; + } + + SetSessionOptionResult result = 1; +} + extend google.protobuf.MessageOptions { bool experimental = 1000; }