Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update FlightSqlService trait to proxy handshake #2211

Merged
merged 5 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions arrow-flight/examples/flight_sql_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
// under the License.

use arrow_flight::sql::{ActionCreatePreparedStatementResult, SqlInfo};
use arrow_flight::FlightData;
use arrow_flight::{FlightData, HandshakeRequest, HandshakeResponse};
use futures::Stream;
use std::pin::Pin;
use tonic::transport::Server;
use tonic::{Response, Status, Streaming};
use tonic::{Request, Response, Status, Streaming};

use arrow_flight::{
flight_service_server::FlightService,
Expand All @@ -41,6 +43,52 @@ pub struct FlightSqlServiceImpl {}
#[tonic::async_trait]
impl FlightSqlService for FlightSqlServiceImpl {
type FlightService = FlightSqlServiceImpl;

async fn do_handshake(
&self,
request: Request<Streaming<HandshakeRequest>>,
) -> Result<
Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
Status,
> {
let basic = "Basic ";
let authorization = request
.metadata()
.get("authorization")
.ok_or(Status::invalid_argument("authorization field not present"))?
.to_str()
.map_err(|_| Status::invalid_argument("authorization not parsable"))?;
if !authorization.starts_with(basic) {
Err(Status::invalid_argument(format!(
"Auth type not implemented: {}",
authorization
)))?;
}
let base64 = &authorization[basic.len()..];
let bytes = base64::decode(base64)
.map_err(|_| Status::invalid_argument("authorization not parsable"))?;
let str = String::from_utf8(bytes)
.map_err(|_| Status::invalid_argument("authorization not parsable"))?;
let parts: Vec<_> = str.split(":").collect();
if parts.len() != 2 {
Err(Status::invalid_argument(format!(
"Invalid authorization header"
)))?;
}
let user = parts[0];
let pass = parts[1];
if user != "admin" || pass != "password" {
Err(Status::unauthenticated("Invalid credentials!"))?
}
let result = HandshakeResponse {
protocol_version: 0,
payload: "random_uuid_token".as_bytes().to_vec(),
};
let result = Ok(result);
let output = futures::stream::iter(vec![result]);
return Ok(Response::new(Box::pin(output)));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good 👍

It would be awesome to get some test coverage of this code eventually 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like to follow acceptance test driven development when possible. Do you think the rust FlightSqlClient is far enough along to integration test with?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed that we eventually should add some test coverage for flight sql server. Currently we don't have any test coverage for it, not just this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we have TPC-H up and running, I'll come back around to this and look at the main issues: cloning the server state & testing. Thanks for your feedback and helping get it merged!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like to follow acceptance test driven development when possible. Do you think the rust FlightSqlClient is far enough along to integration test with?

I think there is also a place for targeted for regression tests (so that, for example, if someone breaks the code accidentally in some future refactoring, they also get a test failure). However, the right balance and where to draw the line between the two is always a matter of judgement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I didn't mean to imply integration tests were the only way. I think they become exponentially impossible to maintain as code coverage increases - so unit tests are also required. I was just (and still am) hoping to start with a FlighSql-rs test then add unit tests in addition to that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we are in total agreement

// get_flight_info
async fn get_flight_info_statement(
&self,
Expand Down
19 changes: 17 additions & 2 deletions arrow-flight/src/sql/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,20 @@ pub trait FlightSqlService:
/// When impl FlightSqlService, you can always set FlightService to Self
type FlightService: FlightService;

/// Accept authentication and return a token
/// <https://arrow.apache.org/docs/format/Flight.html#authentication>
async fn do_handshake(
&self,
_request: Request<Streaming<HandshakeRequest>>,
) -> Result<
Response<Pin<Box<dyn Stream<Item = Result<HandshakeResponse, Status>> + Send>>>,
Status,
> {
Err(Status::unimplemented(
"Handshake has no default implementation",
))
}

/// Get a FlightInfo for executing a SQL query.
async fn get_flight_info_statement(
&self,
Expand Down Expand Up @@ -256,9 +270,10 @@ where

async fn handshake(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this isn't cool that the impl doesn't call the underlying implementation

&self,
_request: Request<Streaming<HandshakeRequest>>,
request: Request<Streaming<HandshakeRequest>>,
) -> Result<Response<Self::HandshakeStream>, Status> {
Err(Status::unimplemented("Not yet implemented"))
let res = self.do_handshake(request).await?;
Ok(res)
}

async fn list_flights(
Expand Down