Skip to content

Commit

Permalink
feat: Add integration test for dual mode (#582)
Browse files Browse the repository at this point in the history
This adds an integration test for "Dual" mode. Note that this uses a
database settings file `dual_test.json` to contain the doubly escaped
values. The values in this file should match those used in
`test_integration_all_rust.py` for data store configuration and set-up.

This also adds a bit more `debug` messages to indicate successful
operations. This was useful while determining a few problems with the
configuration, but are not strongly required. I can remove them if
requested.

Closes: https://mozilla-hub.atlassian.net/browse/SYNC-4111

---------

Co-authored-by: Philip Jenvey <[email protected]>
  • Loading branch information
jrconlin and pjenvey authored Jan 30, 2024
1 parent db2e02e commit 7684875
Show file tree
Hide file tree
Showing 8 changed files with 56 additions and 22 deletions.
19 changes: 11 additions & 8 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ jobs:
export PATH=$PATH:$HOME/.cargo/bin
echo 'export PATH=$PATH:$HOME/.cargo/bin' >> $BASH_ENV
rustc --version
cargo build --features=emulator
cargo build --features=emulator --features=dual
- run:
name: Check formatting
command: |
Expand All @@ -153,7 +153,7 @@ jobs:
# when doing discovery, we found that the docker image `meminfo` and `cpuinfo` often report
# the machine level memory and CPU which are far higher than the memory allocated to the docker
# instance. This may be causing rust to be overly greedy triggering the VM to OOM the process.)
command: cargo test --features=emulator --jobs=2
command: cargo test --features=emulator --features=dual --jobs=2
- run:
name: Integration tests (Bigtable)
command: make integration-test
Expand All @@ -165,12 +165,15 @@ jobs:
command: make integration-test
environment:
TEST_RESULTS_DIR: workspace/test-results
# - run:
# name: Integration tests (Dual Bigtable/DynamoDB)
# command: make integration-test
# environment:
# DB_DSN: dual
# TEST_RESULTS_DIR: workspace/test-results
- run:
name: Integration tests (Dual Bigtable/DynamoDB)
command: make integration-test
environment:
RUST_LOG: autopush=debug,autopush_common=debug,autoendpoint=debug,autoconnect=debug,slog_mozlog_json=info,warn
# PYTEST_ARGS: -sv
DB_DSN: dual
DB_SETTINGS: tests/integration/dual_test.json
TEST_RESULTS_DIR: workspace/test-results
- store_test_results:
path: workspace/test-results
- save_cache:
Expand Down
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ integration-test-legacy:
integration-test:
$(POETRY) -V
$(POETRY) install --without dev,load --no-root
CONNECTION_BINARY=autoconnect \
CONNECTION_SETTINGS_PREFIX=autoconnect__ \
$(POETRY) run pytest $(INTEGRATION_TEST_FILE) \
--junit-xml=$(TEST_RESULTS_DIR)/integration_test_results.xml \
-v $(PYTEST_ARGS)
Expand Down
1 change: 1 addition & 0 deletions autopush-common/src/db/bigtable/bigtable_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ impl BigTableClientImpl {

// Do the actual commit.
let bigtable = self.pool.get().await?;
debug!("🉑 writing row...");
let _resp = bigtable
.conn
.mutate_row_async(&req)
Expand Down
11 changes: 8 additions & 3 deletions autopush-common/src/db/bigtable/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ impl BigTablePool {
pub async fn get(
&self,
) -> Result<deadpool::managed::Object<BigtableClientManager>, error::BigTableError> {
self.pool
let obj = self
.pool
.get()
.await
.map_err(|e| error::BigTableError::Pool(e.to_string()))
.map_err(|e| error::BigTableError::Pool(e.to_string()))?;
debug!("🉑 Got db from pool");
Ok(obj)
}

/// Get the pools manager, because we would like to talk to them.
Expand Down Expand Up @@ -151,7 +154,9 @@ impl Manager for BigtableClientManager {
/// `BigtableClient` is the most atomic we can go.
async fn create(&self) -> Result<BigtableDb, DbError> {
debug!("🏊 Create a new pool entry.");
Ok(BigtableDb::new(self.get_channel()?))
let entry = BigtableDb::new(self.get_channel()?);
debug!("🏊 Bigtable connection acquired");
Ok(entry)
}

/// Recycle if the connection has outlived it's lifespan.
Expand Down
22 changes: 16 additions & 6 deletions autopush-common/src/db/dual/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ pub struct DualClientImpl {
pub struct DualDbSettings {
primary: DbSettings,
secondary: DbSettings,
#[serde(default)]
write_to_secondary: bool,
#[serde(default)]
median: Option<String>,
Expand All @@ -45,6 +46,7 @@ pub struct DualDbSettings {
impl DualClientImpl {
pub fn new(metrics: Arc<StatsdClient>, settings: &DbSettings) -> DbResult<Self> {
// Not really sure we need the dsn here.
info!("Trying: {:?}", settings.db_settings);
let db_settings: DualDbSettings = from_str(&settings.db_settings).map_err(|e| {
DbError::General(format!("Could not parse DualDBSettings string {:?}", e))
})?;
Expand Down Expand Up @@ -79,6 +81,7 @@ impl DualClientImpl {
};
let primary = BigTableClientImpl::new(metrics.clone(), &db_settings.primary)?;
let secondary = DdbClientImpl::new(metrics.clone(), &db_settings.secondary)?;
debug!("⚖ Got primary and secondary");
Ok(Self {
primary,
secondary: secondary.clone(),
Expand All @@ -95,18 +98,20 @@ impl DualClientImpl {
/// allowance
/// Returns the dbclient to use and whether or not it's the primary database.
async fn allot<'a>(&'a self, uaid: &Uuid) -> DbResult<(Box<&'a dyn DbClient>, bool)> {
if let Some(median) = self.median {
let target: (Box<&'a dyn DbClient>, bool) = if let Some(median) = self.median {
if uaid.as_bytes()[0] <= median {
debug!("⚖ Routing user to Bigtable");
// These are migrations so the metrics should appear as
// `auto[endpoint|connect].migrate`.
Ok((Box::new(&self.primary), true))
(Box::new(&self.primary), true)
} else {
Ok((Box::new(&self.secondary), false))
(Box::new(&self.secondary), false)
}
} else {
Ok((Box::new(&self.primary), true))
}
(Box::new(&self.primary), true)
};
debug!("⚖ alloting to {}", target.0.name());
Ok(target)
}
}

Expand All @@ -117,7 +122,10 @@ impl DbClient for DualClientImpl {
if is_primary && self.write_to_secondary {
let _ = self.secondary.add_user(user).await?;
}
target.add_user(user).await
debug!("⚖ adding user to {}...", target.name());
let result = target.add_user(user).await?;
debug!("⚖ User added...");
Ok(result)
}

async fn update_user(&self, user: &User) -> DbResult<bool> {
Expand Down Expand Up @@ -163,7 +171,9 @@ impl DbClient for DualClientImpl {
}

async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> {
debug!("⚖ getting target");
let (target, _) = self.allot(uaid).await?;
debug!("⚖ Adding channel to {}", target.name());
target.add_channel(uaid, channel_id).await
}

Expand Down
1 change: 1 addition & 0 deletions autopush-common/src/db/dynamodb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ pub struct DdbClientImpl {

impl DdbClientImpl {
pub fn new(metrics: Arc<StatsdClient>, db_settings: &DbSettings) -> DbResult<Self> {
debug!("🛢️DynamoDB Settings {:?}", db_settings);
let db_client = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") {
DynamoDbClient::new_with(
HttpClient::new().expect("TLS initialization error"),
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/dual_test.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"primary": {
"db_settings": "{\"message_family\": \"message\", \"router_family\": \"router\", \"table_name\": \"projects/test/instances/test/tables/autopush\"}",
"dsn": "grpc://localhost:8086"
},
"secondary": {
"db_settings": "{\"message_table\": \"message_int_test\",\"router_table\": \"router_int_test\"}",
"dsn": "http://localhost:8000/"
}
}
12 changes: 9 additions & 3 deletions tests/integration/test_integration_all_rust.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@
MP_CONNECTION_PORT = 9052
MP_ROUTER_PORT = 9072

CONNECTION_BINARY = os.environ.get("CONNECTION_BINARY", "autopush_rs")
CONNECTION_SETTINGS_PREFIX = os.environ.get("CONNECTION_SETTINGS_PREFIX", "autopush__")
CONNECTION_BINARY = os.environ.get("CONNECTION_BINARY", "autoconnect")
CONNECTION_SETTINGS_PREFIX = os.environ.get("CONNECTION_SETTINGS_PREFIX", "autoconnect__")

CN_SERVER: subprocess.Popen | None = None
CN_MP_SERVER: subprocess.Popen | None = None
Expand Down Expand Up @@ -465,7 +465,7 @@ def _get_vapid(


def enqueue_output(out, queue):
for line in iter(out.readline, b""):
for line in iter(out.readline, ""):
queue.put(line)
out.close()

Expand Down Expand Up @@ -667,10 +667,14 @@ def setup_connection_server(connection_binary):
CONNECTION_CONFIG["port"] = parsed.port
CONNECTION_CONFIG["endpoint_scheme"] = parsed.scheme
write_config_to_env(CONNECTION_CONFIG, CONNECTION_SETTINGS_PREFIX)
log.debug("Using existing Connection server")
return
else:
write_config_to_env(CONNECTION_CONFIG, CONNECTION_SETTINGS_PREFIX)
cmd = [connection_binary]
run_args = os.getenv("RUN_ARGS")
if run_args is not None:
cmd.append(run_args)
log.debug(f"🐍🟢 Starting Connection server: {' '.join(cmd)}")
CN_SERVER = subprocess.Popen(
cmd,
Expand Down Expand Up @@ -701,6 +705,7 @@ def setup_megaphone_server(connection_binary):
parsed = urlparse(url)
MEGAPHONE_CONFIG["endpoint_port"] = parsed.port
write_config_to_env(MEGAPHONE_CONFIG, CONNECTION_SETTINGS_PREFIX)
log.debug("Using existing Megaphone server")
return
else:
write_config_to_env(MEGAPHONE_CONFIG, CONNECTION_SETTINGS_PREFIX)
Expand All @@ -725,6 +730,7 @@ def setup_endpoint_server():
ENDPOINT_CONFIG["hostname"] = parsed.hostname
ENDPOINT_CONFIG["port"] = parsed.port
ENDPOINT_CONFIG["endpoint_scheme"] = parsed.scheme
log.debug("Using existing Endpoint server")
return
else:
write_config_to_env(ENDPOINT_CONFIG, "autoend__")
Expand Down

0 comments on commit 7684875

Please sign in to comment.