Skip to content

Commit

Permalink
Merge branch 'main' into scheduler-split
Browse files Browse the repository at this point in the history
  • Loading branch information
Little-Wallace authored Jun 19, 2023
2 parents 1059c15 + 02dfee5 commit 5b816a6
Show file tree
Hide file tree
Showing 38 changed files with 1,467 additions and 51 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/cherry-pick-to-release-branch.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
name: PR for release branch
on:
pull_request:
branches:
- main
types: ["closed"]

jobs:
release_pull_request:
if: "contains(github.event.pull_request.labels.*.name, 'need-cherry-pick-v0.19') && github.event.pull_request.merged == true"
runs-on: ubuntu-latest
name: release_pull_request
steps:
- name: checkout
uses: actions/checkout@v1
- name: Create PR to branch
uses: risingwavelabs/github-action-cherry-pick@master
with:
pr_branch: 'v0.19.0-rc'
labels: |
cherry-pick
body: 'Cherry picking #{old_pull_request_id} onto this branch'
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 18 additions & 9 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
FROM ubuntu:22.04 as base
FROM ubuntu:22.04 AS base

ENV LANG en_US.utf8

RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install make build-essential cmake protobuf-compiler curl bash lld maven unzip libsasl2-dev
RUN apt-get update \
&& apt-get -y install ca-certificates build-essential libsasl2-dev openjdk-11-jdk

FROM base as builder
FROM base AS builder

RUN apt-get update && apt-get -y install make cmake protobuf-compiler curl bash lld maven unzip

SHELL ["/bin/bash", "-c"]

Expand Down Expand Up @@ -32,28 +35,34 @@ RUN rustup self update \
&& rustup show \
&& rustup component add rustfmt

RUN cargo fetch

RUN cargo build -p risingwave_cmd_all --release --features "rw-static-link" && \
RUN cargo fetch && \
cargo build -p risingwave_cmd_all --release --features "rw-static-link" && \
mkdir -p /risingwave/bin && mv /risingwave/target/release/risingwave /risingwave/bin/ && \
cp ./target/release/build/tikv-jemalloc-sys-*/out/build/bin/jeprof /risingwave/bin/ && \
mkdir -p /risingwave/lib && cargo clean

RUN cd /risingwave/java && mvn -B package -Dmaven.test.skip=true -Djava.binding.release=true && \
mkdir -p /risingwave/bin/connector-node && \
tar -zxvf /risingwave/java/connector-node/assembly/target/risingwave-connector-1.0.0.tar.gz -C /risingwave/bin/connector-node

FROM ubuntu:22.04 as image-base
RUN apt-get update && DEBIAN_FRONTEND=noninteractive apt-get -y install ca-certificates openjdk-11-jdk libsasl2-dev && rm -rf /var/lib/{apt,dpkg,cache,log}/
FROM base AS risingwave

FROM image-base as risingwave
LABEL org.opencontainers.image.source https://github.com/risingwavelabs/risingwave

RUN apt-get -y install gdb \
&& rm -rf /var/lib/{apt,dpkg,cache,log}/

RUN mkdir -p /risingwave/bin/connector-node && mkdir -p /risingwave/lib

COPY --from=builder /risingwave/bin/risingwave /risingwave/bin/risingwave
COPY --from=builder /risingwave/bin/connector-node /risingwave/bin/connector-node
COPY --from=builder /risingwave/ui /risingwave/ui
COPY --from=builder /risingwave/bin/jeprof /usr/local/bin/jeprof

# Set default playground mode to docker-playground profile
ENV PLAYGROUND_PROFILE docker-playground
# Set default dashboard UI to local path instead of github proxy
ENV RW_DASHBOARD_UI_PATH /risingwave/ui

ENTRYPOINT [ "/risingwave/bin/risingwave" ]
CMD [ "playground" ]
7 changes: 6 additions & 1 deletion java/.gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
# Java
target/
test_db/
bin/
.settings/
*.log
*.class
*.class
.project
.factorypath
.classpath
5 changes: 5 additions & 0 deletions java/connector-node/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ If you meet problem, you can try the following to skip the unit test:
mvn clean package -DskipTests=true
```

To disable building the rust library, you can try the following:
```
mvn clean package -Dno-build-rust
```

This will create a `.tar.gz` file with the Connector Node and all its dependencies in the `risingwave/java/connector-node/assembly/target` directory. To run the Connector Node, execute the following command:

```
Expand Down
1 change: 1 addition & 0 deletions java/connector-node/assembly/assembly.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<include>*:risingwave-source-cdc</include>

<!-- Sink connectors -->
<include>*:risingwave-sink-es</include>
<include>*:risingwave-sink-jdbc</include>
<include>*:risingwave-sink-iceberg</include>
<include>*:risingwave-sink-deltalake</include>
Expand Down
4 changes: 4 additions & 0 deletions java/connector-node/assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-source-cdc</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-sink-es</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-sink-jdbc</artifactId>
Expand Down
8 changes: 8 additions & 0 deletions java/connector-node/python-client/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ def validate_jdbc_sink(input_file):
rows[i][j]))
exit(1)

