-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Integrate datafusion #93
Conversation
@@ -0,0 +1,47 @@ | |||
//! Transaction timestamp utilities. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This module was added to explore what client-generated timestamp would look like when interacting with ArrowStore. This isn't currently being used, as the datafusion stuff doesn't make use of any of the ArrowStore stuff.
match scalar { | ||
ScalarValue::Boolean(Some(v)) => write!(buf, "{}", if v { "t" } else { "f" }), | ||
scalar => write!(buf, "{}", scalar), // Note this won't write null, that's checked above. | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There may be alternate formats we want to use in the future, but the default Display impls for most of the scalar values works for now.
Eventually we may want to determine a subset of the arrow types we want to support. E.g. we probably don't care about Struct
scalars.
@@ -1,4 +1,4 @@ | |||
use lemur::repr::value::{Value, ValueRef, ValueType}; | |||
use lemur::repr::value::ValueType; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's still a reference to lemur here. I didn't want to get carried away with tweaking this module to more accurately support arrow types, so left it in for now.
|
||
#[derive(Clone, Default)] | ||
pub struct SchemaCatalog { | ||
tables: Arc<RwLock<HashMap<String, Arc<dyn TableProvider>>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll eventually want this to use a concretely typed enum for tables so that we can more easily distinguish between mutable and immutable (external) tables.
let stream = self.session.execute_physical(physical)?; | ||
Ok(ExecutionResult::Query { stream }) | ||
} | ||
other => Err(internal!("unimplemented logical plan: {:?}", other)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left for future iterations.
&self, | ||
plan: DfLogicalPlan, | ||
) -> Result<Arc<dyn ExecutionPlan>> { | ||
let plan = self.state.create_physical_plan(&plan).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Datafusion does optimization here. We can extend the query planner to add custom node types and optimizations in the future if we need to.
let table = table | ||
.as_any() | ||
.downcast_ref::<MemTable>() | ||
.ok_or_else(|| internal!("cannot downcast to mem table"))?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comment about using enum for tables. Right now we're assuming the only mutable table is the memory table impl.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀
/// Every timestamp is a u64 serialized to its big-endian representation. | ||
#[derive(Debug)] | ||
pub struct SingleNodeTimestampGen { | ||
ts: AtomicU64, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will these timestamps be tracking the Unix Epoch time? While these timestamps will always be increasing we should keep in mind most Unix systems use an i64 for time. Not a big deal but just something to keep in mind in general about how we are storing time (and how to store time of the past in other places in the db).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't planning for this one to track unix time. I don't plan on this making it to production in any way.
Evenutally we'll want something like this: https://github.com/GlareDB/glaredb/blob/b30db1e7ee18a0fdc321871a3d0f25a492ba0b4a/crates/diststore/src/accord/timestamp.rs. Each timestamp is totally ordered, and includes the node id, unix time, and a logical time. This was part of one of my previous iterations on replications.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice. I see a good amount of things that overlap with some things I had to remove from #63. It'd be good for me to try to use those structs/enums elsewhere
|
||
fn schema_names(&self) -> Vec<String> { | ||
let schemas = self.schemas.read(); | ||
schemas.iter().map(|(name, _)| name).cloned().collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
schemas.iter().map(|(name, _)| name).cloned().collect() | |
schemas.keys().cloned().collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opened a followup: #99
|
||
fn table_names(&self) -> Vec<String> { | ||
let tables = self.tables.read(); | ||
tables.iter().map(|(name, _)| name).cloned().collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tables.iter().map(|(name, _)| name).cloned().collect() | |
tables.keys().cloned().collect() |
use datafusion::logical_plan::Expr; | ||
use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan}; | ||
use dfutil::cast::cast_record_batch; | ||
use parking_lot::RwLock; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there benefits/drawbacks to using this synchronous lock over tokio::sync::RwLock
? I see it is being written to inside a synchronous function but read in an async one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It really depends. It's correct to use either a tokio or parking lot lock here.
The benefit/drawback comes down to have long the lock is held, and if that ends up blocking other threads. If the inserts do take a long time (like on the order of 100s of ns) then it would make sense to use a tokio lock and change the insert to be async. I just always default to using parking lot/std then move to tokio if necessary depending on performance.
I would only default to use tokio from the start if there's an intent on passing around guards in async code.
Integrates datafusion as our query execution engine.
Existing sqllogictest tests pass (even though we don't really have a lot).
Notable changes:
lemur
andsqlengine
from the main path. These crates will be deleted.lemur
is obviated by datafusion, andsqlexec
replacessqlengine
. The structure of the session insqlexec
is also a bit more amenable to following implicit transaction semantics as it relates to the Postgres protocol.pgsrv
to stream back arrow record batches to the client instead of buffering everything in memory.I've noted some peculiarities/future enhancements below.