Skip to content

Commit

Permalink
Merge branch 'master' into tope/tiktok-marketing/add-sparkAds-stream
Browse files Browse the repository at this point in the history
  • Loading branch information
ChristoGrab authored Oct 18, 2024
2 parents c978fd6 + 1cc7f2c commit 61ebcd0
Show file tree
Hide file tree
Showing 116 changed files with 9,594 additions and 1,259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
package io.airbyte.cdk

import io.airbyte.cdk.command.ConnectorCommandLinePropertySource
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.command.MetadataYamlPropertySource
import io.micronaut.configuration.picocli.MicronautFactory
import io.micronaut.context.ApplicationContext
import io.micronaut.context.RuntimeBeanDefinition
import io.micronaut.context.env.CommandLinePropertySource
import io.micronaut.context.env.Environment
import io.micronaut.context.env.MapPropertySource
import io.micronaut.core.cli.CommandLine as MicronautCommandLine
import java.nio.file.Path
import kotlin.system.exitProcess
Expand All @@ -21,9 +21,11 @@ import picocli.CommandLine.Model.UsageMessageSpec
class AirbyteSourceRunner(
/** CLI args. */
args: Array<out String>,
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("source", args, testBeanDefinitions) {
) : AirbyteConnectorRunner("source", args, systemEnv, testBeanDefinitions) {
companion object {
@JvmStatic
fun run(vararg args: String) {
Expand All @@ -36,10 +38,11 @@ class AirbyteSourceRunner(
class AirbyteDestinationRunner(
/** CLI args. */
args: Array<out String>,
testEnvironments: Map<String, String> = emptyMap(),
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("destination", args, testBeanDefinitions, testEnvironments) {
) : AirbyteConnectorRunner("destination", args, systemEnv, testBeanDefinitions) {
companion object {
@JvmStatic
fun run(vararg args: String) {
Expand All @@ -55,10 +58,18 @@ class AirbyteDestinationRunner(
sealed class AirbyteConnectorRunner(
val connectorType: String,
val args: Array<out String>,
systemEnv: Map<String, String>,
val testBeanDefinitions: Array<out RuntimeBeanDefinition<*>>,
val testProperties: Map<String, String> = emptyMap(),
) {
val envs: Array<String> = arrayOf(Environment.CLI, connectorType)
val envs: Array<String> =
arrayOf(Environment.CLI, connectorType) +
// Set feature flag environments.
FeatureFlag.active(systemEnv).map { it.micronautEnvironmentName } +
// Micronaut's TEST env detection relies on inspecting the stacktrace and checking for
// any junit calls. This doesn't work if we launch the connector from a different
// thread, e.g. `Dispatchers.IO`. Force the test env if needed. Some tests launch the
// connector from the IO context to avoid blocking themselves.
listOfNotNull(Environment.TEST.takeIf { testBeanDefinitions.isNotEmpty() })

inline fun <reified R : Runnable> run() {
val picocliCommandLineFactory = PicocliCommandLineFactory(this)
Expand All @@ -73,7 +84,6 @@ sealed class AirbyteConnectorRunner(
ApplicationContext.builder(R::class.java, *envs)
.propertySources(
*listOfNotNull(
MapPropertySource("additional_properties", testProperties),
airbytePropertySource,
commandLinePropertySource,
MetadataYamlPropertySource(),
Expand All @@ -91,6 +101,11 @@ sealed class AirbyteConnectorRunner(
// Required by the platform, otherwise syncs may hang.
exitProcess(exitCode)
}
// At this point, we're in a test.
if (exitCode != 0) {
// Propagate failure to test callers.
throw ConnectorUncleanExitException(exitCode)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk

/** This is used only in tests. */
class ConnectorUncleanExitException(val exitCode: Int) :
Exception("Destination process exited uncleanly: $exitCode")
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.micronaut.context.annotation.Factory
import io.micronaut.context.env.Environment
import jakarta.inject.Singleton
import java.util.EnumSet

/**
* An enum of all feature flags, currently these are set via environment vars.
*
* Micronaut can inject a Set<FeatureFlag> singleton of all active feature flags.
*/
enum class FeatureFlag(
val micronautEnvironmentName: String,
val envVar: EnvVar,
val requiredEnvVarValue: String,
private val transformActualValue: (String) -> String = { it }
) {

/** [AIRBYTE_CLOUD_DEPLOYMENT] is active when the connector is running in Airbyte Cloud. */
AIRBYTE_CLOUD_DEPLOYMENT(
micronautEnvironmentName = AIRBYTE_CLOUD_ENV,
envVar = EnvVar.DEPLOYMENT_MODE,
requiredEnvVarValue = "CLOUD",
transformActualValue = { it.trim().uppercase() },
);

/** Environment variable binding shell declaration which activates the feature flag. */
val envVarBindingDeclaration: String
get() = "${envVar.name}=$requiredEnvVarValue"

enum class EnvVar(val defaultValue: String = "") {
DEPLOYMENT_MODE
}

companion object {
internal fun active(systemEnv: Map<String, String>): List<FeatureFlag> =
entries.filter { featureFlag: FeatureFlag ->
val envVar: EnvVar = featureFlag.envVar
val envVarValue: String = systemEnv[envVar.name] ?: envVar.defaultValue
featureFlag.transformActualValue(envVarValue) == featureFlag.requiredEnvVarValue
}
}

@Factory
private class MicronautFactory {

@Singleton
fun active(environment: Environment): Set<FeatureFlag> =
EnumSet.noneOf(FeatureFlag::class.java).apply {
addAll(
FeatureFlag.entries.filter {
environment.activeNames.contains(it.micronautEnvironmentName)
}
)
}
}
}

const val AIRBYTE_CLOUD_ENV = "airbyte-cloud"
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ data object CliRunner {
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
vararg featureFlags: FeatureFlag,
): CliRunnable {
val out = CliRunnerOutputStream()
val runnable: Runnable =
makeRunnable(op, config, catalog, state) { args: Array<String> ->
AirbyteSourceRunner(args, out.beanDefinition)
AirbyteSourceRunner(args, featureFlags.systemEnv, out.beanDefinition)
}
return CliRunnable(runnable, out.results)
}
Expand All @@ -52,7 +53,7 @@ data object CliRunner {
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
inputStream: InputStream,
testProperties: Map<String, String> = emptyMap(),
vararg featureFlags: FeatureFlag,
): CliRunnable {
val inputBeanDefinition: RuntimeBeanDefinition<InputStream> =
RuntimeBeanDefinition.builder(InputStream::class.java) { inputStream }
Expand All @@ -63,9 +64,9 @@ data object CliRunner {
makeRunnable(op, config, catalog, state) { args: Array<String> ->
AirbyteDestinationRunner(
args,
testProperties,
featureFlags.systemEnv,
inputBeanDefinition,
out.beanDefinition
out.beanDefinition,
)
}
return CliRunnable(runnable, out.results)
Expand All @@ -77,6 +78,7 @@ data object CliRunner {
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
featureFlags: Set<FeatureFlag> = setOf(),
vararg input: AirbyteMessage,
): CliRunnable {
val inputJsonBytes: ByteArray =
Expand All @@ -88,7 +90,7 @@ data object CliRunner {
baos.toByteArray()
}
val inputStream: InputStream = ByteArrayInputStream(inputJsonBytes)
return destination(op, config, catalog, state, inputStream)
return destination(op, config, catalog, state, inputStream, *featureFlags.toTypedArray())
}

private fun makeRunnable(
Expand Down Expand Up @@ -120,6 +122,9 @@ data object CliRunner {
}
}

private val Array<out FeatureFlag>.systemEnv: Map<String, String>
get() = toSet().map { it.envVar.name to it.requiredEnvVarValue }.toMap()

private fun inputFile(contents: Any?): Path? =
contents?.let {
Files.createTempFile(null, null).also { file ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,9 @@ class MockBasicFunctionalityIntegrationTest :
override fun testBasicWrite() {
super.testBasicWrite()
}

@Test
override fun testMidSyncCheckpointingStreamState() {
super.testMidSyncCheckpointingStreamState()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.cdk.load.mock_integration_test

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.test.util.DestinationDataDumper
import io.airbyte.cdk.load.test.util.OutputRecord
import java.util.concurrent.ConcurrentHashMap
Expand All @@ -25,9 +27,12 @@ object MockDestinationBackend {
}

object MockDestinationDataDumper : DestinationDataDumper {
override fun dumpRecords(streamName: String, streamNamespace: String?): List<OutputRecord> {
override fun dumpRecords(
spec: ConfigurationSpecification,
stream: DestinationStream
): List<OutputRecord> {
return MockDestinationBackend.readFile(
MockStreamLoader.getFilename(streamNamespace, streamName)
MockStreamLoader.getFilename(stream.descriptor.namespace, stream.descriptor.name)
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ import io.airbyte.cdk.load.command.DestinationConfigurationFactory
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

class MockDestinationConfiguration : DestinationConfiguration()
class MockDestinationConfiguration : DestinationConfiguration() {
// override to 10KB instead of 200MB
override val recordBatchSizeBytes = 10 * 1024L
}

@Singleton class MockDestinationSpecification : ConfigurationSpecification()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ package io.airbyte.cdk.load.command

import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteTypeToJsonSchema
import io.airbyte.cdk.load.data.ArrayType
import io.airbyte.cdk.load.data.FieldType
import io.airbyte.cdk.load.data.IntegerType
import io.airbyte.cdk.load.data.JsonSchemaToAirbyteType
import io.airbyte.cdk.load.data.ObjectType
import io.airbyte.cdk.load.data.StringType
import io.airbyte.cdk.load.message.DestinationRecord
import io.airbyte.protocol.models.v0.AirbyteStream
import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream
import io.airbyte.protocol.models.v0.DestinationSyncMode
Expand Down Expand Up @@ -40,6 +46,65 @@ data class DestinationStream(
fun toPrettyString() = "$namespace.$name"
}

/**
* This is the schema of what we currently write to destinations, but this might not reflect
* what actually exists, as many destinations have legacy data from before this schema was
* adopted.
*/
val schemaWithMeta: AirbyteType
get() =
ObjectType(
linkedMapOf(
DestinationRecord.Meta.COLUMN_NAME_AB_RAW_ID to
FieldType(StringType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_EXTRACTED_AT to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_AB_META to
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"sync_id" to FieldType(IntegerType, nullable = false),
"changes" to
FieldType(
nullable = false,
type =
ArrayType(
FieldType(
nullable = false,
type =
ObjectType(
linkedMapOf(
"field" to
FieldType(
StringType,
nullable = false
),
"change" to
FieldType(
StringType,
nullable = false
),
"reason" to
FieldType(
StringType,
nullable = false
),
)
)
)
)
)
)
)
),
DestinationRecord.Meta.COLUMN_NAME_AB_GENERATION_ID to
FieldType(IntegerType, nullable = false),
DestinationRecord.Meta.COLUMN_NAME_DATA to FieldType(schema, nullable = false),
)
)

/**
* This is not fully round-trippable. Destinations don't care about most of the stuff in an
* AirbyteStream (e.g. we don't care about defaultCursorField, we only care about the _actual_
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,7 @@ class AirbyteValueToJson {
}
}
}

fun AirbyteValue.toJson(): JsonNode {
return AirbyteValueToJson().convert(this)
}
Loading

0 comments on commit 61ebcd0

Please sign in to comment.