Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make sure that the feature flag is transfer to container #14314

Merged
merged 6 commits into from
Jun 30, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -79,15 +79,17 @@ public Optional<String> runJob() throws Exception {
Math.toIntExact(sourceLauncherConfig.getAttemptId()),
sourceLauncherConfig.getDockerImage(),
processFactory,
syncInput.getSourceResourceRequirements());
syncInput.getSourceResourceRequirements(),
featureFlags);

log.info("Setting up destination launcher...");
final IntegrationLauncher destinationLauncher = new AirbyteIntegrationLauncher(
destinationLauncherConfig.getJobId(),
Math.toIntExact(destinationLauncherConfig.getAttemptId()),
destinationLauncherConfig.getDockerImage(),
processFactory,
syncInput.getDestinationResourceRequirements());
syncInput.getDestinationResourceRequirements(),
featureFlags);

log.info("Setting up source...");
// reset jobs use an empty source to induce resetting all data in destination.
Expand Down
11 changes: 6 additions & 5 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ private void registerConnectionManager(final WorkerFactory factory) {
workerEnvironment,
logConfigs,
jobPersistence,
airbyteVersion),
airbyteVersion,
featureFlags),
new AutoDisableConnectionActivityImpl(configRepository, jobPersistence, featureFlags, configs, jobNotifier),
new StreamResetActivityImpl(streamResetPersistence, jobPersistence));
}
Expand Down Expand Up @@ -243,7 +244,7 @@ private void registerDiscover(final WorkerFactory factory) {
.registerActivitiesImplementations(
new DiscoverCatalogActivityImpl(discoverWorkerConfigs, discoverProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment,
logConfigs,
jobPersistence, airbyteVersion));
jobPersistence, airbyteVersion, featureFlags));
}

private void registerCheckConnection(final WorkerFactory factory) {
Expand All @@ -253,15 +254,15 @@ private void registerCheckConnection(final WorkerFactory factory) {
checkConnectionWorker
.registerActivitiesImplementations(
new CheckConnectionActivityImpl(checkWorkerConfigs, checkProcessFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs,
jobPersistence, airbyteVersion));
jobPersistence, airbyteVersion, featureFlags));
}

private void registerGetSpec(final WorkerFactory factory) {
final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name(), getWorkerOptions(maxWorkers.getMaxSpecWorkers()));
specWorker.registerWorkflowImplementationTypes(SpecWorkflowImpl.class);
specWorker.registerActivitiesImplementations(
new SpecActivityImpl(specWorkerConfigs, specProcessFactory, workspaceRoot, workerEnvironment, logConfigs, jobPersistence,
airbyteVersion));
airbyteVersion, featureFlags));
}

private ReplicationActivityImpl getReplicationActivityImpl(final WorkerConfigs workerConfigs,
Expand All @@ -277,7 +278,7 @@ private ReplicationActivityImpl getReplicationActivityImpl(final WorkerConfigs w
logConfigs,
jobPersistence,
airbyteVersion,
featureFlags.useStreamCapableState());
featureFlags);
}

