Skip to content

Commit

Permalink
Merge branch 'master' into patch-2
Browse files Browse the repository at this point in the history
  • Loading branch information
LeoTheGriff authored Nov 10, 2023
2 parents 8f7b573 + 68706d7 commit cd4e985
Show file tree
Hide file tree
Showing 79 changed files with 1,454 additions and 364 deletions.
32 changes: 32 additions & 0 deletions .github/workflows/airbyte-ci-release-experiment.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Connector Ops CI - Experimental Airbyte CI Release

# Note this is a workflow simply to test out if github can build using macos

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

on:
workflow_dispatch:
jobs:
build:
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
os: ["ubuntu-latest", "macos-latest"]

steps:
- uses: actions/checkout@v2
- uses: actions/setup-python@v2
with:
python-version: 3.10

- run: curl -sSL https://raw.githubusercontent.com/python-poetry/poetry/master/get-poetry.py | python -

- run: cd airbyte-ci/pipelines/airbyte_ci
- run: poetry install --with dev
- run: poetry run pyinstaller --collect-all pipelines --collect-all beartype --collect-all dagger --hidden-import strawberry --name airbyte-ci-${{ matrix.os }} --onefile pipelines/cli/airbyte_ci.py
- uses: actions/upload-artifact@v2
with:
path: dist/*
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.7 | 2023-11-08 | [\#31856](https://github.com/airbytehq/airbyte/pull/31856) | source-postgres: support for inifinty date and timestamps |
| 0.4.5 | 2023-11-07 | [\#32112](https://github.com/airbytehq/airbyte/pull/32112) | Async destinations framework: Allow configuring the queue flush threshold |
| 0.4.4 | 2023-11-06 | [\#32119](https://github.com/airbytehq/airbyte/pull/32119) | Add STANDARD UUID codec to MongoDB debezium handler |
| 0.4.2 | 2023-11-06 | [\#32190](https://github.com/airbytehq/airbyte/pull/32190) | Improve error deinterpolation |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,39 @@
},
"Date": {
"type": "string",
"pattern": "^\\d{4}-\\d{2}-\\d{2}( BC)?$",
"description": "RFC 3339\u00a75.6's full-date format, extended with BC era support"
"oneOf": [
{
"pattern": "^\\d{4}-\\d{2}-\\d{2}( BC)?$"
},
{
"enum": ["Infinity", "-Infinity"]
}
],
"description": "RFC 3339\u00a75.6's full-date format, extended with BC era support and (-)Infinity"
},
"TimestampWithTimezone": {
"type": "string",
"pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?(Z|[+\\-]\\d{1,2}:\\d{2})( BC)?$",
"description": "An instant in time. Frequently simply referred to as just a timestamp, or timestamptz. Uses RFC 3339\u00a75.6's date-time format, requiring a \"T\" separator, and extended with BC era support. Note that we do _not_ accept Unix epochs here.\n"
"oneOf": [
{
"pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?(Z|[+\\-]\\d{1,2}:\\d{2})( BC)?$"
},
{
"enum": ["Infinity", "-Infinity"]
}
],
"description": "An instant in time. Frequently simply referred to as just a timestamp, or timestamptz. Uses RFC 3339\u00a75.6's date-time format, requiring a \"T\" separator, and extended with BC era support and (-)Infinity. Note that we do _not_ accept Unix epochs here.\n"
},
"TimestampWithoutTimezone": {
"type": "string",
"pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?( BC)?$",
"description": "Also known as a localdatetime, or just datetime. Under RFC 3339\u00a75.6, this would be represented as `full-date \"T\" partial-time`, extended with BC era support.\n"
"oneOf": [
{
"pattern": "^\\d{4}-\\d{2}-\\d{2}T\\d{2}:\\d{2}:\\d{2}(\\.\\d+)?( BC)?$"
},
{
"enum": ["Infinity", "-Infinity"]
}
],
"description": "Also known as a localdatetime, or just datetime. Under RFC 3339\u00a75.6, this would be represented as `full-date \"T\" partial-time`, extended with BC era support and (-)Infinity.\n"
},
"TimeWithTimezone": {
"type": "string",
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.4.6
version=0.4.8
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import io.airbyte.cdk.db.jdbc.DateTimeConverter;
import io.airbyte.cdk.integrations.debezium.internals.DebeziumConverterUtils;
import io.debezium.connector.postgresql.PostgresValueConverter;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import io.debezium.time.Conversions;
Expand Down Expand Up @@ -239,11 +240,13 @@ private int getTimePrecision(final RelationalColumn field) {
return field.scale().orElse(-1);
}

private final String POSITIVE_INFINITY_VALUE = "Infinity";
private final String NEGATIVE_INFINITY_VALUE = "-Infinity";

// Ref :
// https://debezium.io/documentation/reference/2.2/connectors/postgresql.html#postgresql-temporal-types
private void registerDate(final RelationalColumn field, final ConverterRegistration<SchemaBuilder> registration) {
final var fieldType = field.typeName();

registration.register(SchemaBuilder.string().optional(), x -> {
if (x == null) {
return DebeziumConverterUtils.convertDefaultValue(field);
Expand All @@ -252,8 +255,20 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat
case "TIMETZ":
return DateTimeConverter.convertToTimeWithTimezone(x);
case "TIMESTAMPTZ":
if (x.equals(PostgresValueConverter.NEGATIVE_INFINITY_OFFSET_DATE_TIME)) {
return NEGATIVE_INFINITY_VALUE;
}
if (x.equals(PostgresValueConverter.POSITIVE_INFINITY_OFFSET_DATE_TIME)) {
return POSITIVE_INFINITY_VALUE;
}
return DateTimeConverter.convertToTimestampWithTimezone(x);
case "TIMESTAMP":
if (x.equals(PostgresValueConverter.NEGATIVE_INFINITY_INSTANT)) {
return NEGATIVE_INFINITY_VALUE;
}
if (x.equals(PostgresValueConverter.POSITIVE_INFINITY_INSTANT)) {
return POSITIVE_INFINITY_VALUE;
}
if (x instanceof final Long l) {
if (getTimePrecision(field) <= 3) {
return convertToTimestamp(Conversions.toInstantFromMillis(l));
Expand All @@ -264,6 +279,12 @@ private void registerDate(final RelationalColumn field, final ConverterRegistrat
}
return convertToTimestamp(x);
case "DATE":
if (x.equals(PostgresValueConverter.NEGATIVE_INFINITY_LOCAL_DATE)) {
return NEGATIVE_INFINITY_VALUE;
}
if (x.equals(PostgresValueConverter.POSITIVE_INFINITY_LOCAL_DATE)) {
return POSITIVE_INFINITY_VALUE;
}
if (x instanceof Integer) {
return convertToDate(LocalDate.ofEpochDay((Integer) x));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ public static StateManager createStateManager(final AirbyteStateType supportedSt
switch (supportedStateType) {
case LEGACY:
LOGGER.info("Legacy state manager selected to manage state object with type {}.", airbyteStateMessage.getType());
return new LegacyStateManager(Jsons.object(airbyteStateMessage.getData(), DbState.class), catalog);
@SuppressWarnings("deprecation")
StateManager retVal = new LegacyStateManager(Jsons.object(airbyteStateMessage.getData(), DbState.class), catalog);
return retVal;
case GLOBAL:
LOGGER.info("Global state manager selected to manage state object with type {}.", airbyteStateMessage.getType());
return new GlobalStateManager(generateGlobalState(airbyteStateMessage), catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,16 @@
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.util.HostPortResolver;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.string.Strings;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.slf4j.Logger;
Expand All @@ -42,6 +38,8 @@
*/
public class PostgresTestDatabase implements AutoCloseable {

static private final Logger LOGGER = LoggerFactory.getLogger(PostgresTestDatabase.class);

/**
* Create a new {@link PostgresTestDatabase} instance.
*
Expand All @@ -61,32 +59,15 @@ static public PostgresTestDatabase make(String imageName, String... methods) {

private PostgresTestDatabase(PostgreSQLContainer<?> sharedContainer) {
this.container = sharedContainer;

this.suffix = Strings.addRandomSuffix("", "_", 10);
try {
this.tmpDir = Files.createTempDirectory("dir" + suffix);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
final var dir = this.tmpDir.toFile();
Runtime.getRuntime().addShutdownHook(new Thread(() -> FileUtils.deleteQuietly(dir)));

this.dbName = "db" + suffix;
this.userName = "test_user" + suffix;
this.password = "test_password" + suffix;

final Path script = this.tmpDir.resolve("create" + suffix + ".sql");
IOs.writeFile(script, String.format("""
CREATE DATABASE %s;
CREATE USER %s PASSWORD '%s';
GRANT ALL PRIVILEGES ON DATABASE %s TO %s;
ALTER USER %s WITH SUPERUSER;
""",
dbName,
userName, password,
dbName, userName,
userName));
PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(script), sharedContainer);
execSQL(
String.format("CREATE DATABASE %s", dbName),
String.format("CREATE USER %s PASSWORD '%s'", userName, password),
String.format("GRANT ALL PRIVILEGES ON DATABASE %s TO %s", dbName, userName),
String.format("ALTER USER %s WITH SUPERUSER", userName));

this.jdbcUrl = String.format(
DatabaseDriver.POSTGRESQL.getUrlFormatString(),
Expand All @@ -103,13 +84,10 @@ private PostgresTestDatabase(PostgreSQLContainer<?> sharedContainer) {
}

public final PostgreSQLContainer<?> container;
public final String dbName, userName, password, jdbcUrl;
public final String suffix, dbName, userName, password, jdbcUrl;
public final DSLContext dslContext;
public final Database database;

private final Path tmpDir;
private final String suffix;

/**
* Convenience method for building identifiers which are unique to this instance.
*/
Expand Down Expand Up @@ -147,17 +125,38 @@ public PostgresUtils.Certificate getCertificate() {
return new PostgresUtils.Certificate(caCert, clientCert, clientKey);
}

private void execSQL(String... stmts) {
final List<String> cmd = Stream.concat(
Stream.of("psql", "-a", "-d", container.getDatabaseName(), "-U", container.getUsername()),
Stream.of(stmts).flatMap(stmt -> Stream.of("-c", stmt)))
.toList();
try {
LOGGER.debug("executing {}", Strings.join(cmd, " "));
final var exec = container.execInContainer(cmd.toArray(new String[0]));
LOGGER.debug("exit code: {}\nstdout:\n{}\nstderr:\n{}", exec.getExitCode(), exec.getStdout(), exec.getStderr());
} catch (IOException e) {
throw new UncheckedIOException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

/**
* Drop the database owned by this instance.
*/
public void dropDatabase() {
execSQL(String.format("DROP DATABASE %s", dbName));
}

/**
* Close resources held by this instance. This deliberately avoids dropping the database, which is
* really expensive in Postgres. This is because a DROP DATABASE in Postgres triggers a CHECKPOINT.
* Call {@link #dropDatabase} to explicitly drop the database.
*/
@Override
public void close() {
dslContext.close();
final Path script = this.tmpDir.resolve("drop" + suffix + ".sql");
IOs.writeFile(script, String.format("""
DROP USER %s;
DROP DATABASE %s;
""",
userName,
dbName));
PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(script), container);
execSQL(String.format("DROP USER %s", userName));
}

static private class ContainerFactory {
Expand All @@ -168,6 +167,7 @@ static private class ContainerFactory {
final private String imageName;
final private List<Method> methods;
private PostgreSQLContainer<?> sharedContainer;
private RuntimeException containerCreationError;

private ContainerFactory(String imageNamePlusMethods) {
final String[] parts = imageNamePlusMethods.split("\\+");
Expand All @@ -183,18 +183,30 @@ private ContainerFactory(String imageNamePlusMethods) {

private synchronized PostgreSQLContainer<?> getOrCreateSharedContainer() {
if (sharedContainer == null) {
if (containerCreationError != null) {
throw new RuntimeException(
"Error during container creation for imageName=" + imageName + ", methods=" + methods.stream().map(Method::getName).toList(),
containerCreationError);
}
LOGGER.info("Creating new shared container based on {} with {}.", imageName, methods.stream().map(Method::getName).toList());
final var parsed = DockerImageName.parse(imageName).asCompatibleSubstituteFor("postgres");
sharedContainer = new PostgreSQLContainer<>(parsed);
for (Method method : methods) {
LOGGER.info("Calling {} on new shared container based on {}.", method.getName(), imageName);
try {
try {
final var parsed = DockerImageName.parse(imageName).asCompatibleSubstituteFor("postgres");
sharedContainer = new PostgreSQLContainer<>(parsed);
for (Method method : methods) {
LOGGER.info("Calling {} on new shared container based on {}.", method.getName(),
imageName);
method.invoke(this);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
sharedContainer.start();
} catch (IllegalAccessException | InvocationTargetException e) {
containerCreationError = new RuntimeException(e);
this.sharedContainer = null;
throw containerCreationError;
} catch (RuntimeException e) {
this.sharedContainer = null;
containerCreationError = e;
throw e;
}
sharedContainer.start();
}
return sharedContainer;
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.53.2
current_version = 0.53.4
commit = False

[bumpversion:file:setup.py]
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 0.53.4
Concurrent CDK: fix futures pruning

## 0.53.3
Fix spec schema generation for File CDK and Vector DB CDK and allow skipping invalid files in document file parser

## 0.53.2
Concurrent CDK: Increase connection pool size to allow for 20 max workers

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.53.2
RUN pip install --prefix=/install airbyte-cdk==0.53.4

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.53.2
LABEL io.airbyte.version=0.53.4
LABEL io.airbyte.name=airbyte/source-declarative-manifest
Loading

0 comments on commit cd4e985

Please sign in to comment.