Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(http_handler): the first request no longer wait for query to start #7410

Merged
merged 5 commits into from
Sep 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/query/functions/src/scalars/others/sleep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ impl Function for SleepFunction {
value.as_u64().map(Duration::from_secs).map_err(|_| err())?
};

if duration.ge(&Duration::from_secs(3)) {
if duration.gt(&Duration::from_secs(30)) {
return Err(ErrorCode::BadArguments(format!(
"The maximum sleep time is 3 seconds. Requested: {:?}",
"The maximum sleep time is 30 seconds. Requested: {:?}",
duration
)));
};
Expand Down
7 changes: 4 additions & 3 deletions src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,12 @@ async fn query_page_handler(
let http_query_manager = HttpQueryManager::instance();
match http_query_manager.get_query(&query_id).await {
Some(query) => {
query.clear_expire_time().await;
query.update_expire_time(true).await;
let resp = query
.get_response_page(page_no)
.await
.map_err(|err| poem::Error::from_string(err.message(), StatusCode::NOT_FOUND))?;
query.update_expire_time().await;
query.update_expire_time(false).await;
Ok(Json(QueryResponse::from_internal(query_id, resp)))
}
None => Err(query_id_not_found(query_id)),
Expand All @@ -236,11 +236,11 @@ pub(crate) async fn query_handler(

match query {
Ok(query) => {
query.update_expire_time(true).await;
let resp = query
.get_response_page(0)
.await
.map_err(|err| poem::Error::from_string(err.message(), StatusCode::NOT_FOUND))?;
query.update_expire_time().await;
let (rows, next_page) = match &resp.data {
None => (0, None),
Some(p) => (p.page.data.num_rows(), p.next_page_no),
Expand All @@ -249,6 +249,7 @@ pub(crate) async fn query_handler(
"initial response to http query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'",
&query.id, &resp.state, rows, next_page, sql
);
query.update_expire_time(false).await;
Ok(Json(QueryResponse::from_internal(
query.id.to_string(),
resp,
Expand Down
148 changes: 91 additions & 57 deletions src/query/service/src/servers/http/v1/query/execute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use std::time::Duration;
use std::time::Instant;
use std::time::SystemTime;

use common_base::base::tokio::sync::mpsc;
use common_base::base::tokio::sync::RwLock;
Expand All @@ -37,7 +38,6 @@ use serde::Serialize;
use tracing::error;
use ExecuteState::*;

use super::http_query::HttpQueryRequest;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterFactory;
use crate::interpreters::InterpreterFactoryV2;
Expand Down Expand Up @@ -88,21 +88,25 @@ impl Progresses {
}

pub enum ExecuteState {
Starting(ExecuteStarting),
Running(ExecuteRunning),
Stopped(ExecuteStopped),
}

impl ExecuteState {
pub(crate) fn extract(&self) -> (ExecuteStateKind, Option<ErrorCode>) {
match self {
Running(_) => (ExecuteStateKind::Running, None),
Starting(_) | Running(_) => (ExecuteStateKind::Running, None),
Stopped(v) => match &v.reason {
Ok(_) => (ExecuteStateKind::Succeeded, None),
Err(e) => (ExecuteStateKind::Failed, Some(e.clone())),
},
}
}
}
pub struct ExecuteStarting {
pub(crate) ctx: Arc<QueryContext>,
}

pub struct ExecuteRunning {
// used to kill query
Expand All @@ -113,80 +117,123 @@ pub struct ExecuteRunning {
}

pub struct ExecuteStopped {
stats: Progresses,
affect: Option<QueryAffect>,
reason: Result<()>,
stop_time: Instant,
pub stats: Progresses,
pub affect: Option<QueryAffect>,
pub reason: Result<()>,
pub stop_time: Instant,
}

pub struct Executor {
start_time: Instant,
pub(crate) state: ExecuteState,
pub query_id: String,
pub start_time: Instant,
pub state: ExecuteState,
}

impl Executor {
pub(crate) fn get_progress(&self) -> Progresses {
pub fn get_progress(&self) -> Progresses {
match &self.state {
Starting(_) => Default::default(),
Running(r) => Progresses::from_context(&r.ctx),
Stopped(f) => f.stats.clone(),
}
}

pub(crate) fn get_affect(&self) -> Option<QueryAffect> {
pub fn get_affect(&self) -> Option<QueryAffect> {
match &self.state {
Starting(_) => None,
Running(r) => r.ctx.get_affect(),
Stopped(r) => r.affect.clone(),
}
}

pub(crate) fn elapsed(&self) -> Duration {
pub fn elapsed(&self) -> Duration {
match &self.state {
Running(_) => Instant::now() - self.start_time,
Starting(_) | Running(_) => Instant::now() - self.start_time,
Stopped(f) => f.stop_time - self.start_time,
}
}

pub(crate) async fn stop(this: &Arc<RwLock<Executor>>, reason: Result<()>, kill: bool) {
pub async fn start_to_running(this: &Arc<RwLock<Executor>>, state: ExecuteState) {
let mut guard = this.write().await;
if let Running(r) = &guard.state {
// release session
if kill {
r.session.force_kill_query();
}
// Write Finish to query log table.
let _ = r
.interpreter
.finish()
.await
.map_err(|e| error!("interpreter.finish error: {:?}", e));
guard.state = Stopped(ExecuteStopped {
stats: Progresses::from_context(&r.ctx),
reason: reason.clone(),
stop_time: Instant::now(),
affect: r.ctx.get_affect(),
});
if let Starting(_) = &guard.state {
guard.state = state
}
}

pub async fn start_to_stop(this: &Arc<RwLock<Executor>>, state: ExecuteState) {
let mut guard = this.write().await;
if let Starting(_) = &guard.state {
guard.state = state
}
}
pub async fn stop(this: &Arc<RwLock<Executor>>, reason: Result<()>, kill: bool) {
{
let guard = this.read().await;
tracing::info!(
"http query {}: change state to Stopped, reason {:?}",
&guard.query_id,
reason
);
}

if let Err(e) = reason {
if e.code() != ErrorCode::aborted_session_code()
&& e.code() != ErrorCode::aborted_query_code()
{
// query state can be pulled multi times, only log it once
error!("Query Error: {:?}", e);
let mut guard = this.write().await;
match &guard.state {
Starting(s) => {
if let Err(e) = &reason {
s.ctx.set_error(e.clone());
InterpreterQueryLog::create(s.ctx.clone(), "".to_string())
.log_finish(SystemTime::now(), Some(e.clone()))
.await
.unwrap_or_else(|e| error!("fail to write query_log {:?}", e));
}
guard.state = Stopped(ExecuteStopped {
stats: Default::default(),
reason,
stop_time: Instant::now(),
affect: Default::default(),
})
}
};
Running(r) => {
// release session
if kill {
r.session.force_kill_query();
}
if let Err(e) = &reason {
r.ctx.set_error(e.clone());
}
// Write Finish to query log table.
let _ = r
.interpreter
.finish()
.await
.map_err(|e| error!("interpreter.finish error: {:?}", e));
guard.state = Stopped(ExecuteStopped {
stats: Progresses::from_context(&r.ctx),
reason,
stop_time: Instant::now(),
affect: r.ctx.get_affect(),
})
}
Stopped(s) => {
tracing::info!(
"http query {}: already stopped, reason {:?}, new reason {:?}",
&guard.query_id,
s.reason,
reason
);
}
}
}
}

impl ExecuteState {
pub(crate) async fn try_create(
request: &HttpQueryRequest,
pub(crate) async fn try_start_query(
executor: Arc<RwLock<Executor>>,
sql: &str,
session: Arc<Session>,
ctx: Arc<QueryContext>,
block_buffer: Arc<BlockBuffer>,
) -> Result<Arc<RwLock<Executor>>> {
let sql = &request.sql;
let start_time = Instant::now();
) -> Result<ExecuteRunning> {
ctx.attach_query_str(sql);

let stmts = DfParser::parse_sql(sql, ctx.get_current_session().get_type());
Expand Down Expand Up @@ -223,16 +270,12 @@ impl ExecuteState {
ctx: ctx.clone(),
interpreter: interpreter.clone(),
};
let executor = Arc::new(RwLock::new(Executor {
start_time,
state: Running(running_state),
}));
ctx.attach_http_query(HttpQueryHandle {
executor: executor.clone(),
block_buffer,
});
interpreter.execute().await?;
Ok(executor)
Ok(running_state)
} else {
// Write Start to query log table.
let _ = interpreter
Expand All @@ -245,10 +288,6 @@ impl ExecuteState {
ctx: ctx.clone(),
interpreter: interpreter.clone(),
};
let executor = Arc::new(RwLock::new(Executor {
start_time,
state: Running(running_state),
}));

let executor_clone = executor.clone();
let ctx_clone = ctx.clone();
Expand All @@ -273,8 +312,7 @@ impl ExecuteState {
_ => {}
}
})?;

Ok(executor)
Ok(running_state)
}
}
}
Expand Down Expand Up @@ -346,9 +384,6 @@ impl HttpQueryHandle {
mut build_res: PipelineBuildResult,
result_columns: &[ColumnBinding],
) -> Result<SendableDataBlockStream> {
let id = ctx.get_id();
tracing::info!("http query {id} execute() begin");

let executor = self.executor.clone();
let block_buffer = self.block_buffer.clone();

Expand Down Expand Up @@ -430,7 +465,6 @@ impl HttpQueryHandle {
}
}
});
tracing::info!("http query {id} execute() end");
Ok(Box::pin(DataBlockStream::create(schema, None, vec![])))
}
}
Loading