forked from data-integrations/database-plugins
-
Notifications
You must be signed in to change notification settings - Fork 3
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request data-integrations#492 from cloudsufi/Redshift_for-6.9
[Cherrypicked] Redshift source and connector plugin added.
- Loading branch information
Showing
23 changed files
with
2,307 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
# Amazon Redshift Batch Source | ||
|
||
Description | ||
----------- | ||
Reads from an Amazon Redshift database using a configurable SQL query. | ||
Outputs one record for each row returned by the query. | ||
|
||
|
||
Use Case | ||
-------- | ||
The source is used whenever you need to read from an Amazon Redshift database. For example, you may want | ||
to create daily snapshots of a database table by using this source and writing to | ||
a TimePartitionedFileSet. | ||
|
||
|
||
Properties | ||
---------- | ||
**Reference Name:** Name used to uniquely identify this source for lineage, annotating metadata, etc. | ||
|
||
**JDBC Driver name:** Name of the JDBC driver to use. | ||
|
||
**Host:** Host URL of the current master instance of Redshift cluster. | ||
|
||
**Port:** Port that Redshift master instance is listening to. | ||
|
||
**Database:** Redshift database name. | ||
|
||
**Import Query:** The SELECT query to use to import data from the specified table. | ||
You can specify an arbitrary number of columns to import, or import all columns using \*. The Query should | ||
contain the '$CONDITIONS' string. For example, 'SELECT * FROM table WHERE $CONDITIONS'. | ||
The '$CONDITIONS' string will be replaced by 'splitBy' field limits specified by the bounding query. | ||
The '$CONDITIONS' string is not required if numSplits is set to one. | ||
|
||
**Bounding Query:** Bounding Query should return the min and max of the values of the 'splitBy' field. | ||
For example, 'SELECT MIN(id),MAX(id) FROM table'. Not required if numSplits is set to one. | ||
|
||
**Split-By Field Name:** Field Name which will be used to generate splits. Not required if numSplits is set to one. | ||
|
||
**Number of Splits to Generate:** Number of splits to generate. | ||
|
||
**Username:** User identity for connecting to the specified database. | ||
|
||
**Password:** Password to use to connect to the specified database. | ||
|
||
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments | ||
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. | ||
|
||
**Schema:** The schema of records output by the source. This will be used in place of whatever schema comes | ||
back from the query. However, it must match the schema that comes back from the query, | ||
except it can mark fields as nullable and can contain a subset of the fields. | ||
|
||
**Fetch Size:** The number of rows to fetch at a time per split. Larger fetch size can result in faster import, | ||
with the tradeoff of higher memory usage. | ||
|
||
Example | ||
------ | ||
Suppose you want to read data from an Amazon Redshift database named "prod" that is running on | ||
"redshift.xyz.eu-central-1.redshift.amazonaws.com", port 5439, as "sa" user with "Test11" password. | ||
Ensure that the driver for Redshift is installed (you can also provide driver name for some specific driver, | ||
otherwise "redshift" will be used), then configure the plugin with:then configure plugin with: | ||
|
||
``` | ||
Reference Name: "src1" | ||
Driver Name: "redshift" | ||
Host: "redshift.xyz.eu-central-1.redshift.amazonaws.com" | ||
Port: 5439 | ||
Database: "prod" | ||
Import Query: "select id, name, email, phone from users;" | ||
Number of Splits to Generate: 1 | ||
Username: "sa" | ||
Password: "Test11" | ||
``` | ||
|
||
Data Types Mapping | ||
------------------ | ||
|
||
Mapping of Redshift types to CDAP schema: | ||
|
||
| Redshift Data Type | CDAP Schema Data Type | Comment | | ||
|-----------------------------------------------------|-----------------------|----------------------------------| | ||
| bigint | long | | | ||
| boolean | boolean | | | ||
| character | string | | | ||
| character varying | string | | | ||
| double precision | double | | | ||
| integer | int | | | ||
| numeric(precision, scale)/decimal(precision, scale) | decimal | | | ||
| numeric(with 0 precision) | string | | | ||
| real | float | | | ||
| smallint | int | | | ||
| smallserial | int | | | ||
| text | string | | | ||
| date | date | | | ||
| time [ (p) ] [ without time zone ] | time | | | ||
| time [ (p) ] with time zone | string | | | ||
| timestamp [ (p) ] [ without time zone ] | timestamp | | | ||
| timestamp [ (p) ] with time zone | timestamp | stored in UTC format in database | | ||
| xml | string | | | ||
| json | string | | | ||
| super | string | | | ||
| geometry | bytes | | | ||
| hllsketch | string | | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Amazon Redshift Connection | ||
|
||
Description | ||
----------- | ||
Use this connection to access data in an Amazon Redshift database using JDBC. | ||
|
||
Properties | ||
---------- | ||
**Name:** Name of the connection. Connection names must be unique in a namespace. | ||
|
||
**Description:** Description of the connection. | ||
|
||
**JDBC Driver name:** Name of the JDBC driver to use. | ||
|
||
**Host:** Host of the current master instance of Redshift cluster. | ||
|
||
**Port:** Port that Redshift master instance is listening to. | ||
|
||
**Database:** Redshift database name. | ||
|
||
**Username:** User identity for connecting to the specified database. | ||
|
||
**Password:** Password to use to connect to the specified database. | ||
|
||
**Connection Arguments:** A list of arbitrary string key/value pairs as connection arguments. These arguments | ||
will be passed to the JDBC driver as connection arguments for JDBC drivers that may need additional configurations. |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
Copyright © 2023 CDAP | ||
Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
use this file except in compliance with the License. You may obtain a copy of | ||
the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
License for the specific language governing permissions and limitations under | ||
the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<artifactId>database-plugins-parent</artifactId> | ||
<groupId>io.cdap.plugin</groupId> | ||
<version>1.10.6-SNAPSHOT</version> | ||
</parent> | ||
|
||
<name>Amazon Redshift plugin</name> | ||
<artifactId>amazon-redshift-plugin</artifactId> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<properties> | ||
<redshift-jdbc.version>2.1.0.18</redshift-jdbc.version> | ||
</properties> | ||
|
||
<repositories> | ||
<repository> | ||
<id>redshift</id> | ||
<url>http://redshift-maven-repository.s3-website-us-east-1.amazonaws.com/release</url> | ||
</repository> | ||
</repositories> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>io.cdap.cdap</groupId> | ||
<artifactId>cdap-etl-api</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.cdap.plugin</groupId> | ||
<artifactId>database-commons</artifactId> | ||
<version>${project.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.cdap.plugin</groupId> | ||
<artifactId>hydrator-common</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.google.guava</groupId> | ||
<artifactId>guava</artifactId> | ||
</dependency> | ||
|
||
<!-- test dependencies --> | ||
<dependency> | ||
<groupId>com.amazon.redshift</groupId> | ||
<artifactId>redshift-jdbc42</artifactId> | ||
<version>${redshift-jdbc.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.cdap.plugin</groupId> | ||
<artifactId>database-commons</artifactId> | ||
<version>${project.version}</version> | ||
<type>test-jar</type> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.cdap.cdap</groupId> | ||
<artifactId>hydrator-test</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.cdap.cdap</groupId> | ||
<artifactId>cdap-data-pipeline3_2.12</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.mockito</groupId> | ||
<artifactId>mockito-core</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>io.cdap.cdap</groupId> | ||
<artifactId>cdap-api</artifactId> | ||
<scope>provided</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.jetbrains</groupId> | ||
<artifactId>annotations</artifactId> | ||
<version>RELEASE</version> | ||
<scope>compile</scope> | ||
</dependency> | ||
</dependencies> | ||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>io.cdap</groupId> | ||
<artifactId>cdap-maven-plugin</artifactId> | ||
</plugin> | ||
<plugin> | ||
<groupId>org.apache.felix</groupId> | ||
<artifactId>maven-bundle-plugin</artifactId> | ||
<version>5.1.2</version> | ||
<extensions>true</extensions> | ||
<configuration> | ||
<instructions> | ||
<_exportcontents> | ||
io.cdap.plugin.amazon.redshift.*; | ||
io.cdap.plugin.db.source.*; | ||
org.apache.commons.lang; | ||
org.apache.commons.logging.*; | ||
org.codehaus.jackson.* | ||
</_exportcontents> | ||
<Embed-Dependency>*;inline=false;scope=compile</Embed-Dependency> | ||
<Embed-Transitive>true</Embed-Transitive> | ||
<Embed-Directory>lib</Embed-Directory> | ||
</instructions> | ||
</configuration> | ||
<executions> | ||
<execution> | ||
<phase>package</phase> | ||
<goals> | ||
<goal>bundle</goal> | ||
</goals> | ||
</execution> | ||
</executions> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
</project> |
117 changes: 117 additions & 0 deletions
117
amazon-redshift-plugin/src/main/java/io/cdap/plugin/amazon/redshift/RedshiftConnector.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,117 @@ | ||
/* | ||
* Copyright © 2023 Cask Data, Inc. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); you may not | ||
* use this file except in compliance with the License. You may obtain a copy of | ||
* the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | ||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | ||
* License for the specific language governing permissions and limitations under | ||
* the License. | ||
*/ | ||
|
||
package io.cdap.plugin.amazon.redshift; | ||
|
||
import io.cdap.cdap.api.annotation.Category; | ||
import io.cdap.cdap.api.annotation.Description; | ||
import io.cdap.cdap.api.annotation.Name; | ||
import io.cdap.cdap.api.annotation.Plugin; | ||
import io.cdap.cdap.api.data.format.StructuredRecord; | ||
import io.cdap.cdap.etl.api.batch.BatchSource; | ||
import io.cdap.cdap.etl.api.connector.Connector; | ||
import io.cdap.cdap.etl.api.connector.ConnectorSpec; | ||
import io.cdap.cdap.etl.api.connector.ConnectorSpecRequest; | ||
import io.cdap.cdap.etl.api.connector.PluginSpec; | ||
import io.cdap.plugin.common.Constants; | ||
import io.cdap.plugin.common.ReferenceNames; | ||
import io.cdap.plugin.common.db.DBConnectorPath; | ||
import io.cdap.plugin.common.db.DBPath; | ||
import io.cdap.plugin.db.SchemaReader; | ||
import io.cdap.plugin.db.connector.AbstractDBSpecificConnector; | ||
import org.apache.hadoop.io.LongWritable; | ||
import org.apache.hadoop.mapreduce.lib.db.DBWritable; | ||
|
||
import java.io.IOException; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
|
||
/** | ||
* Amazon Redshift Database Connector that connects to Amazon Redshift database via JDBC. | ||
*/ | ||
@Plugin(type = Connector.PLUGIN_TYPE) | ||
@Name(RedshiftConnector.NAME) | ||
@Description("Connection to access data in Amazon Redshift using JDBC.") | ||
@Category("Database") | ||
public class RedshiftConnector extends AbstractDBSpecificConnector<io.cdap.plugin.amazon.redshift.RedshiftDBRecord> { | ||
public static final String NAME = RedshiftConstants.PLUGIN_NAME; | ||
private final RedshiftConnectorConfig config; | ||
|
||
public RedshiftConnector(RedshiftConnectorConfig config) { | ||
super(config); | ||
this.config = config; | ||
} | ||
|
||
@Override | ||
protected DBConnectorPath getDBConnectorPath(String path) throws IOException { | ||
return new DBPath(path, true); | ||
} | ||
|
||
@Override | ||
public boolean supportSchema() { | ||
return true; | ||
} | ||
|
||
@Override | ||
protected Class<? extends DBWritable> getDBRecordType() { | ||
return RedshiftDBRecord.class; | ||
} | ||
|
||
@Override | ||
public StructuredRecord transform(LongWritable longWritable, RedshiftDBRecord redshiftDBRecord) { | ||
return redshiftDBRecord.getRecord(); | ||
} | ||
|
||
@Override | ||
protected SchemaReader getSchemaReader(String sessionID) { | ||
return new RedshiftSchemaReader(sessionID); | ||
} | ||
|
||
@Override | ||
protected String getTableName(String database, String schema, String table) { | ||
return String.format("\"%s\".\"%s\"", schema, table); | ||
} | ||
|
||
@Override | ||
protected String getRandomQuery(String tableName, int limit) { | ||
return String.format("SELECT * FROM %s\n" + | ||
"TABLESAMPLE BERNOULLI (100.0 * %d / (SELECT COUNT(*) FROM %s))", | ||
tableName, limit, tableName); | ||
} | ||
|
||
@Override | ||
protected void setConnectorSpec(ConnectorSpecRequest request, DBConnectorPath path, | ||
ConnectorSpec.Builder builder) { | ||
Map<String, String> sourceProperties = new HashMap<>(); | ||
setConnectionProperties(sourceProperties, request); | ||
builder | ||
.addRelatedPlugin(new PluginSpec(RedshiftConstants.PLUGIN_NAME, | ||
BatchSource.PLUGIN_TYPE, sourceProperties)); | ||
|
||
String schema = path.getSchema(); | ||
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.NUM_SPLITS, "1"); | ||
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.FETCH_SIZE, | ||
RedshiftSource.RedshiftSourceConfig.DEFAULT_FETCH_SIZE); | ||
String table = path.getTable(); | ||
if (table == null) { | ||
return; | ||
} | ||
sourceProperties.put(RedshiftSource.RedshiftSourceConfig.IMPORT_QUERY, | ||
getTableQuery(path.getDatabase(), schema, table)); | ||
sourceProperties.put(Constants.Reference.REFERENCE_NAME, ReferenceNames.cleanseReferenceName(table)); | ||
} | ||
|
||
} |
Oops, something went wrong.