Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Byzer-doris extension #93

Merged
merged 1 commit into from
Dec 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions byzer-doris/.repo/desc.template.plugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
mainClass=-
version=0.1.0-SNAPSHOT
author=allwefantasy
mlsqlVersions="1.5.0,1.5.0-SNAPSHOT,1.6.0,1.6.0-SNAPSHOT"
githubUrl=https://github.com/byzer-org/byzer-doris/tree/master/byzer-doris
scala_version={{scala_binary_version}}
spark_version={{spark_binary_version}}
mlsqlPluginType=app
desc=byzer to read and write apache doris
14 changes: 14 additions & 0 deletions byzer-doris/.repo/pom.template.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mlsql-plugins-{{spark_binary_version}}_{{scala_binary_version}}</artifactId>
<groupId>tech.mlsql</groupId>
<version>0.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>byzer-doris-{{spark_binary_version}}_{{scala_binary_version}}</artifactId>

</project>
50 changes: 50 additions & 0 deletions byzer-doris/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Byzer-Doris
This extension enables Byzer-lang to read and write [Apache Doris](https://github.com/apache/doris)

## Usage
To read Doris table, use `load` statement. The following statement reads **Doris db**: zjc_1
**table**: table_hash_1.

Please note: `doris.fenodes user password` are required
```sql
load doris.`zjc_1.table_hash_1`
and `doris.fenodes`="127.0.0.1:8030"
and `user`="user"
and `password`="xxx"
AS abc;
```

To insert into Doris table, use `save` statement:
```sql
SELECT 11 k1, 11.1 k2 , current_timestamp() dt AS data;

SAVE append data AS doris.`zjc_1.table_hash_1`
WHERE `doris.fenodes`="127.0.0.1:8030"
and `user`="user"
and `password`="xxx";
```
Please note that `overwrite` mode is not supported, `overwrite` is silently changed into
`append` by [spark-doris-connector](https://github.com/apache/doris-spark-connector)

To make your code clean, use `Connect` statement to setup common config.
The previous examples can be rewritten to:
```sql
CONNECT doris
WHERE `doris.fenodes`="127.0.0.1:8030"
and `user`="user"
and `password`="xxx"
AS zjc_1;

load doris.`zjc_1.table_hash_1` AS abc;

SELECT 11 k1, 11.1 k2 , current_timestamp() dt AS data;

SAVE append data AS doris.`zjc_1.table_hash_1`;
```

## Build
To build this extension, please follow [spark-doris-connector document](https://doris.apache.org/zh-CN/docs/ecosystem/spark-doris-connector)
to compile it first. Then run `mvn package install -P shade` to build and install the extension.

## Deploy
Please copy jar file into `${BYZER_HOME}/plugin` .
64 changes: 64 additions & 0 deletions byzer-doris/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mlsql-plugins-3.3_2.12</artifactId>
<groupId>tech.mlsql</groupId>
<version>0.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<artifactId>byzer-doris-${spark.bigversion}_${scala.binary.version}</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-${spark.bigversion}_${scala.binary.version}</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>shade</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<createDependencyReducedPom>false</createDependencyReducedPom>
<relocations>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
123 changes: 123 additions & 0 deletions byzer-doris/src/main/scala/tech/mlsql/datasource/impl/MLSQLDoris.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package tech.mlsql.datasource.impl

import org.apache.spark.ml.param.Param
import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}

import streaming.core.datasource.{DataAuthConfig, DataSinkConfig, DataSourceConfig, DataSourceRegistry, MLSQLDataSourceKey, MLSQLRegistry, MLSQLSink, MLSQLSource, MLSQLSourceConfig, MLSQLSourceInfo, MLSQLSparkDataSourceType, SourceInfo}
import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams}
import streaming.log.WowLog

import tech.mlsql.common.form.{Extra, FormParams, Text}
import tech.mlsql.common.utils.log.Logging
import tech.mlsql.datasource.impl.MLSQLDoris.{DORIS_FENODES, DORIS_PASSWORD, DORIS_USER, TABLE_IDENTIFIER}
import tech.mlsql.dsl.adaptor.DslTool

class MLSQLDoris(override val uid: String) extends MLSQLSource
with MLSQLSink
with MLSQLSourceInfo
with MLSQLSourceConfig
with MLSQLRegistry with DslTool with WowParams with Logging with WowLog {

def this() = this(BaseParams.randomUID())

override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = {
parseRef(fullFormat, config.path, dbSplitter, configs => {
reader.options(configs)
})

reader.format(fullFormat)
.options(config.config)
.option(TABLE_IDENTIFIER, config.path)
.load()
}

override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Any = {
// Load previous defined config
parseRef(fullFormat, config.path, dbSplitter, configs => {
writer.options(configs)
})

writer.format(fullFormat)
.options(config.config)
.option(TABLE_IDENTIFIER, config.path)
.mode(config.mode)
.save()
}

override def sourceInfo(config: DataAuthConfig): SourceInfo = {
val Array(db, table) = parseRef(fullFormat, config.path, dbSplitter, (_: Map[String, String]) => { } )
SourceInfo(shortFormat, db, table)
}

override def register(): Unit = {
DataSourceRegistry.register(MLSQLDataSourceKey(fullFormat, MLSQLSparkDataSourceType), this)
DataSourceRegistry.register(MLSQLDataSourceKey(shortFormat, MLSQLSparkDataSourceType), this)
}

override def fullFormat: String = "doris"
override def shortFormat: String = fullFormat

final val dorisFenodes : Param[String] = new Param[String](parent = this
, name = DORIS_FENODES
, doc = FormParams.toJson(Text(
name = DORIS_FENODES
, value = ""
, extra = Extra(
doc = "Doris FE Nodes, example: localhost:8030"
, label = DORIS_FENODES
, options = Map(
"valueType" -> "string",
"required" -> "true",
"derivedType" -> "NONE"
)
)
)
)
)

final val dorisUser: Param[String] = new Param[String](parent = this
, name = DORIS_USER
, doc = FormParams.toJson(Text(
name = DORIS_USER
, value = ""
, extra = Extra(
doc = "Doris user name"
, label = DORIS_USER
, options = Map(
"valueType" -> "string",
"required" -> "true",
"derivedType" -> "NONE"
)
)
)
)
)

final val dorisPassword: Param[String] = new Param[String](parent = this
, name = DORIS_PASSWORD
, doc = FormParams.toJson(Text(
name = DORIS_PASSWORD
, value = ""
, extra = Extra(
doc = "Doris password"
, label = DORIS_PASSWORD
, options = Map(
"valueType" -> "string",
"required" -> "true",
"derivedType" -> "NONE"
)
)
)
)
)

}



object MLSQLDoris {
val TABLE_IDENTIFIER = "doris.table.identifier"
val DORIS_FENODES = "doris.fenodes"
val DORIS_USER = "user"
val DORIS_PASSWORD = "password"
}
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<module>byzer-objectstore-blob</module>
<module>byzer-objectstore-s3</module>
<module>byzer-xgboost</module>
<module>byzer-doris</module>
</modules>
<name>MLSQL Plugins</name>
<url>https://github.com/allwefantasy/mlsql-plugins.git</url>
Expand Down Expand Up @@ -171,7 +172,12 @@
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.vladsch.flexmark</groupId>
<artifactId>flexmark-all</artifactId>
<version>0.62.2</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down