Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fetch specs from definitions directly #7293

Merged
merged 15 commits into from
Oct 25, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Function;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -83,16 +81,7 @@ public SynchronousResponse<ConnectorSpecification> createGetSpecJob(final String

if (cachedSpecOptional.isPresent()) {
LOGGER.debug("Spec bucket cache: Cache hit.");
final long now = Instant.now().toEpochMilli();
final SynchronousJobMetadata mockMetadata = new SynchronousJobMetadata(
UUID.randomUUID(),
ConfigType.GET_SPEC,
null,
now,
now,
true,
null);
return new SynchronousResponse<>(cachedSpecOptional.get(), mockMetadata);
return new SynchronousResponse<>(cachedSpecOptional.get(), SynchronousJobMetadata.mock(ConfigType.GET_SPEC));
} else {
LOGGER.debug("Spec bucket cache: Cache miss.");
return client.createGetSpecJob(dockerImage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.workers.temporal.JobMetadata;
import java.nio.file.Path;
import java.time.Instant;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
Expand Down Expand Up @@ -114,4 +115,20 @@ public String toString() {
'}';
}

public static SynchronousJobMetadata mock(final ConfigType configType) {
final long now = Instant.now().toEpochMilli();
final UUID configId = null;
final boolean succeeded = true;
final Path logPath = null;

return new SynchronousJobMetadata(
UUID.randomUUID(),
configType,
configId,
now,
now,
succeeded,
logPath);
}

}
15 changes: 7 additions & 8 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,8 @@ public class ServerApp implements ServerRunnable {
private static final Logger LOGGER = LoggerFactory.getLogger(ServerApp.class);
private static final int PORT = 8001;
/**
* We can't support automatic migration for kube before this version because we had a bug in kube
* which would cause airbyte db to erase state upon termination, as a result the automatic migration
* wouldn't run
* We can't support automatic migration for kube before this version because we had a bug in kube which would cause airbyte db to erase state upon
* termination, as a result the automatic migration wouldn't run
*/
private static final AirbyteVersion KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION = new AirbyteVersion("0.26.5-alpha");
private final String airbyteVersion;
Expand Down Expand Up @@ -167,7 +166,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
configs.getConfigDatabaseUser(),
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl())
.getAndInitialize();
.getAndInitialize();
final DatabaseConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase);
configPersistence.migrateFileConfigs(configs);

Expand All @@ -183,7 +182,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
configs.getDatabaseUser(),
configs.getDatabasePassword(),
configs.getDatabaseUrl())
.getAndInitialize();
.getAndInitialize();
final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase);

createDeploymentIfNoneExists(jobPersistence);
Expand Down Expand Up @@ -219,7 +218,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
final SpecFetcher specFetcher = new SpecFetcher(cachingSchedulerClient);

