Skip to content

Commit

Permalink
apacheGH-34865: [C++][Java][Flight RPC] Add Session management messag…
Browse files Browse the repository at this point in the history
…es (apache#34817)

### Rationale for this change

Flight presently contains no formal mechanism for managing connection/query configuration options; instead, request headers and/or non-query SQL statements are often used in lieu, with unnecessary overhead and poor failure handling.  A stateless (from Flight's perspective) Flight format extension is desirable to close this gap for server implementations that use/want connection state/context.

### What changes are included in this PR?

"Session" set/get/close Actions and server-side helper middleware.

### Are these changes tested?

Integration tests (C++ currently broken due to middleware-related framework issue) and some complex-case unit testing are included.

### Are there any user-facing changes?

Non-breaking extensions to wire format and corresponding client/server Flight RPC API extensions.

* Closes: apache#34865

Lead-authored-by: Paul Nienaber <[email protected]>
Co-authored-by: Paul Nienaber <[email protected]>
Co-authored-by: James Duong <[email protected]>
Co-authored-by: Sutou Kouhei <[email protected]>
Signed-off-by: David Li <[email protected]>
  • Loading branch information
3 people authored and zanmato1984 committed Feb 28, 2024
1 parent 0eca132 commit 11e790c
Show file tree
Hide file tree
Showing 44 changed files with 3,527 additions and 21 deletions.
41 changes: 41 additions & 0 deletions cpp/src/arrow/flight/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,47 @@ arrow::Result<FlightClient::DoExchangeResult> FlightClient::DoExchange(
return result;
}

::arrow::Result<SetSessionOptionsResult> FlightClient::SetSessionOptions(
const FlightCallOptions& options, const SetSessionOptionsRequest& request) {
RETURN_NOT_OK(CheckOpen());
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kSetSessionOptions.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(
auto set_session_options_result,
SetSessionOptionsResult::Deserialize(std::string_view(*result->body)));
ARROW_RETURN_NOT_OK(stream->Drain());
return set_session_options_result;
}

::arrow::Result<GetSessionOptionsResult> FlightClient::GetSessionOptions(
const FlightCallOptions& options, const GetSessionOptionsRequest& request) {
RETURN_NOT_OK(CheckOpen());
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kGetSessionOptions.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(
auto get_session_options_result,
GetSessionOptionsResult::Deserialize(std::string_view(*result->body)));
ARROW_RETURN_NOT_OK(stream->Drain());
return get_session_options_result;
}

::arrow::Result<CloseSessionResult> FlightClient::CloseSession(
const FlightCallOptions& options, const CloseSessionRequest& request) {
RETURN_NOT_OK(CheckOpen());
ARROW_ASSIGN_OR_RAISE(auto body, request.SerializeToString());
Action action{ActionType::kCloseSession.type, Buffer::FromString(body)};
ARROW_ASSIGN_OR_RAISE(auto stream, DoAction(options, action));
ARROW_ASSIGN_OR_RAISE(auto result, stream->Next());
ARROW_ASSIGN_OR_RAISE(auto close_session_result,
CloseSessionResult::Deserialize(std::string_view(*result->body)));
ARROW_RETURN_NOT_OK(stream->Drain());
return close_session_result;
}

Status FlightClient::Close() {
if (!closed_) {
closed_ = true;
Expand Down
21 changes: 21 additions & 0 deletions cpp/src/arrow/flight/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,27 @@ class ARROW_FLIGHT_EXPORT FlightClient {
return DoExchange({}, descriptor);
}

/// \brief Set server session option(s) by name/value. Sessions are generally
/// persisted via HTTP cookies.
/// \param[in] options Per-RPC options
/// \param[in] request The server session options to set
::arrow::Result<SetSessionOptionsResult> SetSessionOptions(
const FlightCallOptions& options, const SetSessionOptionsRequest& request);

/// \brief Get the current server session options. The session is generally
/// accessed via an HTTP cookie.
/// \param[in] options Per-RPC options
/// \param[in] request The (empty) GetSessionOptions request object.
::arrow::Result<GetSessionOptionsResult> GetSessionOptions(
const FlightCallOptions& options, const GetSessionOptionsRequest& request);

/// \brief Close/invalidate the current server session. The session is generally
/// accessed via an HTTP cookie.
/// \param[in] options Per-RPC options
/// \param[in] request The (empty) CloseSession request object.
::arrow::Result<CloseSessionResult> CloseSession(const FlightCallOptions& options,
const CloseSessionRequest& request);

/// \brief Explicitly shut down and clean up the client.
///
/// For backwards compatibility, this will be implicitly called by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ TEST(FlightIntegration, ExpirationTimeRenewFlightEndpoint) {
ASSERT_OK(RunScenario("expiration_time:renew_flight_endpoint"));
}

TEST(FlightIntegration, SessionOptions) { ASSERT_OK(RunScenario("session_options")); }

TEST(FlightIntegration, PollFlightInfo) { ASSERT_OK(RunScenario("poll_flight_info")); }

TEST(FlightIntegration, AppMetadataFlightInfoEndpoint) {
Expand Down
154 changes: 154 additions & 0 deletions cpp/src/arrow/flight/integration_tests/test_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,13 @@
#include "arrow/array/array_nested.h"
#include "arrow/array/array_primitive.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/flight/client_cookie_middleware.h"
#include "arrow/flight/client_middleware.h"
#include "arrow/flight/server_middleware.h"
#include "arrow/flight/sql/client.h"
#include "arrow/flight/sql/column_metadata.h"
#include "arrow/flight/sql/server.h"
#include "arrow/flight/sql/server_session_middleware.h"
#include "arrow/flight/sql/types.h"
#include "arrow/flight/test_util.h"
#include "arrow/flight/types.h"
Expand Down Expand Up @@ -744,6 +746,155 @@ class ExpirationTimeRenewFlightEndpointScenario : public Scenario {
}
};

/// \brief The server used for testing Session Options.
///
/// SetSessionOptions has a blacklisted option name and string option value,
/// both "lol_invalid", which will result in errors attempting to set either.
class SessionOptionsServer : public sql::FlightSqlServerBase {
static inline const std::string invalid_option_name = "lol_invalid";
static inline const SessionOptionValue invalid_option_value = "lol_invalid";

const std::string session_middleware_key;
// These will never be threaded so using a plain map and no lock
std::map<std::string, SessionOptionValue> session_store_;

public:
explicit SessionOptionsServer(std::string session_middleware_key)
: FlightSqlServerBase(),
session_middleware_key(std::move(session_middleware_key)) {}

arrow::Result<SetSessionOptionsResult> SetSessionOptions(
const ServerCallContext& context,
const SetSessionOptionsRequest& request) override {
SetSessionOptionsResult res;

auto* middleware = static_cast<sql::ServerSessionMiddleware*>(
context.GetMiddleware(session_middleware_key));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<sql::FlightSession> session,
middleware->GetSession());

for (const auto& [name, value] : request.session_options) {
// Blacklisted value name
if (name == invalid_option_name) {
res.errors.emplace(name, SetSessionOptionsResult::Error{
SetSessionOptionErrorValue::kInvalidName});
continue;
}
// Blacklisted option value
if (value == invalid_option_value) {
res.errors.emplace(name, SetSessionOptionsResult::Error{
SetSessionOptionErrorValue::kInvalidValue});
continue;
}
if (std::holds_alternative<std::monostate>(value)) {
session->EraseSessionOption(name);
continue;
}
session->SetSessionOption(name, value);
}

return res;
}

arrow::Result<GetSessionOptionsResult> GetSessionOptions(
const ServerCallContext& context,
const GetSessionOptionsRequest& request) override {
auto* middleware = static_cast<sql::ServerSessionMiddleware*>(
context.GetMiddleware(session_middleware_key));
if (!middleware->HasSession()) {
return Status::Invalid("No existing session to get options from.");
}
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<sql::FlightSession> session,
middleware->GetSession());

return GetSessionOptionsResult{session->GetSessionOptions()};
}

arrow::Result<CloseSessionResult> CloseSession(
const ServerCallContext& context, const CloseSessionRequest& request) override {
// Broken (does not expire cookie) until C++ middleware handling (GH-39791) fixed:
auto* middleware = static_cast<sql::ServerSessionMiddleware*>(
context.GetMiddleware(session_middleware_key));
ARROW_RETURN_NOT_OK(middleware->CloseSession());
return CloseSessionResult{CloseSessionStatus::kClosed};
}
};

/// \brief The Session Options scenario.
///
/// This tests Session Options functionality as well as ServerSessionMiddleware.
class SessionOptionsScenario : public Scenario {
static inline const std::string server_middleware_key = "sessionmiddleware";

Status MakeServer(std::unique_ptr<FlightServerBase>* server,
FlightServerOptions* options) override {
*server = std::make_unique<SessionOptionsServer>(server_middleware_key);

auto id_gen_int = std::make_shared<std::atomic_int>(1000);
options->middleware.emplace_back(
server_middleware_key,
sql::MakeServerSessionMiddlewareFactory(
[=]() -> std::string { return std::to_string((*id_gen_int)++); }));

return Status::OK();
}

Status MakeClient(FlightClientOptions* options) override {
options->middleware.emplace_back(GetCookieFactory());
return Status::OK();
}

Status RunClient(std::unique_ptr<FlightClient> flight_client) override {
sql::FlightSqlClient client{std::move(flight_client)};

// Set
auto req1 = SetSessionOptionsRequest{
{{"foolong", 123L},
{"bardouble", 456.0},
{"lol_invalid", "this won't get set"},
{"key_with_invalid_value", "lol_invalid"},
{"big_ol_string_list", std::vector<std::string>{"a", "b", "sea", "dee", " ",
" ", "geee", "(づ。◕‿‿◕。)づ"}}}};
ARROW_ASSIGN_OR_RAISE(auto res1, client.SetSessionOptions({}, req1));
// Some errors
if (res1.errors !=
std::map<std::string, SetSessionOptionsResult::Error>{
{"lol_invalid",
SetSessionOptionsResult::Error{SetSessionOptionErrorValue::kInvalidName}},
{"key_with_invalid_value", SetSessionOptionsResult::Error{
SetSessionOptionErrorValue::kInvalidValue}}}) {
return Status::Invalid("res1 incorrect: " + res1.ToString());
}
// Some set, some omitted due to above errors
ARROW_ASSIGN_OR_RAISE(auto res2, client.GetSessionOptions({}, {}));
if (res2.session_options !=
std::map<std::string, SessionOptionValue>{
{"foolong", 123L},
{"bardouble", 456.0},
{"big_ol_string_list",
std::vector<std::string>{"a", "b", "sea", "dee", " ", " ", "geee",
"(づ。◕‿‿◕。)づ"}}}) {
return Status::Invalid("res2 incorrect: " + res2.ToString());
}
// Update
ARROW_ASSIGN_OR_RAISE(
auto res3,
client.SetSessionOptions(
{}, SetSessionOptionsRequest{
{{"foolong", std::monostate{}},
{"big_ol_string_list", "a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ"}}}));
ARROW_ASSIGN_OR_RAISE(auto res4, client.GetSessionOptions({}, {}));
if (res4.session_options !=
std::map<std::string, SessionOptionValue>{
{"bardouble", 456.0},
{"big_ol_string_list", "a,b,sea,dee, , ,geee,(づ。◕‿‿◕。)づ"}}) {
return Status::Invalid("res4 incorrect: " + res4.ToString());
}

return Status::OK();
}
};

/// \brief The server used for testing PollFlightInfo().
class PollFlightInfoServer : public FlightServerBase {
public:
Expand Down Expand Up @@ -1952,6 +2103,9 @@ Status GetScenario(const std::string& scenario_name, std::shared_ptr<Scenario>*
} else if (scenario_name == "expiration_time:renew_flight_endpoint") {
*out = std::make_shared<ExpirationTimeRenewFlightEndpointScenario>();
return Status::OK();
} else if (scenario_name == "session_options") {
*out = std::make_shared<SessionOptionsScenario>();
return Status::OK();
} else if (scenario_name == "poll_flight_info") {
*out = std::make_shared<PollFlightInfoScenario>();
return Status::OK();
Expand Down
Loading

0 comments on commit 11e790c

Please sign in to comment.