diff --git a/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt b/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt index 60fcb7b..d366350 100644 --- a/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt +++ b/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt @@ -31,6 +31,39 @@ import java.nio.charset.StandardCharsets.UTF_8 import java.nio.file.Paths class RestructureS3IntegrationTest { + @Test + fun configuration() = runTest { + Timer.isEnabled = true + val sourceConfig = S3Config( + endpoint = "http://localhost:9000", + accessToken = "minioadmin", + secretKey = "minioadmin", + bucket = "source", + ) + val targetConfig = sourceConfig.copy(bucket = "target") + val topicConfig = mapOf( + "application_server_status" to TopicConfig( + pathProperties = PathFormatterConfig( + format = "\${projectId}/\${userId}/\${topic}/\${value:serverStatus}/\${filename}", + ), + ), + ) + val config = RestructureConfig( + source = ResourceConfig("s3", s3 = sourceConfig), + target = ResourceConfig("s3", s3 = targetConfig), + paths = PathConfig( + inputs = listOf(Paths.get("in")), + // These properties were added to verify that they are present later in PathConfig.createFactory() + properties = mapOf("one" to "1", "two" to "2", "three" to "3"), + ), + worker = WorkerConfig(minimumFileAge = 0L), + topics = topicConfig, + ) + val application = Application(config) + + assertEquals(4, application.pathFactory.pathConfig.path.properties.count()) + } + @Test fun integration() = runTest { Timer.isEnabled = true @@ -51,11 +84,18 @@ class RestructureS3IntegrationTest { val config = RestructureConfig( source = ResourceConfig("s3", s3 = sourceConfig), target = ResourceConfig("s3", s3 = targetConfig), - paths = PathConfig(inputs = listOf(Paths.get("in"))), + paths = PathConfig( + inputs = listOf(Paths.get("in")), + // These properties were added to verify that they are present later in PathConfig.createFactory() + properties = mapOf("one" to "1", "two" to "2", "three" to "3"), + ), worker = WorkerConfig(minimumFileAge = 0L), topics = topicConfig, ) val application = Application(config) + + assertEquals(4, application.pathFactory.pathConfig.path.properties.count()) + val sourceClient = sourceConfig.createS3Client() val sourceBucket = requireNotNull(sourceConfig.bucket) if (!sourceClient.bucketExists(BucketExistsArgs.builder().bucketBuild(sourceBucket))) { diff --git a/src/main/java/org/radarbase/output/config/PathConfig.kt b/src/main/java/org/radarbase/output/config/PathConfig.kt index 0755275..00078f3 100644 --- a/src/main/java/org/radarbase/output/config/PathConfig.kt +++ b/src/main/java/org/radarbase/output/config/PathConfig.kt @@ -45,9 +45,19 @@ data class PathConfig( else -> null } + // Pass any properties from the given PathConfig to the PathFormatterConfig for the factory. + // Properties passed in the PathConfig.path.properties take precedent + val pathProperties = buildMap { + putAll(path.properties) + putAll(properties) + } + + val pathFormatterConfig = path.copy(properties = pathProperties) + val pathConfig = copy(bucket = bucketConfig, path = pathFormatterConfig) + pathFactory.init( extension = extension, - config = copy(bucket = bucketConfig), + config = pathConfig, topics = topics, ) diff --git a/src/main/java/org/radarbase/output/path/RecordPathFactory.kt b/src/main/java/org/radarbase/output/path/RecordPathFactory.kt index 1aafbc4..8b6c2ed 100644 --- a/src/main/java/org/radarbase/output/path/RecordPathFactory.kt +++ b/src/main/java/org/radarbase/output/path/RecordPathFactory.kt @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecordBuilder import org.radarbase.output.config.PathConfig import org.radarbase.output.config.TopicConfig import org.radarbase.output.util.TimeUtil +import org.slf4j.LoggerFactory import java.nio.file.Path import java.nio.file.Paths import java.util.regex.Pattern @@ -44,12 +45,13 @@ abstract class RecordPathFactory { config.output }, path = config.path.copy( - properties = buildMap(config.path.properties.size + 1) { + properties = buildMap { putAll(config.path.properties) putIfAbsent("extension", extension) }, ), ) + this.addTopicConfiguration(topics) } @@ -67,7 +69,8 @@ abstract class RecordPathFactory { attempt: Int, ): Path { val keyField = requireNotNull(record.get("key")) { "Failed to process $record; no key present" } - val valueField = requireNotNull(record.get("value") as? GenericRecord) { "Failed to process $record; no value present" } + val valueField = + requireNotNull(record.get("value") as? GenericRecord) { "Failed to process $record; no value present" } val keyRecord: GenericRecord = if (keyField is GenericRecord) { keyField @@ -146,5 +149,7 @@ abstract class RecordPathFactory { ?.let { get(it.pos()) } } + private val logger = LoggerFactory.getLogger(RecordPathFactory::class.java) + protected open fun addTopicConfiguration(topicConfig: Map) = Unit } diff --git a/src/test/java/org/radarbase/output/path/RecordPathFactoryTest.kt b/src/test/java/org/radarbase/output/path/RecordPathFactoryTest.kt new file mode 100644 index 0000000..db8a332 --- /dev/null +++ b/src/test/java/org/radarbase/output/path/RecordPathFactoryTest.kt @@ -0,0 +1,46 @@ +package org.radarbase.output.path + +import org.junit.jupiter.api.Assertions.assertEquals +import org.junit.jupiter.api.Test +import org.radarbase.output.config.PathConfig +import org.radarbase.output.config.PathFormatterConfig +import org.radarbase.output.config.ResourceConfig +import org.radarbase.output.config.S3Config + +internal class RecordPathFactoryTest { + + @Test + fun testInit() { + var properties = mapOf("key1" to "value1", "key2" to "value2") + + val pathConfig = PathConfig( + factory = "org.radarbase.output.path.FormattedPathFactory", + properties = properties, + path = PathFormatterConfig( + format = "\${topic}/\${projectId}/\${userId}/\${sourceId}/\${filename}", + plugins = "fixed", + ), + ) + + val targetConfig = S3Config( + endpoint = "http://localhost:9000", + accessToken = "minioadmin", + secretKey = "minioadmin", + bucket = "target", + ) + + val factory = pathConfig.createFactory( + ResourceConfig("s3", s3 = targetConfig), + "test-extension", + topics = mapOf(), + ) + + properties = buildMap { + putAll(properties) + putIfAbsent("extension", "test-extension") + } + + assertEquals(properties, factory.pathConfig.path.properties) + assertEquals(properties, factory.pathConfig.path.properties) + } +}