Skip to content

Commit

Permalink
Add Byzer-doris extension (#93)
Browse files Browse the repository at this point in the history
Add Byzer-doris extension
  • Loading branch information
chncaesar authored Dec 14, 2022
1 parent 64e6f05 commit 019e3ab
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 1 deletion.
9 changes: 9 additions & 0 deletions byzer-doris/.repo/desc.template.plugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
mainClass=-
version=0.1.0-SNAPSHOT
author=allwefantasy
mlsqlVersions="1.5.0,1.5.0-SNAPSHOT,1.6.0,1.6.0-SNAPSHOT"
githubUrl=https://github.com/byzer-org/byzer-doris/tree/master/byzer-doris
scala_version={{scala_binary_version}}
spark_version={{spark_binary_version}}
mlsqlPluginType=app
desc=byzer to read and write apache doris
14 changes: 14 additions & 0 deletions byzer-doris/.repo/pom.template.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mlsql-plugins-{{spark_binary_version}}_{{scala_binary_version}}</artifactId>
<groupId>tech.mlsql</groupId>
<version>0.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>byzer-doris-{{spark_binary_version}}_{{scala_binary_version}}</artifactId>

</project>
50 changes: 50 additions & 0 deletions byzer-doris/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# Byzer-Doris
This extension enables Byzer-lang to read and write [Apache Doris](https://github.com/apache/doris)

## Usage
To read Doris table, use `load` statement. The following statement reads **Doris db**: zjc_1
**table**: table_hash_1.

Please note: `doris.fenodes user password` are required
```sql
load doris.`zjc_1.table_hash_1`
and `doris.fenodes`="127.0.0.1:8030"
and `user`="user"
and `password`="xxx"
AS abc;
```

To insert into Doris table, use `save` statement:
```sql
SELECT 11 k1, 11.1 k2 , current_timestamp() dt AS data;

SAVE append data AS doris.`zjc_1.table_hash_1`
WHERE `doris.fenodes`="127.0.0.1:8030"
and `user`="user"
and `password`="xxx";
```
Please note that `overwrite` mode is not supported, `overwrite` is silently changed into
`append` by [spark-doris-connector](https://github.com/apache/doris-spark-connector)

To make your code clean, use `Connect` statement to setup common config.
The previous examples can be rewritten to:
```sql
CONNECT doris
WHERE `doris.fenodes`="127.0.0.1:8030"
and `user`="user"
and `password`="xxx"
AS zjc_1;

load doris.`zjc_1.table_hash_1` AS abc;

SELECT 11 k1, 11.1 k2 , current_timestamp() dt AS data;

SAVE append data AS doris.`zjc_1.table_hash_1`;
```

## Build
To build this extension, please follow [spark-doris-connector document](https://doris.apache.org/zh-CN/docs/ecosystem/spark-doris-connector)
to compile it first. Then run `mvn package install -P shade` to build and install the extension.

## Deploy
Please copy jar file into `${BYZER_HOME}/plugin` .
64 changes: 64 additions & 0 deletions byzer-doris/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>mlsql-plugins-3.3_2.12</artifactId>
<groupId>tech.mlsql</groupId>
<version>0.1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<artifactId>byzer-doris-${spark.bigversion}_${scala.binary.version}</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.doris</groupId>
<artifactId>spark-doris-connector-${spark.bigversion}_${scala.binary.version}</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
</dependencies>
<profiles>
<profile>
<id>shade</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<createDependencyReducedPom>false</createDependencyReducedPom>
<relocations>
</relocations>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>

</project>
123 changes: 123 additions & 0 deletions byzer-doris/src/main/scala/tech/mlsql/datasource/impl/MLSQLDoris.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package tech.mlsql.datasource.impl

import org.apache.spark.ml.param.Param
import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row}

import streaming.core.datasource.{DataAuthConfig, DataSinkConfig, DataSourceConfig, DataSourceRegistry, MLSQLDataSourceKey, MLSQLRegistry, MLSQLSink, MLSQLSource, MLSQLSourceConfig, MLSQLSourceInfo, MLSQLSparkDataSourceType, SourceInfo}
import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams}
import streaming.log.WowLog

import tech.mlsql.common.form.{Extra, FormParams, Text}
import tech.mlsql.common.utils.log.Logging
import tech.mlsql.datasource.impl.MLSQLDoris.{DORIS_FENODES, DORIS_PASSWORD, DORIS_USER, TABLE_IDENTIFIER}
import tech.mlsql.dsl.adaptor.DslTool

class MLSQLDoris(override val uid: String) extends MLSQLSource
with MLSQLSink
with MLSQLSourceInfo
with MLSQLSourceConfig
with MLSQLRegistry with DslTool with WowParams with Logging with WowLog {

def this() = this(BaseParams.randomUID())

override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = {
parseRef(fullFormat, config.path, dbSplitter, configs => {
reader.options(configs)
})

reader.format(fullFormat)
.options(config.config)
.option(TABLE_IDENTIFIER, config.path)
.load()
}

override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Any = {
// Load previous defined config
parseRef(fullFormat, config.path, dbSplitter, configs => {
writer.options(configs)
})

writer.format(fullFormat)
.options(config.config)
.option(TABLE_IDENTIFIER, config.path)
.mode(config.mode)
.save()
}

override def sourceInfo(config: DataAuthConfig): SourceInfo = {
val Array(db, table) = parseRef(fullFormat, config.path, dbSplitter, (_: Map[String, String]) => { } )
SourceInfo(shortFormat, db, table)
}

override def register(): Unit = {
DataSourceRegistry.register(MLSQLDataSourceKey(fullFormat, MLSQLSparkDataSourceType), this)
DataSourceRegistry.register(MLSQLDataSourceKey(shortFormat, MLSQLSparkDataSourceType), this)
}

override def fullFormat: String = "doris"
override def shortFormat: String = fullFormat

final val dorisFenodes : Param[String] = new Param[String](parent = this
, name = DORIS_FENODES
, doc = FormParams.toJson(Text(
name = DORIS_FENODES
, value = ""
, extra = Extra(
doc = "Doris FE Nodes, example: localhost:8030"
, label = DORIS_FENODES
, options = Map(
"valueType" -> "string",
"required" -> "true",
"derivedType" -> "NONE"
)
)
)
)
)

final val dorisUser: Param[String] = new Param[String](parent = this
, name = DORIS_USER
, doc = FormParams.toJson(Text(
name = DORIS_USER
, value = ""
, extra = Extra(
doc = "Doris user name"
, label = DORIS_USER
, options = Map(
"valueType" -> "string",
"required" -> "true",
"derivedType" -> "NONE"
)
)
)
)
)

final val dorisPassword: Param[String] = new Param[String](parent = this
, name = DORIS_PASSWORD
, doc = FormParams.toJson(Text(
name = DORIS_PASSWORD
, value = ""
, extra = Extra(
doc = "Doris password"
, label = DORIS_PASSWORD
, options = Map(
"valueType" -> "string",
"required" -> "true",
"derivedType" -> "NONE"
)
)
)
)
)

}



object MLSQLDoris {
val TABLE_IDENTIFIER = "doris.table.identifier"
val DORIS_FENODES = "doris.fenodes"
val DORIS_USER = "user"
val DORIS_PASSWORD = "password"
}
8 changes: 7 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
<module>byzer-objectstore-blob</module>
<module>byzer-objectstore-s3</module>
<module>byzer-xgboost</module>
<module>byzer-doris</module>
</modules>
<name>MLSQL Plugins</name>
<url>https://github.com/allwefantasy/mlsql-plugins.git</url>
Expand Down Expand Up @@ -171,7 +172,12 @@
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.vladsch.flexmark</groupId>
<artifactId>flexmark-all</artifactId>
<version>0.62.2</version>
<scope>test</scope>
</dependency>

</dependencies>

Expand Down

0 comments on commit 019e3ab

Please sign in to comment.