Skip to content

Commit

Permalink
Bulk Load CDK: GZip Compression and S3V2 Usage
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Oct 18, 2024
1 parent 58d01c1 commit ca54e41
Show file tree
Hide file tree
Showing 22 changed files with 641 additions and 222 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ package io.airbyte.cdk.load.data
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.cdk.load.message.DestinationRecord.Meta
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Singleton
import java.util.*

@Singleton
@Secondary
class DestinationRecordToAirbyteValueWithMeta(private val catalog: DestinationCatalog) {
fun decorate(record: DestinationRecord): ObjectValue {
val streamActual = catalog.getStream(record.stream.name, record.stream.namespace)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.file

import java.io.ByteArrayOutputStream
import java.util.zip.GZIPOutputStream

interface StreamProcessor<T> {
val wrapper: (ByteArrayOutputStream) -> T
val partFinisher: T.() -> Unit
val extension: String?
}

data object NoopProcessor : StreamProcessor<ByteArrayOutputStream> {
override val wrapper: (ByteArrayOutputStream) -> ByteArrayOutputStream = { it }
override val partFinisher: ByteArrayOutputStream.() -> Unit = {}
override val extension: String? = null
}

data object GZIPProcessor : StreamProcessor<GZIPOutputStream> {
override val wrapper: (ByteArrayOutputStream) -> GZIPOutputStream = { GZIPOutputStream(it) }
override val partFinisher: GZIPOutputStream.() -> Unit = { finish() }
override val extension: String = "gz"
}
2 changes: 2 additions & 0 deletions airbyte-cdk/bulk/toolkits/load-object-storage/build.gradle
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
dependencies {
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-base')
implementation project(':airbyte-cdk:bulk:core:bulk-cdk-core-load')

testFixturesImplementation testFixtures(project(":airbyte-cdk:bulk:core:bulk-cdk-core-load"))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load.command.object_storage

import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonPropertyDescription
import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.annotation.JsonValue
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.load.file.GZIPProcessor
import io.airbyte.cdk.load.file.NoopProcessor
import io.airbyte.cdk.load.file.StreamProcessor
import java.io.ByteArrayOutputStream
import java.io.OutputStream

/**
* Mix-in to provide file format configuration.
*
* The specification is intended to be applied to file formats that are compatible with file-level
* compression (csv, jsonl) and does not need to be added to the destination spec directly. The
* [ObjectStorageCompressionConfigurationProvider] can be added to the top-level
* [io.airbyte.cdk.load.command.DestinationConfiguration] and initialized directly with
* [ObjectStorageFormatSpecificationProvider.toObjectStorageFormatConfiguration]. (See the comments
* on [io.airbyte.cdk.load.command.DestinationConfiguration] for more details.)
*/
interface ObjectStorageCompressionSpecificationProvider {
@get:JsonSchemaTitle("Compression")
@get:JsonPropertyDescription(
"Whether the output files should be compressed. If compression is selected, the output filename will have an extra extension (GZIP: \".jsonl.gz\").",
)
@get:JsonProperty("compression")
val compression: ObjectStorageCompressionSpecification

fun toCompressionConfiguration(): ObjectStorageCompressionConfiguration<*> {
return when (compression) {
is NoCompressionSpecification -> ObjectStorageCompressionConfiguration(NoopProcessor)
is GZIPCompressionSpecification -> ObjectStorageCompressionConfiguration(GZIPProcessor)
}
}

companion object {
fun getNoCompressionConfiguration():
ObjectStorageCompressionConfiguration<ByteArrayOutputStream> {
return ObjectStorageCompressionConfiguration(NoopProcessor)
}
}
}

@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.EXISTING_PROPERTY,
property = "compression_type",
)
@JsonSubTypes(
JsonSubTypes.Type(value = NoCompressionSpecification::class, name = "No Compression"),
JsonSubTypes.Type(value = GZIPCompressionSpecification::class, name = "GZIP"),
)
sealed class ObjectStorageCompressionSpecification(
@JsonProperty("compression_type") open val compressionType: Type
) {
enum class Type(@get:JsonValue val typeName: String) {
NoCompression("No Compression"),
GZIP("GZIP"),
}
}

@JsonSchemaTitle("No Compression")
class NoCompressionSpecification(
@JsonProperty("compression_type") override val compressionType: Type = Type.NoCompression
) : ObjectStorageCompressionSpecification(compressionType)

@JsonSchemaTitle("GZIP")
class GZIPCompressionSpecification(
@JsonProperty("compression_type") override val compressionType: Type = Type.GZIP
) : ObjectStorageCompressionSpecification(compressionType)

data class ObjectStorageCompressionConfiguration<T : OutputStream>(
val compressor: StreamProcessor<T>
)

interface ObjectStorageCompressionConfigurationProvider<T : OutputStream> {
val objectStorageCompressionConfiguration: ObjectStorageCompressionConfiguration<T>
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,24 @@ import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonPropertyDescription
import com.fasterxml.jackson.annotation.JsonSubTypes
import com.fasterxml.jackson.annotation.JsonTypeInfo
import com.fasterxml.jackson.annotation.JsonValue
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle

/**
* Mix-in to provide file format configuration.
*
* NOTE: This assumes a fixed set of file formats. If you need to support a different set, clone the
* [ObjectStorageFormatSpecification] class with a new set of enums.
*
* See [io.airbyte.cdk.load.command.DestinationConfiguration] for more details on how to use this
* interface.
*/
interface ObjectStorageFormatSpecificationProvider {
@get:JsonSchemaTitle("Output Format")
@get:JsonPropertyDescription(
"Format of the data output. See <a href=\"https://docs.airbyte.com/integrations/destinations/s3/#supported-output-schema\">here</a> for more details",
)
@get:JsonProperty("format")
val format: ObjectStorageFormatSpecification

fun toObjectStorageFormatConfiguration(): ObjectStorageFormatConfiguration {
Expand All @@ -25,6 +36,15 @@ interface ObjectStorageFormatSpecificationProvider {
is ParquetFormatSpecification -> ParquetFormatConfiguration()
}
}

fun toCompressionConfiguration(): ObjectStorageCompressionConfiguration<*> {
return when (format) {
is ObjectStorageCompressionSpecificationProvider ->
(format as ObjectStorageCompressionSpecificationProvider)
.toCompressionConfiguration()
else -> ObjectStorageCompressionSpecificationProvider.getNoCompressionConfiguration()
}
}
}

@JsonTypeInfo(
Expand All @@ -38,21 +58,40 @@ interface ObjectStorageFormatSpecificationProvider {
JsonSubTypes.Type(value = AvroFormatSpecification::class, name = "AVRO"),
JsonSubTypes.Type(value = ParquetFormatSpecification::class, name = "PARQUET")
)
sealed class ObjectStorageFormatSpecification {
@JsonSchemaTitle("Format Type") @JsonProperty("format_type") val formatType: String = "JSONL"
sealed class ObjectStorageFormatSpecification(
@JsonSchemaTitle("Format Type") @JsonProperty("format_type") open val formatType: Type
) {
enum class Type(@get:JsonValue val typeName: String) {
JSONL("JSONL"),
CSV("CSV"),
AVRO("AVRO"),
PARQUET("PARQUET")
}
}

@JsonSchemaTitle("JSON Lines: Newline-delimited JSON")
class JsonFormatSpecification : ObjectStorageFormatSpecification()
class JsonFormatSpecification(
@JsonProperty("format_type") override val formatType: Type = Type.JSONL
) : ObjectStorageFormatSpecification(formatType), ObjectStorageCompressionSpecificationProvider {
override val compression: ObjectStorageCompressionSpecification = NoCompressionSpecification()
}

@JsonSchemaTitle("CSV: Comma-Separated Values")
class CSVFormatSpecification : ObjectStorageFormatSpecification()
class CSVFormatSpecification(
@JsonProperty("format_type") override val formatType: Type = Type.CSV
) : ObjectStorageFormatSpecification(formatType), ObjectStorageCompressionSpecificationProvider {
override val compression: ObjectStorageCompressionSpecification = NoCompressionSpecification()
}

@JsonSchemaTitle("Avro: Apache Avro")
class AvroFormatSpecification : ObjectStorageFormatSpecification()
class AvroFormatSpecification(
@JsonProperty("format_type") override val formatType: Type = Type.AVRO
) : ObjectStorageFormatSpecification(formatType)

@JsonSchemaTitle("Parquet: Columnar Storage")
class ParquetFormatSpecification : ObjectStorageFormatSpecification()
class ParquetFormatSpecification(
@JsonProperty("format_type") override val formatType: Type = Type.PARQUET
) : ObjectStorageFormatSpecification(formatType)

interface OutputFormatConfigurationProvider {
val outputFormat: ObjectStorageFormatConfiguration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.airbyte.cdk.load.file.object_storage

import io.airbyte.cdk.load.file.NoopProcessor
import io.airbyte.cdk.load.file.StreamProcessor
import java.io.InputStream
import java.io.OutputStream
import kotlinx.coroutines.flow.Flow

interface ObjectStorageClient<T : RemoteObject<*>, U : ObjectStorageStreamingUploadWriter> {
Expand All @@ -13,7 +16,13 @@ interface ObjectStorageClient<T : RemoteObject<*>, U : ObjectStorageStreamingUpl
suspend fun <U> get(key: String, block: (InputStream) -> U): U
suspend fun put(key: String, bytes: ByteArray): T
suspend fun delete(remoteObject: T)
suspend fun streamingUpload(key: String, collector: suspend U.() -> Unit): T
suspend fun streamingUpload(key: String, block: suspend (U) -> Unit): T =
streamingUpload(key, NoopProcessor, block)
suspend fun <V : OutputStream> streamingUpload(
key: String,
streamProcessor: StreamProcessor<V>,
block: suspend (U) -> Unit
): T
}

interface ObjectStorageStreamingUploadWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.load.file.object_storage

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfigurationProvider
import io.airbyte.cdk.load.command.object_storage.ObjectStorageFormatConfigurationProvider
import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfigurationProvider
import io.airbyte.cdk.load.file.TimeProvider
Expand All @@ -23,11 +24,21 @@ import java.util.*
class ObjectStoragePathFactory(
pathConfigProvider: ObjectStoragePathConfigurationProvider,
formatConfigProvider: ObjectStorageFormatConfigurationProvider? = null,
compressionConfigProvider: ObjectStorageCompressionConfigurationProvider<*>? = null,
timeProvider: TimeProvider? = null,
) {
private val loadedAt = timeProvider?.let { Instant.ofEpochMilli(it.currentTimeMillis()) }
private val pathConfig = pathConfigProvider.objectStoragePathConfiguration
private val defaultExtension = formatConfigProvider?.objectStorageFormatConfiguration?.extension
private val fileFormatExtension =
formatConfigProvider?.objectStorageFormatConfiguration?.extension
private val compressionExtension =
compressionConfigProvider?.objectStorageCompressionConfiguration?.compressor?.extension
private val defaultExtension =
if (fileFormatExtension != null && compressionExtension != null) {
"$fileFormatExtension.$compressionExtension"
} else {
fileFormatExtension ?: compressionExtension
}

inner class VariableContext(
val stream: DestinationStream,
Expand Down Expand Up @@ -143,8 +154,9 @@ class ObjectStoragePathFactory(

fun <T> from(config: T, timeProvider: TimeProvider? = null): ObjectStoragePathFactory where
T : ObjectStoragePathConfigurationProvider,
T : ObjectStorageFormatConfigurationProvider {
return ObjectStoragePathFactory(config, config, timeProvider)
T : ObjectStorageFormatConfigurationProvider,
T : ObjectStorageCompressionConfigurationProvider<*> {
return ObjectStoragePathFactory(config, config, config, timeProvider)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.load

import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.object_storage.ObjectStorageCompressionConfiguration
import io.airbyte.cdk.load.data.toAirbyteValue
import io.airbyte.cdk.load.file.GZIPProcessor
import io.airbyte.cdk.load.file.NoopProcessor
import io.airbyte.cdk.load.file.object_storage.ObjectStorageClient
import io.airbyte.cdk.load.file.object_storage.ObjectStoragePathFactory
import io.airbyte.cdk.load.file.object_storage.RemoteObject
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.cdk.load.test.util.toOutputRecord
import io.airbyte.cdk.load.util.deserializeToNode
import java.io.InputStream
import java.util.zip.GZIPInputStream
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext

class ObjectStorageDataDumper(
private val stream: DestinationStream,
private val client: ObjectStorageClient<*, *>,
private val pathFactory: ObjectStoragePathFactory,
private val compressionConfig: ObjectStorageCompressionConfiguration<*>? = null
) {
fun dump(): List<OutputRecord> {
val prefix = pathFactory.getFinalDirectory(stream).toString()
return runBlocking {
withContext(Dispatchers.IO) {
client
.list(prefix)
.map { listedObject: RemoteObject<*> ->
client.get(listedObject.key) { objectData: InputStream ->
when (compressionConfig?.compressor) {
is GZIPProcessor -> GZIPInputStream(objectData)
is NoopProcessor,
null -> objectData
else -> error("Unsupported compressor")
}
.bufferedReader()
.lineSequence()
.map { line ->
line
.deserializeToNode()
.toAirbyteValue(stream.schemaWithMeta)
.toOutputRecord()
}
.toList()
}
}
.toList()
.flatten()
}
}
}
}
1 change: 1 addition & 0 deletions airbyte-cdk/bulk/toolkits/load-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ dependencies {
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-aws')
api project(':airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage')

testFixturesApi(testFixtures(project(":airbyte-cdk:bulk:toolkits:bulk-cdk-toolkit-load-object-storage")))
implementation("aws.sdk.kotlin:s3:1.0.0")
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,15 @@ import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaInject
import com.kjetland.jackson.jsonSchema.annotations.JsonSchemaTitle
import io.airbyte.cdk.load.command.object_storage.ObjectStoragePathConfiguration

/**
* Mix-in to provide S3 path configuration fields as properties.
*
* NOTE: For legacy reasons, this is unnecessarily s3-specific. Future cloud storage solutions
* should create a single generic version of this in the `object-storage` toolkit and use that.
*
* See [io.airbyte.cdk.load.command.DestinationConfiguration] for more details on how to use this
* interface.
*/
interface S3PathSpecification {
@get:JsonSchemaTitle("S3 Path Format")
@get:JsonPropertyDescription(
Expand Down
Loading

0 comments on commit ca54e41

Please sign in to comment.