Skip to content

Commit

Permalink
Bulk Load CDK: CSV Support, S3V2Usage
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Oct 18, 2024
1 parent 5f740f3 commit 83c6de4
Show file tree
Hide file tree
Showing 16 changed files with 276 additions and 57 deletions.
2 changes: 2 additions & 0 deletions airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,7 @@ dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')

api("org.apache.commons:commons-csv:1.10.0")

testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import java.io.Writer
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVPrinter

class AirbyteTypeToCsvHeader {
fun convert(schema: AirbyteType): Array<String> {
if (schema !is ObjectType) {
throw IllegalArgumentException("Only object types are supported")
}
return schema.properties.map { it.key }.toTypedArray()
}
}

fun AirbyteType.toCsvHeader(): Array<String> {
return AirbyteTypeToCsvHeader().convert(this)
}

fun AirbyteType.toCsvPrinterWithHeader(writer: Writer): CSVPrinter =
CSVFormat.Builder.create().setHeader(*toCsvHeader()).build().print(writer)
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.util.serializeToString

class AirbyteValueToCsvRow {
fun convert(value: AirbyteValue): Array<String> {
if (value !is ObjectValue) {
throw IllegalArgumentException("Only object values are supported")
}
return value.values.map { convertInner(it.value) }.toTypedArray()
}

private fun convertInner(value: AirbyteValue): String {
return when (value) {
is ObjectValue -> value.toJson().serializeToString()
is ArrayValue -> value.toJson().serializeToString()
is StringValue -> value.value
is IntegerValue -> value.value.toString()
is NumberValue -> value.value.toString()
else -> value.toString()
}
}
}

