Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New Destination: Elasticsearch #7005

Merged
merged 34 commits into from
Nov 2, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
2d8b1b0
feat: adds destination-elasticsearch
jdbranham Oct 10, 2021
f4dd34f
feat: adds destination-elasticsearch es server container
jdbranham Oct 13, 2021
73a2e5d
Merge branch 'airbytehq:master' into master
jdbranham Oct 13, 2021
fc9c838
refactor: header configuration
jdbranham Oct 13, 2021
1be82bc
update: only call createIndex when preparing the writes
jdbranham Oct 13, 2021
f884d01
update: reuse container
jdbranham Oct 13, 2021
ad1eb1f
fix: make index names valid and use namespace
jdbranham Oct 14, 2021
94f4abe
refactor: use bulk process and buffered consumer
jdbranham Oct 15, 2021
8392b40
Merge branch 'airbytehq:master' into master
jdbranham Oct 15, 2021
6b55b95
refactor: fix bulk process and buffered consumer
jdbranham Oct 16, 2021
c5c02b6
Merge remote-tracking branch 'origin/master'
jdbranham Oct 16, 2021
b45a0dc
chore: update documentation
jdbranham Oct 16, 2021
f4b136d
update: remove ssl reference
jdbranham Oct 16, 2021
5ad50e5
fix: bulk indexing
jdbranham Oct 17, 2021
2de6d41
docs: update for authentication
jdbranham Oct 18, 2021
df1c552
refactor: simplify config
jdbranham Oct 18, 2021
6c118e0
refactor: cleanup indices, implement auth
jdbranham Oct 18, 2021
4798a29
update: cleanup equals/toString in Elasticsearch ConnectionConfiguration
jdbranham Oct 18, 2021
edc67f8
chore: use conventions and remove unused code
jdbranham Oct 20, 2021
73b1f56
update: close underlying rest connection
jdbranham Oct 22, 2021
4cd90c2
update: enable `supportsNormalization`
jdbranham Oct 24, 2021
66b36ca
refactor: better encapsulate index naming
jdbranham Oct 24, 2021
72445dc
update: allow upserting
jdbranham Oct 25, 2021
e4336d8
update: use oneOf for auth method
jdbranham Oct 25, 2021
1c2c499
refactor: use encapsulated auth object
jdbranham Oct 25, 2021
38ba360
Merge branch 'master' into master
jdbranham Oct 25, 2021
56a9b78
chore: pretty
jdbranham Oct 25, 2021
4316620
Merge remote-tracking branch 'origin/master'
jdbranham Oct 25, 2021
bae713b
update: simplify auth header creation
jdbranham Oct 25, 2021
e5f2605
chore: remove unused class
jdbranham Oct 25, 2021
1a4d4f4
update: use boolean as field type
jdbranham Oct 25, 2021
0b081a8
adds: elasticsearch example server
jdbranham Oct 25, 2021
6e234cc
fix: api secret test
jdbranham Oct 25, 2021
67529c2
Merge remote-tracking branch 'upstream/master'
jdbranham Nov 2, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,5 @@ resources/examples/airflow/logs/*

# Cloud Demo
!airbyte-webapp/src/packages/cloud/data
/bin/
/**/bin/
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"java.configuration.updateBuildConfiguration": "automatic"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"destinationDefinitionId": "68f351a7-2745-4bef-ad7f-996b8e51bb8c",
"name": "Elasticsearch",
"dockerRepository": "airbyte/destination-elasticsearch",
"dockerImageTag": "0.1.0",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/elasticsearch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,8 @@
dockerRepository: airbyte/destination-mongodb
dockerImageTag: 0.1.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/mongodb
- destinationDefinitionId: 68f351a7-2745-4bef-ad7f-996b8e51bb8c
name: Elasticsearch
dockerRepository: airbyte/destination-elasticsearch
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.io/integrations/destinations/elasticsearch
1 change: 1 addition & 0 deletions airbyte-integrations/builds.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
| Azure Blob Storage | [![destination-azure-blob-storage](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-azure-blob-storage%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-azure-blob-storage) |
| BigQuery | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery) |
| Databricks | (Temporarily Not Available) |
| Elasticsearch | (Temporarily Not Available) |
| Google Cloud Storage (GCS) | [![destination-gcs](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-gcs%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-gcs) |
| Google PubSub | [![destination-pubsub](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-pubsub%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-pubsub) |
| Kafka | [![destination-kafka](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-kafka%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-kafka) |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
FROM airbyte/integration-base-java:dev

WORKDIR /airbyte
ENV APPLICATION destination-elasticsearch

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-elasticsearch
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Destination Elasticsearch

This is the repository for the Elasticsearch destination connector in Java.
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/elasticsearch).

## Local development

#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:destination-elasticsearch:build
```

#### Create credentials
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`.
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.

**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials.

### Locally running the connector docker image

#### Build
Build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:destination-elasticsearch:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.

#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-elasticsearch:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-elasticsearch:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-elasticsearch:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-elasticsearch:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```

## Testing
We use `JUnit` for Java tests.

### Unit and Integration Tests
Place unit tests under `src/test/io/airbyte/integrations/destinations/elasticsearch`.

#### Acceptance Tests
Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in
`src/test-integration/java/io/airbyte/integrations/destinations/elasticsearchDestinationAcceptanceTest.java`.

### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:destination-elasticsearch:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:destination-elasticsearch:integrationTest
```

## Dependency Management

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Elasticsearch Destination

Elasticsearch is a Lucene based search engine that's a type of NoSql storage.
Documents are created in an `index`, similar to a `table`in a relation database.

The documents are structured with fields that may contain nested complex structures.
[Read more about Elastic](https://elasticsearch.org/)

This connector maps an incoming `stream` to an Elastic `index`.
When using destination sync mode `append` and `append_dedup`, an `upsert` operation is performed against the Elasticsearch index.
When using `overwrite`, the records/docs are place in a temp index, then cloned to the target index.
The target index is deleted first, if it exists before the sync.

The [ElasticsearchConnection.java](./src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchConnection.java)
handles the communication with the Elastic server.
This uses the `elasticsearch-java` rest client from the Elasticsearch team -
[https://github.com/elastic/elasticsearch-java/](https://github.com/elastic/elasticsearch-java/)

The [ElasticsearchAirbyteMessageConsumerFactory.java](./src/main/java/io/airbyte/integrations/destination/elasticsearch/ElasticsearchAirbyteMessageConsumerFactory.java)
contains the logic for organizing a batch of records and reporting progress.

The `namespace` and stream `name` are used to generate an index name.
The index is created if it doesn't exist, but no other index configuration is done at this time.

Elastic will determine the type of data by detection.
You can create an index ahead of time for field type customization.

Basic authentication and API key authentication are supported.

## Development
See the Elasticsearch client tests for examples on how to use the library.

[https://github.com/elastic/elasticsearch-java/blob/main/java-client/src/test/java/co/elastic/clients/elasticsearch/end_to_end/RequestTest.java](https://github.com/elastic/elasticsearch-java/blob/main/java-client/src/test/java/co/elastic/clients/elasticsearch/end_to_end/RequestTest.java)
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.elasticsearch.ElasticsearchDestination'
}

dependencies {
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-integrations:bases:base-java')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)

implementation 'co.elastic.clients:elasticsearch-java:7.15.0'
implementation 'com.fasterxml.jackson.core:jackson-databind:2.12.3'
implementation 'org.projectlombok:lombok:1.18.20'

// EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// https://eclipse-ee4j.github.io/jsonp/
implementation 'jakarta.json:jakarta.json-api:2.0.1'

// Needed even if using Jackson to have an implementation of the Jsonp object model
// EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
// https://github.com/eclipse-ee4j/jsonp
implementation 'org.glassfish:jakarta.json:2.0.1'

// MIT
// https://www.testcontainers.org/
//implementation "org.testcontainers:testcontainers:1.16.0"
testImplementation "org.testcontainers:elasticsearch:1.15.3"
integrationTestJavaImplementation "org.testcontainers:elasticsearch:1.15.3"

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-elasticsearch')
}

repositories {
maven {
name = "ESSnapshots"
url = "https://snapshots.elastic.co/maven/"
}
maven {
name = "ESJavaGithubPackages"
url = "https://maven.pkg.github.com/elastic/elasticsearch-java"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.airbyte.integrations.destination.elasticsearch;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Objects;

@JsonIgnoreProperties(ignoreUnknown = true)
public class ConnectorConfiguration {

private String endpoint;
private int port;
private String username;
private String password;
private String apiKeyId;
private String apiKeySecret;

public ConnectorConfiguration() {
}

public static ConnectorConfiguration FromJsonNode(JsonNode config) {
jdbranham marked this conversation as resolved.
Show resolved Hide resolved
return new ObjectMapper().convertValue(config, ConnectorConfiguration.class);
}

public String getEndpoint() {
return this.endpoint;
}

public String getApiKeyId() {
return this.apiKeyId;
}

public String getUsername() {
return this.username;
}

public String getPassword() {
return this.password;
}

public String getApiKeySecret() {
return this.apiKeySecret;
}


public void setEndpoint(String endpoint) {
this.endpoint = endpoint;
}

public void setUsername(String username) {
this.username = username;
}

public void setPassword(String password) {
this.password = password;
}

public void setApiKeyId(String apiKeyId) {
this.apiKeyId = apiKeyId;
}

public void setApiKeySecret(String apiKeySecret) {
this.apiKeySecret = apiKeySecret;
}

public boolean isUsingBasicAuth() {
return Objects.nonNull(this.username) &&
!this.username.isEmpty() &&
Objects.nonNull(this.password) &&
!this.password.isEmpty();
}

public boolean isUsingApiKey() {
return Objects.nonNull(this.apiKeyId) &&
!this.apiKeyId.isEmpty() &&
Objects.nonNull(this.apiKeySecret) &&
!this.apiKeySecret.isEmpty();
}


public boolean equals(final Object o) {
if (o == this) return true;
if (!(o instanceof ConnectorConfiguration)) return false;
final ConnectorConfiguration other = (ConnectorConfiguration) o;
if (!other.canEqual(this)) return false;
final Object this$host = this.getEndpoint();
final Object other$host = other.getEndpoint();
if (!Objects.equals(this$host, other$host)) return false;
final Object this$username = this.getApiKeyId();
alexandr-shegeda marked this conversation as resolved.
Show resolved Hide resolved
final Object other$username = other.getApiKeyId();
if (!Objects.equals(this$username, other$username)) return false;
final Object this$password = this.getApiKeySecret();
alexandr-shegeda marked this conversation as resolved.
Show resolved Hide resolved
final Object other$password = other.getApiKeySecret();
if (!Objects.equals(this$password, other$password)) return false;
return true;
}

protected boolean canEqual(final Object other) {
return other instanceof ConnectorConfiguration;
}

public int hashCode() {
final int PRIME = 59;
int result = 1;
final Object $host = this.getEndpoint();
result = result * PRIME + ($host == null ? 43 : $host.hashCode());
final Object $username = this.getApiKeyId();
result = result * PRIME + ($username == null ? 43 : $username.hashCode());
final Object $password = this.getApiKeySecret();
result = result * PRIME + ($password == null ? 43 : $password.hashCode());
return result;
}

public String toString() {
return "ConnectorConfiguration(endpoint=" + this.getEndpoint() + ", username=" + this.getApiKeyId() + ")";
}
}
Loading