Skip to content

Commit

Permalink
Get further with extended query protocol (#127)
Browse files Browse the repository at this point in the history
* chore: Update to datafusion 12 (#114)

* First draft of key layout using RocksDB (#116)

* First draft of key layout using RocksDB

* Additional future considerations from Sean

* feat: Add information_schema (#115)

* feat: Add information_schema

Fixes #98

Cloud will be making requests directly to the database to get info about the
contents of the database, including schemas, tables, and columns.

* fix: Remove datafusion-proto crate (#119)

We're not using if for anything yet, and this release seems to break building
container images.

```
error: builder for '/nix/store/75lddm4kg8mzn2x5nz8lg36gdj16p7ka-glaredb-cli-0.1.0.drv' failed with exit code 101;
       last 10 log lines:
       > Caused by:
       >   process didn't exit successfully: `/build/source/target/release/build/datafusion-proto-497b9ae0fe438eda/build-script-build` (exit status: 1)
       >   --- stdout
       >   cargo:rerun-if-env-changed=FORCE_REBUILD
       >   cargo:rerun-if-changed=proto/datafusion.proto
       >   Running: "/nix/store/2qg94y58v1jr4dw360bmpxlrs30m31ca-protobuf-3.19.4/bin/protoc" "--include_imports" "--include_source_info" "-o" "/build/prost-buildFXFfZG/prost-descriptor-set" "-I" "proto" "-I" "/nix/store/2qg94y58v1jr4dw360bmpxlrs30m31ca-protobuf-3.19.4/include" "proto/datafusion.proto"
       >
       >   --- stderr
       >   Error: "protobuf compilation failed: Permission denied (os error 13)"
       > warning: build failed, waiting for other jobs to finish...
       For full logs, run 'nix log /nix/store/75lddm4kg8mzn2x5nz8lg36gdj16p7ka-glaredb-cli-0.1.0.drv'.
```

Possibly related: apache/datafusion#3538

* feat: Implement raft via gRPC (#63)

* Replace toy-rpc with tonic gRPC

* implement glaredb cli for raft nodes and client

* current progress

* implement begin, allocate_table, and get_schema

* implement scan

* implement insert

* cleanup

* comment out old tests

* clean up ConsensusClient

* Implement change membership command

* rewrite cluster tests to use RaftClientSource

* add protoc to CI

* switch raft to in-memory implementation

* Remove application logic from raft cluster tests

* cargo fmt

* add tracing to RPC impls

* Remove lemur from raft crate

* remove raft_client example

* Apply suggestions from code review

Co-authored-by: Sean Smith <[email protected]>

* remove protoc from ci

* Remove lemur_impl from raft crate

* Store tonic clients instead of endpoint in ConsensusClient

* use shared n_retries

* Add default num_retries

* Apply suggestions from code review

Co-authored-by: Rustom Shareef <[email protected]>

* moved some mod.rs modules into their parent directories

* implement ConsensusClient retry to find leader using macro

* Fix missing delimiter

* fix clippy issues

* rewrite retry_rpc_on_leader macro to evaluate to an expression

* remove panics in rpc server impls

Co-authored-by: Sean Smith <[email protected]>
Co-authored-by: Rustom Shareef <[email protected]>

* build(nix): Use crane to cache cargo dependencies (#121)

* Add crane

* switch rust toolchain to come from fenix

* touch buildscript before executing cargo build

* add clippy and build checks

* rename arrowstore build script

* rename raft build script

* Send back BindComplete intead of ParseComplete

Also moved sending results into its own function since we need to send results
back after Execute commands complete.

* Add logical plan stub for SETting runtime vars

Also fixes logic for checking pg message length.

Co-authored-by: Rustom Shareef <[email protected]>
Co-authored-by: Justin Rubek <[email protected]>
  • Loading branch information
3 people authored Sep 27, 2022
1 parent 9bcd40a commit 3a2154c
Show file tree
Hide file tree
Showing 56 changed files with 2,162 additions and 1,428 deletions.
331 changes: 43 additions & 288 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions crates/arrowstore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
name = "arrowstore"
version = "0.1.0"
edition = "2021"
build = "build_proto.rs"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -16,8 +17,7 @@ async-trait = "0.1.56"
tonic = { version = "0.8.0", features = ["transport", "codegen"] }
prost = "0.11.0"
futures = "0.3"
datafusion = "11.0.0"
datafusion-proto = "11.0.0"
datafusion = "12.0.0"
serde = { version = "1.0", features = ["derive", "rc"] }
serde_json = "1.0"

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion crates/arrowstore/src/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl Index {
for row_idx in 0..batch.num_rows() {
let key = pk_cols
.iter()
.map(|col| ScalarValue::try_from_array(*col, row_idx))
.map(|col| ScalarValue::try_from_array(col, row_idx))
.collect::<Result<Vec<_>, _>>()?;
if pk.insert(key, row_idx).is_some() {
return Err(MemoryError::DuplicatePrimaryKey);
Expand Down
2 changes: 1 addition & 1 deletion crates/dfutil/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
datafusion = "11.0"
datafusion = "12.0"
1 change: 1 addition & 0 deletions crates/glaredb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ path = "src/bin/main.rs"
[dependencies]
logutil = {path = "../logutil"}
lemur = {path = "../lemur"}
raft = {path = "../raft"}
storageengine = {path = "../storageengine"}
sqlexec = {path = "../sqlexec"}
pgsrv = {path = "../pgsrv"}
Expand Down
94 changes: 89 additions & 5 deletions crates/glaredb/src/bin/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
use anyhow::Result;
use clap::{Parser, Subcommand};
use glaredb::server::{Server, ServerConfig};
use raft::client::ConsensusClient;
use raft::repr::NodeId;
use raft::rpc::pb::AddLearnerRequest;
use raft::server::start_raft_node;
use std::collections::BTreeSet;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};

use std::sync::atomic::{AtomicU64, Ordering};
use tokio::net::TcpListener;
use tokio::runtime::{Builder, Runtime};
use tracing::error;

#[derive(Parser)]
#[clap(name = "GlareDB")]
Expand Down Expand Up @@ -36,21 +42,98 @@ enum Commands {
/// Address of server to connect to.
#[clap(value_parser)]
addr: String,

#[clap(subcommand)]
command: ClientCommands,
},

/// Starts the sql server portion of GlareDB, using a cluster of raft nodes.
RaftNode {
/// TCP port to bind to.
#[clap(long, value_parser, default_value_t = 6000)]
port: u16,

/// leader node address.
#[clap(long, value_parser)]
leader: Option<String>,

/// node id.
#[clap(long, value_parser)]
node_id: u64,
},
}


#[derive(Subcommand)]
enum ClientCommands {
Init,
AddLearner {
#[clap(short, long)]
address: String,

#[clap(short, long)]
node_id: NodeId,
},
ChangeMembership {
// TODO: add a command to change membership
membership: Vec<NodeId>,
},
Metrics,
}

fn main() -> Result<()> {
let cli = Cli::parse();
logutil::init(cli.verbose);

match cli.command {
Commands::RaftNode {
leader: _,
port,
node_id,
} => {
let rt = tokio::runtime::Runtime::new()?;

let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), port);
let url = format!("http://127.0.0.1:{}", port);

rt.block_on(async {
start_raft_node(node_id, url, addr)
.await
.expect("raft node");
});
}
Commands::Server { bind, db_name } => {
begin_server(db_name, &bind)?;
}
Commands::Client { .. } => {
// TODO: Eventually there will be some "management" client. E.g.
// adding nodes to the cluster, graceful shutdowns, etc.
error!("client not implemented");
Commands::Client { addr, command } => {
let rt = tokio::runtime::Runtime::new()?;

rt.block_on(async {
let client = ConsensusClient::new(1, addr).await.expect("client");

match command {
ClientCommands::Init => {
client.init().await.expect("failed to init cluster");
}
ClientCommands::AddLearner { address, node_id } => {
client
.add_learner(AddLearnerRequest { address, node_id })
.await
.expect("failed to add learner");
}
ClientCommands::ChangeMembership { membership } => {
let new_membership = BTreeSet::from_iter(membership);
client
.change_membership(&new_membership)
.await
.expect("failed to change membership");
}
ClientCommands::Metrics => {
let metrics = client.metrics().await.expect("failed to get metrics");
println!("{:?}", metrics);
}
}
});
}
}

Expand All @@ -76,5 +159,6 @@ fn build_runtime() -> Result<Runtime> {
})
.enable_all()
.build()?;

Ok(runtime)
}
2 changes: 1 addition & 1 deletion crates/pgsrv/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ thiserror = "1.0"
tracing = "0.1"
async-trait = "0.1.56"
futures = "0.3.21"
datafusion = "11.0"
datafusion = "12.0"
tokio-util = { version = "0.7.3", features = ["codec"] }
bytes = "1.2.1"
tokio = { version = "1", features = ["full"] }
Expand Down
9 changes: 3 additions & 6 deletions crates/pgsrv/src/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,7 @@ impl PgCodec {
fn decode_execute(buf: &mut Cursor<'_>) -> Result<FrontendMessage> {
let portal = buf.read_cstring()?.to_string();
let max_rows = buf.get_i32();
Ok(FrontendMessage::Execute {
portal,
max_rows,
})
Ok(FrontendMessage::Execute { portal, max_rows })
}

fn decode_sync(_buf: &mut Cursor<'_>) -> Result<FrontendMessage> {
Expand Down Expand Up @@ -367,8 +364,8 @@ impl Decoder for PgCodec {
let msg_len = i32::from_be_bytes(src[1..5].try_into().unwrap()) as usize;

// Not enough bytes to read the full message yet.
if src.len() < msg_len {
src.reserve(msg_len - src.len());
if src.len() < msg_len + 1 {
src.reserve(msg_len + 1 - src.len());
return Ok(None);
}

Expand Down
Loading

0 comments on commit 3a2154c

Please sign in to comment.