diff --git a/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt b/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt index d69bec7..33d46a3 100644 --- a/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt +++ b/src/integrationTest/java/org/radarbase/output/RestructureS3IntegrationTest.kt @@ -16,10 +16,12 @@ import kotlinx.coroutines.test.runTest import kotlinx.coroutines.withContext import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test +import org.radarbase.output.config.PathConfig import org.radarbase.output.config.PathFormatterConfig import org.radarbase.output.config.ResourceConfig import org.radarbase.output.config.RestructureConfig import org.radarbase.output.config.S3Config +import org.radarbase.output.config.TargetFormatterConfig import org.radarbase.output.config.TopicConfig import org.radarbase.output.config.WorkerConfig import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended @@ -49,9 +51,15 @@ class RestructureS3IntegrationTest { ) val config = RestructureConfig( sources = listOf(ResourceConfig("s3", path = Paths.get("in"), s3 = sourceConfig)), - targets = mapOf("radar-output-storage" to ResourceConfig("s3", path = Paths.get("output"), s3 = targetConfig)), + targets = mapOf( + "radar-output-storage" to ResourceConfig("s3", path = Paths.get("output"), s3 = targetConfig), + "radar-test-root" to ResourceConfig("s3", path = Paths.get("otherOutput"), s3 = targetConfig), + ), worker = WorkerConfig(minimumFileAge = 0L), topics = topicConfig, + paths = PathConfig( + target = TargetFormatterConfig("\${projectId}"), + ), ) val application = Application(config) val sourceClient = sourceConfig.createS3Client() @@ -92,7 +100,7 @@ class RestructureS3IntegrationTest { val firstParticipantOutput = "output/STAGING_PROJECT/1543bc93-3c17-4381-89a5-c5d6272b827c/application_server_status/CONNECTED" val secondParticipantOutput = - "output/radar-test-root/4ab9b985-6eec-4e51-9a29-f4c571c89f99/android_phone_acceleration" + "otherOutput/radar-test-root/4ab9b985-6eec-4e51-9a29-f4c571c89f99/android_phone_acceleration" val targetBucket = requireNotNull(targetConfig.bucket) @@ -119,7 +127,6 @@ class RestructureS3IntegrationTest { return@coroutineScope withContext(Dispatchers.IO) { targetClient.listObjects( ListObjectsArgs.Builder().bucketBuild(targetBucket) { - prefix("output") recursive(true) useUrlEncodingType(false) }, diff --git a/src/main/java/org/radarbase/output/target/S3TargetStorage.kt b/src/main/java/org/radarbase/output/target/S3TargetStorage.kt index 2183298..8366dc7 100644 --- a/src/main/java/org/radarbase/output/target/S3TargetStorage.kt +++ b/src/main/java/org/radarbase/output/target/S3TargetStorage.kt @@ -25,6 +25,7 @@ import io.minio.MinioClient import io.minio.RemoveObjectArgs import io.minio.StatObjectArgs import io.minio.UploadObjectArgs +import io.minio.errors.ErrorResponseException import org.radarbase.output.config.S3Config import org.radarbase.output.source.S3SourceStorage.Companion.faultTolerant import org.radarbase.output.util.bucketBuild @@ -80,7 +81,17 @@ class S3TargetStorage( logger.info("Bucket $bucket already exists.") } else { val makeBucketRequest = MakeBucketArgs.builder().bucketBuild(bucket) - faultTolerant { s3Client.makeBucket(makeBucketRequest) } + faultTolerant { + try { + s3Client.makeBucket(makeBucketRequest) + } catch (ex: ErrorResponseException) { + if (ex.errorResponse().code() == "BucketAlreadyOwnedByYou") { + logger.warn("Bucket {} was already created while the request was busy", bucket) + } else { + throw ex + } + } + } logger.info("Bucket $bucket was created.") } } catch (ex: Exception) {