From 76848753b268b7116e572ad6ab3e9f3d256f2009 Mon Sep 17 00:00:00 2001 From: JR Conlin Date: Tue, 30 Jan 2024 08:07:53 -0800 Subject: [PATCH] feat: Add integration test for `dual` mode (#582) This adds an integration test for "Dual" mode. Note that this uses a database settings file `dual_test.json` to contain the doubly escaped values. The values in this file should match those used in `test_integration_all_rust.py` for data store configuration and set-up. This also adds a bit more `debug` messages to indicate successful operations. This was useful while determining a few problems with the configuration, but are not strongly required. I can remove them if requested. Closes: https://mozilla-hub.atlassian.net/browse/SYNC-4111 --------- Co-authored-by: Philip Jenvey --- .circleci/config.yml | 19 +++++++++------- Makefile | 2 -- .../src/db/bigtable/bigtable_client/mod.rs | 1 + autopush-common/src/db/bigtable/pool.rs | 11 +++++++--- autopush-common/src/db/dual/mod.rs | 22 ++++++++++++++----- autopush-common/src/db/dynamodb/mod.rs | 1 + tests/integration/dual_test.json | 10 +++++++++ .../integration/test_integration_all_rust.py | 12 +++++++--- 8 files changed, 56 insertions(+), 22 deletions(-) create mode 100644 tests/integration/dual_test.json diff --git a/.circleci/config.yml b/.circleci/config.yml index 94fb21bab..f4841d920 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -137,7 +137,7 @@ jobs: export PATH=$PATH:$HOME/.cargo/bin echo 'export PATH=$PATH:$HOME/.cargo/bin' >> $BASH_ENV rustc --version - cargo build --features=emulator + cargo build --features=emulator --features=dual - run: name: Check formatting command: | @@ -153,7 +153,7 @@ jobs: # when doing discovery, we found that the docker image `meminfo` and `cpuinfo` often report # the machine level memory and CPU which are far higher than the memory allocated to the docker # instance. This may be causing rust to be overly greedy triggering the VM to OOM the process.) - command: cargo test --features=emulator --jobs=2 + command: cargo test --features=emulator --features=dual --jobs=2 - run: name: Integration tests (Bigtable) command: make integration-test @@ -165,12 +165,15 @@ jobs: command: make integration-test environment: TEST_RESULTS_DIR: workspace/test-results -# - run: -# name: Integration tests (Dual Bigtable/DynamoDB) -# command: make integration-test -# environment: -# DB_DSN: dual -# TEST_RESULTS_DIR: workspace/test-results + - run: + name: Integration tests (Dual Bigtable/DynamoDB) + command: make integration-test + environment: + RUST_LOG: autopush=debug,autopush_common=debug,autoendpoint=debug,autoconnect=debug,slog_mozlog_json=info,warn + # PYTEST_ARGS: -sv + DB_DSN: dual + DB_SETTINGS: tests/integration/dual_test.json + TEST_RESULTS_DIR: workspace/test-results - store_test_results: path: workspace/test-results - save_cache: diff --git a/Makefile b/Makefile index f97178e7f..fbceb076c 100644 --- a/Makefile +++ b/Makefile @@ -33,8 +33,6 @@ integration-test-legacy: integration-test: $(POETRY) -V $(POETRY) install --without dev,load --no-root - CONNECTION_BINARY=autoconnect \ - CONNECTION_SETTINGS_PREFIX=autoconnect__ \ $(POETRY) run pytest $(INTEGRATION_TEST_FILE) \ --junit-xml=$(TEST_RESULTS_DIR)/integration_test_results.xml \ -v $(PYTEST_ARGS) diff --git a/autopush-common/src/db/bigtable/bigtable_client/mod.rs b/autopush-common/src/db/bigtable/bigtable_client/mod.rs index 8a56e12b2..301768cfe 100644 --- a/autopush-common/src/db/bigtable/bigtable_client/mod.rs +++ b/autopush-common/src/db/bigtable/bigtable_client/mod.rs @@ -255,6 +255,7 @@ impl BigTableClientImpl { // Do the actual commit. let bigtable = self.pool.get().await?; + debug!("πŸ‰‘ writing row..."); let _resp = bigtable .conn .mutate_row_async(&req) diff --git a/autopush-common/src/db/bigtable/pool.rs b/autopush-common/src/db/bigtable/pool.rs index c6f528f79..778ec949d 100644 --- a/autopush-common/src/db/bigtable/pool.rs +++ b/autopush-common/src/db/bigtable/pool.rs @@ -40,10 +40,13 @@ impl BigTablePool { pub async fn get( &self, ) -> Result, error::BigTableError> { - self.pool + let obj = self + .pool .get() .await - .map_err(|e| error::BigTableError::Pool(e.to_string())) + .map_err(|e| error::BigTableError::Pool(e.to_string()))?; + debug!("πŸ‰‘ Got db from pool"); + Ok(obj) } /// Get the pools manager, because we would like to talk to them. @@ -151,7 +154,9 @@ impl Manager for BigtableClientManager { /// `BigtableClient` is the most atomic we can go. async fn create(&self) -> Result { debug!("🏊 Create a new pool entry."); - Ok(BigtableDb::new(self.get_channel()?)) + let entry = BigtableDb::new(self.get_channel()?); + debug!("🏊 Bigtable connection acquired"); + Ok(entry) } /// Recycle if the connection has outlived it's lifespan. diff --git a/autopush-common/src/db/dual/mod.rs b/autopush-common/src/db/dual/mod.rs index a513274d8..ac4891f99 100644 --- a/autopush-common/src/db/dual/mod.rs +++ b/autopush-common/src/db/dual/mod.rs @@ -37,6 +37,7 @@ pub struct DualClientImpl { pub struct DualDbSettings { primary: DbSettings, secondary: DbSettings, + #[serde(default)] write_to_secondary: bool, #[serde(default)] median: Option, @@ -45,6 +46,7 @@ pub struct DualDbSettings { impl DualClientImpl { pub fn new(metrics: Arc, settings: &DbSettings) -> DbResult { // Not really sure we need the dsn here. + info!("Trying: {:?}", settings.db_settings); let db_settings: DualDbSettings = from_str(&settings.db_settings).map_err(|e| { DbError::General(format!("Could not parse DualDBSettings string {:?}", e)) })?; @@ -79,6 +81,7 @@ impl DualClientImpl { }; let primary = BigTableClientImpl::new(metrics.clone(), &db_settings.primary)?; let secondary = DdbClientImpl::new(metrics.clone(), &db_settings.secondary)?; + debug!("βš– Got primary and secondary"); Ok(Self { primary, secondary: secondary.clone(), @@ -95,18 +98,20 @@ impl DualClientImpl { /// allowance /// Returns the dbclient to use and whether or not it's the primary database. async fn allot<'a>(&'a self, uaid: &Uuid) -> DbResult<(Box<&'a dyn DbClient>, bool)> { - if let Some(median) = self.median { + let target: (Box<&'a dyn DbClient>, bool) = if let Some(median) = self.median { if uaid.as_bytes()[0] <= median { debug!("βš– Routing user to Bigtable"); // These are migrations so the metrics should appear as // `auto[endpoint|connect].migrate`. - Ok((Box::new(&self.primary), true)) + (Box::new(&self.primary), true) } else { - Ok((Box::new(&self.secondary), false)) + (Box::new(&self.secondary), false) } } else { - Ok((Box::new(&self.primary), true)) - } + (Box::new(&self.primary), true) + }; + debug!("βš– alloting to {}", target.0.name()); + Ok(target) } } @@ -117,7 +122,10 @@ impl DbClient for DualClientImpl { if is_primary && self.write_to_secondary { let _ = self.secondary.add_user(user).await?; } - target.add_user(user).await + debug!("βš– adding user to {}...", target.name()); + let result = target.add_user(user).await?; + debug!("βš– User added..."); + Ok(result) } async fn update_user(&self, user: &User) -> DbResult { @@ -163,7 +171,9 @@ impl DbClient for DualClientImpl { } async fn add_channel(&self, uaid: &Uuid, channel_id: &Uuid) -> DbResult<()> { + debug!("βš– getting target"); let (target, _) = self.allot(uaid).await?; + debug!("βš– Adding channel to {}", target.name()); target.add_channel(uaid, channel_id).await } diff --git a/autopush-common/src/db/dynamodb/mod.rs b/autopush-common/src/db/dynamodb/mod.rs index 6f81f4248..ef92040b5 100644 --- a/autopush-common/src/db/dynamodb/mod.rs +++ b/autopush-common/src/db/dynamodb/mod.rs @@ -61,6 +61,7 @@ pub struct DdbClientImpl { impl DdbClientImpl { pub fn new(metrics: Arc, db_settings: &DbSettings) -> DbResult { + debug!("πŸ›’οΈDynamoDB Settings {:?}", db_settings); let db_client = if let Ok(endpoint) = env::var("AWS_LOCAL_DYNAMODB") { DynamoDbClient::new_with( HttpClient::new().expect("TLS initialization error"), diff --git a/tests/integration/dual_test.json b/tests/integration/dual_test.json new file mode 100644 index 000000000..d6c74ef34 --- /dev/null +++ b/tests/integration/dual_test.json @@ -0,0 +1,10 @@ +{ + "primary": { + "db_settings": "{\"message_family\": \"message\", \"router_family\": \"router\", \"table_name\": \"projects/test/instances/test/tables/autopush\"}", + "dsn": "grpc://localhost:8086" + }, + "secondary": { + "db_settings": "{\"message_table\": \"message_int_test\",\"router_table\": \"router_int_test\"}", + "dsn": "http://localhost:8000/" + } +} diff --git a/tests/integration/test_integration_all_rust.py b/tests/integration/test_integration_all_rust.py index 2bf35df58..f340ddda6 100644 --- a/tests/integration/test_integration_all_rust.py +++ b/tests/integration/test_integration_all_rust.py @@ -69,8 +69,8 @@ MP_CONNECTION_PORT = 9052 MP_ROUTER_PORT = 9072 -CONNECTION_BINARY = os.environ.get("CONNECTION_BINARY", "autopush_rs") -CONNECTION_SETTINGS_PREFIX = os.environ.get("CONNECTION_SETTINGS_PREFIX", "autopush__") +CONNECTION_BINARY = os.environ.get("CONNECTION_BINARY", "autoconnect") +CONNECTION_SETTINGS_PREFIX = os.environ.get("CONNECTION_SETTINGS_PREFIX", "autoconnect__") CN_SERVER: subprocess.Popen | None = None CN_MP_SERVER: subprocess.Popen | None = None @@ -465,7 +465,7 @@ def _get_vapid( def enqueue_output(out, queue): - for line in iter(out.readline, b""): + for line in iter(out.readline, ""): queue.put(line) out.close() @@ -667,10 +667,14 @@ def setup_connection_server(connection_binary): CONNECTION_CONFIG["port"] = parsed.port CONNECTION_CONFIG["endpoint_scheme"] = parsed.scheme write_config_to_env(CONNECTION_CONFIG, CONNECTION_SETTINGS_PREFIX) + log.debug("Using existing Connection server") return else: write_config_to_env(CONNECTION_CONFIG, CONNECTION_SETTINGS_PREFIX) cmd = [connection_binary] + run_args = os.getenv("RUN_ARGS") + if run_args is not None: + cmd.append(run_args) log.debug(f"🐍🟒 Starting Connection server: {' '.join(cmd)}") CN_SERVER = subprocess.Popen( cmd, @@ -701,6 +705,7 @@ def setup_megaphone_server(connection_binary): parsed = urlparse(url) MEGAPHONE_CONFIG["endpoint_port"] = parsed.port write_config_to_env(MEGAPHONE_CONFIG, CONNECTION_SETTINGS_PREFIX) + log.debug("Using existing Megaphone server") return else: write_config_to_env(MEGAPHONE_CONFIG, CONNECTION_SETTINGS_PREFIX) @@ -725,6 +730,7 @@ def setup_endpoint_server(): ENDPOINT_CONFIG["hostname"] = parsed.hostname ENDPOINT_CONFIG["port"] = parsed.port ENDPOINT_CONFIG["endpoint_scheme"] = parsed.scheme + log.debug("Using existing Endpoint server") return else: write_config_to_env(ENDPOINT_CONFIG, "autoend__")