diff --git a/home/docs/start/greptime-init.md b/home/docs/start/greptime-init.md index d2b73d39068..f4a0ccbdde9 100644 --- a/home/docs/start/greptime-init.md +++ b/home/docs/start/greptime-init.md @@ -8,7 +8,7 @@ Apache HertzBeat (incubating)'s historical data storage relies on the time serie > It is recommended to use VictoriaMetrics as metrics storage. -GreptimeDB is an open-source time-series database with a special focus on scalability, analytical capabilities and efficiency. +[GreptimeDB](https://github.com/GreptimeTeam/greptimedb) is an open-source time-series database with a special focus on scalability, analytical capabilities and efficiency. It's designed to work on infrastructure of the cloud era, and users benefit from its elasticity and commodity storage. @@ -26,21 +26,24 @@ It's designed to work on infrastructure of the cloud era, and users benefit from 2. Install GreptimeDB with Docker ```shell -$ docker run -p 4000-4004:4000-4004 \ - -p 4242:4242 -v /opt/greptimedb:/tmp/greptimedb \ - --name greptime \ - greptime/greptimedb standalone start \ - --http-addr 0.0.0.0:4000 \ - --rpc-addr 0.0.0.0:4001 \ +$ docker run -p 127.0.0.1:4000-4003:4000-4003 \ + -v "$(pwd)/greptimedb:/tmp/greptimedb" \ + --name greptime --rm \ + greptime/greptimedb:latest standalone start \ + --http-addr 0.0.0.0:4000 \ + --rpc-addr 0.0.0.0:4001 \ + --mysql-addr 0.0.0.0:4002 \ + --postgres-addr 0.0.0.0:4003 ``` - `-v /opt/greptimedb:/tmp/greptimedb` is local persistent mount of greptimedb data directory. `/opt/greptimedb` should be replaced with the actual local directory. + + `-v "$(pwd)/greptimedb:/tmp/greptimedb"` is local persistent mount of greptimedb data directory. `$(pwd)/greptimedb` should be replaced with the actual local directory, default is the `greptimedb` directory under the current directory. use```$ docker ps``` to check if the database started successfully ### Configure the database connection in hertzbeat `application.yml` configuration file 1. Configure HertzBeat's configuration file - Modify `hertzbeat/config/application.yml` configuration file - Note⚠️The docker container way need to mount application.yml file locally, while you can use installation package way to unzip and modify `hertzbeat/config/application.yml` + Modify `hertzbeat/config/application.yml` configuration file [/script/application.yml](https://github.com/apache/hertzbeat/raw/master/script/application.yml) + Note⚠️The docker container way need to mount application.yml file locally, while you can use installation package way to unzip and modify `hertzbeat/config/application.yml` Replace `warehouse.store.greptime` data source parameters, URL account and password. ```yaml @@ -52,9 +55,15 @@ warehouse: # enable greptime greptime: enabled: true - endpoint: localhost:4001 + grpc-endpoints: localhost:4001 + url: jdbc:mysql://localhost:4002/hertzbeat?connectionTimeZone=Asia/Shanghai&forceConnectionTimeZoneToSession=true + driver-class-name: com.mysql.cj.jdbc.Driver + username: greptime + password: greptime ``` +The default database is `hertzbeat` in the `url`. + 2. Restart HertzBeat ### FAQ diff --git a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/start/greptime-init.md b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/start/greptime-init.md index 5b3279b1f40..ab99977867e 100644 --- a/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/start/greptime-init.md +++ b/home/i18n/zh-cn/docusaurus-plugin-content-docs/current/start/greptime-init.md @@ -1,14 +1,14 @@ --- id: greptime-init -title: 依赖时序数据库服务GreptimeDB安装初始化(可选) -sidebar_label: 指标数据存储GreptimeDB +title: 依赖时序数据库服务 GreptimeDB 安装初始化(可选) +sidebar_label: 指标数据存储 GreptimeDB --- Apache HertzBeat (incubating) 的历史数据存储依赖时序数据库,任选其一安装初始化即可,也可不安装(注意⚠️但强烈建议生产环境配置) > 我们推荐使用并长期支持 VictoriaMetrics 作为存储。 -GreptimeDB is an open-source time-series database with a special focus on scalability, analytical capabilities and efficiency. +[GreptimeDB](https://github.com/GreptimeTeam/greptimedb) is an open-source time-series database with a special focus on scalability, analytical capabilities and efficiency. It's designed to work on infrastructure of the cloud era, and users benefit from its elasticity and commodity storage. **⚠️ 若不配置时序数据库,则只会留最近一小时历史数据** @@ -25,21 +25,24 @@ It's designed to work on infrastructure of the cloud era, and users benefit from 2. Docker安装GreptimeDB ```shell -$ docker run -p 4000-4004:4000-4004 \ - -p 4242:4242 -v /opt/greptimedb:/tmp/greptimedb \ - --name greptime \ - greptime/greptimedb standalone start \ - --http-addr 0.0.0.0:4000 \ - --rpc-addr 0.0.0.0:4001 +$ docker run -p 127.0.0.1:4000-4003:4000-4003 \ + -v "$(pwd)/greptimedb:/tmp/greptimedb" \ + --name greptime --rm \ + greptime/greptimedb:latest standalone start \ + --http-addr 0.0.0.0:4000 \ + --rpc-addr 0.0.0.0:4001 \ + --mysql-addr 0.0.0.0:4002 \ + --postgres-addr 0.0.0.0:4003 ``` - `-v /opt/greptimedb:/tmp/greptimedb` 为greptimedb数据目录本地持久化挂载,需将`/opt/greptimedb`替换为实际本地存在的目录 + `-v "$(pwd)/greptimedb:/tmp/greptimedb` 为 greptimedb 数据目录本地持久化挂载,需将 `$(pwd)/greptimedb` 替换为实际本地存在的目录,默认使用执行命令的当前目录下的 `greptimedb` 目录作为数据目录。 + 使用```$ docker ps```查看数据库是否启动成功 ### 在hertzbeat的`application.yml`配置文件配置此数据库连接 1. 配置HertzBeat的配置文件 - 修改位于 `hertzbeat/config/application.yml` 的配置文件 + 修改位于 `hertzbeat/config/application.yml` 的配置文件 [/script/application.yml](https://github.com/apache/hertzbeat/raw/master/script/application.yml) 注意⚠️docker容器方式需要将application.yml文件挂载到主机本地,安装包方式解压修改位于 `hertzbeat/config/application.yml` 即可 **修改里面的`warehouse.store.jpa.enabled`参数为`false`, 配置里面的`warehouse.store.greptime`数据源参数,URL账户密码,并启用`enabled`为`true`** @@ -52,9 +55,15 @@ warehouse: enabled: false greptime: enabled: true - endpoint: localhost:4001 + grpc-endpoints: localhost:4001 + url: jdbc:mysql://localhost:4002/hertzbeat?connectionTimeZone=Asia/Shanghai&forceConnectionTimeZoneToSession=true + driver-class-name: com.mysql.cj.jdbc.Driver + username: greptime + password: greptime ``` +默认数据库是 URL 中配置的 `hertzbeat` 。 + 2. 重启 HertzBeat ### 常见问题 diff --git a/manager/src/main/resources/application.yml b/manager/src/main/resources/application.yml index e086e4952de..053008a8154 100644 --- a/manager/src/main/resources/application.yml +++ b/manager/src/main/resources/application.yml @@ -141,7 +141,11 @@ warehouse: password: taosdata greptime: enabled: false - endpoint: localhost:4001 + grpc-endpoints: localhost:4001 + url: jdbc:mysql://localhost:4002/hertzbeat?connectionTimeZone=Asia/Shanghai&forceConnectionTimeZoneToSession=true + driver-class-name: com.mysql.cj.jdbc.Driver + username: greptime + password: greptime iot-db: enabled: false host: 127.0.0.1 diff --git a/warehouse/pom.xml b/warehouse/pom.xml index c89770e3755..0db9386496d 100644 --- a/warehouse/pom.xml +++ b/warehouse/pom.xml @@ -29,7 +29,8 @@ 2.23 3.0.5 3.0.0 - 0.4.0 + 0.7.3 + 8.0.33 4.0.0 @@ -81,10 +82,10 @@ influxdb-java ${influxdb.version} - + io.greptime - greptimedb-protocol + ingester-all ${greptimedb.version} @@ -105,28 +106,11 @@ + - io.greptime - greptimedb-grpc - ${greptimedb.version} - - - io.grpc - grpc-all - - - com.google.code.gson - gson - - - com.google.guava - guava - - - com.google.protobuf - protobuf-java - - + mysql + mysql-connector-java + ${mysql-jdbcdriver.version} diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GrepTimeDbDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GrepTimeDbDataStorage.java deleted file mode 100644 index 4109cf025e9..00000000000 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GrepTimeDbDataStorage.java +++ /dev/null @@ -1,427 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hertzbeat.warehouse.store.history.greptime; - -import io.greptime.GreptimeDB; -import io.greptime.models.ColumnDataType; -import io.greptime.models.Err; -import io.greptime.models.QueryOk; -import io.greptime.models.QueryRequest; -import io.greptime.models.Result; -import io.greptime.models.Row; -import io.greptime.models.SelectExprType; -import io.greptime.models.SelectRows; -import io.greptime.models.SemanticType; -import io.greptime.models.TableName; -import io.greptime.models.TableSchema; -import io.greptime.models.WriteOk; -import io.greptime.models.WriteRows; -import io.greptime.options.GreptimeOptions; -import java.math.BigDecimal; -import java.math.RoundingMode; -import java.time.Duration; -import java.time.ZonedDateTime; -import java.time.temporal.TemporalAmount; -import java.util.Arrays; -import java.util.Calendar; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import lombok.extern.slf4j.Slf4j; -import org.apache.arrow.flight.FlightRuntimeException; -import org.apache.hertzbeat.common.constants.CommonConstants; -import org.apache.hertzbeat.common.entity.dto.Value; -import org.apache.hertzbeat.common.entity.message.CollectRep; -import org.apache.hertzbeat.common.util.JsonUtil; -import org.apache.hertzbeat.common.util.TimePeriodUtil; -import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.stereotype.Component; - -/** - * greptimeDB data storage - */ -@Component -@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled", havingValue = "true") -@Slf4j -public class GrepTimeDbDataStorage extends AbstractHistoryDataStorage { - - /** - * storage database - */ - private static final String STORAGE_DATABASE = "hertzbeat"; - private static final String QUERY_HISTORY_SQL = - "SELECT CAST (ts AS Int64) ts, instance, \"%s\" FROM %s WHERE ts >= %s and monitor_id = %s order by ts desc;"; - private static final String QUERY_HISTORY_WITH_INSTANCE_SQL = - "SELECT CAST (ts AS Int64) ts, instance, \"%s\" FROM %s WHERE ts >= %s and monitor_id = %s and instance = %s order by ts desc;"; - private static final String QUERY_INSTANCE_SQL = - "SELECT DISTINCT instance FROM %s WHERE ts >= now() - interval '1' WEEK"; - private static final String QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL = - "SELECT first, avg ,max, min FROM (SELECT \"%s\" as first FROM %s WHERE monitor_id = %s and ts >= %s" - + " and ts < %s ORDER BY ts LIMIT 1) LEFT JOIN (SELECT avg(\"%s\") as avg, min(\"%s\") as min, max(\"%s\") as max FROM %s WHERE ts >= %s and ts < %s) ON 1=1"; - private static final String TABLE_NOT_EXIST = "not exist"; - private static final String DATABASE_NOT_EXIST = "not exist"; - private GreptimeDB greptimeDb; - - public GrepTimeDbDataStorage(GreptimeProperties greptimeProperties) { - this.serverAvailable = this.initDbSession(greptimeProperties); - } - - private boolean initDbSession(GreptimeProperties properties) { - String endpoint = properties.endpoint(); - GreptimeOptions opts = GreptimeOptions.newBuilder(endpoint) - .writeMaxRetries(1) - .readMaxRetries(2) - .routeTableRefreshPeriodSeconds(-1) - .build(); - greptimeDb = new GreptimeDB(); - if (!greptimeDb.init(opts)) { - log.error("Fail to start Greptime client"); - return false; - } - return createDatabase(); - } - - /** - * Checks if the database exists; if not, creates the Database. - */ - private boolean createDatabase() { - // 查询现有数据库 - QueryRequest showDatabases = QueryRequest.newBuilder() - .exprType(SelectExprType.Sql) - .ql("SHOW DATABASES;") - .build(); - Result result = null; - try { - CompletableFuture> future = greptimeDb.query(showDatabases); - result = future.get(); - } catch (Exception e) { - log.info("TABLE_NOT_EXIST: {}", e.getMessage()); - String msg = e.getMessage(); - if (msg != null && !msg.contains(DATABASE_NOT_EXIST)) { - log.warn(msg); - } - - } - // Check if the existing database includes“Hertzbeat” - boolean isDatabaseExist = false; - if (result != null && result.isOk()) { - QueryOk queryOk = result.getOk(); - SelectRows rows = queryOk.getRows(); - List rowsList = rows.collect(); - for (Row row : rowsList) { - for (io.greptime.models.Value value : row.values()) { - if (STORAGE_DATABASE.equals(value.value().toString())) { - log.info("Exist Database {}", STORAGE_DATABASE); - isDatabaseExist = true; - break; - } - } - } - } - // If it does not exist, create database - if (!isDatabaseExist) { - QueryRequest createDatabase = QueryRequest.newBuilder() - .exprType(SelectExprType.Sql) - .ql("CREATE DATABASE %s;", STORAGE_DATABASE) - .build(); - try { - CompletableFuture> createFuture = greptimeDb.query(createDatabase); - isDatabaseExist = createFuture.get().isOk(); - log.info("Database {} does not exist,and has been created", STORAGE_DATABASE); - } catch (InterruptedException | ExecutionException e) { - log.error("Error creating database"); - } - } - return isDatabaseExist; - } - - @Override - public void saveData(CollectRep.MetricsData metricsData) { - if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) { - return; - } - if (metricsData.getValuesList().isEmpty()) { - log.info("[warehouse greptime] flush metrics data {} is null, ignore.", metricsData.getId()); - return; - } - String monitorId = String.valueOf(metricsData.getId()); - String table = metricsData.getApp() + "_" + metricsData.getMetrics(); - TableSchema.Builder tableSchemaBuilder = TableSchema.newBuilder(TableName.with(STORAGE_DATABASE, table)); - - List semanticTypes = new LinkedList<>(Arrays.asList(SemanticType.Tag, SemanticType.Tag, SemanticType.Timestamp)); - List dataTypes = new LinkedList<>(Arrays.asList(ColumnDataType.String, ColumnDataType.String, ColumnDataType.TimestampMillisecond)); - List columnNames = new LinkedList<>(Arrays.asList("monitor_id", "instance", "ts")); - - List fieldsList = metricsData.getFieldsList(); - for (CollectRep.Field field : fieldsList) { - semanticTypes.add(SemanticType.Field); - columnNames.add(field.getName()); - // handle field type - if (field.getType() == CommonConstants.TYPE_NUMBER) { - dataTypes.add(ColumnDataType.Float64); - } else if (field.getType() == CommonConstants.TYPE_STRING) { - dataTypes.add(ColumnDataType.String); - } - } - tableSchemaBuilder.semanticTypes(semanticTypes.toArray(new SemanticType[0])); - tableSchemaBuilder.dataTypes(dataTypes.toArray(new ColumnDataType[0])); - tableSchemaBuilder.columnNames(columnNames.toArray(new String[0])); - WriteRows rows = WriteRows.newBuilder(tableSchemaBuilder.build()).build(); - try { - long now = System.currentTimeMillis(); - Object[] values = new Object[3 + fieldsList.size()]; - values[0] = monitorId; - values[2] = now; - for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { - Map labels = new HashMap<>(8); - for (int i = 0; i < fieldsList.size(); i++) { - if (!CommonConstants.NULL_VALUE.equals(valueRow.getColumns(i))) { - CollectRep.Field field = fieldsList.get(i); - if (field.getType() == CommonConstants.TYPE_NUMBER) { - values[3 + i] = Double.parseDouble(valueRow.getColumns(i)); - } else if (field.getType() == CommonConstants.TYPE_STRING) { - values[3 + i] = valueRow.getColumns(i); - } - if (field.getLabel()) { - labels.put(field.getName(), String.valueOf(values[3 + i])); - } - } else { - values[3 + i] = null; - } - } - values[1] = JsonUtil.toJson(labels); - rows.insert(values); - } - rows.finish(); - CompletableFuture> writeFuture = greptimeDb.write(rows); - try { - Result result = writeFuture.get(10, TimeUnit.SECONDS); - if (result.isOk()) { - log.debug("[warehouse greptime]-Write successful"); - } else { - log.warn("[warehouse greptime]--Write failed: {}", result.getErr().getFailedQl()); - } - } catch (Throwable throwable) { - log.error("[warehouse greptime]--Error occurred: {}", throwable.getMessage()); - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } - - @Override - public Map> getHistoryMetricData(Long monitorId, String app, String metrics, String metric, - String label, String history) { - Map> instanceValuesMap = new HashMap<>(8); - if (!isServerAvailable()) { - log.error("\n\t---------------Greptime Init Failed---------------\n" - + "\t--------------Please Config Greptime--------------\n" - + "\t----------Can Not Use Metric History Now----------\n"); - return instanceValuesMap; - } - long expireTime = getExpireTimeFromToken(history); - String table = app + "_" + metrics; - String selectSql = label == null - ? String.format(QUERY_HISTORY_SQL, metric, table, expireTime, monitorId) - : String.format(QUERY_HISTORY_WITH_INSTANCE_SQL, metric, table, expireTime, monitorId, label); - log.debug("selectSql: {}", selectSql); - QueryRequest request = QueryRequest.newBuilder() - .exprType(SelectExprType.Sql) - .databaseName(STORAGE_DATABASE) - .ql(selectSql) - .build(); - try { - CompletableFuture> future = greptimeDb.query(request); - Result result = future.get(); - if (result != null && result.isOk()) { - QueryOk queryOk = result.getOk(); - SelectRows rows = queryOk.getRows(); - List> maps = rows.collectToMaps(); - List valueList; - for (Map map : maps) { - String instanceValue = map.get("instance") == null ? "" : map.get("instance").toString(); - Object valueObj = map.get(metric); - if (valueObj == null) { - continue; - } - String strValue = new BigDecimal(valueObj.toString()).setScale(4, RoundingMode.HALF_UP).stripTrailingZeros().toPlainString(); - valueList = instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>()); - valueList.add(new Value(strValue, (long) map.get("ts"))); - } - } - } catch (FlightRuntimeException e) { - String msg = e.getMessage(); - if (msg != null && msg.contains(TABLE_NOT_EXIST)) { - List valueList = instanceValuesMap.computeIfAbsent(metric, k -> new LinkedList<>()); - valueList.add(new Value(null, System.currentTimeMillis())); - log.info("[warehouse greptime]-TABLE_NOT_EXIST: {}", table); - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - return instanceValuesMap; - } - - private long getExpireTimeFromToken(String history) { - long expireTime; - try { - TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(history); - ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount); - expireTime = dateTime.toEpochSecond() * 1000L; - } catch (Exception e) { - log.error("parse history time error: {}. use default: 6h", e.getMessage()); - ZonedDateTime dateTime = ZonedDateTime.now().minus(Duration.ofHours(6)); - expireTime = dateTime.toEpochSecond() * 1000L; - } - return expireTime; - } - - @Override - public Map> getHistoryIntervalMetricData(Long monitorId, String app, String metrics, - String metric, String label, String history) { - Map> instanceValuesMap = new HashMap<>(8); - if (!isServerAvailable()) { - log.error("\n\t---------------Greptime Init Failed---------------\n" - + "\t--------------Please Config Greptime--------------\n" - + "\t----------Can Not Use Metric History Now----------\n"); - return instanceValuesMap; - } - String table = app + "_" + metrics; - List instances = new LinkedList<>(); - if (label != null) { - instances.add(label); - } - if (instances.isEmpty()) { - String selectSql = String.format(QUERY_INSTANCE_SQL, table); - log.debug("selectSql: {}", selectSql); - QueryRequest request = QueryRequest.newBuilder() - .exprType(SelectExprType.Sql) - .databaseName(STORAGE_DATABASE) - .ql(selectSql) - .build(); - try { - CompletableFuture> future = greptimeDb.query(request); - Result result = future.get(); - if (result != null && result.isOk()) { - QueryOk queryOk = result.getOk(); - SelectRows rows = queryOk.getRows(); - while (rows.hasNext()) { - Row row = rows.next(); - if (row != null) { - List values = row.values(); - for (io.greptime.models.Value value : values) { - log.debug("value:{}", value.value()); - Object instanceValue = value.value(); - if (instanceValue == null || "".equals(instanceValue)) { - instances.add("''"); - } else { - instances.add(instanceValue.toString()); - } - } - } - - } - } - } catch (FlightRuntimeException e) { - String msg = e.getMessage(); - if (msg != null && msg.contains(TABLE_NOT_EXIST)) { - log.info("[warehouse greptime]-TABLE_NOT_EXIST: {}", table); - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } - // TODO 'greptime' did not find the proper SQL function processing, temporarily using code implementation, future 'greptime' update documents using SQL implementation - long endTime; - long startTime = getExpireTimeFromToken(history); - - Calendar cal = Calendar.getInstance(); - - long interval = System.currentTimeMillis() - startTime; - long fourHourCount = TimeUnit.MILLISECONDS.toHours(interval) / 4; - for (int i = 0; i < fourHourCount; i++) { - cal.clear(); - cal.setTimeInMillis(startTime); - cal.add(Calendar.HOUR_OF_DAY, 4); - endTime = cal.getTimeInMillis(); - - for (String instanceValue : instances) { - String selectSql = String.format(QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL, metric, table, monitorId, startTime, endTime, metric, metric, metric, table, startTime, endTime); - - log.debug("selectSql: {}", selectSql); - QueryRequest request = QueryRequest.newBuilder() - .exprType(SelectExprType.Sql) - .databaseName(STORAGE_DATABASE) - .ql(selectSql) - .build(); - List values = instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>()); - try { - CompletableFuture> future = greptimeDb.query(request); - Result result = future.get(); - log.debug("result:{}", result); - if (result != null && result.isOk()) { - QueryOk queryOk = result.getOk(); - SelectRows rows = queryOk.getRows(); - String[] col = new String[4]; - while (rows.hasNext()) { - Row row = rows.next(); - if (!row.values().isEmpty()) { - for (int j = 0; j < row.values().size(); j++) { - log.debug("value:{}", row.values().get(j)); - String colStr = new BigDecimal(row.values().get(j).value().toString()).setScale(4, RoundingMode.HALF_UP).stripTrailingZeros().toPlainString(); - col[j] = colStr; - } - Value valueBuild = Value.builder() - .origin(col[0]).mean(col[1]) - .min(col[2]).max(col[3]) - .time(System.currentTimeMillis()) - .build(); - values.add(valueBuild); - } - } - log.debug("[warehouse greptime] values:{}", values); - } - } catch (FlightRuntimeException e) { - String msg = e.getMessage(); - if (msg != null && msg.contains(TABLE_NOT_EXIST)) { - List valueList = instanceValuesMap.computeIfAbsent(metric, k -> new LinkedList<>()); - valueList.add(new Value(null, System.currentTimeMillis())); - log.info("[warehouse greptime]-TABLE_NOT_EXIST: {}", table); - } - } catch (Exception e) { - log.error(e.getMessage(), e); - } - } - startTime = endTime; - } - - return instanceValuesMap; - } - - @Override - public void destroy() { - if (this.greptimeDb != null) { - this.greptimeDb.shutdownGracefully(); - } - } -} diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeDbDataStorage.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeDbDataStorage.java new file mode 100644 index 00000000000..a44417121c0 --- /dev/null +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeDbDataStorage.java @@ -0,0 +1,423 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hertzbeat.warehouse.store.history.greptime; + +import com.mysql.cj.jdbc.Driver; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import io.greptime.GreptimeDB; +import io.greptime.models.AuthInfo; +import io.greptime.models.DataType; +import io.greptime.models.Err; +import io.greptime.models.Result; +import io.greptime.models.Table; +import io.greptime.models.TableSchema; +import io.greptime.models.WriteOk; +import io.greptime.options.GreptimeOptions; +import java.math.BigDecimal; +import java.math.RoundingMode; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.DriverPropertyInfo; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.ObjectUtils; +import org.apache.hertzbeat.common.constants.CommonConstants; +import org.apache.hertzbeat.common.entity.dto.Value; +import org.apache.hertzbeat.common.entity.message.CollectRep; +import org.apache.hertzbeat.common.util.JsonUtil; +import org.apache.hertzbeat.warehouse.store.history.AbstractHistoryDataStorage; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +/** + * GreptimeDB data storage, only supports GreptimeDB version >= v0.5 + * + */ +@Component +@ConditionalOnProperty(prefix = "warehouse.store.greptime", name = "enabled", havingValue = "true") +@Slf4j +public class GreptimeDbDataStorage extends AbstractHistoryDataStorage { + + private static final String QUERY_HISTORY_SQL = "SELECT CAST (ts AS Int64) ts, instance, `%s` FROM `%s` WHERE ts >= now() - interval '%s' and monitor_id = %s order by ts desc;"; + + @SuppressWarnings("checkstyle:LineLength") + private static final String QUERY_HISTORY_WITH_INSTANCE_SQL = "SELECT CAST (ts AS Int64) ts, instance, `%s` FROM `%s` WHERE ts >= now() - interval '%s' and monitor_id = %s and instance = '%s' order by ts desc;"; + + private static final String QUERY_INSTANCE_SQL = "SELECT DISTINCT instance FROM `%s` WHERE ts >= now() - interval '1 WEEK'"; + + @SuppressWarnings("checkstyle:LineLength") + private static final String QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL = "SELECT CAST (ts AS Int64) ts, first_value(`%s`) range '4h' first, avg(`%s`) range '4h' avg, min(`%s`) range '4h' min, max(`%s`) range '4h' max FROM `%s` WHERE instance = '%s' AND ts >= now() - interval '%s' ALIGN '4h'"; + + private static final String TABLE_NOT_EXIST = "not found"; + + private static final String CONSTANTS_CREATE_DATABASE = "CREATE DATABASE IF NOT EXISTS `%s`"; + + private static final Runnable INSTANCE_EXCEPTION_PRINT = () -> { + if (log.isErrorEnabled()) { + log.error(""" + \t---------------GreptimeDB Init Failed--------------- + \t--------------Please Config GreptimeDB-------------- + t-----------Can Not Use Metric History Now----------- + """); + } + }; + + private HikariDataSource hikariDataSource; + + private GreptimeDB greptimeDb; + + public GreptimeDbDataStorage(GreptimeProperties greptimeProperties) { + if (greptimeProperties == null) { + log.error("init error, please config Warehouse GreptimeDB props in application.yml"); + throw new IllegalArgumentException("please config Warehouse GreptimeDB props"); + } + + serverAvailable = initGreptimeDbClient(greptimeProperties) && initGreptimeDbDataSource(greptimeProperties); + } + + private void initGreptimeDb(final GreptimeProperties greptimeProperties) throws SQLException { + final DriverPropertyInfo[] properties = new Driver().getPropertyInfo(greptimeProperties.url(), null); + final String host = ObjectUtils.requireNonEmpty(properties[0].value); + final String port = ObjectUtils.requireNonEmpty(properties[1].value); + final String dbName = ObjectUtils.requireNonEmpty(properties[2].value); + + try (final Connection tempConnection = DriverManager.getConnection("jdbc:mysql://" + host + ":" + port, + greptimeProperties.username(), greptimeProperties.password()); + final PreparedStatement pstmt = tempConnection + .prepareStatement(String.format(CONSTANTS_CREATE_DATABASE, dbName))) { + log.info("[warehouse greptime] try to create database `{}` if not exists", dbName); + pstmt.execute(); + } + } + + private boolean initGreptimeDbClient(GreptimeProperties greptimeProperties) { + String endpoints = greptimeProperties.grpcEndpoints(); + try { + final DriverPropertyInfo[] properties = new Driver().getPropertyInfo(greptimeProperties.url(), null); + final String dbName = ObjectUtils.requireNonEmpty(properties[2].value); + + GreptimeOptions opts = GreptimeOptions.newBuilder(endpoints.split(","), dbName) // + .writeMaxRetries(3) // + .authInfo(new AuthInfo(greptimeProperties.username(), greptimeProperties.password())) + .routeTableRefreshPeriodSeconds(30) // + .build(); + + this.greptimeDb = GreptimeDB.create(opts); + } catch (Exception e) { + log.error("[warehouse greptime] Fail to start GreptimeDB client"); + return false; + } + + return true; + } + + private boolean initGreptimeDbDataSource(final GreptimeProperties greptimeProperties) { + try { + initGreptimeDb(greptimeProperties); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error(e.getMessage(), e); + } + + INSTANCE_EXCEPTION_PRINT.run(); + return false; + } + + final HikariConfig config = new HikariConfig(); + // jdbc properties + config.setJdbcUrl(greptimeProperties.url()); + config.setUsername(greptimeProperties.username()); + config.setPassword(greptimeProperties.password()); + config.setDriverClassName(greptimeProperties.driverClassName()); + // minimum number of idle connection + config.setMinimumIdle(10); + // maximum number of connection in the pool + config.setMaximumPoolSize(10); + // maximum wait milliseconds for get connection from pool + config.setConnectionTimeout(30000); + // maximum lifetime for each connection + config.setMaxLifetime(0); + // max idle time for recycle idle connection + config.setIdleTimeout(0); + // validation query + config.setConnectionTestQuery("select 1"); + try { + this.hikariDataSource = new HikariDataSource(config); + } catch (Exception e) { + INSTANCE_EXCEPTION_PRINT.run(); + return false; + } + return true; + } + + @Override + public void saveData(CollectRep.MetricsData metricsData) { + if (!isServerAvailable() || metricsData.getCode() != CollectRep.Code.SUCCESS) { + return; + } + if (metricsData.getValuesList().isEmpty()) { + log.info("[warehouse greptime] flush metrics data {} is null, ignore.", metricsData.getId()); + return; + } + String monitorId = String.valueOf(metricsData.getId()); + String tableName = getTableName(metricsData.getApp(), metricsData.getMetrics()); + TableSchema.Builder tableSchemaBuilder = TableSchema.newBuilder(tableName); + + tableSchemaBuilder.addTag("monitor_id", DataType.String) // + .addTag("instance", DataType.String) // + .addTimestamp("ts", DataType.TimestampMillisecond); + + List fieldsList = metricsData.getFieldsList(); + for (CollectRep.Field field : fieldsList) { + // handle field type + if (field.getType() == CommonConstants.TYPE_NUMBER) { + tableSchemaBuilder.addField(field.getName(), DataType.Float64); + } else if (field.getType() == CommonConstants.TYPE_STRING) { + tableSchemaBuilder.addField(field.getName(), DataType.String); + } + } + Table table = Table.from(tableSchemaBuilder.build()); + + try { + long now = System.currentTimeMillis(); + Object[] values = new Object[3 + fieldsList.size()]; + values[0] = monitorId; + values[2] = now; + for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { + Map labels = new HashMap<>(8); + for (int i = 0; i < fieldsList.size(); i++) { + if (!CommonConstants.NULL_VALUE.equals(valueRow.getColumns(i))) { + CollectRep.Field field = fieldsList.get(i); + if (field.getType() == CommonConstants.TYPE_NUMBER) { + values[3 + i] = Double.parseDouble(valueRow.getColumns(i)); + } else if (field.getType() == CommonConstants.TYPE_STRING) { + values[3 + i] = valueRow.getColumns(i); + } + if (field.getLabel()) { + labels.put(field.getName(), String.valueOf(values[3 + i])); + } + } else { + values[3 + i] = null; + } + } + values[1] = JsonUtil.toJson(labels); + table.addRow(values); + } + + CompletableFuture> writeFuture = greptimeDb.write(table); + try { + Result result = writeFuture.get(10, TimeUnit.SECONDS); + if (result.isOk()) { + log.debug("[warehouse greptime]-Write successful"); + } else { + log.warn("[warehouse greptime]--Write failed: {}", result.getErr()); + } + } catch (Throwable throwable) { + log.error("[warehouse greptime]--Error occurred: {}", throwable.getMessage()); + } + } catch (Exception e) { + log.error("[warehouse greptime]--Error: {}", e.getMessage(), e); + } + } + + @Override + public Map> getHistoryMetricData(Long monitorId, String app, String metrics, String metric, + String label, String history) { + Map> instanceValuesMap = new HashMap<>(8); + if (!isServerAvailable()) { + INSTANCE_EXCEPTION_PRINT.run(); + return instanceValuesMap; + } + + String table = getTableName(app, metrics); + + String interval = history2interval(history); + String selectSql = label == null ? String.format(QUERY_HISTORY_SQL, metric, table, interval, monitorId) + : String.format(QUERY_HISTORY_WITH_INSTANCE_SQL, metric, table, interval, monitorId, label); + + if (log.isDebugEnabled()) { + log.debug("[warehouse greptime] getHistoryMetricData SQL: {}", selectSql); + } + + try (Connection connection = hikariDataSource.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(selectSql)) { + while (resultSet.next()) { + long ts = resultSet.getLong(1); + if (ts == 0) { + if (log.isErrorEnabled()) { + log.error("[warehouse greptime] getHistoryMetricData query result timestamp is 0, ignore. {}.", + selectSql); + } + continue; + } + String instanceValue = resultSet.getString(2); + if (instanceValue == null || "".equals(instanceValue)) { + instanceValue = ""; + } + double value = resultSet.getDouble(3); + String strValue = double2decimalString(value); + + List valueList = instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>()); + valueList.add(new Value(strValue, ts)); + } + return instanceValuesMap; + } catch (SQLException sqlException) { + String msg = sqlException.getMessage(); + if (msg != null && !msg.contains(TABLE_NOT_EXIST)) { + if (log.isWarnEnabled()) { + log.warn("[warehouse greptime] failed to getHistoryMetricData: " + sqlException.getMessage()); + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("[warehouse greptime] failed to getHistoryMetricData:" + e.getMessage(), e); + } + } + return instanceValuesMap; + } + + private String getTableName(String app, String metrics) { + return app + "_" + metrics; + } + + @Override + public Map> getHistoryIntervalMetricData(Long monitorId, String app, String metrics, + String metric, String label, String history) { + if (!isServerAvailable()) { + INSTANCE_EXCEPTION_PRINT.run(); + return Collections.emptyMap(); + } + String table = getTableName(app, metrics); + List instances = new LinkedList<>(); + if (label != null && !"".equals(label)) { + instances.add(label); + } + if (instances.isEmpty()) { + String selectSql = String.format(QUERY_INSTANCE_SQL, table); + if (log.isDebugEnabled()) { + log.debug("[warehouse greptime] getHistoryIntervalMetricData sql: {}", selectSql); + } + + try (Connection connection = hikariDataSource.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(selectSql)) { + while (resultSet.next()) { + String instanceValue = resultSet.getString(1); + if (instanceValue == null || "".equals(instanceValue)) { + instances.add("''"); + } else { + instances.add(instanceValue); + } + } + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("[warehouse greptime] failed to query instances" + e.getMessage(), e); + } + } + } + + Map> instanceValuesMap = new HashMap<>(instances.size()); + for (String instanceValue : instances) { + String selectSql = String.format(QUERY_HISTORY_INTERVAL_WITH_INSTANCE_SQL, metric, metric, metric, metric, + table, instanceValue, history2interval(history)); + + if (log.isDebugEnabled()) { + log.debug("[warehouse greptime] getHistoryIntervalMetricData sql: {}", selectSql); + } + + List values = instanceValuesMap.computeIfAbsent(instanceValue, k -> new LinkedList<>()); + try (Connection connection = hikariDataSource.getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(selectSql);) { + while (resultSet.next()) { + long ts = resultSet.getLong(1); + if (ts == 0) { + if (log.isErrorEnabled()) { + log.error( + "[warehouse greptime] getHistoryIntervalMetricData query result timestamp is 0, ignore. {}.", + selectSql); + } + continue; + } + double origin = resultSet.getDouble(2); + String originStr = double2decimalString(origin); + double avg = resultSet.getDouble(3); + String avgStr = double2decimalString(avg); + double min = resultSet.getDouble(4); + String minStr = double2decimalString(min); + double max = resultSet.getDouble(5); + String maxStr = double2decimalString(max); + Value value = Value.builder().origin(originStr).mean(avgStr).min(minStr).max(maxStr).time(ts) + .build(); + values.add(value); + } + resultSet.close(); + } catch (Exception e) { + if (log.isErrorEnabled()) { + log.error("[warehouse greptime] failed to getHistoryIntervalMetricData: " + e.getMessage(), e); + } + } + } + return instanceValuesMap; + } + + // TODO(dennis): we can remove it when + // https://github.com/GreptimeTeam/greptimedb/issues/4168 is fixed. + // default 6h-6 hours: s-seconds, M-minutes, h-hours, d-days, w-weeks + private String history2interval(String history) { + if (history == null) { + return null; + } + history = history.trim().toLowerCase(); + + // Be careful, the order matters. + return history.replaceAll("d", " day") // + .replaceAll("s", " second") // + .replaceAll("w", " week") // + .replaceAll("h", " hour")// + .replaceAll("m", " minute"); + } + + private String double2decimalString(double d) { + return BigDecimal.valueOf(d).setScale(4, RoundingMode.HALF_UP).stripTrailingZeros().toPlainString(); + } + + @Override + public void destroy() { + if (this.greptimeDb != null) { + this.greptimeDb.shutdownGracefully(); + this.greptimeDb = null; + } + if (this.hikariDataSource != null) { + this.hikariDataSource.close(); + hikariDataSource = null; + } + } +} diff --git a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeProperties.java b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeProperties.java index 0561af78d9c..ac8e29f9f36 100644 --- a/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeProperties.java +++ b/warehouse/src/main/java/org/apache/hertzbeat/warehouse/store/history/greptime/GreptimeProperties.java @@ -25,7 +25,7 @@ */ @ConfigurationProperties(prefix = "warehouse.store.greptime") public record GreptimeProperties(@DefaultValue("false") boolean enabled, - @DefaultValue("127.0.0.1:4001") String endpoint, - String username, - String password) { + @DefaultValue("127.0.0.1:4001") String grpcEndpoints, + @DefaultValue("jdbc:mysql://127.0.0.1:4002/hertzbeat?connectionTimeZone=Asia/Shanghai&forceConnectionTimeZoneToSession=true") String url, + @DefaultValue("com.mysql.cj.jdbc.Driver") String driverClassName, String username, String password) { }