Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Organization bucket #549

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,18 @@ import io.minio.RemoveBucketArgs
import io.minio.RemoveObjectArgs
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.withContext
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import org.radarbase.kotlin.coroutines.launchJoin
import org.radarbase.output.config.PathConfig
import org.radarbase.output.config.PathFormatterConfig
import org.radarbase.output.config.ResourceConfig
import org.radarbase.output.config.RestructureConfig
import org.radarbase.output.config.S3Config
import org.radarbase.output.config.TargetFormatterConfig
import org.radarbase.output.config.TopicConfig
import org.radarbase.output.config.WorkerConfig
import org.radarbase.output.util.SuspendedCloseable.Companion.useSuspended
Expand Down Expand Up @@ -49,11 +50,16 @@ class RestructureS3IntegrationTest {
),
)
val config = RestructureConfig(
source = ResourceConfig("s3", s3 = sourceConfig),
target = ResourceConfig("s3", s3 = targetConfig),
paths = PathConfig(inputs = listOf(Paths.get("in"))),
sources = listOf(ResourceConfig("s3", path = Paths.get("in"), s3 = sourceConfig)),
targets = mapOf(
"radar-output-storage" to ResourceConfig("s3", path = Paths.get("output"), s3 = targetConfig),
"radar-test-root" to ResourceConfig("s3", path = Paths.get("otherOutput"), s3 = targetConfig),
),
worker = WorkerConfig(minimumFileAge = 0L),
topics = topicConfig,
paths = PathConfig(
target = TargetFormatterConfig("\${projectId}"),
),
)
val application = Application(config)
val sourceClient = sourceConfig.createS3Client()
Expand All @@ -67,20 +73,18 @@ class RestructureS3IntegrationTest {
"application_server_status/partition=1/application_server_status+1+0000000021.avro",
"android_phone_acceleration/partition=0/android_phone_acceleration+0+0003018784.avro",
)
val targetFiles = resourceFiles.map { Paths.get("in/$it") }
resourceFiles.mapIndexed { i, resourceFile ->
launch(Dispatchers.IO) {
[email protected]("/$resourceFile")
.useSuspended { statusFile ->
sourceClient.putObject(
PutObjectArgs.Builder()
.objectBuild(sourceBucket, targetFiles[i]) {
stream(statusFile, -1, MAX_PART_SIZE)
},
)
}
}
}.joinAll()
val targetFiles = resourceFiles.associateWith { Paths.get("in/$it") }
targetFiles.entries.launchJoin(Dispatchers.IO) { (resourceFile, targetFile) ->
[email protected]("/$resourceFile")
.useSuspended { statusFile ->
sourceClient.putObject(
PutObjectArgs.Builder()
.objectBuild(sourceBucket, targetFile) {
stream(statusFile, -1, MAX_PART_SIZE)
},
)
}
}

application.start()

