Skip to content

Commit

Permalink
bulk-cdk: add feature flag environments (#46692)
Browse files Browse the repository at this point in the history
  • Loading branch information
postamar authored Oct 18, 2024
1 parent 7b12647 commit a4847df
Show file tree
Hide file tree
Showing 19 changed files with 221 additions and 112 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
package io.airbyte.cdk

import io.airbyte.cdk.command.ConnectorCommandLinePropertySource
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.command.MetadataYamlPropertySource
import io.micronaut.configuration.picocli.MicronautFactory
import io.micronaut.context.ApplicationContext
import io.micronaut.context.RuntimeBeanDefinition
import io.micronaut.context.env.CommandLinePropertySource
import io.micronaut.context.env.Environment
import io.micronaut.context.env.MapPropertySource
import io.micronaut.core.cli.CommandLine as MicronautCommandLine
import java.nio.file.Path
import kotlin.system.exitProcess
Expand All @@ -21,9 +21,11 @@ import picocli.CommandLine.Model.UsageMessageSpec
class AirbyteSourceRunner(
/** CLI args. */
args: Array<out String>,
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("source", args, testBeanDefinitions) {
) : AirbyteConnectorRunner("source", args, systemEnv, testBeanDefinitions) {
companion object {
@JvmStatic
fun run(vararg args: String) {
Expand All @@ -36,10 +38,11 @@ class AirbyteSourceRunner(
class AirbyteDestinationRunner(
/** CLI args. */
args: Array<out String>,
testEnvironments: Map<String, String> = emptyMap(),
/** Environment variables. */
systemEnv: Map<String, String> = System.getenv(),
/** Micronaut bean definition overrides, used only for tests. */
vararg testBeanDefinitions: RuntimeBeanDefinition<*>,
) : AirbyteConnectorRunner("destination", args, testBeanDefinitions, testEnvironments) {
) : AirbyteConnectorRunner("destination", args, systemEnv, testBeanDefinitions) {
companion object {
@JvmStatic
fun run(vararg args: String) {
Expand All @@ -55,21 +58,18 @@ class AirbyteDestinationRunner(
sealed class AirbyteConnectorRunner(
val connectorType: String,
val args: Array<out String>,
systemEnv: Map<String, String>,
val testBeanDefinitions: Array<out RuntimeBeanDefinition<*>>,
val testProperties: Map<String, String> = emptyMap(),
) {
// Micronaut's TEST env detection relies on inspecting the stacktrace and checking for
// any junit calls. This doesn't work if we launch the connector from a different thread, e.g.
// `Dispatchers.IO`. Force the test env if needed. (Some tests launch the connector from the IO
// context to avoid blocking themselves.)
private val isTest = testBeanDefinitions.isNotEmpty()
val envs: Array<String> =
arrayOf(Environment.CLI, connectorType) +
if (isTest) {
arrayOf(Environment.TEST)
} else {
emptyArray()
}
// Set feature flag environments.
FeatureFlag.active(systemEnv).map { it.micronautEnvironmentName } +
// Micronaut's TEST env detection relies on inspecting the stacktrace and checking for
// any junit calls. This doesn't work if we launch the connector from a different
// thread, e.g. `Dispatchers.IO`. Force the test env if needed. Some tests launch the
// connector from the IO context to avoid blocking themselves.
listOfNotNull(Environment.TEST.takeIf { testBeanDefinitions.isNotEmpty() })

inline fun <reified R : Runnable> run() {
val picocliCommandLineFactory = PicocliCommandLineFactory(this)
Expand All @@ -84,7 +84,6 @@ sealed class AirbyteConnectorRunner(
ApplicationContext.builder(R::class.java, *envs)
.propertySources(
*listOfNotNull(
MapPropertySource("additional_properties", testProperties),
airbytePropertySource,
commandLinePropertySource,
MetadataYamlPropertySource(),
Expand All @@ -93,10 +92,6 @@ sealed class AirbyteConnectorRunner(
)
.beanDefinitions(*testBeanDefinitions)
.start()
// We can't rely on the isTest value from our constructor,
// because that won't autodetect junit in our stacktrace.
// So instead we ask micronaut (which will include if we explicitly added
// the TEST env).
val isTest: Boolean = ctx.environment.activeNames.contains(Environment.TEST)
val picocliFactory: CommandLine.IFactory = MicronautFactory(ctx)
val picocliCommandLine: CommandLine =
Expand All @@ -105,8 +100,10 @@ sealed class AirbyteConnectorRunner(
if (!isTest) {
// Required by the platform, otherwise syncs may hang.
exitProcess(exitCode)
} else if (exitCode != 0) {
// Otherwise, propagate failure to test callers.
}
// At this point, we're in a test.
if (exitCode != 0) {
// Propagate failure to test callers.
throw ConnectorUncleanExitException(exitCode)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@

package io.airbyte.cdk

/** This is used only in tests. */
class ConnectorUncleanExitException(val exitCode: Int) :
Exception("Destination process exited uncleanly: $exitCode")
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2024 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.cdk.command

import io.micronaut.context.annotation.Factory
import io.micronaut.context.env.Environment
import jakarta.inject.Singleton
import java.util.EnumSet

/**
* An enum of all feature flags, currently these are set via environment vars.
*
* Micronaut can inject a Set<FeatureFlag> singleton of all active feature flags.
*/
enum class FeatureFlag(
val micronautEnvironmentName: String,
val envVar: EnvVar,
val requiredEnvVarValue: String,
private val transformActualValue: (String) -> String = { it }
) {

/** [AIRBYTE_CLOUD_DEPLOYMENT] is active when the connector is running in Airbyte Cloud. */
AIRBYTE_CLOUD_DEPLOYMENT(
micronautEnvironmentName = AIRBYTE_CLOUD_ENV,
envVar = EnvVar.DEPLOYMENT_MODE,
requiredEnvVarValue = "CLOUD",
transformActualValue = { it.trim().uppercase() },
);

/** Environment variable binding shell declaration which activates the feature flag. */
val envVarBindingDeclaration: String
get() = "${envVar.name}=$requiredEnvVarValue"

enum class EnvVar(val defaultValue: String = "") {
DEPLOYMENT_MODE
}

companion object {
internal fun active(systemEnv: Map<String, String>): List<FeatureFlag> =
entries.filter { featureFlag: FeatureFlag ->
val envVar: EnvVar = featureFlag.envVar
val envVarValue: String = systemEnv[envVar.name] ?: envVar.defaultValue
featureFlag.transformActualValue(envVarValue) == featureFlag.requiredEnvVarValue
}
}

@Factory
private class MicronautFactory {

@Singleton
fun active(environment: Environment): Set<FeatureFlag> =
EnumSet.noneOf(FeatureFlag::class.java).apply {
addAll(
FeatureFlag.entries.filter {
environment.activeNames.contains(it.micronautEnvironmentName)
}
)
}
}
}

const val AIRBYTE_CLOUD_ENV = "airbyte-cloud"
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ data object CliRunner {
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
vararg featureFlags: FeatureFlag,
): CliRunnable {
val out = CliRunnerOutputStream()
val runnable: Runnable =
makeRunnable(op, config, catalog, state) { args: Array<String> ->
AirbyteSourceRunner(args, out.beanDefinition)
AirbyteSourceRunner(args, featureFlags.systemEnv, out.beanDefinition)
}
return CliRunnable(runnable, out.results)
}
Expand All @@ -52,7 +53,7 @@ data object CliRunner {
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
inputStream: InputStream,
testProperties: Map<String, String> = emptyMap(),
vararg featureFlags: FeatureFlag,
): CliRunnable {
val inputBeanDefinition: RuntimeBeanDefinition<InputStream> =
RuntimeBeanDefinition.builder(InputStream::class.java) { inputStream }
Expand All @@ -63,7 +64,7 @@ data object CliRunner {
makeRunnable(op, config, catalog, state) { args: Array<String> ->
AirbyteDestinationRunner(
args,
testProperties,
featureFlags.systemEnv,
inputBeanDefinition,
out.beanDefinition,
)
Expand All @@ -77,6 +78,7 @@ data object CliRunner {
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
state: List<AirbyteStateMessage>? = null,
featureFlags: Set<FeatureFlag> = setOf(),
vararg input: AirbyteMessage,
): CliRunnable {
val inputJsonBytes: ByteArray =
Expand All @@ -88,7 +90,7 @@ data object CliRunner {
baos.toByteArray()
}
val inputStream: InputStream = ByteArrayInputStream(inputJsonBytes)
return destination(op, config, catalog, state, inputStream)
return destination(op, config, catalog, state, inputStream, *featureFlags.toTypedArray())
}

private fun makeRunnable(
Expand Down Expand Up @@ -120,6 +122,9 @@ data object CliRunner {
}
}

private val Array<out FeatureFlag>.systemEnv: Map<String, String>
get() = toSet().map { it.envVar.name to it.requiredEnvVarValue }.toMap()

private fun inputFile(contents: Any?): Path? =
contents?.let {
Files.createTempFile(null, null).also { file ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
package io.airbyte.cdk.load.check

import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.command.ValidatedJsonUtils
import io.airbyte.cdk.load.test.util.FakeDataDumper
import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.destination_process.TestDeploymentMode
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus
import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.charset.StandardCharsets
Expand All @@ -23,7 +23,7 @@ import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.assertAll

data class CheckTestConfig(val configPath: String, val deploymentMode: TestDeploymentMode)
data class CheckTestConfig(val configPath: String, val featureFlags: Set<FeatureFlag> = emptySet())

open class CheckIntegrationTest<T : ConfigurationSpecification>(
val configurationClass: Class<T>,
Expand All @@ -37,14 +37,14 @@ open class CheckIntegrationTest<T : ConfigurationSpecification>(
) {
@Test
open fun testSuccessConfigs() {
for ((path, deploymentMode) in successConfigFilenames) {
for ((path, featureFlags) in successConfigFilenames) {
val fileContents = Files.readString(Path.of(path), StandardCharsets.UTF_8)
val config = ValidatedJsonUtils.parseOne(configurationClass, fileContents)
val process =
destinationProcessFactory.createDestinationProcess(
"check",
config = config,
deploymentMode = deploymentMode,
featureFlags = featureFlags.toTypedArray(),
)
runBlocking { process.run() }
val messages = process.readMessages()
Expand All @@ -66,14 +66,14 @@ open class CheckIntegrationTest<T : ConfigurationSpecification>(
@Test
open fun testFailConfigs() {
for ((checkTestConfig, failurePattern) in failConfigFilenamesAndFailureReasons) {
val (path, deploymentMode) = checkTestConfig
val (path, featureFlags) = checkTestConfig
val fileContents = Files.readString(Path.of(path))
val config = ValidatedJsonUtils.parseOne(configurationClass, fileContents)
val process =
destinationProcessFactory.createDestinationProcess(
"check",
config = config,
deploymentMode = deploymentMode,
featureFlags = featureFlags.toTypedArray(),
)
runBlocking { process.run() }
val messages = process.readMessages()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@ import com.deblock.jsondiff.matcher.LenientJsonObjectPartialMatcher
import com.deblock.jsondiff.matcher.StrictJsonArrayPartialMatcher
import com.deblock.jsondiff.matcher.StrictPrimitivePartialMatcher
import com.deblock.jsondiff.viewer.OnlyErrorDiffViewer
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.load.test.util.FakeDataDumper
import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.cdk.load.test.util.NoopDestinationCleaner
import io.airbyte.cdk.load.test.util.NoopExpectedRecordMapper
import io.airbyte.cdk.load.test.util.destination_process.TestDeploymentMode
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import java.nio.file.Files
Expand All @@ -42,27 +42,26 @@ abstract class SpecTest :
) {
@Test
fun testSpecOss() {
testSpec(TestDeploymentMode.OSS)
testSpec("expected-spec-oss.json")
}

@Test
fun testSpecCloud() {
testSpec(TestDeploymentMode.CLOUD)
testSpec("expected-spec-cloud.json", FeatureFlag.AIRBYTE_CLOUD_DEPLOYMENT)
}

private fun testSpec(deploymentMode: TestDeploymentMode) {
val expectedSpecFilename = "expected-spec-${deploymentMode.name.lowercase()}.json"
private fun testSpec(
expectedSpecFilename: String,
vararg featureFlags: FeatureFlag,
) {
val expectedSpecPath = Path.of("src/test-integration/resources", expectedSpecFilename)

if (!Files.exists(expectedSpecPath)) {
Files.createFile(expectedSpecPath)
}
val expectedSpec = Files.readString(expectedSpecPath)
val process =
destinationProcessFactory.createDestinationProcess(
"spec",
deploymentMode = deploymentMode,
)
destinationProcessFactory.createDestinationProcess("spec", featureFlags = featureFlags)
runBlocking { process.run() }
val messages = process.readMessages()
val specMessages = messages.filter { it.type == AirbyteMessage.Type.SPEC }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package io.airbyte.cdk.load.test.util.destination_process

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.command.ConfigurationSpecification
import io.airbyte.cdk.command.FeatureFlag
import io.airbyte.cdk.load.test.util.IntegrationTest
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog
Expand Down Expand Up @@ -40,11 +41,6 @@ interface DestinationProcess {
suspend fun shutdown()
}

enum class TestDeploymentMode {
CLOUD,
OSS
}

@SuppressFBWarnings("NP_NONNULL_RETURN_VIOLATION", "good old lateinit")
abstract class DestinationProcessFactory {
/**
Expand All @@ -58,6 +54,6 @@ abstract class DestinationProcessFactory {
command: String,
config: ConfigurationSpecification? = null,
catalog: ConfiguredAirbyteCatalog? = null,
deploymentMode: TestDeploymentMode = TestDeploymentMode.OSS,
vararg featureFlags: FeatureFlag,
): DestinationProcess
}
Loading

0 comments on commit a4847df

Please sign in to comment.