fun AirbyteValue.toCsvRecord(): Array<String> {
return AirbyteValueToCsvRow().convert(this)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.data

import io.airbyte.cdk.load.util.deserializeToNode
import org.apache.commons.csv.CSVRecord

class CsvRowToAirbyteValue {
fun convert(row: CSVRecord, schema: AirbyteType): AirbyteValue {
if (schema !is ObjectType) {
throw IllegalArgumentException("Only object types are supported")
}
val asList = row.toList()
if (asList.size != schema.properties.size) {
throw IllegalArgumentException("Row size does not match schema size")
}
val properties = linkedMapOf<String, AirbyteValue>()
schema.properties
.toList()
.zip(asList)
.map { (property, value) ->
property.first to convertInner(value, property.second.type)
}
.toMap(properties)
return ObjectValue(properties)
}

private fun convertInner(value: String, field: AirbyteType): AirbyteValue {
return when (field) {
is ArrayType -> ArrayValue(value.split(",").map { convertInner(it, field.items.type) })
is BooleanType -> BooleanValue(value.toBoolean())
is IntegerType -> IntegerValue(value.toLong())
is NumberType -> NumberValue(value.toBigDecimal())
is ObjectType -> {
val properties = linkedMapOf<String, AirbyteValue>()
value
.deserializeToNode()
.fields()
.asSequence()
.map { entry ->
entry.key to entry.value.toAirbyteValue(field.properties[entry.key]!!.type)
}
.toMap(properties)
ObjectValue(properties)
}
is ObjectTypeWithoutSchema ->
value.deserializeToNode().toAirbyteValue(ObjectTypeWithoutSchema)
is StringType -> StringValue(value)
else -> throw IllegalArgumentException("Unsupported field type: $field")
}
}
}

fun CSVRecord.toAirbyteValue(schema: AirbyteType): AirbyteValue {
return CsvRowToAirbyteValue().convert(this, schema)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,20 @@ import io.airbyte.cdk.load.file.NoopProcessor
import io.airbyte.cdk.load.file.StreamProcessor
import java.io.InputStream
import java.io.OutputStream
import java.io.Writer
import kotlinx.coroutines.flow.Flow

interface ObjectStorageClient<T : RemoteObject<*>, U : ObjectStorageStreamingUploadWriter> {
interface ObjectStorageClient<T : RemoteObject<*>> {
suspend fun list(prefix: String): Flow<T>
suspend fun move(remoteObject: T, toKey: String): T
suspend fun <U> get(key: String, block: (InputStream) -> U): U
suspend fun put(key: String, bytes: ByteArray): T
suspend fun delete(remoteObject: T)
suspend fun streamingUpload(key: String, block: suspend (U) -> Unit): T =
suspend fun streamingUpload(key: String, block: suspend (Writer) -> Unit): T =
streamingUpload(key, NoopProcessor, block)
suspend fun <V : OutputStream> streamingUpload(
key: String,
streamProcessor: StreamProcessor<V>,
block: suspend (U) -> Unit
block: suspend (Writer) -> Unit
): T
}

interface ObjectStorageStreamingUploadWriter {
suspend fun write(bytes: ByteArray)
suspend fun write(string: String) = write(string.toByteArray(Charsets.UTF_8))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@
package io.airbyte.cdk.load

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.object_storage.CSVFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.JsonFormatConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfiguration
import io.airbyte.cdk.load.data.toAirbyteValue
import io.airbyte.cdk.load.file.GZIPProcessor
import io.airbyte.cdk.load.file.NoopProcessor
Expand All @@ -15,18 +18,22 @@ import io.airbyte.cdk.load.file.object_storage.RemoteObject
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.test.util.toOutputRecord
import io.airbyte.cdk.load.util.deserializeToNode
import java.io.BufferedReader
import java.io.InputStream
import java.util.zip.GZIPInputStream
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import org.apache.commons.csv.CSVFormat
import org.apache.commons.csv.CSVParser

class ObjectStorageDataDumper(
private val stream: DestinationStream,
private val client: ObjectStorageClient<*, *>,
private val client: ObjectStorageClient<*>,
private val pathFactory: ObjectStoragePathFactory,
private val formatConfig: ObjectStorageFormatConfiguration,
private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null
) {
fun dump(): List<OutputRecord> {
Expand All @@ -37,26 +44,43 @@ class ObjectStorageDataDumper(
.list(prefix)
.map { listedObject: RemoteObject<*> ->
client.get(listedObject.key) { objectData: InputStream ->
when (compressionConfig?.compressor) {
val reader =
when (compressionConfig?.compressor) {
is GZIPProcessor -> GZIPInputStream(objectData)
is NoopProcessor,
null -> objectData
else -> error("Unsupported compressor")
}
.bufferedReader()
.lineSequence()
.map { line ->
line
.deserializeToNode()
.toAirbyteValue(stream.schemaWithMeta)
.toOutputRecord()
}
.toList()
}.bufferedReader()
readLines(reader)
}
}
.toList()
.flatten()
}
}
}

@Suppress("DEPRECATION")
private fun readLines(reader: BufferedReader): List<OutputRecord> =
when (formatConfig) {
is JsonFormatConfiguration -> {
reader
.lineSequence()
.map { line ->
line
.deserializeToNode()
.toAirbyteValue(stream.schemaWithMeta)
.toOutputRecord()
}
.toList()
}
is CSVFormatConfiguration -> {
CSVParser(reader, CSVFormat.DEFAULT.withHeader()).use {
it.records.map { record ->
record.toAirbyteValue(stream.schemaWithMeta).toOutputRecord()
}
}
}
else -> error("Unsupported format")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.object_storage.RemoteObject
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext

class ObjectStorageDestinationCleaner<T : RemoteObject<*>> {
fun cleanup(
stream: DestinationStream,
client: ObjectStorageClient<T>,
pathFactory: ObjectStoragePathFactory,
) {
val prefix = pathFactory.getFinalDirectory(stream).toString()
runBlocking {
withContext(Dispatchers.IO) { client.list(prefix).collect { client.delete(it) } }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import jakarta.inject.Singleton
import java.io.ByteArrayOutputStream
import java.io.InputStream
import java.io.OutputStream
import java.io.Writer
import kotlinx.coroutines.flow.flow

data class S3Object(override val key: String, override val storageConfig: S3BucketConfiguration) :
Expand All @@ -46,7 +47,7 @@ class S3Client(
val bucketConfig: S3BucketConfiguration,
private val uploadConfig: ObjectStorageUploadConfiguration?,
private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null,
) : ObjectStorageClient<S3Object, S3MultipartUpload<*>.Writer> {
) : ObjectStorageClient<S3Object> {
private val log = KotlinLogging.logger {}

override suspend fun list(prefix: String) = flow {
Expand Down Expand Up @@ -112,17 +113,14 @@ class S3Client(
client.deleteObject(request)
}

override suspend fun streamingUpload(
key: String,
block: suspend (S3MultipartUpload<*>.Writer) -> Unit
): S3Object {
override suspend fun streamingUpload(key: String, block: suspend (Writer) -> Unit): S3Object {
return streamingUpload(key, compressionConfig?.compressor ?: NoopProcessor, block)
}

override suspend fun <U : OutputStream> streamingUpload(
key: String,
streamProcessor: StreamProcessor<U>,
block: suspend (S3MultipartUpload<*>.Writer) -> Unit
block: suspend (Writer) -> Unit
): S3Object {
val request = CreateMultipartUploadRequest {
this.bucket = bucketConfig.s3BucketName
Expand All @@ -140,8 +138,10 @@ class S3Client(
log.info {
"Starting multipart upload to ${response.bucket}/${response.key} (${response.uploadId}"
}
block(upload.Writer())
val uploadJob = upload.start()
block(upload.UploadWriter())
upload.complete()
uploadJob.join()
return S3Object(key, bucketConfig)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ import aws.sdk.kotlin.services.s3.model.UploadPartRequest
import aws.smithy.kotlin.runtime.content.ByteStream
import io.airbyte.cdk.load.command.object_storage.ObjectStorageUploadConfiguration
import io.airbyte.cdk.load.file.StreamProcessor
import io.airbyte.cdk.load.file.object_storage.ObjectStorageStreamingUploadWriter
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.ByteArrayOutputStream
import java.io.OutputStream
import java.io.Writer
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

class S3MultipartUpload<T : OutputStream>(
private val client: aws.sdk.kotlin.services.s3.S3Client,
Expand All @@ -32,16 +38,34 @@ class S3MultipartUpload<T : OutputStream>(
?: throw IllegalStateException("Streaming upload part size is not configured")
private val wrappingBuffer = streamProcessor.wrapper(underlyingBuffer)

inner class Writer : ObjectStorageStreamingUploadWriter {
override suspend fun write(bytes: ByteArray) {
wrappingBuffer.write(bytes)
if (underlyingBuffer.size() >= partSize) {
private val work = Channel<suspend () -> Unit>(Channel.UNLIMITED)

suspend fun start(): Job =
CoroutineScope(Dispatchers.IO).launch {
for (unit in work) {
uploadPart()
}
completeInner()
}

inner class UploadWriter : Writer() {
override fun close() {
log.warn { "Close called on UploadWriter, ignoring." }
}

override fun flush() {
throw NotImplementedError("flush() is not supported on S3MultipartUpload.UploadWriter")
}

override fun write(str: String) {
wrappingBuffer.write(str.toByteArray(Charsets.UTF_8))
if (underlyingBuffer.size() >= partSize) {
runBlocking { work.send { uploadPart() } }
}
}

override suspend fun write(string: String) {
write(string.toByteArray(Charsets.UTF_8))
override fun write(cbuf: CharArray, off: Int, len: Int) {
write(String(cbuf, off, len))
}
}

Expand All @@ -66,6 +90,10 @@ class S3MultipartUpload<T : OutputStream>(
}

suspend fun complete() {
work.close()
}

private suspend fun completeInner() {
if (underlyingBuffer.size() > 0) {
uploadPart()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ airbyteBulkConnector {
application {
mainClass = 'io.airbyte.integrations.destination.s3_v2.S3V2Destination'

applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
//applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']

// Uncomment and replace to run locally
//applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0', '--add-opens', 'java.base/sun.nio.ch=ALL-UNNAMED', '--add-opens', 'java.base/sun.security.action=ALL-UNNAMED', '--add-opens', 'java.base/java.lang=ALL-UNNAMED']
Expand Down
Loading

0 comments on commit 83c6de4

Please sign in to comment.