Skip to content

Commit

Permalink
Merge branch 'master' into asanaPortMem
Browse files Browse the repository at this point in the history
  • Loading branch information
sajarin authored Oct 25, 2023
2 parents 24327e7 + fb2286e commit 2e96706
Show file tree
Hide file tree
Showing 172 changed files with 2,500 additions and 2,268 deletions.
7 changes: 5 additions & 2 deletions airbyte-cdk/java/airbyte-cdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ subprojects { subproject ->
artifact subproject.tasks.testFixturesJar
}
}
// This repository is only defined and used in the context of an artifact publishing
// It's different from the 'airbyte-public-jars' defined in settings.graddle only in its omission
// of the 'public' directory. Any artifacts publish here will be available in the 'airbyte-public-jars' repo
repositories {
maven {
name 'airbyte-public-jars'
url 'https://airbyte.mycloudrepo.io/repositories/airbyte-public-jars/'
name 'airbyte-repo'
url 'https://airbyte.mycloudrepo.io/repositories/airbyte-public-jars/'
credentials {
username System.getenv('CLOUDREPO_USER')
password System.getenv('CLOUDREPO_PASSWORD')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.function.Predicate;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -30,25 +29,23 @@ public class ConnectorExceptionUtil {
public static final String COMMON_EXCEPTION_MESSAGE_TEMPLATE = "Could not connect with provided configuration. Error: %s";
static final String RECOVERY_CONNECTION_ERROR_MESSAGE =
"We're having issues syncing from a Postgres replica that is configured as a hot standby server. " +
"Please see https://docs.airbyte.com/integrations/sources/postgres/#sync-data-from-postgres-hot-standby-server for options and workarounds";
"Please see https://go.airbyte.com/pg-hot-standby-error-message for options and workarounds";

public static final List<Integer> HTTP_AUTHENTICATION_ERROR_CODES = ImmutableList.of(401, 403);
private static final List<Predicate<Throwable>> configErrorPredicates =
List.of(getConfigErrorPredicate(), getConnectionErrorPredicate(),
isRecoveryConnectionExceptionPredicate(), isUnknownColumnInFieldListException());

public static boolean isConfigError(final Throwable e) {
return configErrorPredicates.stream().anyMatch(predicate -> predicate.test(e));
return isConfigErrorException(e) || isConnectionError(e) ||
isRecoveryConnectionException(e) || isUnknownColumnInFieldListException(e);
}

public static String getDisplayMessage(final Throwable e) {
if (e instanceof ConfigErrorException) {
return ((ConfigErrorException) e).getDisplayMessage();
} else if (e instanceof final ConnectionErrorException connEx) {
return ErrorMessage.getErrorMessage(connEx.getStateCode(), connEx.getErrorCode(), connEx.getExceptionMessage(), connEx);
} else if (isRecoveryConnectionExceptionPredicate().test(e)) {
} else if (isRecoveryConnectionException(e)) {
return RECOVERY_CONNECTION_ERROR_MESSAGE;
} else if (isUnknownColumnInFieldListException().test(e)) {
} else if (isUnknownColumnInFieldListException(e)) {
return e.getMessage();
} else {
return String.format(COMMON_EXCEPTION_MESSAGE_TEMPLATE, e.getMessage() != null ? e.getMessage() : "");
Expand Down Expand Up @@ -88,22 +85,22 @@ public static <T extends Throwable> void logAllAndThrowFirst(final String initia
}
}

private static Predicate<Throwable> getConfigErrorPredicate() {
return e -> e instanceof ConfigErrorException;
private static boolean isConfigErrorException(Throwable e) {
return e instanceof ConfigErrorException;
}

private static Predicate<Throwable> getConnectionErrorPredicate() {
return e -> e instanceof ConnectionErrorException;
private static boolean isConnectionError(Throwable e) {
return e instanceof ConnectionErrorException;
}

private static Predicate<Throwable> isRecoveryConnectionExceptionPredicate() {
return e -> e instanceof SQLException && e.getMessage()
private static boolean isRecoveryConnectionException(Throwable e) {
return e instanceof SQLException && e.getMessage()
.toLowerCase(Locale.ROOT)
.contains("due to conflict with recovery");
}

private static Predicate<Throwable> isUnknownColumnInFieldListException() {
return e -> e instanceof SQLSyntaxErrorException
private static boolean isUnknownColumnInFieldListException(Throwable e) {
return e instanceof SQLSyntaxErrorException
&& e.getMessage()
.toLowerCase(Locale.ROOT)
.contains("unknown column")
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.1.12
version=0.1.13
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.52.0
current_version = 0.52.2
commit = False

[bumpversion:file:setup.py]
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 0.52.2


## 0.52.1
Add max time for backoff handler

## 0.52.0
File CDK: Add CustomFileBasedException for custom errors

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.52.0
RUN pip install --prefix=/install airbyte-cdk==0.52.2

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.52.0
LABEL io.airbyte.version=0.52.2
LABEL io.airbyte.name=airbyte/source-declarative-manifest
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,15 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

from .config import CohereEmbeddingConfigModel, FakeEmbeddingConfigModel, OpenAIEmbeddingConfigModel, FromFieldEmbeddingConfigModel, AzureOpenAIEmbeddingConfigModel, OpenAICompatibleEmbeddingConfigModel, ProcessingConfigModel
from .config import (
AzureOpenAIEmbeddingConfigModel,
CohereEmbeddingConfigModel,
FakeEmbeddingConfigModel,
FromFieldEmbeddingConfigModel,
OpenAICompatibleEmbeddingConfigModel,
OpenAIEmbeddingConfigModel,
ProcessingConfigModel,
)
from .document_processor import Chunk, DocumentProcessor
from .embedder import CohereEmbedder, Embedder, FakeEmbedder, OpenAIEmbedder
from .indexer import Indexer
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@
# of airbyte-cdk rather than a standalone package.
from .airbyte_protocol import (
AdvancedAuth,
AirbyteAnalyticsTraceMessage,
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteControlConnectorConfigMessage,
AirbyteControlMessage,
AirbyteAnalyticsTraceMessage,
AirbyteErrorTraceMessage,
AirbyteEstimateTraceMessage,
AirbyteGlobalState,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,14 @@ def max_retries(self) -> Union[int, None]:
"""
pass

@property
@abstractmethod
def max_time(self) -> Union[int, None]:
"""
Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
"""
pass

@abstractmethod
def interpret_response(self, response: requests.Response) -> ResponseStatus:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class HttpRequester(Requester):

_DEFAULT_MAX_RETRY = 5
_DEFAULT_RETRY_FACTOR = 5
_DELAULT_MAX_TIME = 60 * 10

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._url_base = InterpolatedString.create(self.url_base, parameters=parameters)
Expand Down Expand Up @@ -170,6 +171,15 @@ def max_retries(self) -> Union[int, None]:
return self._DEFAULT_MAX_RETRY
return self.error_handler.max_retries

@property
def max_time(self) -> Union[int, None]:
"""
Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
"""
if self.error_handler is None:
return self._DELAULT_MAX_TIME
return self.error_handler.max_time

@property
def logger(self) -> logging.Logger:
return logging.getLogger(f"airbyte.HttpRequester.{self.name}")
Expand Down Expand Up @@ -427,11 +437,19 @@ def _send_with_retry(
Add this condition to avoid an endless loop if it hasn't been set
explicitly (i.e. max_retries is not None).
"""
max_time = self.max_time
"""
According to backoff max_time docstring:
max_time: The maximum total amount of time to try for before
giving up. Once expired, the exception will be allowed to
escape. If a callable is passed, it will be
evaluated at runtime and its return value used.
"""
if max_tries is not None:
max_tries = max(0, max_tries) + 1

user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send) # type: ignore # we don't pass in kwargs to the backoff handler
backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self._DEFAULT_RETRY_FACTOR)
user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(self._send) # type: ignore # we don't pass in kwargs to the backoff handler
backoff_handler = default_backoff_handler(max_tries=max_tries, max_time=max_time, factor=self._DEFAULT_RETRY_FACTOR)
# backoff handlers wrap _send, so it will always return a response
return backoff_handler(user_backoff_handler)(request, log_formatter=log_formatter) # type: ignore

Expand Down
19 changes: 17 additions & 2 deletions airbyte-cdk/python/airbyte_cdk/sources/streams/http/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,13 @@ def max_retries(self) -> Union[int, None]:
"""
return 5

@property
def max_time(self) -> Union[int, None]:
"""
Override if needed. Specifies maximum total waiting time (in seconds) for backoff policy. Return None for no limit.
"""
return 60 * 10

@property
def retry_factor(self) -> float:
"""
Expand Down Expand Up @@ -380,11 +387,19 @@ def _send_request(self, request: requests.PreparedRequest, request_kwargs: Mappi
Add this condition to avoid an endless loop if it hasn't been set
explicitly (i.e. max_retries is not None).
"""
max_time = self.max_time
"""
According to backoff max_time docstring:
max_time: The maximum total amount of time to try for before
giving up. Once expired, the exception will be allowed to
escape. If a callable is passed, it will be
evaluated at runtime and its return value used.
"""
if max_tries is not None:
max_tries = max(0, max_tries) + 1

user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries)(self._send)
backoff_handler = default_backoff_handler(max_tries=max_tries, factor=self.retry_factor)
user_backoff_handler = user_defined_backoff_handler(max_tries=max_tries, max_time=max_time)(self._send)
backoff_handler = default_backoff_handler(max_tries=max_tries, max_time=max_time, factor=self.retry_factor)
return backoff_handler(user_backoff_handler)(request, request_kwargs)

@classmethod
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@


def default_backoff_handler(
max_tries: Optional[int], factor: float, **kwargs: Any
max_tries: Optional[int], factor: float, max_time: Optional[int], **kwargs: Any
) -> Callable[[SendRequestCallableType], SendRequestCallableType]:
def log_retry_attempt(details: Mapping[str, Any]) -> None:
_, exc, _ = sys.exc_info()
Expand Down Expand Up @@ -56,12 +56,15 @@ def should_give_up(exc: Exception) -> bool:
on_backoff=log_retry_attempt,
giveup=should_give_up,
max_tries=max_tries,
max_time=max_time,
factor=factor,
**kwargs,
)


def user_defined_backoff_handler(max_tries: Optional[int], **kwargs: Any) -> Callable[[SendRequestCallableType], SendRequestCallableType]:
def user_defined_backoff_handler(
max_tries: Optional[int], max_time: Optional[int], **kwargs: Any
) -> Callable[[SendRequestCallableType], SendRequestCallableType]:
def sleep_on_ratelimit(details: Mapping[str, Any]) -> None:
_, exc, _ = sys.exc_info()
if isinstance(exc, UserDefinedBackoffException):
Expand All @@ -86,5 +89,6 @@ def log_give_up(details: Mapping[str, Any]) -> None:
on_giveup=log_give_up,
jitter=None,
max_tries=max_tries,
max_time=max_time,
**kwargs,
)
2 changes: 1 addition & 1 deletion airbyte-cdk/python/airbyte_cdk/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

from .is_cloud_environment import is_cloud_environment
from .schema_inferrer import SchemaInferrer
from .traced_exception import AirbyteTracedException
from .is_cloud_environment import is_cloud_environment

__all__ = ["AirbyteTracedException", "SchemaInferrer", "is_cloud_environment"]
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
name="airbyte-cdk",
# The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be
# updated if our semver format changes such as using release candidate versions.
version="0.52.0",
version="0.52.2",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
1 change: 1 addition & 0 deletions airbyte-ci/connectors/pipelines/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,7 @@ This command runs the Python tests for a airbyte-ci poetry package.
## Changelog
| Version | PR | Description |
| ------- | ---------------------------------------------------------- | --------------------------------------------------------------------------------------------------------- |
| 2.5.1 | [#31774](https://github.com/airbytehq/airbyte/pull/31774) | Add a docker configuration check on `airbyte-ci` startup. |
| 2.5.0 | [#31766](https://github.com/airbytehq/airbyte/pull/31766) | Support local connectors secrets. |
| 2.4.0 | [#31716](https://github.com/airbytehq/airbyte/pull/31716) | Enable pre-release publish with local CDK.
| 2.3.1 | [#31748](https://github.com/airbytehq/airbyte/pull/31748) | Use AsyncClick library instead of base Click. |
Expand Down
20 changes: 20 additions & 0 deletions airbyte-ci/connectors/pipelines/pipelines/cli/airbyte_ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@

import importlib
import logging
import multiprocessing
import os
from pathlib import Path
from typing import List

import asyncclick as click
import docker
import git
from github import PullRequest
from pipelines import main_logger
Expand Down Expand Up @@ -136,6 +138,20 @@ async def get_modified_files(
return await get_modified_files_in_branch(git_branch, git_revision, diffed_branch, is_local)


def check_local_docker_configuration():
try:
docker_client = docker.from_env()
except Exception as e:
raise click.UsageError(f"Could not connect to docker daemon: {e}")
daemon_info = docker_client.info()
docker_cpus_count = daemon_info["NCPU"]
local_cpus_count = multiprocessing.cpu_count()
if docker_cpus_count < local_cpus_count:
logging.warning(
f"Your docker daemon is configured with less CPUs than your local machine ({docker_cpus_count} vs. {local_cpus_count}). This may slow down the airbyte-ci execution. Please consider increasing the number of CPUs allocated to your docker daemon in the Resource Allocation settings of Docker."
)


# COMMANDS


Expand Down Expand Up @@ -198,6 +214,10 @@ async def airbyte_ci(
show_dagger_logs: bool,
): # noqa D103
ctx.ensure_object(dict)
if is_local:
# This check is meaningful only when running locally
# In our CI the docker host used by the Dagger Engine is different from the one used by the runner.
check_local_docker_configuration()
check_up_to_date()
ctx.obj["is_local"] = is_local
ctx.obj["is_ci"] = not is_local
Expand Down
2 changes: 1 addition & 1 deletion airbyte-ci/connectors/pipelines/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api"

[tool.poetry]
name = "pipelines"
version = "2.5.0"
version = "2.5.1"
description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines"
authors = ["Airbyte <[email protected]>"]

Expand Down
Loading

0 comments on commit 2e96706

Please sign in to comment.