// required before migration
configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.execute(dockerImage)));
// TODO: remove this specFetcherFn logic once file migrations are deprecated
configRepository.setSpecFetcher(dockerImage -> Exceptions.toRuntime(() -> specFetcher.getSpec(dockerImage)));
cgardens marked this conversation as resolved.
Show resolved Hide resolved

Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) {
Expand Down Expand Up @@ -263,8 +263,7 @@ public static void main(final String[] args) throws Exception {
}

/**
* Ideally when automatic migration runs, we should make sure that we acquire a lock on database and
* no other operation is allowed
* Ideally when automatic migration runs, we should make sure that we acquire a lock on database and no other operation is allowed
*/
private static void runAutomaticMigration(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.airbyte.server.converters;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
Expand Down Expand Up @@ -42,8 +41,7 @@ public SourceConnection source(final UUID sourceId, final String sourceName, fin
persistedSource.setName(sourceName);
// get spec
final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(persistedSource.getSourceDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag());
final ConnectorSpecification spec = specFetcher.execute(imageName);
final ConnectorSpecification spec = specFetcher.getSpec(sourceDefinition);
// copy any necessary secrets from the current source to the incoming updated source
final JsonNode updatedConfiguration = secretsProcessor.copySecrets(
persistedSource.getConfiguration(),
Expand All @@ -61,8 +59,7 @@ public DestinationConnection destination(final UUID destinationId, final String
// get spec
final StandardDestinationDefinition destinationDefinition = configRepository
.getStandardDestinationDefinition(persistedDestination.getDestinationDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(destinationDefinition.getDockerRepository(), destinationDefinition.getDockerImageTag());
final ConnectorSpecification spec = specFetcher.execute(imageName);
final ConnectorSpecification spec = specFetcher.getSpec(destinationDefinition);
// copy any necessary secrets from the current destination to the incoming updated destination
final JsonNode updatedConfiguration = secretsProcessor.copySecrets(
persistedDestination.getConfiguration(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,77 @@
package io.airbyte.server.converters;

import com.google.common.base.Preconditions;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SynchronousJobMetadata;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpecFetcher {

private static final Logger LOGGER = LoggerFactory.getLogger(SpecFetcher.class);

private final SynchronousSchedulerClient schedulerJobClient;

public SpecFetcher(final SynchronousSchedulerClient schedulerJobClient) {
this.schedulerJobClient = schedulerJobClient;
}

public ConnectorSpecification execute(final String dockerImage) throws IOException {
// TODO: remove this once file migrations are deprecated, as that is the only time this function is used
@Deprecated
public ConnectorSpecification getSpec(final String dockerImage) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the only reason we need to keep this method are 2 usages.

  1. ServerApp.java line 222, which we know we are going to get rid of when we get rid of the file migrations
  2. DockerImageValidator - left a comment below for how we might be able to remove this usage.

can we add a todo, reminding us that we want to kill this method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason i'm so focused on this method is because it has a different behavior that is non-obvious from the caller's point of view. the other methods all will try to find the spec in the definition itself. this one will never do that and will always start calling scheduler clients. so i want to advertise loudly that it is different and shouldn't be used. in fact, we should mark it as deprecated.

return getSpecFromJob(schedulerJobClient.createGetSpecJob(dockerImage));
}

private static ConnectorSpecification getSpecFromJob(final SynchronousResponse<ConnectorSpecification> response) {
public ConnectorSpecification getSpec(final StandardSourceDefinition sourceDefinition) throws IOException {
return getSpecFromJob(getSpecJobResponse(sourceDefinition));
}

public ConnectorSpecification getSpec(final StandardDestinationDefinition destinationDefinition) throws IOException {
return getSpecFromJob(getSpecJobResponse(destinationDefinition));
}

// TODO: remove this method once the spec is a required field on the StandardSourceDefinition struct
public SynchronousResponse<ConnectorSpecification> getSpecJobResponse(final StandardSourceDefinition sourceDefinition) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how annoying! I see that usage in SchedulerHandler that is causing this. can you add a todo that says when we have moved the spec into the db as a required field we should get rid of the need for this?

in case it's not clear, all jobs the require spinning up a docker container we use this SynchronousResponse struct to help pass through metadata around what happened in the job and logs. Once we can guarantee that for spec that we will not be spinning up a docker container, we can just remove this part from the code path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you pointed out, the main reason I added this was because the SourceDefinitionSpecificationRead struct returned by the SchedulerHandler method (which is in turned returned by the ConfigurationApi) contains a required jobInfo field, so the method I added mocked out that job info when just getting the spec from the db.

Therefore it seems to me that as part of the change to make the spec field required/guaranteed on the db struct, we should also remove the jobInfo field from the SourceDefinitionSpecificationRead struct. Does this align with what you are suggesting here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i actually think the code you've written here is good. i'm not suggesting we change anything. just want to add a comment saying that we want to get rid of this part of the public interface in the future.

you're right, we will want to remove jobinfo from SourceDefinitionSpecificationRead, but we should wait to do it until we have the guarantee that we will only ever fetch the spec from the db. the reality is that while it is still possible to pull specs from the docker image, it is still helpful to keep the jobinfo because it includes logs to give clues as to what the problems are!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Completely agree! I meant that we should only remove jobInfo once we have that guarantee, and I agree that it makes sense to keep it for now 👍

LOGGER.debug("Spec Fetcher: Getting spec for Source Definition.");
final ConnectorSpecification spec = sourceDefinition.getSpec();

if (spec != null) {
LOGGER.debug("Spec Fetcher: Spec found in Source Definition.");
return new SynchronousResponse<>(spec, SynchronousJobMetadata.mock(ConfigType.GET_SPEC));
}

LOGGER.debug("Spec Fetcher: Spec not found in Source Definition, fetching with scheduler job instead.");
final String dockerImageName = DockerUtils.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag());
return schedulerJobClient.createGetSpecJob(dockerImageName);
}

// TODO: remove this method once the spec is a required field on the StandardDestinationDefinition
// struct
public SynchronousResponse<ConnectorSpecification> getSpecJobResponse(final StandardDestinationDefinition destinationDefinition)
throws IOException {
LOGGER.debug("Spec Fetcher: Getting spec for Destination Definition.");
final ConnectorSpecification spec = destinationDefinition.getSpec();

if (spec != null) {
LOGGER.debug("Spec Fetcher: Spec found in Destination Definition.");
return new SynchronousResponse<>(spec, SynchronousJobMetadata.mock(ConfigType.GET_SPEC));
}

LOGGER.debug("Spec Fetcher: Spec not found in Destination Definition, fetching with scheduler job instead.");
final String dockerImageName = DockerUtils.getTaggedImageName(
destinationDefinition.getDockerRepository(),
destinationDefinition.getDockerImageTag());
return schedulerJobClient.createGetSpecJob(dockerImageName);
}

public static ConnectorSpecification getSpecFromJob(final SynchronousResponse<ConnectorSpecification> response) {
Preconditions.checkState(response.isSuccess(), "Get Spec job failed.");
Preconditions.checkNotNull(response.getOutput(), "Get Spec job return null spec");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.airbyte.api.model.DestinationSearch;
import io.airbyte.api.model.DestinationUpdate;
import io.airbyte.api.model.WorkspaceIdRequestBody;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
Expand Down Expand Up @@ -213,7 +212,7 @@ public ConnectorSpecification getSpec(final UUID destinationDefinitionId)

public static ConnectorSpecification getSpec(final SpecFetcher specFetcher, final StandardDestinationDefinition destinationDef)
throws JsonValidationException, IOException, ConfigNotFoundException {
return specFetcher.execute(DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag()));
return specFetcher.getSpec(destinationDef);
}

private void persistDestinationConnection(final String name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,18 +136,17 @@ public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdReque
public CheckConnectionRead checkSourceConnectionFromSourceCreate(final SourceCoreConfig sourceConfig)
throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(sourceConfig.getSourceDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());

final var partialConfig = configRepository.statefulSplitEphemeralSecrets(
sourceConfig.getConnectionConfiguration(),
specFetcher.execute(imageName));
specFetcher.getSpec(sourceDef));

// todo (cgardens) - narrow the struct passed to the client. we are not setting fields that are
// technically declared as required.
final SourceConnection source = new SourceConnection()
.withSourceDefinitionId(sourceConfig.getSourceDefinitionId())
.withConfiguration(partialConfig);

final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());
return reportConnectionStatus(synchronousSchedulerClient.createSourceCheckConnectionJob(source, imageName));
}

Expand Down Expand Up @@ -177,18 +176,17 @@ public CheckConnectionRead checkDestinationConnectionFromDestinationId(final Des
public CheckConnectionRead checkDestinationConnectionFromDestinationCreate(final DestinationCoreConfig destinationConfig)
throws ConfigNotFoundException, IOException, JsonValidationException {
final StandardDestinationDefinition destDef = configRepository.getStandardDestinationDefinition(destinationConfig.getDestinationDefinitionId());
final String imageName = DockerUtils.getTaggedImageName(destDef.getDockerRepository(), destDef.getDockerImageTag());

final var partialConfig = configRepository.statefulSplitEphemeralSecrets(
destinationConfig.getConnectionConfiguration(),
specFetcher.execute(imageName));
specFetcher.getSpec(destDef));

// todo (cgardens) - narrow the struct passed to the client. we are not setting fields that are
// technically declared as required.
final DestinationConnection destination = new DestinationConnection()
.withDestinationDefinitionId(destinationConfig.getDestinationDefinitionId())
.withConfiguration(partialConfig);

final String imageName = DockerUtils.getTaggedImageName(destDef.getDockerRepository(), destDef.getDockerImageTag());
return reportConnectionStatus(synchronousSchedulerClient.createDestinationCheckConnectionJob(destination, imageName));
}

Expand Down Expand Up @@ -244,8 +242,7 @@ public SourceDefinitionSpecificationRead getSourceDefinitionSpecification(final
throws ConfigNotFoundException, IOException, JsonValidationException {
final UUID sourceDefinitionId = sourceDefinitionIdRequestBody.getSourceDefinitionId();
final StandardSourceDefinition source = configRepository.getStandardSourceDefinition(sourceDefinitionId);
final String imageName = DockerUtils.getTaggedImageName(source.getDockerRepository(), source.getDockerImageTag());
final SynchronousResponse<ConnectorSpecification> response = getConnectorSpecification(imageName);
final SynchronousResponse<ConnectorSpecification> response = specFetcher.getSpecJobResponse(source);
final ConnectorSpecification spec = response.getOutput();
final SourceDefinitionSpecificationRead specRead = new SourceDefinitionSpecificationRead()
.jobInfo(JobConverter.getSynchronousJobRead(response))
Expand All @@ -265,8 +262,7 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification(final
throws ConfigNotFoundException, IOException, JsonValidationException {
final UUID destinationDefinitionId = destinationDefinitionIdRequestBody.getDestinationDefinitionId();
final StandardDestinationDefinition destination = configRepository.getStandardDestinationDefinition(destinationDefinitionId);
final String imageName = DockerUtils.getTaggedImageName(destination.getDockerRepository(), destination.getDockerImageTag());
final SynchronousResponse<ConnectorSpecification> response = getConnectorSpecification(imageName);
final SynchronousResponse<ConnectorSpecification> response = specFetcher.getSpecJobResponse(destination);
final ConnectorSpecification spec = response.getOutput();

final DestinationDefinitionSpecificationRead specRead = new DestinationDefinitionSpecificationRead()
Expand All @@ -286,10 +282,6 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification(final
return specRead;
}

public SynchronousResponse<ConnectorSpecification> getConnectorSpecification(final String dockerImage) throws IOException {
return synchronousSchedulerClient.createGetSpecJob(dockerImage);
}

public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
final UUID connectionId = connectionIdRequestBody.getConnectionId();
Expand Down Expand Up @@ -378,9 +370,9 @@ public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOE
}

private void cancelTemporalWorkflowIfPresent(final long jobId) throws IOException {
final var latestAttemptId = jobPersistence.getJob(jobId).getAttempts().size() - 1; // attempts ids are monotonically increasing starting from 0
// and
// specific to a job id, allowing us to do this.
// attempts ids are monotonically increasing starting from 0 and specific to a job id, allowing us
// to do this.
final var latestAttemptId = jobPersistence.getJob(jobId).getAttempts().size() - 1;
final var workflowId = jobPersistence.getAttemptTemporalWorkflowId(jobId, latestAttemptId);

if (workflowId.isPresent()) {
Expand Down Expand Up @@ -416,15 +408,13 @@ private CheckConnectionRead reportConnectionStatus(final SynchronousResponse<Sta
private ConnectorSpecification getSpecFromSourceDefinitionId(final UUID sourceDefId)
throws IOException, JsonValidationException, ConfigNotFoundException {
final StandardSourceDefinition sourceDef = configRepository.getStandardSourceDefinition(sourceDefId);
final String imageName = DockerUtils.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());
return specFetcher.execute(imageName);
return specFetcher.getSpec(sourceDef);
}

private ConnectorSpecification getSpecFromDestinationDefinitionId(final UUID destDefId)
throws IOException, JsonValidationException, ConfigNotFoundException {
final StandardDestinationDefinition destinationDef = configRepository.getStandardDestinationDefinition(destDefId);
final String imageName = DockerUtils.getTaggedImageName(destinationDef.getDockerRepository(), destinationDef.getDockerImageTag());
return specFetcher.execute(imageName);
return specFetcher.getSpec(destinationDef);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import io.airbyte.api.model.SourceSearch;
import io.airbyte.api.model.SourceUpdate;
import io.airbyte.api.model.WorkspaceIdRequestBody;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSourceDefinition;
Expand Down Expand Up @@ -206,9 +205,7 @@ private SourceRead buildSourceRead(final UUID sourceId)
// read configuration from db
final StandardSourceDefinition sourceDef = configRepository
.getSourceDefinitionFromSource(sourceId);
final String imageName = DockerUtils
.getTaggedImageName(sourceDef.getDockerRepository(), sourceDef.getDockerImageTag());
final ConnectorSpecification spec = specFetcher.execute(imageName);
final ConnectorSpecification spec = specFetcher.getSpec(sourceDef);
return buildSourceRead(sourceId, spec);
}

Expand Down Expand Up @@ -243,9 +240,7 @@ private ConnectorSpecification getSpecFromSourceDefinitionId(final UUID sourceDe

public static ConnectorSpecification getSpecFromSourceDefinitionId(final SpecFetcher specFetcher, final StandardSourceDefinition sourceDefinition)
throws IOException, ConfigNotFoundException {
final String imageName = DockerUtils
.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag());
return specFetcher.execute(imageName);
return specFetcher.getSpec(sourceDefinition);
}

private void persistSourceConnection(final String name,
Expand Down
Loading