Skip to content

Commit

Permalink
refactor(query): try fix test failure
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 committed Dec 26, 2022
1 parent b8e8de7 commit 6a8faad
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 632 deletions.
20 changes: 18 additions & 2 deletions src/query/datablocks/tests/it/meta_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use std::any::Any;
use common_datablocks::BlockMetaInfo;
use common_exception::Result;

#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
struct TestMetaInfoA {
field_a: usize,
field_b: String,
Expand All @@ -29,6 +29,14 @@ impl BlockMetaInfo for TestMetaInfoA {
self
}

fn as_mut_any(&mut self) -> &mut dyn Any {
self
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
Box::new(self.clone())
}

fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
match info.as_any().downcast_ref::<TestMetaInfoA>() {
None => false,
Expand All @@ -37,7 +45,7 @@ impl BlockMetaInfo for TestMetaInfoA {
}
}

#[derive(serde::Serialize, serde::Deserialize, Debug, PartialEq)]
#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)]
struct TestPartInfoB {
field_a: String,
field_b: u64,
Expand All @@ -49,6 +57,14 @@ impl BlockMetaInfo for TestPartInfoB {
self
}

fn as_mut_any(&mut self) -> &mut dyn Any {
self
}

fn clone_self(&self) -> Box<dyn BlockMetaInfo> {
Box::new(self.clone())
}

fn equals(&self, info: &Box<dyn BlockMetaInfo>) -> bool {
match info.as_any().downcast_ref::<TestPartInfoB>() {
None => false,
Expand Down
49 changes: 32 additions & 17 deletions src/query/service/src/pipelines/executor/executor_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use petgraph::prelude::StableGraph;
use petgraph::Direction;
use tracing::debug;

use crate::pipelines::executor::executor_condvar::WorkersCondvar;
use crate::pipelines::executor::executor_tasks::ExecutorTasksQueue;
use crate::pipelines::executor::executor_worker_context::ExecutorTask;
use crate::pipelines::executor::executor_worker_context::ExecutorWorkerContext;
Expand Down Expand Up @@ -325,23 +326,14 @@ impl ScheduleQueue {
debug_assert!(!context.has_task());

while let Some(processor) = self.async_queue.pop_front() {
let worker_id = context.get_worker_num();
let workers_condvar = context.get_workers_condvar().clone();
let tasks_queue = global.clone();

unsafe {
workers_condvar.inc_active_async_worker();
executor
.async_runtime
.spawn(TrackedFuture::create(ProcessorAsyncTask::create(
context.query_id.clone(),
worker_id,
processor.clone(),
tasks_queue,
workers_condvar,
processor.async_process(),
)));
}
Self::schedule_async_task(
processor,
context.query_id.clone(),
executor,
context.get_worker_num(),
context.get_workers_condvar().clone(),
global.clone(),
)
}

if !self.sync_queue.is_empty() {
Expand All @@ -353,6 +345,29 @@ impl ScheduleQueue {
}
}

pub fn schedule_async_task(
proc: ProcessorPtr,
query_id: Arc<String>,
executor: &PipelineExecutor,
wakeup_worker_num: usize,
workers_condvar: Arc<WorkersCondvar>,
global_queue: Arc<ExecutorTasksQueue>,
) {
unsafe {
workers_condvar.inc_active_async_worker();
executor
.async_runtime
.spawn(TrackedFuture::create(ProcessorAsyncTask::create(
query_id,
wakeup_worker_num,
proc.clone(),
global_queue,
workers_condvar,
proc.async_process(),
)));
}
}

fn schedule_sync(&mut self, _: &ExecutorTasksQueue, ctx: &mut ExecutorWorkerContext) {
if let Some(processor) = self.sync_queue.pop_front() {
ctx.set_task(ExecutorTask::Sync(processor));
Expand Down
20 changes: 1 addition & 19 deletions src/query/service/src/pipelines/executor/executor_tasks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,6 @@ struct ExecutorTasks {
tasks_size: usize,
workers_waiting_status: WorkersWaitingStatus,
workers_sync_tasks: Vec<VecDeque<ProcessorPtr>>,
workers_async_tasks: Vec<VecDeque<ProcessorPtr>>,
workers_completed_async_tasks: Vec<VecDeque<CompletedAsyncTask>>,
}

Expand All @@ -223,19 +222,16 @@ unsafe impl Send for ExecutorTasks {}
impl ExecutorTasks {
pub fn create(workers_size: usize) -> ExecutorTasks {
let mut workers_sync_tasks = Vec::with_capacity(workers_size);
let mut workers_async_tasks = Vec::with_capacity(workers_size);
let mut workers_completed_async_tasks = Vec::with_capacity(workers_size);

for _index in 0..workers_size {
workers_sync_tasks.push(VecDeque::new());
workers_async_tasks.push(VecDeque::new());
workers_completed_async_tasks.push(VecDeque::new());
}

ExecutorTasks {
tasks_size: 0,
workers_sync_tasks,
workers_async_tasks,
workers_completed_async_tasks,
workers_waiting_status: WorkersWaitingStatus::create(workers_size),
}
Expand All @@ -247,10 +243,6 @@ impl ExecutorTasks {

#[inline]
fn pop_worker_task(&mut self, worker_id: usize) -> ExecutorTask {
if let Some(processor) = self.workers_async_tasks[worker_id].pop_front() {
return ExecutorTask::Async(processor);
}

if let Some(task) = self.workers_completed_async_tasks[worker_id].pop_front() {
return ExecutorTask::AsyncCompleted(task);
}
Expand All @@ -272,10 +264,6 @@ impl ExecutorTasks {
return worker_id;
}

if !self.workers_async_tasks[worker_id].is_empty() {
return worker_id;
}

if !self.workers_completed_async_tasks[worker_id].is_empty() {
return worker_id;
}
Expand Down Expand Up @@ -314,18 +302,12 @@ impl ExecutorTasks {
self.workers_sync_tasks.len()
);
let sync_queue = &mut self.workers_sync_tasks[worker_id];
debug_assert!(worker_id < self.workers_async_tasks.len(), "out of index");
let async_queue = &mut self.workers_async_tasks[worker_id];
debug_assert!(
worker_id < self.workers_completed_async_tasks.len(),
"out of index"
);
let completed_queue = &mut self.workers_completed_async_tasks[worker_id];

match task {
ExecutorTask::None => unreachable!(),
ExecutorTask::Async(_) => unreachable!(),
ExecutorTask::Sync(processor) => sync_queue.push_back(processor),
ExecutorTask::Async(processor) => async_queue.push_back(processor),
ExecutorTask::AsyncCompleted(task) => completed_queue.push_back(task),
}
}
Expand Down
21 changes: 21 additions & 0 deletions src/query/service/src/pipelines/executor/pipeline_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ use parking_lot::Mutex;

use crate::pipelines::executor::executor_condvar::WorkersCondvar;
use crate::pipelines::executor::executor_graph::RunningGraph;
use crate::pipelines::executor::executor_graph::ScheduleQueue;
use crate::pipelines::executor::executor_tasks::ExecutorTasksQueue;
use crate::pipelines::executor::executor_worker_context::ExecutorTask;
use crate::pipelines::executor::executor_worker_context::ExecutorWorkerContext;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::pipeline::Pipeline;
Expand Down Expand Up @@ -203,8 +205,27 @@ impl PipelineExecutor {

let mut init_schedule_queue = self.graph.init_schedule_queue()?;

let mut wakeup_worker_id = 0;
let mut tasks = VecDeque::new();
while let Some(task) = init_schedule_queue.pop_task() {
if let ExecutorTask::Async(processor) = &task {
ScheduleQueue::schedule_async_task(
processor.clone(),
self.settings.query_id.clone(),
self,
wakeup_worker_id,
self.workers_condvar.clone(),
self.global_tasks_queue.clone(),
);
wakeup_worker_id += 1;

if wakeup_worker_id == self.threads_num {
wakeup_worker_id = 0;
}

continue;
}

tasks.push_back(task);
}

Expand Down
4 changes: 4 additions & 0 deletions src/query/service/tests/it/storages/fuse/operations/commit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,10 @@ impl TableContext for CtxDelegation {
todo!()
}

fn try_get_parts(&self, _: usize) -> Vec<PartInfoPtr> {
todo!()
}

fn try_set_partitions(&self, _partitions: Partitions) -> Result<()> {
todo!()
}
Expand Down
Loading

0 comments on commit 6a8faad

Please sign in to comment.