Skip to content

Commit

Permalink
[Feature][Connector-V2][Cassandra] Add Cassandra Source And Sink Conn…
Browse files Browse the repository at this point in the history
…ector (#3229)
  • Loading branch information
FWLamb authored Nov 8, 2022
1 parent 630e884 commit 12268a6
Show file tree
Hide file tree
Showing 19 changed files with 1,758 additions and 2 deletions.
4 changes: 3 additions & 1 deletion docs/en/Connector-v2-release-state.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ SeaTunnel uses a grading system for connectors to help you understand what to ex
| [Kafka](connector-v2/source/kafka.md) | Source | Alpha | 2.3.0-beta |
| [Kafka](connector-v2/sink/Kafka.md) | Sink | Alpha | 2.3.0-beta |
| [S3File](connector-v2/source/S3File.md) | Source | Alpha | 2.3.0-beta |
| [S3File](connector-v2/sink/S3File.md) | Sink | Alpha | 2.3.0-beta |
| [S3File](connector-v2/sink/S3File.md) | Sink | Alpha | 2.3.0-beta |
| [Cassandra](connector-v2/source/Cassandra.md) | Source | Alpha | 2.3.0-beta |
| [Cassandra](connector-v2/sink/Cassandra.md) | Sink | Alpha | 2.3.0-beta |
98 changes: 98 additions & 0 deletions docs/en/connector-v2/sink/Cassandra.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Cassandra

> Cassandra sink connector
## Description

Write data to Apache Cassandra.

## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [ ] [schema projection](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-------------------|--------|----------|---------------|
| host | String | Yes | - |
| keyspace | String | Yes | - |
| table | String | Yes | - |
| username | String | No | - |
| password | String | No | - |
| datacenter | String | No | datacenter1 |
| consistency_level | String | No | LOCAL_ONE |
| fields | String | No | LOCAL_ONE |
| batch_size | String | No | 5000 |
| batch_type | String | No | UNLOGGER |
| async_write | String | No | true |

### host [string]

`Cassandra` cluster address, the format is `host:port` , allowing multiple `hosts` to be specified. Such as
`"cassandra1:9042,cassandra2:9042"`.

### keyspace [string]

The `Cassandra` keyspace.

### table [String]

The `Cassandra` table name.

### username [string]

`Cassandra` user username.

### password [string]

`Cassandra` user password.

### datacenter [String]

The `Cassandra` datacenter, default is `datacenter1`.

### consistency_level [String]

The `Cassandra` write consistency level, default is `LOCAL_ONE`.

### fields [array]

The data field that needs to be output to `Cassandra` , if not configured, it will be automatically adapted
according to the sink table `schema`.

### batch_size [number]

The number of rows written through [Cassandra-Java-Driver](https://github.com/datastax/java-driver) each time,
default is `5000`.

### batch_type [String]

The `Cassandra` batch processing mode, default is `UNLOGGER`.

### async_write [boolean]

Whether `cassandra` writes in asynchronous mode, default is `true`.

## Examples

```hocon
sink {
Cassandra {
host = "localhost:9042"
username = "cassandra"
password = "cassandra"
datacenter = "datacenter1"
keyspace = "test"
}
}
```

## Changelog

### next version

- Add Cassandra Sink Connector



82 changes: 82 additions & 0 deletions docs/en/connector-v2/source/Cassandra.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
# Cassandra

> Cassandra source connector
## Description

Read data from Apache Cassandra.

## Key features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [schema projection](../../concept/connector-v2-features.md)
- [ ] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-------------------------|--------|----------|---------------|
| host | String | Yes | - |
| keyspace | String | Yes | - |
| cql | String | Yes | - |
| username | String | No | - |
| password | String | No | - |
| datacenter | String | No | datacenter1 |
| consistency_level | String | No | LOCAL_ONE |

### host [string]

`Cassandra` cluster address, the format is `host:port` , allowing multiple `hosts` to be specified. Such as
`"cassandra1:9042,cassandra2:9042"`.

### keyspace [string]

The `Cassandra` keyspace.

### cql [String]

The query cql used to search data though Cassandra session.

### username [string]

`Cassandra` user username.

### password [string]

`Cassandra` user password.

### datacenter [String]

The `Cassandra` datacenter, default is `datacenter1`.

### consistency_level [String]

The `Cassandra` write consistency level, default is `LOCAL_ONE`.

## Examples

```hocon
source {
Cassandra {
host = "localhost:9042"
username = "cassandra"
password = "cassandra"
datacenter = "datacenter1"
keyspace = "test"
cql = "select * from source_table"
result_table_name = "source_table"
}
}
```

## Changelog

### next version

- Add Cassandra Source Connector



4 changes: 3 additions & 1 deletion plugin-mapping.properties
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,7 @@ seatunnel.source.S3File = connector-file-s3
seatunnel.sink.S3File = connector-file-s3
seatunnel.source.Amazondynamodb = connector-amazondynamodb
seatunnel.sink.Amazondynamodb = connector-amazondynamodb
seatunnel.source.Cassandra = connector-cassandra
seatunnel.sink.Cassandra = connector-cassandra
seatunnel.sink.StarRocks = connector-starrocks
seatunnel.sink.InfluxDB = connector-influxdb
seatunnel.sink.InfluxDB = connector-influxdb
52 changes: 52 additions & 0 deletions seatunnel-connectors-v2/connector-cassandra/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one or more
~ contributor license agreements. See the NOTICE file distributed with
~ this work for additional information regarding copyright ownership.
~ The ASF licenses this file to You under the Apache License, Version 2.0
~ (the "License"); you may not use this file except in compliance with
~ the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>seatunnel-connectors-v2</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>connector-cassandra</artifactId>

<properties>
<cassandra.driver.version>4.14.0</cassandra.driver.version>
</properties>

<dependencies>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>${cassandra.driver.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>


</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.cassandra.client;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import org.apache.commons.lang3.StringUtils;

import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;

public class CassandraClient {
public static CqlSessionBuilder getCqlSessionBuilder(String nodeAddress, String keyspace, String username, String password, String dataCenter) {
List<CqlSessionBuilder> cqlSessionBuilderList = Arrays.stream(nodeAddress.split(",")).map(address -> {
String[] nodeAndPort = address.split(":", 2);
if (StringUtils.isEmpty(username) && StringUtils.isEmpty(password)) {
return CqlSession.builder()
.addContactPoint(new InetSocketAddress(nodeAndPort[0], Integer.parseInt(nodeAndPort[1])))
.withKeyspace(keyspace)
.withLocalDatacenter(dataCenter);
}
return CqlSession.builder()
.addContactPoint(new InetSocketAddress(nodeAndPort[0], Integer.parseInt(nodeAndPort[1])))
.withAuthCredentials(username, password)
.withKeyspace(keyspace)
.withLocalDatacenter(dataCenter);
}).collect(Collectors.toList());
return cqlSessionBuilderList.get(ThreadLocalRandom.current().nextInt(cqlSessionBuilderList.size()));
}

public static SimpleStatement createSimpleStatement(String cql, ConsistencyLevel consistencyLevel) {
return SimpleStatement.builder(cql).setConsistencyLevel(consistencyLevel).build();
}

public static ColumnDefinitions getTableSchema(CqlSession session, String table) {
try {
return session.execute(String.format("select * from %s limit 1", table))
.getColumnDefinitions();
} catch (Exception e) {
throw new RuntimeException("Cannot get table schema from cassandra", e);
}

}
}
Loading

0 comments on commit 12268a6

Please sign in to comment.