Skip to content
This repository has been archived by the owner on Sep 7, 2023. It is now read-only.

Commit

Permalink
Clientless runtime-agnostic architecture (typedb#32)
Browse files Browse the repository at this point in the history
## What is the goal of this PR?

We remove the requirement that typedb-client be used within a tokio
runtime, making the library runtime-agnostic.
We also remove the distinction between core and cluster, and replace the
`Client` entry point with more fundamental `Connection`.

## What is the motivation behind the changes?

### Encapsulate tokio runtime

This change serves three main purposes:

1. **Remove the requirement that the typedb-client library is run from a
tokio runtime.** The gRPC crate we use, `tonic`, and its networking
dependencies heavily rely on being used within a `tokio` runtime. That's
a fairly big restriction to place on all user code. Now that all RPC
interaction is hidden away in a background thread, all the API exposed
to the user can be fully runtime-agnostic, and, potentially down the
line, available in synchronous contexts as well.

2. **Eliminate the session close deadlocks in single-threaded
runtimes.** We create our own _system_ thread, which spawns a tokio
runtime and handles the RPC communication. This runs independently from
the user-facing runtime and as such is not affected by it.
Previously, if the user code happened to run in a single-threaded
runtime, dropping the last handle for a session would send a
`session_close()` request and block the only executor thread tokio had
available, deadlocking the entire application. This happened because, to
avoid sessions timing out on the server side, the session's drop method
would block until it received a signal that the request had been sent.
Because RPC communication is done asynchronously, it must be done in a
different task from the one that's performing the drop, which is always
blocking and cannot yield to said task.

3. **Encapsulate away RPC implementation details.** Should we ever make
a decision to move away from gRPC+protobuf, the change would only
require us to change a small set of files, namely the contents of
`connection::network`. The communication under this new model uses
Rust-native `Request` and `Response` enums, abstracting away the
protocol structures.

### Dissolve Client into underlying Connection

Requiring that a `session` may not outlive its `client`, and that a
`transaction` may not outlive its `session`, meant that to preserve
consistency we had to either extend the lifetimes of both `client` and
`session`, or require the user to share the `client` handle between
threads explicitly, even if they don't intend to use it beyond opening a
`session`.

Removing the top-of-the-hierarchy `Client` type and replacing it with a
primitive clonable `Connection` allows us to partially invert the
hierarchy, such that `DatabaseManager` and `Session` can explicitly own
the resources they rely upon. This is in line with how established Rust
crates (e.g. `tonic`, `mysql`) treat connections.

For multithreading or concurrency, the ownership of a `session` needs to
be explicitly shared between the threads or tasks, whether it be via
using shared pointers (read: `Arc<_>`) or explicit scope bounds (such as
`std::thread::scope()` in a synchronous version).

### Remove distinction between core and cluster

The shift from TypeDB Core + TypeDB Cluster to just TypeDB (Cluster /
Cloud) by default is reflected in the architecture. We now treat a core
server as effectively a single-node cluster instance that lacks
enterprise facilities (viz. user management).

This change greatly improves user experience: all code written to
interact with an open-source TypeDB instance is automatically valid for
the production instance with a simple change in initialization. As a
side-effect, this also helps us ensure that all integration tests
implemented for core are also automatically implemented for cluster.

Merging core and cluster also vastly simplifies the internal structure
of the library, as only a few places have to know about which backend
they are running against, specifically the portions that deal with
authentication and, some day, user management.

## What are the changes implemented in this PR?

Major changes:
- New `connection` module:
- `Connection` and `ServerConnection` conceptually roughly correspond to
`ClusterRPC` and `ClusterServerRPC`: `Connection`'s only job is to
manage the set of `ServerConnection`s, i.e. connections to individual
nodes of the server;
- `Connection`, created by user, spawns a background single-threaded
tokio runtime which will houses all request handlers;
- `ServerConnection` performs the actual message-passing between user
code and its dedicated request dispatcher.
- Move `common::rpc` module under `connection::network`:
- move all protobuf serialization/deserialization into
`connection::network::proto`, fully isolated from the rest of the crate;
- provide native message enums intended for inter-thread communication
(cf. `common::info` for crate-wide data structures);
- merge `CoreRPC`, `ServerRPC`, and `ClusterServerRPC` into single
`RPCStub`;
- add `RPCTransmitter`: a dispatcher agent meant to run in the
background tokio runtime, which handles the communication with the
server; its job is to:
    - listen for user requests over an inter-thread mpsc channel,
    - serialize the requests for tonic;
    - deserialize the responses;
    - send the responses back over the provided callback channel;
- overhaul `TransactionRPC` into `TransactionTransmitter`, analogous to
the `RPCTransmitter` above;
- its listener loop buffers the requests into batches before dispatching
them to the server;
- because, unlike `RPCTransmitter`, `TransactionTransmitter` wraps a
bidirectional stream, it also has an associated listener agent that
handles the user callbacks and auto-requests stream continuation.
- Remove `connection::core` and merge `connection::{cluster, server}`,
and `query` into a single top-level `database` module:
- `ServerDatabase` and `ServerSession` are now hidden implementation
details that handle communication with an individual node;
    - `Client`, as mentioned, has been removed entirely.

Minor changes:
- remove the no longer needed `async_dispatch` helper macro;
- restructure the `queries_...` tests into a single `queries` test
module that handles both core and cluster connections using a helper
permutation test macro;
- add a `compatibility` test module that ensures the API is async
runtime-agnostic.

Closes typedb#7, typedb#16, typedb#17, typedb#20, typedb#22, typedb#30.
  • Loading branch information
dmitrii-ubskii authored Mar 1, 2023
1 parent edee6f2 commit 993aa8f
Show file tree
Hide file tree
Showing 64 changed files with 3,841 additions and 3,387 deletions.
17 changes: 15 additions & 2 deletions .factory/automation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ config:
version-candidate: VERSION
dependencies:
dependencies: [build]
typedb-common: [build]
typedb-protocol: [build, release]
typeql: [build, release]

build:
quality:
Expand Down Expand Up @@ -53,7 +55,7 @@ build:
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
bazel build //...
tools/start-core-server.sh
bazel test //tests:queries_core --test_arg=-- --test_arg=--test-threads=1 --test_output=streamed && export TEST_SUCCESS=0 || export TEST_SUCCESS=1
bazel test //tests:queries --test_arg=-- --test_arg=core --test_arg=--test-threads=1 --test_output=streamed && export TEST_SUCCESS=0 || export TEST_SUCCESS=1
tools/stop-core-server.sh
exit $TEST_SUCCESS
test-integration-cluster:
Expand All @@ -65,9 +67,20 @@ build:
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
bazel build //...
source tools/start-cluster-servers.sh # use source to receive export vars
bazel test //tests:queries_cluster --test_env=ROOT_CA=$ROOT_CA --test_arg=-- --test_arg=--test-threads=1 --test_output=streamed && export TEST_SUCCESS=0 || export TEST_SUCCESS=1
bazel test //tests:queries --test_env=ROOT_CA=$ROOT_CA --test_arg=-- --test_arg=cluster --test_arg=--test-threads=1 --test_output=streamed && export TEST_SUCCESS=0 || export TEST_SUCCESS=1
tools/stop-cluster-servers.sh
exit $TEST_SUCCESS
test-integration-runtimes:
image: vaticle-ubuntu-22.04
command: |
export ARTIFACT_USERNAME=$REPO_VATICLE_USERNAME
export ARTIFACT_PASSWORD=$REPO_VATICLE_PASSWORD
bazel run @vaticle_dependencies//distribution/artifact:create-netrc
bazel build //...
tools/start-core-server.sh
bazel test //tests:runtimes --test_arg=-- --test_arg=--test-threads=1 --test_output=streamed && export TEST_SUCCESS=0 || export TEST_SUCCESS=1
tools/stop-core-server.sh
exit $TEST_SUCCESS
deploy-crate-snapshot:
filter:
owner: vaticle
Expand Down
36 changes: 19 additions & 17 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -31,45 +31,47 @@ load("//:deployment.bzl", deployment_github = "deployment")
rust_library(
name = "typedb_client",
srcs = glob(["src/**/*.rs"]),
tags = ["crate-name=typedb-client"],
deps = [
"@crates//:chrono",
"@crates//:crossbeam",
"@crates//:futures",
"@crates//:http",
"@crates//:itertools",
"@crates//:log",
"@crates//:prost",
"@crates//:tokio",
"@crates//:tokio-stream",
"@crates//:tonic",
"@crates//:uuid",
"@vaticle_typedb_protocol//grpc/rust:typedb_protocol",
"@vaticle_typeql//rust:typeql_lang",

"@vaticle_dependencies//library/crates:chrono",
"@vaticle_dependencies//library/crates:crossbeam",
"@vaticle_dependencies//library/crates:futures",
"@vaticle_dependencies//library/crates:log",
"@vaticle_dependencies//library/crates:prost",
"@vaticle_dependencies//library/crates:tokio",
"@vaticle_dependencies//library/crates:tonic",
"@vaticle_dependencies//library/crates:uuid",
],
tags = ["crate-name=typedb-client"],
)

assemble_crate(
name = "assemble_crate",
target = "typedb_client",
description = "TypeDB Client API for Rust",
homepage = "https://github.com/vaticle/typedb-client-rust",
license = "Apache-2.0",
repository = "https://github.com/vaticle/typedb-client-rust",
target = "typedb_client",
)

deploy_crate(
name = "deploy_crate",
target = ":assemble_crate",
release = deployment["crate.release"],
snapshot = deployment["crate.snapshot"],
release = deployment["crate.release"]
target = ":assemble_crate",
)

deploy_github(
name = "deploy_github",
draft = True,
title = "TypeDB Client Rust",
release_description = "//:RELEASE_TEMPLATE.md",
organisation = deployment_github["github.organisation"],
release_description = "//:RELEASE_TEMPLATE.md",
repository = deployment_github["github.repository"],
title = "TypeDB Client Rust",
title_append_version = True,
)

Expand Down Expand Up @@ -105,13 +107,13 @@ filegroup(

rustfmt_test(
name = "client_rustfmt_test",
targets = ["typedb_client"]
targets = ["typedb_client"],
)

# CI targets that are not declared in any BUILD file, but are called externally
filegroup(
name = "ci",
data = [
"@vaticle_dependencies//ide/rust:sync"
"@vaticle_dependencies//tool/cargo:sync",
],
)
6 changes: 4 additions & 2 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ load("@rules_rust//rust:repositories.bzl", "rules_rust_dependencies", "rust_regi
rules_rust_dependencies()
rust_register_toolchains(edition = "2021", include_rustc_srcs = True)

load("@vaticle_dependencies//library/crates:crates.bzl", "raze_fetch_remote_crates")
raze_fetch_remote_crates()
load("@vaticle_dependencies//library/crates:crates.bzl", "fetch_crates")
fetch_crates()
load("@crates//:defs.bzl", "crate_repositories")
crate_repositories()

# Load //builder/python
load("@vaticle_dependencies//builder/python:deps.bzl", python_deps = "deps")
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion dependencies/ide/sync.sh → dependencies/ide/rust/sync.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@
# under the License.
#

bazel run @vaticle_dependencies//ide/rust:sync
bazel run @vaticle_dependencies//tool/cargo:sync
8 changes: 4 additions & 4 deletions dependencies/vaticle/repositories.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,26 @@ def vaticle_dependencies():
git_repository(
name = "vaticle_dependencies",
remote = "https://github.com/vaticle/dependencies",
commit = "d76a7b935cd6452615f78772539fbc2e1228f503", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_dependencies
commit = "76636b1672b04e9880439395b8913231724ae459", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_dependencies
)

def vaticle_typedb_common():
git_repository(
name = "vaticle_typedb_common",
remote = "https://github.com/vaticle/typedb-common",
tag = "2.12.0" # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_common
commit = "aa03cb5f6a57ec2a51291b7a0510734ca1f41479" # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_typedb_common
)

def vaticle_typedb_protocol():
git_repository(
name = "vaticle_typedb_protocol",
remote = "https://github.com/vaticle/typedb-protocol",
commit = "16d1fb6749c0fee85843ca67f470015dda9fc497", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_dependencies
commit = "b1c19e02054c1a1d354b42875e6ccd67602a546f", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_dependencies
)

def vaticle_typeql():
git_repository(
name = "vaticle_typeql",
remote = "https://github.com/vaticle/typeql",
commit = "776643fb6f0c754730e55230733fd2326f32cd39", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_dependencies
commit = "7a63699b3879296ae3039577ba3f5220bbf6d33d", # sync-marker: do not remove this comment, this is used for sync-dependencies by @vaticle_dependencies
)
1 change: 1 addition & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@
imports_granularity = "Crate"
group_imports = "StdExternalCrate"
use_small_heuristics = "Max"
max_width = 120
10 changes: 1 addition & 9 deletions src/answer/concept_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,14 @@ use std::{
ops::Index,
};

use crate::{common::Result, concept::Concept};
use crate::concept::Concept;

#[derive(Debug)]
pub struct ConceptMap {
pub map: HashMap<String, Concept>,
}

impl ConceptMap {
pub(crate) fn from_proto(proto: typedb_protocol::ConceptMap) -> Result<Self> {
let mut map = HashMap::with_capacity(proto.map.len());
for (k, v) in proto.map {
map.insert(k, Concept::from_proto(v)?);
}
Ok(Self { map })
}

pub fn get(&self, var_name: &str) -> Option<&Concept> {
self.map.get(var_name)
}
Expand Down
16 changes: 0 additions & 16 deletions src/answer/numeric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,6 @@
* under the License.
*/

use typedb_protocol::numeric::Value;

use crate::common::{Error, Result};

#[derive(Clone, Debug)]
pub enum Numeric {
Long(i64),
Expand All @@ -48,18 +44,6 @@ impl Numeric {
}
}

impl TryFrom<typedb_protocol::Numeric> for Numeric {
type Error = Error;

fn try_from(value: typedb_protocol::Numeric) -> Result<Self> {
match value.value.unwrap() {
Value::LongValue(long) => Ok(Numeric::Long(long)),
Value::DoubleValue(double) => Ok(Numeric::Double(double)),
Value::Nan(_) => Ok(Numeric::NaN),
}
}
}

impl From<Numeric> for i64 {
fn from(n: Numeric) -> Self {
n.into_i64()
Expand Down
6 changes: 3 additions & 3 deletions src/common/address.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@

use std::{fmt, str::FromStr};

use tonic::transport::Uri;
use http::Uri;

use crate::common::{Error, Result};

#[derive(Clone, Debug, Hash, PartialEq, Eq)]
pub struct Address {
pub(crate) struct Address {
uri: Uri,
}

Expand All @@ -43,7 +43,7 @@ impl FromStr for Address {
let uri = if address.contains("://") {
address.parse::<Uri>()?
} else {
format!("http://{}", address).parse::<Uri>()?
format!("http://{address}").parse::<Uri>()?
};
Ok(Self { uri })
}
Expand Down
83 changes: 24 additions & 59 deletions src/common/credential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,42 @@
* under the License.
*/

use std::{
fs,
path::{Path, PathBuf},
sync::RwLock,
};
use std::{fmt, fs, path::Path};

use tonic::{
transport::{Certificate, ClientTlsConfig},
Request,
};
use tonic::transport::{Certificate, ClientTlsConfig};

use crate::Result;

#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct Credential {
username: String,
password: String,
is_tls_enabled: bool,
tls_root_ca: Option<PathBuf>,
tls_config: Option<ClientTlsConfig>,
}

impl Credential {
pub fn with_tls(username: &str, password: &str, tls_root_ca: Option<&Path>) -> Self {
Credential {
pub fn with_tls(username: &str, password: &str, tls_root_ca: Option<&Path>) -> Result<Self> {
let tls_config = Some(if let Some(tls_root_ca) = tls_root_ca {
ClientTlsConfig::new().ca_certificate(Certificate::from_pem(fs::read_to_string(tls_root_ca)?))
} else {
ClientTlsConfig::new()
});

Ok(Credential {
username: username.to_owned(),
password: password.to_owned(),
is_tls_enabled: true,
tls_root_ca: tls_root_ca.map(Path::to_owned),
}
tls_config,
})
}

pub fn without_tls(username: &str, password: &str) -> Self {
Credential {
username: username.to_owned(),
password: password.to_owned(),
is_tls_enabled: false,
tls_root_ca: None,
tls_config: None,
}
}

Expand All @@ -71,51 +70,17 @@ impl Credential {
self.is_tls_enabled
}

pub fn tls_config(&self) -> Result<ClientTlsConfig> {
if let Some(ref tls_root_ca) = self.tls_root_ca {
Ok(ClientTlsConfig::new()
.ca_certificate(Certificate::from_pem(fs::read_to_string(tls_root_ca)?)))
} else {
Ok(ClientTlsConfig::new())
}
pub fn tls_config(&self) -> &Option<ClientTlsConfig> {
&self.tls_config
}
}

#[derive(Debug)]
pub(crate) struct CallCredentials {
credential: Credential,
token: RwLock<Option<String>>,
}

impl CallCredentials {
pub(super) fn new(credential: Credential) -> Self {
Self { credential, token: RwLock::new(None) }
}

pub(super) fn username(&self) -> &str {
self.credential.username()
}

pub(super) fn password(&self) -> &str {
self.credential.password()
}

pub(super) fn set_token(&self, token: String) {
*self.token.write().unwrap() = Some(token);
}

pub(super) fn reset_token(&self) {
*self.token.write().unwrap() = None;
}

pub(super) fn inject(&self, mut request: Request<()>) -> Request<()> {
request.metadata_mut().insert("username", self.credential.username().try_into().unwrap());
match &*self.token.read().unwrap() {
Some(token) => request.metadata_mut().insert("token", token.try_into().unwrap()),
None => request
.metadata_mut()
.insert("password", self.credential.password().try_into().unwrap()),
};
request
impl fmt::Debug for Credential {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Credential")
.field("username", &self.username)
.field("is_tls_enabled", &self.is_tls_enabled)
.field("tls_config", &self.tls_config)
.finish()
}
}
Loading

0 comments on commit 993aa8f

Please sign in to comment.