def test_elasticsearch_sink(input_file):
test_sink("elasticsearch",
{"url": "http://127.0.0.1:9200",
"index": "test"},
input_file)

def test_iceberg_sink(input_file):
test_sink("iceberg",
Expand Down Expand Up @@ -196,6 +201,7 @@ def test_deltalake_sink(input_file):
parser.add_argument('--iceberg_sink', action='store_true', help="run iceberg sink test")
parser.add_argument('--upsert_iceberg_sink', action='store_true', help="run upsert iceberg sink test")
parser.add_argument('--deltalake_sink', action='store_true', help="run deltalake sink test")
parser.add_argument('--es_sink', action='store_true', help='run elasticsearch sink test')
parser.add_argument('--input_file', default="./data/sink_input.json", help="input data to run tests")
args = parser.parse_args()
if args.file_sink:
Expand All @@ -208,3 +214,5 @@ def test_deltalake_sink(input_file):
test_upsert_iceberg_sink(args.input_file)
if args.deltalake_sink:
test_deltalake_sink(args.input_file)
if args.es_sink:
test_elasticsearch_sink(args.input_file)
5 changes: 5 additions & 0 deletions java/connector-node/risingwave-connector-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -94,5 +94,10 @@
<artifactId>risingwave-sink-deltalake</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-sink-es</artifactId>
<scope>provided</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public static SinkFactory getSinkFactory(String sinkType) {
return new IcebergSinkFactory();
case "deltalake":
return new DeltaLakeSinkFactory();
case "elasticsearch":
return new EsSinkFactory();
default:
throw UNIMPLEMENTED
.withDescription("unknown sink type: " + sinkType)
Expand Down
11 changes: 11 additions & 0 deletions java/connector-node/risingwave-connector-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,12 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down Expand Up @@ -163,5 +169,10 @@
<artifactId>risingwave-sink-jdbc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>risingwave-sink-es</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.risingwave.connector.sink.elasticsearch;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.risingwave.connector.EsSink;
import com.risingwave.connector.EsSinkConfig;
import com.risingwave.connector.api.TableSchema;
import com.risingwave.connector.api.sink.ArraySinkRow;
import com.risingwave.proto.Data;
import com.risingwave.proto.Data.DataType.TypeName;
import com.risingwave.proto.Data.Op;
import java.io.IOException;
import java.util.Map;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.Test;
import org.testcontainers.elasticsearch.ElasticsearchContainer;

public class EsSinkTest {

static TableSchema getTestTableSchema() {
return new TableSchema(
Lists.newArrayList("id", "name"),
Lists.newArrayList(
Data.DataType.newBuilder().setTypeName(TypeName.INT32).build(),
Data.DataType.newBuilder().setTypeName(TypeName.VARCHAR).build()),
Lists.newArrayList("id", "name"));
}

public void testEsSink(ElasticsearchContainer container) throws IOException {
EsSink sink =
new EsSink(
new EsSinkConfig(container.getHttpHostAddress(), "test", "$"),
getTestTableSchema());
sink.write(
Iterators.forArray(
new ArraySinkRow(Op.INSERT, 1, "Alice"),
new ArraySinkRow(Op.INSERT, 2, "Bob")));
sink.sync();
// container is slow here, but our default flush time is 5s,
// so 2s is enough for sync test
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
fail(e.getMessage());
}

RestHighLevelClient client = sink.getClient();
SearchRequest searchRequest = new SearchRequest("test");
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT);

SearchHits hits = searchResponse.getHits();
assertEquals(2, hits.getHits().length);

SearchHit hit = hits.getAt(0);
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
assertEquals(1, sourceAsMap.get("id"));
assertEquals("Alice", sourceAsMap.get("name"));
assertEquals("1$Alice", hit.getId());

hit = hits.getAt(1);
sourceAsMap = hit.getSourceAsMap();
assertEquals(2, sourceAsMap.get("id"));
assertEquals("Bob", sourceAsMap.get("name"));
assertEquals("2$Bob", hit.getId());

sink.drop();
}

@Test
public void testElasticSearch() throws IOException {
ElasticsearchContainer container =
new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:7.11.0");
container.start();
testEsSink(container);
container.stop();
}
}
64 changes: 64 additions & 0 deletions java/connector-node/risingwave-sink-es/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>java-parent</artifactId>
<groupId>com.risingwave.java</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>


<artifactId>risingwave-sink-es</artifactId>
<version>1.0-SNAPSHOT</version>
<name>risingwave-sink-es</name>

<dependencies>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>proto</artifactId>
</dependency>
<dependency>
<groupId>com.risingwave.java</groupId>
<artifactId>connector-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-text</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>

<!-- ElasticSearch drivers -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
</dependencies>

</project>
Loading

0 comments on commit 5b816a6

Please sign in to comment.