Skip to content

Commit

Permalink
Fix the configuration bug #551
Browse files Browse the repository at this point in the history
  • Loading branch information
Bdegraaf1234 committed Apr 2, 2024
1 parent 54bd1cb commit cbf509b
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,39 @@ import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Paths

class RestructureS3IntegrationTest {
@Test
fun configuration() = runTest {
Timer.isEnabled = true
val sourceConfig = S3Config(
endpoint = "http://localhost:9000",
accessToken = "minioadmin",
secretKey = "minioadmin",
bucket = "source",
)
val targetConfig = sourceConfig.copy(bucket = "target")
val topicConfig = mapOf(
"application_server_status" to TopicConfig(
pathProperties = PathFormatterConfig(
format = "\${projectId}/\${userId}/\${topic}/\${value:serverStatus}/\${filename}",
),
),
)
val config = RestructureConfig(
source = ResourceConfig("s3", s3 = sourceConfig),
target = ResourceConfig("s3", s3 = targetConfig),
paths = PathConfig(
inputs = listOf(Paths.get("in")),
// These properties were added to verify that they are present later in PathConfig.createFactory()
properties = mapOf("one" to "1", "two" to "2", "three" to "3"),
),
worker = WorkerConfig(minimumFileAge = 0L),
topics = topicConfig,
)
val application = Application(config)

assertEquals(4, application.pathFactory.pathConfig.path.properties.count())
}

@Test
fun integration() = runTest {
Timer.isEnabled = true
Expand All @@ -51,11 +84,18 @@ class RestructureS3IntegrationTest {
val config = RestructureConfig(
source = ResourceConfig("s3", s3 = sourceConfig),
target = ResourceConfig("s3", s3 = targetConfig),
paths = PathConfig(inputs = listOf(Paths.get("in"))),
paths = PathConfig(
inputs = listOf(Paths.get("in")),
// These properties were added to verify that they are present later in PathConfig.createFactory()
properties = mapOf("one" to "1", "two" to "2", "three" to "3"),
),
worker = WorkerConfig(minimumFileAge = 0L),
topics = topicConfig,
)
val application = Application(config)

assertEquals(4, application.pathFactory.pathConfig.path.properties.count())

val sourceClient = sourceConfig.createS3Client()
val sourceBucket = requireNotNull(sourceConfig.bucket)
if (!sourceClient.bucketExists(BucketExistsArgs.builder().bucketBuild(sourceBucket))) {
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/org/radarbase/output/config/PathConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,19 @@ data class PathConfig(
else -> null
}

// Pass any properties from the given PathConfig to the PathFormatterConfig for the factory.
// Properties passed in the PathConfig.path.properties take precedent
val pathProperties = buildMap {
putAll(path.properties)
putAll(properties)
}

val pathFormatterConfig = path.copy(properties = pathProperties)
val pathConfig = copy(bucket = bucketConfig, path = pathFormatterConfig)

pathFactory.init(
extension = extension,
config = copy(bucket = bucketConfig),
config = pathConfig,
topics = topics,
)

Expand Down
9 changes: 7 additions & 2 deletions src/main/java/org/radarbase/output/path/RecordPathFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecordBuilder
import org.radarbase.output.config.PathConfig
import org.radarbase.output.config.TopicConfig
import org.radarbase.output.util.TimeUtil
import org.slf4j.LoggerFactory
import java.nio.file.Path
import java.nio.file.Paths
import java.util.regex.Pattern
Expand All @@ -44,12 +45,13 @@ abstract class RecordPathFactory {
config.output
},
path = config.path.copy(
properties = buildMap(config.path.properties.size + 1) {
properties = buildMap {
putAll(config.path.properties)
putIfAbsent("extension", extension)
},
),
)

this.addTopicConfiguration(topics)
}

Expand All @@ -67,7 +69,8 @@ abstract class RecordPathFactory {
attempt: Int,
): Path {
val keyField = requireNotNull(record.get("key")) { "Failed to process $record; no key present" }
val valueField = requireNotNull(record.get("value") as? GenericRecord) { "Failed to process $record; no value present" }
val valueField =
requireNotNull(record.get("value") as? GenericRecord) { "Failed to process $record; no value present" }

val keyRecord: GenericRecord = if (keyField is GenericRecord) {
keyField
Expand Down Expand Up @@ -146,5 +149,7 @@ abstract class RecordPathFactory {
?.let { get(it.pos()) }
}

private val logger = LoggerFactory.getLogger(RecordPathFactory::class.java)

protected open fun addTopicConfiguration(topicConfig: Map<String, TopicConfig>) = Unit
}
46 changes: 46 additions & 0 deletions src/test/java/org/radarbase/output/path/RecordPathFactoryTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.radarbase.output.path

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.radarbase.output.config.PathConfig
import org.radarbase.output.config.PathFormatterConfig
import org.radarbase.output.config.ResourceConfig
import org.radarbase.output.config.S3Config

internal class RecordPathFactoryTest {

@Test
fun testInit() {
var properties = mapOf("key1" to "value1", "key2" to "value2")

val pathConfig = PathConfig(
factory = "org.radarbase.output.path.FormattedPathFactory",
properties = properties,
path = PathFormatterConfig(
format = "\${topic}/\${projectId}/\${userId}/\${sourceId}/\${filename}",
plugins = "fixed",
),
)

val targetConfig = S3Config(
endpoint = "http://localhost:9000",
accessToken = "minioadmin",
secretKey = "minioadmin",
bucket = "target",
)

val factory = pathConfig.createFactory(
ResourceConfig("s3", s3 = targetConfig),
"test-extension",
topics = mapOf(),
)

properties = buildMap {
putAll(properties)
putIfAbsent("extension", "test-extension")
}

assertEquals(properties, factory.pathConfig.path.properties)
assertEquals(properties, factory.pathConfig.path.properties)
}
}

0 comments on commit cbf509b

Please sign in to comment.