private NormalizationActivityImpl getNormalizationActivityImpl(final WorkerConfigs workerConfigs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.WorkerEnvConstants;
import io.airbyte.workers.exception.WorkerException;
Expand Down Expand Up @@ -44,17 +46,20 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
private final String imageName;
private final ProcessFactory processFactory;
private final ResourceRequirements resourceRequirement;
private final FeatureFlags featureFlags;

public AirbyteIntegrationLauncher(final String jobId,
final int attempt,
final String imageName,
final ProcessFactory processFactory,
final ResourceRequirements resourceRequirement) {
final ResourceRequirements resourceRequirement,
final FeatureFlags featureFlags) {
this.jobId = jobId;
this.attempt = attempt;
this.imageName = imageName;
this.processFactory = processFactory;
this.resourceRequirement = resourceRequirement;
this.featureFlags = featureFlags;
}

@Override
Expand Down Expand Up @@ -188,7 +193,8 @@ private Map<String, String> getWorkerMetadata() {
return Map.of(
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, imageName,
WorkerEnvConstants.WORKER_JOB_ID, jobId,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt));
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(attempt),
EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(featureFlags.useStreamCapableState()));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.check.connection;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.StandardCheckConnectionInput;
Expand Down Expand Up @@ -35,6 +36,7 @@ public class CheckConnectionActivityImpl implements CheckConnectionActivity {
private final LogConfigs logConfigs;
private final JobPersistence jobPersistence;
private final String airbyteVersion;
private final FeatureFlags featureFlags;

public CheckConnectionActivityImpl(final WorkerConfigs workerConfigs,
final ProcessFactory processFactory,
Expand All @@ -43,7 +45,8 @@ public CheckConnectionActivityImpl(final WorkerConfigs workerConfigs,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final JobPersistence jobPersistence,
final String airbyteVersion) {
final String airbyteVersion,
final FeatureFlags featureFlags) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to pass the FeatureFlags into each of these non-sync activities? From what I can tell, only the replication activity actually cares about the USE_STREAM_CAPABLE_STATE being set correctly.

I think passing this into all of these activities is fine if we are not planning to remove this featureFlags object once we stop needing the USE_STREAM_CAPABLE_STATE flag. But if we are planning to remove this object completely when we stop using that flag, then it seems a bit overkill to pass it into every activity. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was to avoid initializing it multiple times (only in workerApp). The env will be set in the container because of the way the integrationRunner is done but I can remove it from the code.

this.workerConfigs = workerConfigs;
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
Expand All @@ -52,9 +55,10 @@ public CheckConnectionActivityImpl(final WorkerConfigs workerConfigs,
this.logConfigs = logConfigs;
this.jobPersistence = jobPersistence;
this.airbyteVersion = airbyteVersion;
this.featureFlags = featureFlags;
}

public StandardCheckConnectionOutput run(CheckConnectionInput args) {
public StandardCheckConnectionOutput run(final CheckConnectionInput args) {
final JsonNode fullConfig = secretsHydrator.hydrate(args.getConnectionConfiguration().getConnectionConfiguration());

final StandardCheckConnectionInput input = new StandardCheckConnectionInput()
Expand Down Expand Up @@ -84,7 +88,8 @@ private CheckedSupplier<Worker<StandardCheckConnectionInput, StandardCheckConnec
Math.toIntExact(launcherConfig.getAttemptId()),
launcherConfig.getDockerImage(),
processFactory,
workerConfigs.getResourceRequirements());
workerConfigs.getResourceRequirements(),
featureFlags);

return new DefaultCheckConnectionWorker(workerConfigs, integrationLauncher);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.discover.catalog;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.StandardDiscoverCatalogInput;
Expand Down Expand Up @@ -38,6 +39,7 @@ public class DiscoverCatalogActivityImpl implements DiscoverCatalogActivity {
private final LogConfigs logConfigs;
private final JobPersistence jobPersistence;
private final String airbyteVersion;
private final FeatureFlags featureFlags;

public DiscoverCatalogActivityImpl(final WorkerConfigs workerConfigs,
final ProcessFactory processFactory,
Expand All @@ -46,7 +48,8 @@ public DiscoverCatalogActivityImpl(final WorkerConfigs workerConfigs,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final JobPersistence jobPersistence,
final String airbyteVersion) {
final String airbyteVersion,
final FeatureFlags featureFlags) {
this.workerConfigs = workerConfigs;
this.processFactory = processFactory;
this.secretsHydrator = secretsHydrator;
Expand All @@ -55,7 +58,7 @@ public DiscoverCatalogActivityImpl(final WorkerConfigs workerConfigs,
this.logConfigs = logConfigs;
this.jobPersistence = jobPersistence;
this.airbyteVersion = airbyteVersion;

this.featureFlags = featureFlags;
}

public AirbyteCatalog run(final JobRunConfig jobRunConfig,
Expand Down Expand Up @@ -88,7 +91,7 @@ private CheckedSupplier<Worker<StandardDiscoverCatalogInput, AirbyteCatalog>, Ex
return () -> {
final IntegrationLauncher integrationLauncher =
new AirbyteIntegrationLauncher(launcherConfig.getJobId(), launcherConfig.getAttemptId().intValue(), launcherConfig.getDockerImage(),
processFactory, workerConfigs.getResourceRequirements());
processFactory, workerConfigs.getResourceRequirements(), featureFlags);
final AirbyteStreamFactory streamFactory = new DefaultAirbyteStreamFactory();
return new DefaultDiscoverCatalogWorker(workerConfigs, integrationLauncher, streamFactory);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.spec;

import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.JobGetSpecConfig;
Expand Down Expand Up @@ -34,21 +35,24 @@ public class SpecActivityImpl implements SpecActivity {
private final LogConfigs logConfigs;
private final JobPersistence jobPersistence;
private final String airbyteVersion;
private final FeatureFlags featureFlags;

public SpecActivityImpl(final WorkerConfigs workerConfigs,
final ProcessFactory processFactory,
final Path workspaceRoot,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final JobPersistence jobPersistence,
final String airbyteVersion) {
final String airbyteVersion,
final FeatureFlags featureFlags) {
this.workerConfigs = workerConfigs;
this.processFactory = processFactory;
this.workspaceRoot = workspaceRoot;
this.workerEnvironment = workerEnvironment;
this.logConfigs = logConfigs;
this.jobPersistence = jobPersistence;
this.airbyteVersion = airbyteVersion;
this.featureFlags = featureFlags;
}

public ConnectorSpecification run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig launcherConfig) {
Expand Down Expand Up @@ -79,7 +83,8 @@ private CheckedSupplier<Worker<JobGetSpecConfig, ConnectorSpecification>, Except
launcherConfig.getAttemptId().intValue(),
launcherConfig.getDockerImage(),
processFactory,
workerConfigs.getResourceRequirements());
workerConfigs.getResourceRequirements(),
featureFlags);

return new DefaultGetSpecWorker(workerConfigs, integrationLauncher);
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.workers.temporal.sync;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.functional.CheckedSupplier;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.AirbyteConfigValidator;
Expand Down Expand Up @@ -65,7 +66,7 @@ public class ReplicationActivityImpl implements ReplicationActivity {

private final JobPersistence jobPersistence;
private final String airbyteVersion;
private final boolean useStreamCapableState;
private final FeatureFlags featureFlags;

public ReplicationActivityImpl(final Optional<WorkerApp.ContainerOrchestratorConfig> containerOrchestratorConfig,
final WorkerConfigs workerConfigs,
Expand All @@ -76,9 +77,9 @@ public ReplicationActivityImpl(final Optional<WorkerApp.ContainerOrchestratorCon
final LogConfigs logConfigs,
final JobPersistence jobPersistence,
final String airbyteVersion,
final boolean useStreamCapableState) {
final FeatureFlags featureFlags) {
this(containerOrchestratorConfig, workerConfigs, processFactory, secretsHydrator, workspaceRoot, workerEnvironment, logConfigs,
new AirbyteConfigValidator(), jobPersistence, airbyteVersion, useStreamCapableState);
new AirbyteConfigValidator(), jobPersistence, airbyteVersion, featureFlags);
}

@VisibleForTesting
Expand All @@ -92,7 +93,7 @@ public ReplicationActivityImpl(final Optional<WorkerApp.ContainerOrchestratorCon
final AirbyteConfigValidator validator,
final JobPersistence jobPersistence,
final String airbyteVersion,
final boolean useStreamCapableState) {
final FeatureFlags featureFlags) {
this.containerOrchestratorConfig = containerOrchestratorConfig;
this.workerConfigs = workerConfigs;
this.processFactory = processFactory;
Expand All @@ -103,7 +104,7 @@ public ReplicationActivityImpl(final Optional<WorkerApp.ContainerOrchestratorCon
this.logConfigs = logConfigs;
this.jobPersistence = jobPersistence;
this.airbyteVersion = airbyteVersion;
this.useStreamCapableState = useStreamCapableState;
this.featureFlags = featureFlags;
}

@Override
Expand Down Expand Up @@ -197,18 +198,20 @@ private CheckedSupplier<Worker<StandardSyncInput, ReplicationOutput>, Exception>
Math.toIntExact(sourceLauncherConfig.getAttemptId()),
sourceLauncherConfig.getDockerImage(),
processFactory,
syncInput.getSourceResourceRequirements());
syncInput.getSourceResourceRequirements(),
featureFlags);
final IntegrationLauncher destinationLauncher = new AirbyteIntegrationLauncher(
destinationLauncherConfig.getJobId(),
Math.toIntExact(destinationLauncherConfig.getAttemptId()),
destinationLauncherConfig.getDockerImage(),
processFactory,
syncInput.getDestinationResourceRequirements());
syncInput.getDestinationResourceRequirements(),
featureFlags);

// reset jobs use an empty source to induce resetting all data in destination.
final AirbyteSource airbyteSource =
sourceLauncherConfig.getDockerImage().equals(WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB)
? new EmptyAirbyteSource(useStreamCapableState)
? new EmptyAirbyteSource(featureFlags.useStreamCapableState())
: new DefaultAirbyteSource(workerConfigs, sourceLauncher);

return new DefaultReplicationWorker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@

package io.airbyte.workers.process;

import static io.airbyte.workers.process.AirbyteIntegrationLauncher.*;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.CHECK_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.DISCOVER_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.JOB_TYPE;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.READ_STEP;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SPEC_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_JOB;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_STEP;
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.WRITE_STEP;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.WorkerEnvConstants;
import io.airbyte.workers.WorkerConfigs;
Expand All @@ -17,8 +25,12 @@
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;

@ExtendWith(MockitoExtension.class)
class AirbyteIntegrationLauncherTest {

private static final String JOB_ID = "0";
Expand All @@ -40,14 +52,16 @@ class AirbyteIntegrationLauncherTest {
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT));

private WorkerConfigs workerConfigs;
@Mock
private ProcessFactory processFactory;
private AirbyteIntegrationLauncher launcher;
@Mock
private FeatureFlags featureFlags;

@BeforeEach
void setUp() {
workerConfigs = new WorkerConfigs(new EnvConfigs());
processFactory = Mockito.mock(ProcessFactory.class);
launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements());
launcher = new AirbyteIntegrationLauncher(JOB_ID, JOB_ATTEMPT, FAKE_IMAGE, processFactory, workerConfigs.getResourceRequirements(), featureFlags);
}

@Test
Expand Down