Skip to content

Commit

Permalink
move schema creation to kv storage
Browse files Browse the repository at this point in the history
  • Loading branch information
irach-ramos committed Oct 7, 2024
1 parent 8e8609f commit 4139ab9
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 67 deletions.
64 changes: 0 additions & 64 deletions golem-worker-executor-base/src/storage/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use golem_common::config::CassandraConfig;
use golem_common::metrics::db::{record_db_failure, record_db_success};
use scylla::batch::{Batch, BatchType};
use scylla::prepared_statement::PreparedStatement;
use scylla::query::Query;
use scylla::serialize::row::SerializeRow;
use scylla::transport::errors::QueryError;
use scylla::FromRow;
Expand Down Expand Up @@ -65,69 +64,6 @@ impl CassandraSession {
})
}

pub async fn create_schema(&self) -> Result<(), String> {
self.session.query_unpaged(
Query::new(
format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};", self.keyspace),
),
&[],
).await
.map_err(|e| e.to_string())?;

self.session
.query_unpaged(
Query::new(format!(
r#"
CREATE TABLE IF NOT EXISTS {}.kv_store (
namespace TEXT,
key TEXT,
value BLOB,
PRIMARY KEY (namespace, key)
);"#,
self.keyspace
)),
&[],
)
.await
.map_err(|e| e.to_string())?;

self.session
.query_unpaged(
Query::new(format!(
r#"
CREATE TABLE IF NOT EXISTS {}.kv_sets (
namespace TEXT,
key TEXT,
value BLOB,
PRIMARY KEY ((namespace, key), value)
);"#,
self.keyspace
)),
&[],
)
.await
.map_err(|e| e.to_string())?;

self.session
.query_unpaged(
Query::new(format!(
r#"
CREATE TABLE IF NOT EXISTS {}.kv_sorted_sets (
namespace TEXT,
key TEXT,
score DOUBLE,
value BLOB,
PRIMARY KEY ((namespace, key), score, value)
);"#,
self.keyspace
)),
&[],
)
.await
.map_err(|e| e.to_string())
.map(|_| ())
}

pub fn with(&self, svc_name: &'static str, api_name: &'static str) -> CassandraLabelledApi {
CassandraLabelledApi {
svc_name,
Expand Down
68 changes: 67 additions & 1 deletion golem-worker-executor-base/src/storage/keyvalue/cassandra.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::storage::{
use async_trait::async_trait;
use bytes::Bytes;
use scylla::{
prepared_statement::PreparedStatement, serialize::row::SerializeRow,
prepared_statement::PreparedStatement, query::Query, serialize::row::SerializeRow,
transport::errors::QueryError, FromRow,
};
use serde::Deserialize;
Expand All @@ -32,6 +32,72 @@ pub struct CassandraKeyValueStorage {
}

impl CassandraKeyValueStorage {
pub async fn create_schema(&self) -> Result<(), String> {
self.session.session.query_unpaged(
Query::new(
format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};", self.session.keyspace),
),
&[],
).await
.map_err(|e| e.to_string())?;

self.session
.session
.query_unpaged(
Query::new(format!(
r#"
CREATE TABLE IF NOT EXISTS {}.kv_store (
namespace TEXT,
key TEXT,
value BLOB,
PRIMARY KEY (namespace, key)
);"#,
self.session.keyspace
)),
&[],
)
.await
.map_err(|e| e.to_string())?;

self.session
.session
.query_unpaged(
Query::new(format!(
r#"
CREATE TABLE IF NOT EXISTS {}.kv_sets (
namespace TEXT,
key TEXT,
value BLOB,
PRIMARY KEY ((namespace, key), value)
);"#,
self.session.keyspace
)),
&[],
)
.await
.map_err(|e| e.to_string())?;

self.session
.session
.query_unpaged(
Query::new(format!(
r#"
CREATE TABLE IF NOT EXISTS {}.kv_sorted_sets (
namespace TEXT,
key TEXT,
score DOUBLE,
value BLOB,
PRIMARY KEY ((namespace, key), score, value)
);"#,
self.session.keyspace
)),
&[],
)
.await
.map_err(|e| e.to_string())
.map(|_| ())
}

pub fn new(session: CassandraSession) -> Self {
Self { session }
}
Expand Down
5 changes: 3 additions & 2 deletions golem-worker-executor-base/tests/key_value_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,13 @@ pub(crate) async fn cassandra_storage() -> impl GetKeyValueStorage {
let test_keyspace = format!("golem_test_{}", &Uuid::new_v4().to_string()[..8]);
let session = cassandra.get_session(None).await;
let cassandra_session = CassandraSession::new(session, true, &test_keyspace);
if let Err(err_msg) = cassandra_session.create_schema().await {

let kvs = CassandraKeyValueStorage::new(cassandra_session);
if let Err(err_msg) = kvs.create_schema().await {
cassandra.kill();
panic!("Cannot create schema : {}", err_msg);
}

let kvs = CassandraKeyValueStorage::new(cassandra_session);
CassandraKeyValueStorageWrapper { kvs }
} else {
panic!("Cassandra is not configured");
Expand Down

0 comments on commit 4139ab9

Please sign in to comment.