From 4139ab9f0c786d24d6be12fb09e0afa37517efe7 Mon Sep 17 00:00:00 2001 From: Irach Ramos Date: Mon, 7 Oct 2024 10:37:12 +0200 Subject: [PATCH] move schema creation to kv storage --- .../src/storage/cassandra.rs | 64 ----------------- .../src/storage/keyvalue/cassandra.rs | 68 ++++++++++++++++++- .../tests/key_value_storage.rs | 5 +- 3 files changed, 70 insertions(+), 67 deletions(-) diff --git a/golem-worker-executor-base/src/storage/cassandra.rs b/golem-worker-executor-base/src/storage/cassandra.rs index af51d02753..7857f7e3ae 100644 --- a/golem-worker-executor-base/src/storage/cassandra.rs +++ b/golem-worker-executor-base/src/storage/cassandra.rs @@ -17,7 +17,6 @@ use golem_common::config::CassandraConfig; use golem_common::metrics::db::{record_db_failure, record_db_success}; use scylla::batch::{Batch, BatchType}; use scylla::prepared_statement::PreparedStatement; -use scylla::query::Query; use scylla::serialize::row::SerializeRow; use scylla::transport::errors::QueryError; use scylla::FromRow; @@ -65,69 +64,6 @@ impl CassandraSession { }) } - pub async fn create_schema(&self) -> Result<(), String> { - self.session.query_unpaged( - Query::new( - format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};", self.keyspace), - ), - &[], - ).await - .map_err(|e| e.to_string())?; - - self.session - .query_unpaged( - Query::new(format!( - r#" - CREATE TABLE IF NOT EXISTS {}.kv_store ( - namespace TEXT, - key TEXT, - value BLOB, - PRIMARY KEY (namespace, key) - );"#, - self.keyspace - )), - &[], - ) - .await - .map_err(|e| e.to_string())?; - - self.session - .query_unpaged( - Query::new(format!( - r#" - CREATE TABLE IF NOT EXISTS {}.kv_sets ( - namespace TEXT, - key TEXT, - value BLOB, - PRIMARY KEY ((namespace, key), value) - );"#, - self.keyspace - )), - &[], - ) - .await - .map_err(|e| e.to_string())?; - - self.session - .query_unpaged( - Query::new(format!( - r#" - CREATE TABLE IF NOT EXISTS {}.kv_sorted_sets ( - namespace TEXT, - key TEXT, - score DOUBLE, - value BLOB, - PRIMARY KEY ((namespace, key), score, value) - );"#, - self.keyspace - )), - &[], - ) - .await - .map_err(|e| e.to_string()) - .map(|_| ()) - } - pub fn with(&self, svc_name: &'static str, api_name: &'static str) -> CassandraLabelledApi { CassandraLabelledApi { svc_name, diff --git a/golem-worker-executor-base/src/storage/keyvalue/cassandra.rs b/golem-worker-executor-base/src/storage/keyvalue/cassandra.rs index 972c724fc7..8c63db9ef4 100644 --- a/golem-worker-executor-base/src/storage/keyvalue/cassandra.rs +++ b/golem-worker-executor-base/src/storage/keyvalue/cassandra.rs @@ -19,7 +19,7 @@ use crate::storage::{ use async_trait::async_trait; use bytes::Bytes; use scylla::{ - prepared_statement::PreparedStatement, serialize::row::SerializeRow, + prepared_statement::PreparedStatement, query::Query, serialize::row::SerializeRow, transport::errors::QueryError, FromRow, }; use serde::Deserialize; @@ -32,6 +32,72 @@ pub struct CassandraKeyValueStorage { } impl CassandraKeyValueStorage { + pub async fn create_schema(&self) -> Result<(), String> { + self.session.session.query_unpaged( + Query::new( + format!("CREATE KEYSPACE IF NOT EXISTS {} WITH REPLICATION = {{ 'class' : 'SimpleStrategy', 'replication_factor' : 1 }};", self.session.keyspace), + ), + &[], + ).await + .map_err(|e| e.to_string())?; + + self.session + .session + .query_unpaged( + Query::new(format!( + r#" + CREATE TABLE IF NOT EXISTS {}.kv_store ( + namespace TEXT, + key TEXT, + value BLOB, + PRIMARY KEY (namespace, key) + );"#, + self.session.keyspace + )), + &[], + ) + .await + .map_err(|e| e.to_string())?; + + self.session + .session + .query_unpaged( + Query::new(format!( + r#" + CREATE TABLE IF NOT EXISTS {}.kv_sets ( + namespace TEXT, + key TEXT, + value BLOB, + PRIMARY KEY ((namespace, key), value) + );"#, + self.session.keyspace + )), + &[], + ) + .await + .map_err(|e| e.to_string())?; + + self.session + .session + .query_unpaged( + Query::new(format!( + r#" + CREATE TABLE IF NOT EXISTS {}.kv_sorted_sets ( + namespace TEXT, + key TEXT, + score DOUBLE, + value BLOB, + PRIMARY KEY ((namespace, key), score, value) + );"#, + self.session.keyspace + )), + &[], + ) + .await + .map_err(|e| e.to_string()) + .map(|_| ()) + } + pub fn new(session: CassandraSession) -> Self { Self { session } } diff --git a/golem-worker-executor-base/tests/key_value_storage.rs b/golem-worker-executor-base/tests/key_value_storage.rs index c01837b860..080d7d7c86 100644 --- a/golem-worker-executor-base/tests/key_value_storage.rs +++ b/golem-worker-executor-base/tests/key_value_storage.rs @@ -129,12 +129,13 @@ pub(crate) async fn cassandra_storage() -> impl GetKeyValueStorage { let test_keyspace = format!("golem_test_{}", &Uuid::new_v4().to_string()[..8]); let session = cassandra.get_session(None).await; let cassandra_session = CassandraSession::new(session, true, &test_keyspace); - if let Err(err_msg) = cassandra_session.create_schema().await { + + let kvs = CassandraKeyValueStorage::new(cassandra_session); + if let Err(err_msg) = kvs.create_schema().await { cassandra.kill(); panic!("Cannot create schema : {}", err_msg); } - let kvs = CassandraKeyValueStorage::new(cassandra_session); CassandraKeyValueStorageWrapper { kvs } } else { panic!("Cassandra is not configured");