-
Notifications
You must be signed in to change notification settings - Fork 124
/
lib.rs
121 lines (112 loc) · 3.79 KB
/
lib.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
pub mod cli;
pub mod errors;
pub mod pipeline;
pub mod simple;
use dozer_core::{app::AppPipeline, errors::ExecutionError};
use dozer_sql::pipeline::{builder::statement_to_pipeline, errors::PipelineError};
use dozer_types::{
crossbeam::channel::Sender,
log::debug,
types::{Operation, SourceSchema},
};
use errors::OrchestrationError;
use std::{
backtrace::{Backtrace, BacktraceStatus},
collections::HashMap,
panic, process,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::current,
};
use tokio::task::JoinHandle;
mod console_helper;
#[cfg(test)]
mod test_utils;
mod utils;
pub trait Orchestrator {
fn migrate(&mut self, force: bool) -> Result<(), OrchestrationError>;
fn clean(&mut self) -> Result<(), OrchestrationError>;
fn run_all(&mut self, running: Arc<AtomicBool>) -> Result<(), OrchestrationError>;
fn run_api(&mut self, running: Arc<AtomicBool>) -> Result<(), OrchestrationError>;
fn run_apps(
&mut self,
running: Arc<AtomicBool>,
api_notifier: Option<Sender<bool>>,
) -> Result<(), OrchestrationError>;
fn list_connectors(&self) -> Result<HashMap<String, Vec<SourceSchema>>, OrchestrationError>;
fn generate_token(&self) -> Result<String, OrchestrationError>;
fn query(
&self,
sql: String,
sender: Sender<Operation>,
running: Arc<AtomicBool>,
) -> Result<Schema, OrchestrationError>;
}
// Re-exports
pub use dozer_ingestion::{
connectors::{get_connector, ColumnInfo, TableInfo},
errors::ConnectorError,
};
pub use dozer_sql::pipeline::builder::QueryContext;
pub fn wrapped_statement_to_pipeline(sql: &str) -> Result<QueryContext, PipelineError> {
let mut pipeline = AppPipeline::new();
statement_to_pipeline(sql, &mut pipeline, None)
}
pub use dozer_types::models::connection::Connection;
use dozer_types::tracing::error;
use dozer_types::types::Schema;
async fn flatten_join_handle(
handle: JoinHandle<Result<(), OrchestrationError>>,
) -> Result<(), OrchestrationError> {
match handle.await {
Ok(Ok(_)) => Ok(()),
Ok(Err(err)) => Err(err),
Err(err) => Err(OrchestrationError::InternalError(Box::new(err))),
}
}
pub fn set_panic_hook() {
panic::set_hook(Box::new(move |panic_info| {
// All the orchestrator errors are captured here
if let Some(e) = panic_info.payload().downcast_ref::<OrchestrationError>() {
error!("{}", e);
debug!("{:?}", e);
// All the connector errors are captured here
} else if let Some(e) = panic_info.payload().downcast_ref::<ConnectorError>() {
error!("{}", e);
debug!("{:?}", e);
// All the pipeline errors are captured here
} else if let Some(e) = panic_info.payload().downcast_ref::<ExecutionError>() {
error!("{}", e);
debug!("{:?}", e);
// If any errors are sent as strings.
} else if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
error!("{s:?}");
} else {
error!("{}", panic_info);
}
let backtrace = Backtrace::capture();
if backtrace.status() == BacktraceStatus::Captured {
error!(
"thread '{}' panicked at '{}'\n stack backtrace:\n{}",
current()
.name()
.map(ToString::to_string)
.unwrap_or_default(),
panic_info
.location()
.map(ToString::to_string)
.unwrap_or_default(),
backtrace
);
}
process::exit(1);
}));
}
pub fn set_ctrl_handler(r: Arc<AtomicBool>) {
ctrlc::set_handler(move || {
r.store(false, Ordering::SeqCst);
})
.expect("Error setting Ctrl-C handler");
}