diff --git a/byzer-doris/.repo/desc.template.plugin b/byzer-doris/.repo/desc.template.plugin new file mode 100644 index 00000000..bc0f4a37 --- /dev/null +++ b/byzer-doris/.repo/desc.template.plugin @@ -0,0 +1,9 @@ +mainClass=- +version=0.1.0-SNAPSHOT +author=allwefantasy +mlsqlVersions="1.5.0,1.5.0-SNAPSHOT,1.6.0,1.6.0-SNAPSHOT" +githubUrl=https://github.com/byzer-org/byzer-doris/tree/master/byzer-doris +scala_version={{scala_binary_version}} +spark_version={{spark_binary_version}} +mlsqlPluginType=app +desc=byzer to read and write apache doris diff --git a/byzer-doris/.repo/pom.template.xml b/byzer-doris/.repo/pom.template.xml new file mode 100644 index 00000000..b4abb948 --- /dev/null +++ b/byzer-doris/.repo/pom.template.xml @@ -0,0 +1,14 @@ + + + + mlsql-plugins-{{spark_binary_version}}_{{scala_binary_version}} + tech.mlsql + 0.1.0-SNAPSHOT + + 4.0.0 + + byzer-doris-{{spark_binary_version}}_{{scala_binary_version}} + + \ No newline at end of file diff --git a/byzer-doris/README.md b/byzer-doris/README.md new file mode 100644 index 00000000..2570149d --- /dev/null +++ b/byzer-doris/README.md @@ -0,0 +1,50 @@ +# Byzer-Doris +This extension enables Byzer-lang to read and write [Apache Doris](https://github.com/apache/doris) + +## Usage +To read Doris table, use `load` statement. The following statement reads **Doris db**: zjc_1 +**table**: table_hash_1. + +Please note: `doris.fenodes user password` are required +```sql +load doris.`zjc_1.table_hash_1` +and `doris.fenodes`="127.0.0.1:8030" +and `user`="user" +and `password`="xxx" +AS abc; +``` + +To insert into Doris table, use `save` statement: +```sql +SELECT 11 k1, 11.1 k2 , current_timestamp() dt AS data; + +SAVE append data AS doris.`zjc_1.table_hash_1` +WHERE `doris.fenodes`="127.0.0.1:8030" +and `user`="user" +and `password`="xxx"; +``` +Please note that `overwrite` mode is not supported, `overwrite` is silently changed into +`append` by [spark-doris-connector](https://github.com/apache/doris-spark-connector) + +To make your code clean, use `Connect` statement to setup common config. +The previous examples can be rewritten to: +```sql +CONNECT doris +WHERE `doris.fenodes`="127.0.0.1:8030" +and `user`="user" +and `password`="xxx" +AS zjc_1; + +load doris.`zjc_1.table_hash_1` AS abc; + +SELECT 11 k1, 11.1 k2 , current_timestamp() dt AS data; + +SAVE append data AS doris.`zjc_1.table_hash_1`; +``` + +## Build +To build this extension, please follow [spark-doris-connector document](https://doris.apache.org/zh-CN/docs/ecosystem/spark-doris-connector) +to compile it first. Then run `mvn package install -P shade` to build and install the extension. + +## Deploy +Please copy jar file into `${BYZER_HOME}/plugin` . diff --git a/byzer-doris/pom.xml b/byzer-doris/pom.xml new file mode 100644 index 00000000..9125f278 --- /dev/null +++ b/byzer-doris/pom.xml @@ -0,0 +1,64 @@ + + + + mlsql-plugins-3.3_2.12 + tech.mlsql + 0.1.0-SNAPSHOT + + 4.0.0 + + + 8 + 8 + UTF-8 + + + byzer-doris-${spark.bigversion}_${scala.binary.version} + + + org.apache.doris + spark-doris-connector-${spark.bigversion}_${scala.binary.version} + 1.0.0-SNAPSHOT + + + + + shade + + + + org.apache.maven.plugins + maven-shade-plugin + 3.2.0 + + + + *:* + + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + + false + + + + + + package + + shade + + + + + + + + + + \ No newline at end of file diff --git a/byzer-doris/src/main/scala/tech/mlsql/datasource/impl/MLSQLDoris.scala b/byzer-doris/src/main/scala/tech/mlsql/datasource/impl/MLSQLDoris.scala new file mode 100644 index 00000000..2d56c8d4 --- /dev/null +++ b/byzer-doris/src/main/scala/tech/mlsql/datasource/impl/MLSQLDoris.scala @@ -0,0 +1,123 @@ +package tech.mlsql.datasource.impl + +import org.apache.spark.ml.param.Param +import org.apache.spark.sql.{DataFrame, DataFrameReader, DataFrameWriter, Row} + +import streaming.core.datasource.{DataAuthConfig, DataSinkConfig, DataSourceConfig, DataSourceRegistry, MLSQLDataSourceKey, MLSQLRegistry, MLSQLSink, MLSQLSource, MLSQLSourceConfig, MLSQLSourceInfo, MLSQLSparkDataSourceType, SourceInfo} +import streaming.dsl.mmlib.algs.param.{BaseParams, WowParams} +import streaming.log.WowLog + +import tech.mlsql.common.form.{Extra, FormParams, Text} +import tech.mlsql.common.utils.log.Logging +import tech.mlsql.datasource.impl.MLSQLDoris.{DORIS_FENODES, DORIS_PASSWORD, DORIS_USER, TABLE_IDENTIFIER} +import tech.mlsql.dsl.adaptor.DslTool + +class MLSQLDoris(override val uid: String) extends MLSQLSource + with MLSQLSink + with MLSQLSourceInfo + with MLSQLSourceConfig + with MLSQLRegistry with DslTool with WowParams with Logging with WowLog { + + def this() = this(BaseParams.randomUID()) + + override def load(reader: DataFrameReader, config: DataSourceConfig): DataFrame = { + parseRef(fullFormat, config.path, dbSplitter, configs => { + reader.options(configs) + }) + + reader.format(fullFormat) + .options(config.config) + .option(TABLE_IDENTIFIER, config.path) + .load() + } + + override def save(writer: DataFrameWriter[Row], config: DataSinkConfig): Any = { + // Load previous defined config + parseRef(fullFormat, config.path, dbSplitter, configs => { + writer.options(configs) + }) + + writer.format(fullFormat) + .options(config.config) + .option(TABLE_IDENTIFIER, config.path) + .mode(config.mode) + .save() + } + + override def sourceInfo(config: DataAuthConfig): SourceInfo = { + val Array(db, table) = parseRef(fullFormat, config.path, dbSplitter, (_: Map[String, String]) => { } ) + SourceInfo(shortFormat, db, table) + } + + override def register(): Unit = { + DataSourceRegistry.register(MLSQLDataSourceKey(fullFormat, MLSQLSparkDataSourceType), this) + DataSourceRegistry.register(MLSQLDataSourceKey(shortFormat, MLSQLSparkDataSourceType), this) + } + + override def fullFormat: String = "doris" + override def shortFormat: String = fullFormat + + final val dorisFenodes : Param[String] = new Param[String](parent = this + , name = DORIS_FENODES + , doc = FormParams.toJson(Text( + name = DORIS_FENODES + , value = "" + , extra = Extra( + doc = "Doris FE Nodes, example: localhost:8030" + , label = DORIS_FENODES + , options = Map( + "valueType" -> "string", + "required" -> "true", + "derivedType" -> "NONE" + ) + ) + ) + ) + ) + + final val dorisUser: Param[String] = new Param[String](parent = this + , name = DORIS_USER + , doc = FormParams.toJson(Text( + name = DORIS_USER + , value = "" + , extra = Extra( + doc = "Doris user name" + , label = DORIS_USER + , options = Map( + "valueType" -> "string", + "required" -> "true", + "derivedType" -> "NONE" + ) + ) + ) + ) + ) + + final val dorisPassword: Param[String] = new Param[String](parent = this + , name = DORIS_PASSWORD + , doc = FormParams.toJson(Text( + name = DORIS_PASSWORD + , value = "" + , extra = Extra( + doc = "Doris password" + , label = DORIS_PASSWORD + , options = Map( + "valueType" -> "string", + "required" -> "true", + "derivedType" -> "NONE" + ) + ) + ) + ) + ) + +} + + + +object MLSQLDoris { + val TABLE_IDENTIFIER = "doris.table.identifier" + val DORIS_FENODES = "doris.fenodes" + val DORIS_USER = "user" + val DORIS_PASSWORD = "password" +} diff --git a/pom.xml b/pom.xml index 25d0a6c8..1de476c0 100644 --- a/pom.xml +++ b/pom.xml @@ -39,6 +39,7 @@ byzer-objectstore-blob byzer-objectstore-s3 byzer-xgboost + byzer-doris MLSQL Plugins https://github.com/allwefantasy/mlsql-plugins.git @@ -171,7 +172,12 @@ ${spark.version} provided - + + com.vladsch.flexmark + flexmark-all + 0.62.2 + test +