Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuration bug #552

Merged
merged 1 commit into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,39 @@ import java.nio.charset.StandardCharsets.UTF_8
import java.nio.file.Paths

class RestructureS3IntegrationTest {
@Test
fun configuration() = runTest {
Timer.isEnabled = true
val sourceConfig = S3Config(
endpoint = "http://localhost:9000",
accessToken = "minioadmin",
secretKey = "minioadmin",
bucket = "source",
)
val targetConfig = sourceConfig.copy(bucket = "target")
val topicConfig = mapOf(
"application_server_status" to TopicConfig(
pathProperties = PathFormatterConfig(
format = "\${projectId}/\${userId}/\${topic}/\${value:serverStatus}/\${filename}",
),
),
)
val config = RestructureConfig(
source = ResourceConfig("s3", s3 = sourceConfig),
target = ResourceConfig("s3", s3 = targetConfig),
paths = PathConfig(
inputs = listOf(Paths.get("in")),
// These properties were added to verify that they are present later in PathConfig.createFactory()
properties = mapOf("one" to "1", "two" to "2", "three" to "3"),
),
worker = WorkerConfig(minimumFileAge = 0L),
topics = topicConfig,
)
val application = Application(config)

assertEquals(4, application.pathFactory.pathConfig.path.properties.count())
}

@Test
fun integration() = runTest {
Timer.isEnabled = true
Expand All @@ -51,11 +84,18 @@ class RestructureS3IntegrationTest {
val config = RestructureConfig(
source = ResourceConfig("s3", s3 = sourceConfig),
target = ResourceConfig("s3", s3 = targetConfig),
paths = PathConfig(inputs = listOf(Paths.get("in"))),
paths = PathConfig(
inputs = listOf(Paths.get("in")),
// These properties were added to verify that they are present later in PathConfig.createFactory()
properties = mapOf("one" to "1", "two" to "2", "three" to "3"),
),
worker = WorkerConfig(minimumFileAge = 0L),
topics = topicConfig,
)
val application = Application(config)

assertEquals(4, application.pathFactory.pathConfig.path.properties.count())

val sourceClient = sourceConfig.createS3Client()
val sourceBucket = requireNotNull(sourceConfig.bucket)
if (!sourceClient.bucketExists(BucketExistsArgs.builder().bucketBuild(sourceBucket))) {
Expand Down
12 changes: 11 additions & 1 deletion src/main/java/org/radarbase/output/config/PathConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,19 @@ data class PathConfig(
else -> null
}

// Pass any properties from the given PathConfig to the PathFormatterConfig for the factory.
// Properties passed in the PathConfig.path.properties take precedent
val pathProperties = buildMap {
putAll(path.properties)
putAll(properties)
}

val pathFormatterConfig = path.copy(properties = pathProperties)
val pathConfig = copy(bucket = bucketConfig, path = pathFormatterConfig)

pathFactory.init(
extension = extension,
config = copy(bucket = bucketConfig),
config = pathConfig,
topics = topics,
)

Expand Down
9 changes: 7 additions & 2 deletions src/main/java/org/radarbase/output/path/RecordPathFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericRecordBuilder
import org.radarbase.output.config.PathConfig
import org.radarbase.output.config.TopicConfig
import org.radarbase.output.util.TimeUtil
import org.slf4j.LoggerFactory
import java.nio.file.Path
import java.nio.file.Paths
import java.util.regex.Pattern
Expand All @@ -44,12 +45,13 @@ abstract class RecordPathFactory {
config.output
},
path = config.path.copy(
properties = buildMap(config.path.properties.size + 1) {
properties = buildMap {
putAll(config.path.properties)
putIfAbsent("extension", extension)
},
),
)

this.addTopicConfiguration(topics)
}

Expand All @@ -67,7 +69,8 @@ abstract class RecordPathFactory {
attempt: Int,
): Path {
val keyField = requireNotNull(record.get("key")) { "Failed to process $record; no key present" }
val valueField = requireNotNull(record.get("value") as? GenericRecord) { "Failed to process $record; no value present" }
val valueField =
requireNotNull(record.get("value") as? GenericRecord) { "Failed to process $record; no value present" }

val keyRecord: GenericRecord = if (keyField is GenericRecord) {
keyField
Expand Down Expand Up @@ -146,5 +149,7 @@ abstract class RecordPathFactory {
?.let { get(it.pos()) }
}

private val logger = LoggerFactory.getLogger(RecordPathFactory::class.java)

protected open fun addTopicConfiguration(topicConfig: Map<String, TopicConfig>) = Unit
}
46 changes: 46 additions & 0 deletions src/test/java/org/radarbase/output/path/RecordPathFactoryTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package org.radarbase.output.path

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.radarbase.output.config.PathConfig
import org.radarbase.output.config.PathFormatterConfig
import org.radarbase.output.config.ResourceConfig
import org.radarbase.output.config.S3Config

internal class RecordPathFactoryTest {

@Test
fun testInit() {
var properties = mapOf("key1" to "value1", "key2" to "value2")

val pathConfig = PathConfig(
factory = "org.radarbase.output.path.FormattedPathFactory",
properties = properties,
path = PathFormatterConfig(
format = "\${topic}/\${projectId}/\${userId}/\${sourceId}/\${filename}",
plugins = "fixed",
),
)

val targetConfig = S3Config(
endpoint = "http://localhost:9000",
accessToken = "minioadmin",
secretKey = "minioadmin",
bucket = "target",
)

val factory = pathConfig.createFactory(
ResourceConfig("s3", s3 = targetConfig),
"test-extension",
topics = mapOf(),
)

properties = buildMap {
putAll(properties)
putIfAbsent("extension", "test-extension")
}

assertEquals(properties, factory.pathConfig.path.properties)
assertEquals(properties, factory.pathConfig.path.properties)
}
}
Loading