Skip to content

Commit

Permalink
add obhbase reader and writer plugin by cjyyz64
Browse files Browse the repository at this point in the history
  • Loading branch information
TrafalgarLuo committed Aug 20, 2024
1 parent f1c20ab commit 0ec7677
Show file tree
Hide file tree
Showing 93 changed files with 5,633 additions and 186 deletions.
17 changes: 8 additions & 9 deletions core/src/main/job/job.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,40 @@
"job": {
"setting": {
"speed": {
"channel":1
"channel": 2
},
"errorLimit": {
"record": 0,
"percentage": 0.02
"record": 0
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
"column": [
{
"value": "DataX",
"type": "string"
},
{
"value": 19890604,
"value": 1724154616370,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"value": "2024-01-01 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"value": "TestRawData",
"type": "bytes"
}
],
"sliceRecordCount": 100000
"sliceRecordCount": 100
}
},
"writer": {
Expand All @@ -49,4 +48,4 @@
}
]
}
}
}
4 changes: 1 addition & 3 deletions doriswriter/doc/doriswriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,6 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
"name": "doriswriter",
"parameter": {
"loadUrl": ["172.16.0.13:8030"],
"loadProps": {
},
"column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
"username": "root",
"password": "xxxxxx",
Expand Down Expand Up @@ -178,4 +176,4 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
}
```

更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)
更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)
77 changes: 1 addition & 76 deletions elasticsearchwriter/doc/elasticsearchwriter.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,79 +167,4 @@
* dynamic
* 描述: 不使用datax的mappings,使用es自己的自动mappings
* 必选: 否
* 默认值: false



## 4 性能报告

### 4.1 环境准备

* 总数据量 1kw条数据, 每条0.1kb
* 1个shard, 0个replica
* 不加id,这样默认是append_only模式,不检查版本,插入速度会有20%左右的提升

#### 4.1.1 输入数据类型(streamreader)

```
{"value": "1.1.1.1", "type": "string"},
{"value": 19890604.0, "type": "double"},
{"value": 19890604, "type": "long"},
{"value": 19890604, "type": "long"},
{"value": "hello world", "type": "string"},
{"value": "hello world", "type": "string"},
{"value": "41.12,-71.34", "type": "string"},
{"value": "2017-05-25", "type": "string"},
```

#### 4.1.2 输出数据类型(eswriter)

```
{ "name": "col_ip","type": "ip" },
{ "name": "col_double","type": "double" },
{ "name": "col_long","type": "long" },
{ "name": "col_integer","type": "integer" },
{ "name": "col_keyword", "type": "keyword" },
{ "name": "col_text", "type": "text"},
{ "name": "col_geo_point", "type": "geo_point" },
{ "name": "col_date", "type": "date"}
```

#### 4.1.2 机器参数

1. cpu: 32 Intel(R) Xeon(R) CPU E5-2650 v2 @ 2.60GHz
2. mem: 128G
3. net: 千兆双网卡

#### 4.1.3 DataX jvm 参数

-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError

### 4.2 测试报告

| 通道数| 批量提交行数| DataX速度(Rec/s)|DataX流量(MB/s)|
|--------|--------| --------|--------|
| 4| 256| 11013| 0.828|
| 4| 1024| 19417| 1.43|
| 4| 4096| 23923| 1.76|
| 4| 8172| 24449| 1.80|
| 8| 256| 21459| 1.58|
| 8| 1024| 37037| 2.72|
| 8| 4096| 45454| 3.34|
| 8| 8172| 45871| 3.37|
| 16| 1024| 67567| 4.96|
| 16| 4096| 78125| 5.74|
| 16| 8172| 77519| 5.69|
| 32| 1024| 94339| 6.93|
| 32| 4096| 96153| 7.06|
| 64| 1024| 91743| 6.74|

### 4.3 测试总结

* 最好的结果是32通道,每次传4096,如果单条数据很大, 请适当减少批量数,防止oom
* 当然这个很容易水平扩展,而且es也是分布式的,多设置几个shard也可以水平扩展

## 5 约束限制

* 如果导入id,这样数据导入失败也会重试,重新导入也仅仅是覆盖,保证数据一致性
* 如果不导入id,就是append_only模式,elasticsearch自动生成id,速度会提升20%左右,但数据无法修复,适合日志型数据(对数据精度要求不高的)
* 默认值: false
178 changes: 178 additions & 0 deletions obhbasereader/doc/obhbasereader.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
OceanBase的table api为应用提供了ObHBase的访问接口,因此,OceanBase的table api的reader与HBase Reader的结构和配置方法类似。
obhbasereader插件支持sql和hbase api两种读取方式,两种方式存在如下区别:

1. sql方式可以按照分区或者K值进行数据切片,而hbase api方式的数据切片需要用户手动设置。
2. sql方式会将从obhbase读取的kqtv形式的数据转换为单一横行,而hbase api则不做行列转换,直接以kqtv形式将数据传递给下游。
3. sql方式需要配置column属性,hbase api则不需要配置,数据均为固定的kqtv四列。
4. sql方式仅支持获取获得最新或者最旧版本的数据,而hbase api支持获得多版本数据。
#### 脚本配置
```json
{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 104857600
},
"errorLimit": {
"record": 10
}
},
"content": [
{
"reader": {
"name": "obhbasereader",
"parameter": {
"username": "username",
"password": "password",
"encoding": "utf8",
"column": [
{
"name": "f1:column1_1",
"type": "string"
},
{
"name": "f1:column2_2",
"type": "string"
},
{
"name": "f1:column1_1",
"type": "string"
},
{
"name": "f1:column2_2",
"type": "string"
}
],
"range": [
{
"startRowkey": "aaa",
"endRowkey": "ccc",
"isBinaryRowkey": false
},
{
"startRowkey": "eee",
"endRowkey": "zzz",
"isBinaryRowkey": false
}
],
"mode": "normal",
"readByPartition": "true",
"scanCacheSize": "",
"readerHint": "",
"readBatchSize": "1000",
"connection": [
{
"table": [
"htable1",
"htable2"
],
"jdbcUrl": [
"||_dsc_ob10_dsc_||集群:租户||_dsc_ob10_dsc_||jdbc:mysql://ip:port/dbName1"
],
"username": "username",
"password": "password"
},
{
"table": [
"htable1",
"htable2"
],
"jdbcUrl": [
"jdbc:mysql://ip:port/database"
]
}
]
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/Users/xujing/datax/txtfile",
"charset": "UTF-8",
"fieldDelimiter": ",",
"fileName": "hbase",
"nullFormat": "null",
"writeMode": "truncate"
}
}
}
]
}
}
```
##### 参数解释

- **connection**
- 描述:配置分库分表的jdbcUrl和分表名。如果一个分库中有多个分表可以用逗号隔开,也可以写成表名[起始序号-截止序号]
- 必须:是
- 默认值:无
- **jdbcUrl**
- 描述:连接ob使用的jdbc url,支持如下两种格式:
- jdbc:mysql://obproxyIp:obproxyPort/db
- 此格式下username需要写成三段式格式
- ||_dsc_ob10_dsc_||集群名:租户名||_dsc_ob10_dsc_||jdbc:mysql://obproxyIp:obproxyPort/db
- 此格式下username仅填写用户名本身,无需三段式写法

- 必选:是
- 默认值:无
- **table**
- 描述:所选取的需要同步的表。使用JSON的数组描述,因此支持多张表同时抽取。当配置为多张表时,用户自己需保证多张表是同一schema结构,obhbasereader不予检查表是否同一逻辑表。注意,table必须包含在connection配置单元中。
- 必选:是
- 默认值:无
- **readByPartition**
- 描述:使用sql方式读取时,配置****按照分区进行切片。
- 必须:否
- 默认值:false
- **partitionName**
- 描述:使用sql方式读取时,标识仅读取指定分区名的数据,用户需要保证配置的分区名在表结构中真实存在(要求严格大小写)。
- 必须:否
- 默认值:无
- **readBatchSize**
- 描述:使用sql方式读取时,分页大小。
- 必须:否
- 默认值:10w
- **fetchSize**
- 描述:使用sql方式读取时,控制每次读取数据时从结果集中获取的数据行数。
- 必须:否
- 默认值:-2147483648
- **scanCacheSize**
- 描述:使用hbase api读取时,每次rpc从服务器端读取的行数
- 必须:否
- 默认值:256
- **readerHint**
- 描述:obhbasereader使用sql方式读取时使用的hint
- 必须:否
- 默认值:/*+READ_CONSISTENCY(weak),QUERY_TIMEOUT(86400000000)*/
- **column**
- 描述:使用sql方式读取数据时,所配置的表中需要同步的列名集合,使用JSON的数组描述字段信息。
- 支持列裁剪,即列可以挑选部分列进行导出。
```
支持列换序,即列可以不按照表schema信息进行导出,同时支持通配符*,在使用之前需仔细核对列信息。
```

- 必选:sql方式读取时必选
- 默认值:无
- **range**
- 描述****指定hbasereader读取的rowkey范围
- 必须:否
- 默认值:无
- **username**
- 描述:访问OceanBase的用户名
- 必选:是
- 默认值:无
- **mode**
- 描述:读取obhbase的模式,normal 模式,即仅读取一个版本的数据。
- 必选:是
- 默认值:normal
- **version**
- 描述:读取obhbase的版本,当前支持oldest、latest模式,分别表示读取最旧和最新的数据。
- 必须:是
- 默认值:oldest

一些注意点:
注:如果配置了**partitionName**,则无需再配置readByPartition,即便配置了也会忽略readByPartition选项,而是仅会读取指定分区的数据。
注:如果配置了**readByPartition**,任务将仅按照分区切分任务,而不会再按照K值进行切分。如果是非分区表,则整张表会被当作一个任务而不会再切分。



Loading

0 comments on commit 0ec7677

Please sign in to comment.