Expand All @@ -94,7 +98,7 @@ class RestructureS3IntegrationTest {
val firstParticipantOutput =
"output/STAGING_PROJECT/1543bc93-3c17-4381-89a5-c5d6272b827c/application_server_status/CONNECTED"
val secondParticipantOutput =
"output/radar-test-root/4ab9b985-6eec-4e51-9a29-f4c571c89f99/android_phone_acceleration"
"otherOutput/radar-test-root/4ab9b985-6eec-4e51-9a29-f4c571c89f99/android_phone_acceleration"

val targetBucket = requireNotNull(targetConfig.bucket)

Expand All @@ -121,7 +125,6 @@ class RestructureS3IntegrationTest {
return@coroutineScope withContext(Dispatchers.IO) {
targetClient.listObjects(
ListObjectsArgs.Builder().bucketBuild(targetBucket) {
prefix("output")
recursive(true)
useUrlEncodingType(false)
},
Expand All @@ -144,13 +147,11 @@ class RestructureS3IntegrationTest {
coroutineScope {
// delete source files
launch {
targetFiles.map {
launch(Dispatchers.IO) {
sourceClient.removeObject(
RemoveObjectArgs.Builder().objectBuild(sourceBucket, it),
)
}
}.joinAll()
targetFiles.values.launchJoin(Dispatchers.IO) {
sourceClient.removeObject(
RemoveObjectArgs.Builder().objectBuild(sourceBucket, it),
)
}

launch(Dispatchers.IO) {
sourceClient.removeBucket(
Expand All @@ -161,15 +162,13 @@ class RestructureS3IntegrationTest {

// delete target files
launch {
files.map {
launch(Dispatchers.IO) {
targetClient.removeObject(
RemoveObjectArgs.Builder().bucketBuild(targetBucket) {
`object`(it)
},
)
}
}.joinAll()
files.launchJoin(Dispatchers.IO) { file ->
targetClient.removeObject(
RemoveObjectArgs.Builder().bucketBuild(targetBucket) {
`object`(file)
},
)
}
launch(Dispatchers.IO) {
targetClient.removeBucket(
RemoveBucketArgs.Builder().bucketBuild(targetBucket),
Expand Down
52 changes: 18 additions & 34 deletions src/main/java/org/radarbase/output/Application.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,9 @@ import org.radarbase.output.config.RestructureConfig
import org.radarbase.output.format.RecordConverterFactory
import org.radarbase.output.path.RecordPathFactory
import org.radarbase.output.source.InMemoryStorageIndex
import org.radarbase.output.source.SourceStorage
import org.radarbase.output.source.SourceStorageFactory
import org.radarbase.output.source.StorageIndexManager
import org.radarbase.output.target.TargetStorage
import org.radarbase.output.source.SourceStorageManager
import org.radarbase.output.target.TargetManager
import org.radarbase.output.target.TargetStorageFactory
import org.radarbase.output.util.Timer
import org.radarbase.output.worker.FileCacheStore
Expand All @@ -47,9 +46,7 @@ import org.radarbase.output.worker.RadarKafkaRestructure
import org.slf4j.LoggerFactory
import redis.clients.jedis.JedisPool
import java.io.IOException
import java.nio.file.Path
import java.text.NumberFormat
import java.time.Duration
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.atomic.LongAdder
Expand All @@ -64,18 +61,23 @@ class Application(
override val config = config.apply { validate() }
override val recordConverter: RecordConverterFactory = config.format.createConverter()
override val compression: Compression = config.compression.createCompression()
override val pathFactory: RecordPathFactory = config.paths.createFactory(
config.target,
recordConverter.extension + compression.extension,
config.topics,
)

private val sourceStorageFactory = SourceStorageFactory(config.source, config.paths.temp)
override val sourceStorage: SourceStorage
get() = sourceStorageFactory.createSourceStorage()
private val sourceStorageFactory = SourceStorageFactory(config.paths.temp)
override val sourceStorage: List<SourceStorageManager> = config.consolidatedSources
.map { sourceConfig ->
val storage = sourceStorageFactory.createSourceStorage(sourceConfig)
SourceStorageManager(storage, InMemoryStorageIndex(), sourceConfig.index)
}

override val targetManager: TargetManager = TargetStorageFactory()
.createTargetStorage(config.paths.target.default, config.consolidatedTargets)

override val targetStorage: TargetStorage =
TargetStorageFactory(config.target).createTargetStorage()
override val pathFactory: RecordPathFactory =
config.paths.createFactory(
targetManager,
recordConverter.extension + compression.extension,
config.topics,
)

override val redisHolder: RedisHolder = RedisHolder(JedisPool(config.redis.uri))
override val remoteLockManager: RemoteLockManager = RedisRemoteLockManager(
Expand All @@ -88,27 +90,9 @@ class Application(

override val workerSemaphore = Semaphore(config.worker.numThreads * 2)

override val storageIndexManagers: Map<Path, StorageIndexManager>

private val jobs: List<Job>

init {
val indexConfig = config.source.index
val (fullScan, emptyScan) = if (indexConfig == null) {
listOf(3600L, 900L)
} else {
listOf(indexConfig.fullSyncInterval, indexConfig.emptyDirectorySyncInterval)
}.map { Duration.ofSeconds(it) }

storageIndexManagers = config.paths.inputs.associateWith { input ->
StorageIndexManager(
InMemoryStorageIndex(),
sourceStorage,
input,
fullScan,
emptyScan,
)
}
val serviceMutex = Mutex()
jobs = listOfNotNull(
RadarKafkaRestructure.job(config, serviceMutex),
Expand Down Expand Up @@ -137,7 +121,7 @@ class Application(
}

runBlocking {
launch { targetStorage.initialize() }
targetManager.initialize()
}

if (config.service.enable) {
Expand Down
11 changes: 4 additions & 7 deletions src/main/java/org/radarbase/output/FileStoreFactory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,15 @@ import org.radarbase.output.compression.Compression
import org.radarbase.output.config.RestructureConfig
import org.radarbase.output.format.RecordConverterFactory
import org.radarbase.output.path.RecordPathFactory
import org.radarbase.output.source.SourceStorage
import org.radarbase.output.source.StorageIndexManager
import org.radarbase.output.target.TargetStorage
import org.radarbase.output.source.SourceStorageManager
import org.radarbase.output.target.TargetManager
import org.radarbase.output.worker.FileCacheStore
import java.io.IOException
import java.nio.file.Path

/** Factory for all factory classes and settings. */
interface FileStoreFactory {
val sourceStorage: SourceStorage
val targetStorage: TargetStorage
val sourceStorage: List<SourceStorageManager>
val targetManager: TargetManager
val pathFactory: RecordPathFactory
val compression: Compression
val recordConverter: RecordConverterFactory
Expand All @@ -44,7 +42,6 @@ interface FileStoreFactory {
val redisHolder: RedisHolder
val offsetPersistenceFactory: OffsetPersistenceFactory
val workerSemaphore: Semaphore
val storageIndexManagers: Map<Path, StorageIndexManager>

@Throws(IOException::class)
fun newFileCacheStore(accountant: Accountant): FileCacheStore
Expand Down
27 changes: 0 additions & 27 deletions src/main/java/org/radarbase/output/accounting/AccountantImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,10 @@ package org.radarbase.output.accounting

import kotlinx.coroutines.CoroutineScope
import org.radarbase.output.FileStoreFactory
import org.radarbase.output.config.RestructureConfig
import org.radarbase.output.target.TargetStorage
import org.radarbase.output.util.Timer
import org.slf4j.LoggerFactory
import java.io.IOException
import java.nio.file.Paths
import kotlin.io.path.deleteExisting
import kotlin.io.path.exists

open class AccountantImpl(
private val factory: FileStoreFactory,
Expand All @@ -27,29 +23,6 @@ open class AccountantImpl(

val offsets = offsetPersistence.read(offsetsKey)
offsetFile = offsetPersistence.writer(scope, offsetsKey, offsets)
readDeprecatedOffsets(factory.config, factory.targetStorage, topic)
?.takeUnless { it.isEmpty }
?.let {
offsetFile.addAll(it)
offsetFile.triggerWrite()
}
}

private suspend fun readDeprecatedOffsets(
config: RestructureConfig,
targetStorage: TargetStorage,
topic: String,
): OffsetRangeSet? {
val offsetsPath = config.paths.output
.resolve(OFFSETS_FILE_NAME)
.resolve("$topic.csv")

return if (offsetsPath.exists()) {
OffsetFilePersistence(targetStorage).read(offsetsPath)
.also { offsetsPath.deleteExisting() }
} else {
null
}
}

override suspend fun remove(range: TopicPartitionOffsetRange) =
Expand Down
Loading