diff --git a/src/main/kotlin/org/wfanet/measurement/integration/common/BUILD.bazel b/src/main/kotlin/org/wfanet/measurement/integration/common/BUILD.bazel index 4964db133e8..bbe86fe92fb 100644 --- a/src/main/kotlin/org/wfanet/measurement/integration/common/BUILD.bazel +++ b/src/main/kotlin/org/wfanet/measurement/integration/common/BUILD.bazel @@ -240,3 +240,22 @@ kt_jvm_library( "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/testing", ], ) + +kt_jvm_library( + name = "in_process_reach_measurement_accuracy_test", + srcs = [ + "InProcessReachMeasurementAccuracyTest.kt", + ], + deps = [ + ":all_kingdom_services", + ":in_process_cmms_components", + "//src/main/kotlin/org/wfanet/measurement/duchy/deploy/common/server:duchy_data_server", + "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/common/service:data_services", + "//src/main/kotlin/org/wfanet/measurement/loadtest/measurementconsumer:simulator", + "//src/main/kotlin/org/wfanet/measurement/loadtest/measurementconsumer:synthetic_generator_event_query", + "//src/main/kotlin/org/wfanet/measurement/measurementconsumer/stats:variances", + "@wfa_common_jvm//imports/java/org/junit", + "@wfa_common_jvm//imports/kotlin/kotlinx/coroutines:core", + "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/testing", + ], +) diff --git a/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessCmmsComponents.kt b/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessCmmsComponents.kt index 690f536c8b6..6220b52258b 100644 --- a/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessCmmsComponents.kt +++ b/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessCmmsComponents.kt @@ -26,6 +26,7 @@ import org.wfanet.measurement.api.v2alpha.AccountsGrpcKt import org.wfanet.measurement.api.v2alpha.ApiKeysGrpcKt import org.wfanet.measurement.api.v2alpha.EventGroup import org.wfanet.measurement.api.v2alpha.MeasurementConsumersGrpcKt +import org.wfanet.measurement.api.v2alpha.event_group_metadata.testing.SyntheticEventGroupSpec import org.wfanet.measurement.common.crypto.subjectKeyIdentifier import org.wfanet.measurement.common.crypto.tink.TinkPrivateKeyHandle import org.wfanet.measurement.common.identity.DuchyInfo @@ -46,6 +47,8 @@ class InProcessCmmsComponents( private val kingdomDataServicesRule: ProviderRule, private val duchyDependenciesRule: ProviderRule<(String, ComputationLogEntriesCoroutineStub) -> InProcessDuchy.DuchyDependencies>, + private val syntheticEventGroupSpecs: List = + SyntheticGenerationSpecs.SYNTHETIC_DATA_SPECS ) : TestRule { private val kingdomDataServices: DataServices get() = kingdomDataServicesRule.value @@ -71,7 +74,7 @@ class InProcessCmmsComponents( private val edpSimulators: List by lazy { edpDisplayNameToResourceNameMap.entries.mapIndexed { index, (displayName, resourceName) -> - val specIndex = index % SyntheticGenerationSpecs.SYNTHETIC_DATA_SPECS.size + val specIndex = index % syntheticEventGroupSpecs.size InProcessEdpSimulator( displayName = displayName, resourceName = resourceName, @@ -79,7 +82,7 @@ class InProcessCmmsComponents( kingdomPublicApiChannel = kingdom.publicApiChannel, duchyPublicApiChannel = duchies[1].publicApiChannel, trustedCertificates = TRUSTED_CERTIFICATES, - SyntheticGenerationSpecs.SYNTHETIC_DATA_SPECS[specIndex], + syntheticEventGroupSpecs[specIndex], ) } } diff --git a/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessLifeOfAMeasurementIntegrationTest.kt b/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessLifeOfAMeasurementIntegrationTest.kt index bed6fd9544e..89947e9c86d 100644 --- a/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessLifeOfAMeasurementIntegrationTest.kt +++ b/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessLifeOfAMeasurementIntegrationTest.kt @@ -121,7 +121,7 @@ abstract class InProcessLifeOfAMeasurementIntegrationTest( fun `create a RF measurement and check the result is equal to the expected result`() = runBlocking { // Use frontend simulator to create a reach and frequency measurement and verify its result. - mcSimulator.executeReachAndFrequency("1234") + mcSimulator.testReachAndFrequency("1234") } @Test @@ -129,28 +129,28 @@ abstract class InProcessLifeOfAMeasurementIntegrationTest( runBlocking { // Use frontend simulator to create a direct reach and frequency measurement and verify its // result. - mcSimulator.executeDirectReachAndFrequency("1234") + mcSimulator.testDirectReachAndFrequency("1234") } @Test fun `create a reach-only measurement and check the result is equal to the expected result`() = runBlocking { // Use frontend simulator to create a reach and frequency measurement and verify its result. - mcSimulator.executeReachOnly("1234") + mcSimulator.testReachOnly("1234") } @Test fun `create an impression measurement and check the result is equal to the expected result`() = runBlocking { // Use frontend simulator to create an impression measurement and verify its result. - mcSimulator.executeImpression("1234") + mcSimulator.testImpression("1234") } @Test fun `create a duration measurement and check the result is equal to the expected result`() = runBlocking { // Use frontend simulator to create a duration measurement and verify its result. - mcSimulator.executeDuration("1234") + mcSimulator.testDuration("1234") } @Test @@ -158,7 +158,7 @@ abstract class InProcessLifeOfAMeasurementIntegrationTest( runBlocking { // Use frontend simulator to create an invalid reach and frequency measurement and verify // its error info. - mcSimulator.executeInvalidReachAndFrequency("1234") + mcSimulator.testInvalidReachAndFrequency("1234") } // TODO(@renjiez): Add Multi-round test given the same input to verify correctness. diff --git a/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessReachMeasurementAccuracyTest.kt b/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessReachMeasurementAccuracyTest.kt new file mode 100644 index 00000000000..427d487df4c --- /dev/null +++ b/src/main/kotlin/org/wfanet/measurement/integration/common/InProcessReachMeasurementAccuracyTest.kt @@ -0,0 +1,283 @@ +// Copyright 2023 The Cross-Media Measurement Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.wfanet.measurement.integration.common + +import com.google.common.truth.Truth.assertThat +import java.time.Duration +import java.util.logging.Level +import java.util.logging.Logger +import kotlin.math.abs +import kotlin.math.pow +import kotlin.math.sqrt +import kotlinx.coroutines.runBlocking +import org.junit.After +import org.junit.Before +import org.junit.BeforeClass +import org.junit.Rule +import org.junit.Test +import org.wfanet.measurement.api.v2alpha.CertificatesGrpcKt +import org.wfanet.measurement.api.v2alpha.DataProvidersGrpcKt +import org.wfanet.measurement.api.v2alpha.EventGroupsGrpcKt +import org.wfanet.measurement.api.v2alpha.MeasurementConsumersGrpcKt +import org.wfanet.measurement.api.v2alpha.MeasurementsGrpcKt +import org.wfanet.measurement.api.v2alpha.ProtocolConfig.NoiseMechanism +import org.wfanet.measurement.api.v2alpha.RequisitionsGrpcKt +import org.wfanet.measurement.api.v2alpha.differentialPrivacyParams +import org.wfanet.measurement.common.testing.ProviderRule +import org.wfanet.measurement.eventdataprovider.noiser.DpParams +import org.wfanet.measurement.kingdom.deploy.common.RoLlv2ProtocolConfig +import org.wfanet.measurement.kingdom.deploy.common.service.DataServices +import org.wfanet.measurement.loadtest.measurementconsumer.MeasurementConsumerData +import org.wfanet.measurement.loadtest.measurementconsumer.MeasurementConsumerSimulator +import org.wfanet.measurement.loadtest.measurementconsumer.MeasurementConsumerSimulator.MeasurementInfo +import org.wfanet.measurement.loadtest.measurementconsumer.MetadataSyntheticGeneratorEventQuery +import org.wfanet.measurement.measurementconsumer.stats.LiquidLegionsV2Methodology +import org.wfanet.measurement.measurementconsumer.stats.NoiseMechanism as StatsNoiseMechanism +import org.wfanet.measurement.measurementconsumer.stats.ReachMeasurementParams +import org.wfanet.measurement.measurementconsumer.stats.ReachMeasurementVarianceParams +import org.wfanet.measurement.measurementconsumer.stats.VariancesImpl.computeMeasurementVariance +import org.wfanet.measurement.measurementconsumer.stats.VidSamplingInterval as StatsVidSamplingInterval +import org.wfanet.measurement.system.v1alpha.ComputationLogEntriesGrpcKt + +/** + * Test the Measurement results are accurate w.r.t to the variance. + * + * This is abstract so that different implementations of dependencies can all run the same tests + * easily. + */ +abstract class InProcessReachMeasurementAccuracyTest( + kingdomDataServicesRule: ProviderRule, + duchyDependenciesRule: + ProviderRule< + ( + String, + ComputationLogEntriesGrpcKt.ComputationLogEntriesCoroutineStub, + ) -> InProcessDuchy.DuchyDependencies + >, +) { + + @get:Rule + val inProcessCmmsComponents = + InProcessCmmsComponents( + kingdomDataServicesRule, + duchyDependenciesRule, + SYNTHETIC_EVENT_GROUP_SPECS + ) + + private lateinit var mcSimulator: MeasurementConsumerSimulator + + private val publicMeasurementsClient by lazy { + MeasurementsGrpcKt.MeasurementsCoroutineStub(inProcessCmmsComponents.kingdom.publicApiChannel) + } + private val publicMeasurementConsumersClient by lazy { + MeasurementConsumersGrpcKt.MeasurementConsumersCoroutineStub( + inProcessCmmsComponents.kingdom.publicApiChannel + ) + } + private val publicCertificatesClient by lazy { + CertificatesGrpcKt.CertificatesCoroutineStub(inProcessCmmsComponents.kingdom.publicApiChannel) + } + private val publicEventGroupsClient by lazy { + EventGroupsGrpcKt.EventGroupsCoroutineStub(inProcessCmmsComponents.kingdom.publicApiChannel) + } + private val publicDataProvidersClient by lazy { + DataProvidersGrpcKt.DataProvidersCoroutineStub(inProcessCmmsComponents.kingdom.publicApiChannel) + } + private val publicRequisitionsClient by lazy { + RequisitionsGrpcKt.RequisitionsCoroutineStub(inProcessCmmsComponents.kingdom.publicApiChannel) + } + + @Before + fun startDaemons() { + inProcessCmmsComponents.startDaemons() + initMcSimulator() + } + + private fun initMcSimulator() { + val measurementConsumerData = inProcessCmmsComponents.getMeasurementConsumerData() + val eventQuery = + MetadataSyntheticGeneratorEventQuery( + SyntheticGenerationSpecs.POPULATION_SPEC, + InProcessCmmsComponents.MC_ENCRYPTION_PRIVATE_KEY + ) + mcSimulator = + MeasurementConsumerSimulator( + MeasurementConsumerData( + measurementConsumerData.name, + InProcessCmmsComponents.MC_ENTITY_CONTENT.signingKey, + InProcessCmmsComponents.MC_ENCRYPTION_PRIVATE_KEY, + measurementConsumerData.apiAuthenticationKey + ), + OUTPUT_DP_PARAMS, + publicDataProvidersClient, + publicEventGroupsClient, + publicMeasurementsClient, + publicMeasurementConsumersClient, + publicCertificatesClient, + RESULT_POLLING_DELAY, + InProcessCmmsComponents.TRUSTED_CERTIFICATES, + eventQuery, + NoiseMechanism.CONTINUOUS_GAUSSIAN + ) + } + + @After + fun stopEdpSimulators() { + inProcessCmmsComponents.stopEdpSimulators() + } + + @After + fun stopDuchyDaemons() { + inProcessCmmsComponents.stopDuchyDaemons() + } + + private fun getReachVariance(measurementInfo: MeasurementInfo, reach: Long): Double { + val liquidLegionsMethodology = + LiquidLegionsV2Methodology( + RoLlv2ProtocolConfig.protocolConfig.sketchParams.decayRate, + RoLlv2ProtocolConfig.protocolConfig.sketchParams.maxSize, + RoLlv2ProtocolConfig.protocolConfig.sketchParams.samplingIndicatorSize + ) + val reachMeasurementParams = + ReachMeasurementParams( + StatsVidSamplingInterval( + measurementInfo.measurementSpec.vidSamplingInterval.start.toDouble(), + measurementInfo.measurementSpec.vidSamplingInterval.width.toDouble() + ), + DpParams(OUTPUT_DP_PARAMS.epsilon, OUTPUT_DP_PARAMS.delta), + StatsNoiseMechanism.GAUSSIAN + ) + val reachMeasurementVarianceParams = + ReachMeasurementVarianceParams(reach, reachMeasurementParams) + return computeMeasurementVariance(liquidLegionsMethodology, reachMeasurementVarianceParams) + } + + private fun getStandardDeviation(nums: List): Double { + val mean = nums.average() + val standardDeviation = nums.fold(0.0) { acc, num -> acc + (num - mean).pow(2.0) } + + return sqrt(standardDeviation / nums.size) + } + + data class ReachResult( + val actualReach: Long, + val expectedReach: Long, + val lowerBound: Double, + val upperBound: Double, + val withinInterval: Boolean, + ) + + @Test + fun `reach-only llv2 results should be accurate with respect to the variance`() = runBlocking { + val reachResults = mutableListOf() + var expectedReach = -1L + var expectedStandardDeviation = 0.0 + + var summary = "" + for (round in 1..DEFAULT_TEST_ROUND_NUMBER) { + val executionResult = mcSimulator.executeReachOnly(round.toString()) + + if (expectedReach == -1L) { + expectedReach = executionResult.expectedResult.reach.value + val expectedVariance = getReachVariance(executionResult.measurementInfo, expectedReach) + expectedStandardDeviation = sqrt(expectedVariance) + } else if (expectedReach != executionResult.expectedResult.reach.value) { + logger.log( + Level.WARNING, + "expected result not consistent. round=$round, prev_expected_result=$expectedReach, " + + "current_expected_result=${executionResult.expectedResult.reach.value}" + ) + } + + // The general formula for confidence interval is result +/- multiplier * sqrt(variance). + // The multiplier for 95% confidence interval is 1.96. + val reach = executionResult.actualResult.reach.value + val reachVariance = getReachVariance(executionResult.measurementInfo, reach) + val intervalLowerBound = reach - sqrt(reachVariance) * MULTIPLIER + val intervalUpperBound = reach + sqrt(reachVariance) * MULTIPLIER + val withinInterval = reach >= intervalLowerBound && reach <= intervalUpperBound + + val reachResult = + ReachResult(reach, expectedReach, intervalLowerBound, intervalUpperBound, withinInterval) + reachResults += reachResult + + val message = + "round=$round, actual_result=${reachResult.actualReach}, " + + "expected_result=${reachResult.expectedReach}, " + + "interval=(${"%.2f".format(reachResult.lowerBound)}, " + + "${"%.2f".format(reachResult.upperBound)}), accurate=${reachResult.withinInterval}" + summary += message + "\n" + logger.log(Level.INFO, message) + } + + logger.log(Level.INFO, "Accuracy Test Complete.\n$summary") + + val averageReach = reachResults.map { it.actualReach }.average() + val withinIntervalNumber = reachResults.map { if (it.withinInterval) 1 else 0 }.sum() + val withinIntervalPercentage = withinIntervalNumber.toDouble() / reachResults.size * 100 + val offsetPercentage = (averageReach - expectedReach) / expectedReach * 100 + val averageDispersionRatio = + abs(averageReach - expectedReach) * sqrt(DEFAULT_TEST_ROUND_NUMBER.toDouble()) / + expectedStandardDeviation + + logger.log( + Level.INFO, + "average_reach=$averageReach, offset_percentage=${"%.2f".format(offsetPercentage)}%, " + + "number_of_rounds_within_interval=$withinIntervalNumber out of $DEFAULT_TEST_ROUND_NUMBER " + + "(${"%.2f".format(withinIntervalPercentage)}%) " + ) + + val standardDeviation = getStandardDeviation(reachResults.map { it.actualReach.toDouble() }) + logger.log( + Level.INFO, + "std=${"%.2f".format(standardDeviation)}, " + + "expected_std=${"%.2f".format(expectedStandardDeviation)}, " + + "ratio=${"%.2f".format(standardDeviation / expectedStandardDeviation)}" + ) + + assertThat(withinIntervalPercentage).isAtLeast(COVERAGE_TEST_THRESHOLD) + assertThat(averageDispersionRatio).isLessThan(AVERAGE_TEST_THRESHOLD) + assertThat(standardDeviation) + .isGreaterThan(expectedStandardDeviation * STANDARD_DEVIATION_TEST_LOWER_THRESHOLD) + assertThat(standardDeviation) + .isLessThan(expectedStandardDeviation * STANDARD_DEVIATION_TEST_UPPER_THRESHOLD) + } + + companion object { + private val logger: Logger = Logger.getLogger(this::class.java.name) + + private val SYNTHETIC_EVENT_GROUP_SPECS = SyntheticGenerationSpecs.SYNTHETIC_DATA_SPECS_2M + + private const val DEFAULT_TEST_ROUND_NUMBER = 30 + // Multiplier for 95% confidence interval + private const val MULTIPLIER = 1.96 + + private const val COVERAGE_TEST_THRESHOLD = 80 + private const val AVERAGE_TEST_THRESHOLD = 2.58 + private const val STANDARD_DEVIATION_TEST_LOWER_THRESHOLD = 0.67 + private const val STANDARD_DEVIATION_TEST_UPPER_THRESHOLD = 1.35 + private val OUTPUT_DP_PARAMS = differentialPrivacyParams { + epsilon = 0.0033 + delta = 0.00001 + } + private val RESULT_POLLING_DELAY = Duration.ofSeconds(10) + + @BeforeClass + @JvmStatic + fun initConfig() { + InProcessCmmsComponents.initConfig() + } + } +} diff --git a/src/main/kotlin/org/wfanet/measurement/integration/common/SyntheticGenerationSpecs.kt b/src/main/kotlin/org/wfanet/measurement/integration/common/SyntheticGenerationSpecs.kt index e34bdac9ba1..6adbe2fb58c 100644 --- a/src/main/kotlin/org/wfanet/measurement/integration/common/SyntheticGenerationSpecs.kt +++ b/src/main/kotlin/org/wfanet/measurement/integration/common/SyntheticGenerationSpecs.kt @@ -51,6 +51,28 @@ object SyntheticGenerationSpecs { ) } + /** + * EventGroup specs for synthetic generation based on [POPULATION_SPEC]. + * + * The total reach is ~2,000,000. + */ + val SYNTHETIC_DATA_SPECS_2M: List by lazy { + listOf( + loadTestData( + "synthetic_event_group_spec_1.textproto", + SyntheticEventGroupSpec.getDefaultInstance() + ), + loadTestData( + "synthetic_event_group_spec_2.textproto", + SyntheticEventGroupSpec.getDefaultInstance() + ), + loadTestData( + "synthetic_event_group_spec_3.textproto", + SyntheticEventGroupSpec.getDefaultInstance() + ) + ) + } + private fun loadTestData(fileName: String, defaultInstance: T): T { return parseTextProto(TEST_DATA_RUNTIME_PATH.resolve(fileName).toFile(), defaultInstance) } diff --git a/src/main/kotlin/org/wfanet/measurement/loadtest/measurementconsumer/MeasurementConsumerSimulator.kt b/src/main/kotlin/org/wfanet/measurement/loadtest/measurementconsumer/MeasurementConsumerSimulator.kt index 7c73c2387de..4a29f51ce76 100644 --- a/src/main/kotlin/org/wfanet/measurement/loadtest/measurementconsumer/MeasurementConsumerSimulator.kt +++ b/src/main/kotlin/org/wfanet/measurement/loadtest/measurementconsumer/MeasurementConsumerSimulator.kt @@ -141,13 +141,13 @@ class MeasurementConsumerSimulator( /** Cache of resource name to [Certificate]. */ private val certificateCache = mutableMapOf() - private data class RequisitionInfo( + data class RequisitionInfo( val dataProviderEntry: DataProviderEntry, val requisitionSpec: RequisitionSpec, val eventGroups: List, ) - private data class MeasurementInfo( + data class MeasurementInfo( val measurement: Measurement, val measurementSpec: MeasurementSpec, val requisitions: List, @@ -181,8 +181,14 @@ class MeasurementConsumerSimulator( } } + data class ExecutionResult( + val actualResult: Result, + val expectedResult: Result, + val measurementInfo: MeasurementInfo, + ) + /** A sequence of operations done in the simulator involving a reach and frequency measurement. */ - suspend fun executeReachAndFrequency(runId: String) { + suspend fun testReachAndFrequency(runId: String) { logger.info { "Creating reach and frequency Measurement..." } // Create a new measurement on behalf of the measurement consumer. val measurementConsumer = getMeasurementConsumer(measurementConsumerData.name) @@ -234,7 +240,7 @@ class MeasurementConsumerSimulator( * A sequence of operations done in the simulator involving a reach and frequency measurement with * invalid params. */ - suspend fun executeInvalidReachAndFrequency(runId: String) { + suspend fun testInvalidReachAndFrequency(runId: String) { // Create a new measurement on behalf of the measurement consumer. val measurementConsumer = getMeasurementConsumer(measurementConsumerData.name) @@ -262,7 +268,7 @@ class MeasurementConsumerSimulator( * A sequence of operations done in the simulator involving a direct reach and frequency * measurement. */ - suspend fun executeDirectReachAndFrequency(runId: String) { + suspend fun testDirectReachAndFrequency(runId: String) { // Create a new measurement on behalf of the measurement consumer. val measurementConsumer = getMeasurementConsumer(measurementConsumerData.name) val measurementInfo = @@ -315,7 +321,7 @@ class MeasurementConsumerSimulator( } /** A sequence of operations done in the simulator involving a direct reach measurement. */ - suspend fun executeDirectReach(runId: String) { + suspend fun testDirectReach(runId: String) { // Create a new measurement on behalf of the measurement consumer. val measurementConsumer = getMeasurementConsumer(measurementConsumerData.name) val measurementInfo = @@ -349,8 +355,7 @@ class MeasurementConsumerSimulator( logger.info("Direct reach result is equal to the expected result") } - /** A sequence of operations done in the simulator involving a reach-only measurement. */ - suspend fun executeReachOnly(runId: String) { + suspend fun executeReachOnly(runId: String): ExecutionResult { // Create a new measurement on behalf of the measurement consumer. val measurementConsumer = getMeasurementConsumer(measurementConsumerData.name) val measurementInfo = @@ -368,29 +373,43 @@ class MeasurementConsumerSimulator( reachOnlyResult = getReachResult(measurementName) } checkNotNull(reachOnlyResult) { "Timed out waiting for response to reach-only request" } - logger.info("Actual result: $reachOnlyResult") val expectedResult: Result = getExpectedResult(measurementInfo) - logger.info("Expected result: $expectedResult") + return ExecutionResult(reachOnlyResult, expectedResult, measurementInfo) + } - val protocol = measurementInfo.measurement.protocolConfig.protocolsList.first() + /** A sequence of operations done in the simulator involving a reach-only measurement. */ + suspend fun testReachOnly(runId: String) { + val result = executeReachOnly(runId) + + val protocol = result.measurementInfo.measurement.protocolConfig.protocolsList.first() val reachVariance: Double = computeReachVariance( - reachOnlyResult, - measurementInfo.measurementSpec.vidSamplingInterval, - measurementInfo.measurementSpec.reach.privacyParams, + result.actualResult, + result.measurementInfo.measurementSpec.vidSamplingInterval, + result.measurementInfo.measurementSpec.reach.privacyParams, protocol ) val reachTolerance = computeErrorMargin(reachVariance) - assertThat(reachOnlyResult).reachValue().isWithin(reachTolerance).of(expectedResult.reach.value) + assertThat(result.actualResult) + .reachValue() + .isWithin(reachTolerance) + .of(result.expectedResult.reach.value) + logger.info("Actual result: ${result.actualResult}") + logger.info("Expected result: ${result.expectedResult}") + + assertThat(result.actualResult) + .reachValue() + .isWithin(reachTolerance) + .of(result.expectedResult.reach.value) logger.info("Reach-only result is equal to the expected result. Correctness Test passes.") } /** A sequence of operations done in the simulator involving an impression measurement. */ - suspend fun executeImpression(runId: String) { + suspend fun testImpression(runId: String) { logger.info { "Creating impression Measurement..." } // Create a new measurement on behalf of the measurement consumer. val measurementConsumer = getMeasurementConsumer(measurementConsumerData.name) @@ -416,7 +435,7 @@ class MeasurementConsumerSimulator( } /** A sequence of operations done in the simulator involving a duration measurement. */ - suspend fun executeDuration(runId: String) { + suspend fun testDuration(runId: String) { logger.info { "Creating duration Measurement..." } // Create a new measurement on behalf of the measurement consumer. val measurementConsumer = getMeasurementConsumer(measurementConsumerData.name) diff --git a/src/test/kotlin/org/wfanet/measurement/duchy/daemon/herald/ContinuationTokenManagerTest.kt b/src/test/kotlin/org/wfanet/measurement/duchy/daemon/herald/ContinuationTokenManagerTest.kt index 1e7f33c839a..0eb755ed7e7 100644 --- a/src/test/kotlin/org/wfanet/measurement/duchy/daemon/herald/ContinuationTokenManagerTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/duchy/daemon/herald/ContinuationTokenManagerTest.kt @@ -23,8 +23,10 @@ import org.junit.Before import org.junit.Test import org.junit.runner.RunWith import org.junit.runners.JUnit4 +import org.wfanet.measurement.common.base64UrlDecode import org.wfanet.measurement.duchy.service.internal.testing.InMemoryContinuationTokensService import org.wfanet.measurement.internal.duchy.ContinuationTokensGrpcKt.ContinuationTokensCoroutineStub +import org.wfanet.measurement.system.v1alpha.StreamActiveComputationsContinuationToken @RunWith(JUnit4::class) class ContinuationTokenManagerTest { @@ -110,4 +112,11 @@ class ContinuationTokenManagerTest { assertThat(continuationTokenManager.getLatestContinuationToken()).isEqualTo("6") assertThat(inMemoryContinuationTokensService.latestContinuationToken).isEqualTo("6") } + + @Test + fun `tmp`() = runBlocking { + val token = "CgsIgIDeqQYQmOiZBxED1UugjRsAZw" + val parsed = StreamActiveComputationsContinuationToken.parseFrom(token.base64UrlDecode()) + print(parsed) + } } diff --git a/src/test/kotlin/org/wfanet/measurement/integration/deploy/gcloud/BUILD.bazel b/src/test/kotlin/org/wfanet/measurement/integration/deploy/gcloud/BUILD.bazel index d7a835d2756..ecd862c1ef0 100644 --- a/src/test/kotlin/org/wfanet/measurement/integration/deploy/gcloud/BUILD.bazel +++ b/src/test/kotlin/org/wfanet/measurement/integration/deploy/gcloud/BUILD.bazel @@ -61,3 +61,19 @@ spanner_emulator_test( "@wfa_common_jvm//src/main/kotlin/org/wfanet/measurement/common/identity", ], ) + +spanner_emulator_test( + name = "GCloudSpannerInProcessReachMeasurementAccuracyTest", + size = "enormous", + srcs = ["GCloudSpannerInProcessReachMeasurementAccuracyTest.kt"], + tags = [ + "cpu:2", + "manual", + ], + test_class = "org.wfanet.measurement.integration.deploy.gcloud.GCloudSpannerInProcessReachMeasurementAccuracyTest", + deps = [ + "//src/main/kotlin/org/wfanet/measurement/integration/common:in_process_reach_measurement_accuracy_test", + "//src/main/kotlin/org/wfanet/measurement/integration/deploy/gcloud:spanner_duchy_dependency_provider_rule", + "//src/main/kotlin/org/wfanet/measurement/kingdom/deploy/gcloud/spanner/testing", + ], +) diff --git a/src/test/kotlin/org/wfanet/measurement/integration/deploy/gcloud/GCloudSpannerInProcessLifeOfAMeasurementIntegrationTest.kt b/src/test/kotlin/org/wfanet/measurement/integration/deploy/gcloud/GCloudSpannerInProcessLifeOfAMeasurementIntegrationTest.kt index d33563addc3..d7c0c5cd7eb 100644 --- a/src/test/kotlin/org/wfanet/measurement/integration/deploy/gcloud/GCloudSpannerInProcessLifeOfAMeasurementIntegrationTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/integration/deploy/gcloud/GCloudSpannerInProcessLifeOfAMeasurementIntegrationTest.kt @@ -35,5 +35,5 @@ class GCloudSpannerInProcessLifeOfAMeasurementIntegrationTest : * * TODO(Kotlin/kotlinx.coroutines#3865): Switch back to CoroutinesTimeout when fixed. */ - @get:Rule val timeout: Timeout = Timeout.seconds(90) + @get:Rule val timeout: Timeout = Timeout.seconds(180) } diff --git a/src/test/kotlin/org/wfanet/measurement/integration/deploy/gcloud/GCloudSpannerInProcessReachMeasurementAccuracyTest.kt b/src/test/kotlin/org/wfanet/measurement/integration/deploy/gcloud/GCloudSpannerInProcessReachMeasurementAccuracyTest.kt new file mode 100644 index 00000000000..eac6d402566 --- /dev/null +++ b/src/test/kotlin/org/wfanet/measurement/integration/deploy/gcloud/GCloudSpannerInProcessReachMeasurementAccuracyTest.kt @@ -0,0 +1,39 @@ +// Copyright 2023 The Cross-Media Measurement Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package org.wfanet.measurement.integration.deploy.gcloud + +import org.junit.Rule +import org.junit.rules.Timeout +import org.wfanet.measurement.integration.common.ALL_DUCHY_NAMES +import org.wfanet.measurement.integration.common.InProcessReachMeasurementAccuracyTest +import org.wfanet.measurement.kingdom.deploy.gcloud.spanner.testing.KingdomDataServicesProviderRule + +/** + * Implementation of [InProcessReachMeasurementAccuracyTest] for GCloud backends with Spanner + * database. + */ +class GCloudSpannerInProcessReachMeasurementAccuracyTest : + InProcessReachMeasurementAccuracyTest( + KingdomDataServicesProviderRule(), + SpannerDuchyDependencyProviderRule(ALL_DUCHY_NAMES) + ) { + + /** + * Rule to enforce test method timeout. + * + * TODO(Kotlin/kotlinx.coroutines#3865): Switch back to CoroutinesTimeout when fixed. + */ + @get:Rule val timeout: Timeout = Timeout.seconds(3600) +} diff --git a/src/test/kotlin/org/wfanet/measurement/integration/k8s/AbstractCorrectnessTest.kt b/src/test/kotlin/org/wfanet/measurement/integration/k8s/AbstractCorrectnessTest.kt index 051559891ed..2b58668d49a 100644 --- a/src/test/kotlin/org/wfanet/measurement/integration/k8s/AbstractCorrectnessTest.kt +++ b/src/test/kotlin/org/wfanet/measurement/integration/k8s/AbstractCorrectnessTest.kt @@ -38,17 +38,17 @@ abstract class AbstractCorrectnessTest(private val measurementSystem: Measuremen @Test(timeout = 1 * 60 * 1000) fun `impression measurement completes with expected result`() = runBlocking { - testHarness.executeImpression("$runId-impression") + testHarness.testImpression("$runId-impression") } @Test(timeout = 1 * 60 * 1000) fun `duration measurement completes with expected result`() = runBlocking { - testHarness.executeDuration("$runId-duration") + testHarness.testDuration("$runId-duration") } @Test(timeout = 10 * 60 * 1000) fun `reach and frequency measurement completes with expected result`() = runBlocking { - testHarness.executeReachAndFrequency("$runId-reach-and-freq") + testHarness.testReachAndFrequency("$runId-reach-and-freq") } interface MeasurementSystem {