Skip to content

Commit

Permalink
Merge branch 'master' into asanaPortMem
Browse files Browse the repository at this point in the history
  • Loading branch information
btkcodedev committed Oct 25, 2023
2 parents d056a61 + 7e4bf43 commit 24327e7
Show file tree
Hide file tree
Showing 501 changed files with 8,194 additions and 6,173 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.50.32
current_version = 0.50.33
commit = False
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(\-[a-z]+)?
Expand Down
2 changes: 1 addition & 1 deletion .devcontainer/destination-duckdb/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"name": "DuckDB Destination Connector DevContainer (Python)",

// Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile
"image": "mcr.microsoft.com/devcontainers/python:0-3.9",
"image": "mcr.microsoft.com/devcontainers/python:0-3.10",

// Features to add to the dev container. More info: https://containers.dev/features.
"features": {
Expand Down
1 change: 1 addition & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.1.12 | 2023-10-24 | [\#31674](https://github.com/airbytehq/airbyte/pull/31674) | Fail sync when Debezium does not shut down properly. |
| 0.1.11 | 2023-10-18 | [\#31486](https://github.com/airbytehq/airbyte/pull/31486) | Update constants in AdaptiveSourceRunner. |
| 0.1.9 | 2023-10-12 | [\#31309](https://github.com/airbytehq/airbyte/pull/31309) | Use toPlainString() when handling BigDecimals in PostgresConverter |
| 0.1.8 | 2023-10-11 | [\#31322](https://github.com/airbytehq/airbyte/pull/31322) | Cap log line length to 32KB to prevent loss of records |
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/java/airbyte-cdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ subprojects { subproject ->
repositories {
maven {
name 'airbyte-public-jars'
url 'https://airbyte.mycloudrepo.io/public/repositories/airbyte-public-jars/'
url 'https://airbyte.mycloudrepo.io/repositories/airbyte-public-jars/'
credentials {
username System.getenv('CLOUDREPO_USER')
password System.getenv('CLOUDREPO_PASSWORD')
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.1.11
version=0.1.12
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,22 @@ public void start(final BlockingQueue<ChangeEvent<String, String>> queue) {
if (e.value() != null) {
try {
queue.put(e);
} catch (InterruptedException ex) {
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
}
})
.using((success, message, error) -> {
LOGGER.info("Debezium engine shutdown.");
LOGGER.info("Debezium engine shutdown. Engine terminated successfully : {}", success);
LOGGER.info(message);
thrownError.set(error);
// If debezium has not shutdown correctly, it can indicate an error with the connector configuration
// or a partial sync success.
// In situations like these, the preference is to fail loud and clear.
if (thrownError.get() != null && !success) {
thrownError.set(new RuntimeException(message));
}
engineLatch.countDown();
})
.build();
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/.bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 0.51.43
current_version = 0.52.0
commit = False

[bumpversion:file:setup.py]
Expand Down
6 changes: 6 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## 0.52.0
File CDK: Add CustomFileBasedException for custom errors

## 0.51.44
low-code: Allow connector developers to specify the type of an added field

## 0.51.43
concurrent cdk: fail fast if a partition raises an exception

Expand Down
4 changes: 2 additions & 2 deletions airbyte-cdk/python/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN apk --no-cache upgrade \
&& apk --no-cache add tzdata build-base

# install airbyte-cdk
RUN pip install --prefix=/install airbyte-cdk==0.51.43
RUN pip install --prefix=/install airbyte-cdk==0.52.0

# build a clean environment
FROM base
Expand All @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

# needs to be the same as CDK
LABEL io.airbyte.version=0.51.43
LABEL io.airbyte.version=0.52.0
LABEL io.airbyte.name=airbyte/source-declarative-manifest
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ definitions:
- "{{ record['updates'] }}"
- "{{ record['MetaData']['LastUpdatedTime'] }}"
- "{{ stream_partition['segment_id'] }}"
value_type:
title: Value Type
description: Type of the value. If not specified, the type will be inferred from the value.
"$ref": "#/definitions/ValueType"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -1987,6 +1991,15 @@ definitions:
$parameters:
type: object
additionalProperties: true
ValueType:
title: Value Type
description: A schema type.
type: string
enum:
- string
- number
- integer
- boolean
WaitTimeFromHeader:
title: Wait Time Extracted From Response Header
description: Extract wait time from a HTTP header in the response.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,6 @@
from typing_extensions import Literal


class AddedFieldDefinition(BaseModel):
type: Literal['AddedFieldDefinition']
path: List[str] = Field(
...,
description='List of strings defining the path where to add the value on the record.',
examples=[['segment_id'], ['metadata', 'segment_id']],
title='Path',
)
value: str = Field(
...,
description="Value of the new field. Use {{ record['existing_field'] }} syntax to refer to other fields in the record.",
examples=[
"{{ record['updates'] }}",
"{{ record['MetaData']['LastUpdatedTime'] }}",
"{{ stream_partition['segment_id'] }}",
],
title='Value',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AddFields(BaseModel):
type: Literal['AddFields']
fields: List[AddedFieldDefinition] = Field(
...,
description='List of transformations (path and corresponding value) that will be added to the record.',
title='Fields',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AuthFlowType(Enum):
oauth2_0 = 'oauth2.0'
oauth1_0 = 'oauth1.0'
Expand Down Expand Up @@ -694,6 +663,13 @@ class LegacySessionTokenAuthenticator(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class ValueType(Enum):
string = 'string'
number = 'number'
integer = 'integer'
boolean = 'boolean'


class WaitTimeFromHeader(BaseModel):
type: Literal['WaitTimeFromHeader']
header: str = Field(
Expand Down Expand Up @@ -734,6 +710,42 @@ class WaitUntilTimeFromHeader(BaseModel):
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AddedFieldDefinition(BaseModel):
type: Literal['AddedFieldDefinition']
path: List[str] = Field(
...,
description='List of strings defining the path where to add the value on the record.',
examples=[['segment_id'], ['metadata', 'segment_id']],
title='Path',
)
value: str = Field(
...,
description="Value of the new field. Use {{ record['existing_field'] }} syntax to refer to other fields in the record.",
examples=[
"{{ record['updates'] }}",
"{{ record['MetaData']['LastUpdatedTime'] }}",
"{{ stream_partition['segment_id'] }}",
],
title='Value',
)
value_type: Optional[ValueType] = Field(
None,
description='Type of the value. If not specified, the type will be inferred from the value.',
title='Value Type',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class AddFields(BaseModel):
type: Literal['AddFields']
fields: List[AddedFieldDefinition] = Field(
...,
description='List of transformations (path and corresponding value) that will be added to the record.',
title='Fields',
)
parameters: Optional[Dict[str, Any]] = Field(None, alias='$parameters')


class ApiKeyAuthenticator(BaseModel):
type: Literal['ApiKeyAuthenticator']
api_token: Optional[str] = Field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SimpleRetriever as SimpleRetrieverModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import SubstreamPartitionRouter as SubstreamPartitionRouterModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitTimeFromHeader as WaitTimeFromHeaderModel
from airbyte_cdk.sources.declarative.models.declarative_component_schema import WaitUntilTimeFromHeader as WaitUntilTimeFromHeaderModel
from airbyte_cdk.sources.declarative.partition_routers import ListPartitionRouter, SinglePartitionRouter, SubstreamPartitionRouter
Expand Down Expand Up @@ -232,15 +233,36 @@ def _create_component_from_model(self, model: BaseModel, config: Config, **kwarg
@staticmethod
def create_added_field_definition(model: AddedFieldDefinitionModel, config: Config, **kwargs: Any) -> AddedFieldDefinition:
interpolated_value = InterpolatedString.create(model.value, parameters=model.parameters or {})
return AddedFieldDefinition(path=model.path, value=interpolated_value, parameters=model.parameters or {})
return AddedFieldDefinition(
path=model.path,
value=interpolated_value,
value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type),
parameters=model.parameters or {},
)

def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any) -> AddFields:
added_field_definitions = [
self._create_component_from_model(model=added_field_definition_model, config=config)
self._create_component_from_model(
model=added_field_definition_model,
value_type=ModelToComponentFactory._json_schema_type_name_to_type(added_field_definition_model.value_type),
config=config,
)
for added_field_definition_model in model.fields
]
return AddFields(fields=added_field_definitions, parameters=model.parameters or {})

@staticmethod
def _json_schema_type_name_to_type(value_type: Optional[ValueType]) -> Optional[Type[Any]]:
if not value_type:
return None
names_to_types = {
ValueType.string: str,
ValueType.number: float,
ValueType.integer: int,
ValueType.boolean: bool,
}
return names_to_types[value_type]

@staticmethod
def create_api_key_authenticator(
model: ApiKeyAuthenticatorModel, config: Config, token_provider: Optional[TokenProvider] = None, **kwargs: Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@

from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, List, Mapping, Optional
from typing import Any, List, Optional

import requests
from airbyte_cdk.sources.declarative.types import Record


@dataclass
Expand All @@ -23,7 +24,7 @@ def initial_token(self) -> Optional[Any]:
"""

@abstractmethod
def next_page_token(self, response: requests.Response, last_records: List[Mapping[str, Any]]) -> Optional[Any]:
def next_page_token(self, response: requests.Response, last_records: List[Record]) -> Optional[Any]:
"""
:param response: response to process
:param last_records: records extracted from the response
Expand All @@ -32,7 +33,7 @@ def next_page_token(self, response: requests.Response, last_records: List[Mappin
pass

@abstractmethod
def reset(self):
def reset(self) -> None:
"""
Reset the pagination's inner state
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,12 @@ def next_page_token(self, response: requests.Response, last_records: List[Record
return None
return self._delegate.next_page_token(response, last_records)

def reset(self):
def reset(self) -> None:
self._delegate.reset()

def get_page_size(self) -> Optional[int]:
return self._delegate.get_page_size()

@property
def initial_token(self) -> Optional[Any]:
return self._delegate.initial_token
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, Optional, Union
from typing import Any, List, Mapping, Optional, Type, Union

import dpath.util
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
Expand All @@ -17,6 +17,7 @@ class AddedFieldDefinition:

path: FieldPointer
value: Union[InterpolatedString, str]
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]


Expand All @@ -26,6 +27,7 @@ class ParsedAddFieldDefinition:

path: FieldPointer
value: InterpolatedString
value_type: Optional[Type[Any]]
parameters: InitVar[Mapping[str, Any]]


Expand Down Expand Up @@ -85,22 +87,27 @@ class AddFields(RecordTransformation):
parameters: InitVar[Mapping[str, Any]]
_parsed_fields: List[ParsedAddFieldDefinition] = field(init=False, repr=False, default_factory=list)

def __post_init__(self, parameters: Mapping[str, Any]):
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
for add_field in self.fields:
if len(add_field.path) < 1:
raise f"Expected a non-zero-length path for the AddFields transformation {add_field}"
raise ValueError(f"Expected a non-zero-length path for the AddFields transformation {add_field}")

if not isinstance(add_field.value, InterpolatedString):
if not isinstance(add_field.value, str):
raise f"Expected a string value for the AddFields transformation: {add_field}"
else:
self._parsed_fields.append(
ParsedAddFieldDefinition(
add_field.path, InterpolatedString.create(add_field.value, parameters=parameters), parameters=parameters
add_field.path,
InterpolatedString.create(add_field.value, parameters=parameters),
value_type=add_field.value_type,
parameters=parameters,
)
)
else:
self._parsed_fields.append(ParsedAddFieldDefinition(add_field.path, add_field.value, parameters={}))
self._parsed_fields.append(
ParsedAddFieldDefinition(add_field.path, add_field.value, value_type=add_field.value_type, parameters={})
)

def transform(
self,
Expand All @@ -109,12 +116,15 @@ def transform(
stream_state: Optional[StreamState] = None,
stream_slice: Optional[StreamSlice] = None,
) -> Record:
if config is None:
config = {}
kwargs = {"record": record, "stream_state": stream_state, "stream_slice": stream_slice}
for parsed_field in self._parsed_fields:
value = parsed_field.value.eval(config, **kwargs)
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
dpath.util.new(record, parsed_field.path, value)

return record

def __eq__(self, other):
return self.__dict__ == other.__dict__
def __eq__(self, other: Any) -> bool:
return bool(self.__dict__ == other.__dict__)
Loading

0 comments on commit 24327e7

Please sign in to comment.