Skip to content

Commit

Permalink
Adding cassandra key value storage
Browse files Browse the repository at this point in the history
  • Loading branch information
irach-ramos committed Sep 27, 2024
1 parent 7dde731 commit 0b16ddf
Show file tree
Hide file tree
Showing 19 changed files with 1,342 additions and 1 deletion.
103 changes: 103 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@ wasmtime = { version = "=21.0.1", features = ["component-model"] }
wasmtime-wasi = { version = "=21.0.1" }
wasmtime-wasi-http = { version = "=21.0.1" }
webpki-roots = { version = "0.26.0" }
scylla = "0.14.0"

[patch.crates-io]
wasmtime = { git = "https://github.com/golemcloud/wasmtime.git", branch = "golem-wasmtime-v21.0.1" }
Expand Down
28 changes: 28 additions & 0 deletions golem-common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use figment::providers::{Env, Format, Serialized, Toml};
use figment::value::Value;
use figment::Figment;
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::time::Duration;
use url::Url;
Expand Down Expand Up @@ -467,3 +468,30 @@ pub struct DbPostgresConfig {
pub max_connections: u32,
pub schema: Option<String>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct CassandraConfig {
pub nodes: Vec<SocketAddr>,
#[serde(default = "default_cassandra_keyspace")]
pub keyspace: String,
pub tracing: bool,
pub pool_size_per_host: usize,
pub username: Option<String>,
pub password: Option<String>,
}
fn default_cassandra_keyspace() -> String {
String::from("__golem")
}

impl Default for CassandraConfig {
fn default() -> Self {
Self {
nodes: vec!["127.0.0.1:9042".parse().unwrap()],
keyspace: default_cassandra_keyspace(),
tracing: false,
pool_size_per_host: 3,
username: None,
password: None,
}
}
}
2 changes: 2 additions & 0 deletions golem-test-framework/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ cli-table = { workspace = true }
chrono = { workspace = true }
colored = "2.1.0"
console-subscriber = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
k8s-openapi = { workspace = true }
kill_tree = { version = "0.2.4", features = ["tokio"] }
Expand All @@ -35,6 +36,7 @@ kube-derive = { workspace = true }
once_cell = { workspace = true }
postgres = { workspace = true }
redis = { workspace = true }
scylla = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
Expand Down
82 changes: 82 additions & 0 deletions golem-test-framework/src/components/cassandra/docker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2024 Golem Cloud
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::Cassandra;
use crate::components::{DOCKER, NETWORK};
use std::sync::atomic::{AtomicBool, Ordering};
use testcontainers::{Container, GenericImage, RunnableImage};
use tracing::info;

pub struct DockerCassandra {
container: Container<'static, GenericImage>,
keep_container: bool,
valid: AtomicBool,
public_port: u16,
}

impl DockerCassandra {
const NAME: &'static str = "golem_cassandra";

pub fn new(keep_container: bool) -> Self {
let image = GenericImage::new("cassandra", "latest")
.with_exposed_port(super::DEFAULT_PORT)
.with_wait_for(testcontainers::core::WaitFor::message_on_stdout(
"Starting listening for CQL clients on",
));
let cassandra_image: RunnableImage<_> = RunnableImage::from(image)
.with_container_name(Self::NAME)
.with_network(NETWORK);

let container = DOCKER.run(cassandra_image);
let public_port: u16 = container.get_host_port_ipv4(super::DEFAULT_PORT);

DockerCassandra {
container,
keep_container,
valid: AtomicBool::new(true),
public_port,
}
}
}

impl Cassandra for DockerCassandra {
fn assert_valid(&self) {
if !self.valid.load(Ordering::Acquire) {
std::panic!("Cassandra has been closed")
}
}

fn private_known_nodes(&self) -> Vec<String> {
vec![format!("{}:{}", Self::NAME, super::DEFAULT_PORT)]
}

fn kill(&self) {
info!("Stopping Cassandra container");
if self.keep_container {
self.container.stop()
} else {
self.container.rm()
}
}

fn public_known_nodes(&self) -> Vec<String> {
vec![format!("localhost:{}", self.public_port)]
}
}

impl Drop for DockerCassandra {
fn drop(&mut self) {
self.kill()
}
}
71 changes: 71 additions & 0 deletions golem-test-framework/src/components/cassandra/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright 2024 Golem Cloud
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use scylla::{transport::session::PoolSize, Session, SessionBuilder};
use std::{num::NonZeroUsize, sync::Arc};
use tonic::async_trait;

pub mod docker;

#[async_trait]
pub trait Cassandra {
fn assert_valid(&self);

fn private_known_nodes(&self) -> Vec<String>;

fn public_known_nodes(&self) -> Vec<String>;

fn kill(&self);

async fn try_get_session(&self, keyspace: Option<&str>) -> Result<Arc<Session>, String> {
let mut session_builder = SessionBuilder::new()
.known_nodes(self.public_known_nodes())
.pool_size(PoolSize::PerHost(NonZeroUsize::new(10).unwrap()));

if let Some(keyspace) = keyspace {
session_builder = session_builder.use_keyspace(keyspace, false)
};

let session = session_builder
.build()
.await
.map_err(|e| e.to_string())
.unwrap();

Ok(Arc::new(session))
}

async fn get_session(&self, keyspace: Option<&str>) -> Arc<Session> {
self.assert_valid();
self.try_get_session(keyspace).await.unwrap()
}

async fn flush_keyspace(&self, keyspace: &str) {
let session = self.get_session(Some(keyspace)).await;
session
.query_unpaged(format!("TRUNCATE {}.{};", keyspace, "kv_store"), &[])
.await
.unwrap();
session
.query_unpaged(format!("TRUNCATE {}.{};", keyspace, "kv_sets"), &[])
.await
.unwrap();
session
.query_unpaged(format!("TRUNCATE {}.{};", keyspace, "kv_sorted_sets"), &[])
.await
.unwrap();
}
}

const DEFAULT_PORT: u16 = 9042;
1 change: 1 addition & 0 deletions golem-test-framework/src/components/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use tracing::{error, warn, Level};
use golem_api_grpc::proto::grpc::health::v1::health_check_response::ServingStatus;
use golem_api_grpc::proto::grpc::health::v1::HealthCheckRequest;

pub mod cassandra;
pub mod component_compilation_service;
pub mod component_service;
mod docker;
Expand Down
Loading

0 comments on commit 0b16ddf

Please sign in to comment.