diff --git a/docker/docker-compose/docker-compose.yml b/docker/docker-compose/docker-compose.yml index 1e894b00b85..2d4648654ff 100644 --- a/docker/docker-compose/docker-compose.yml +++ b/docker/docker-compose/docker-compose.yml @@ -129,6 +129,7 @@ services: - | FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager + - OTEL_EXPORTER_ENDPOINT=logcollector:4317 ports: - "8081:8081" command: jobmanager @@ -142,4 +143,53 @@ services: FLINK_PROPERTIES= jobmanager.rpc.address: jobmanager taskmanager.numberOfTaskSlots: 2 + - OTEL_EXPORTER_ENDPOINT=logcollector:4317 command: taskmanager + + # opentelemetry collector + logcollector: + image: otel/opentelemetry-collector-contrib:0.110.0 + container_name: logcollector + volumes: + - ./log-system/otel-config.yaml:/otel-config.yaml + command: [ "--config=/otel-config.yaml"] + ports: + - "4317:4317" + + # grafana loki + loki: + image: grafana/loki:3.0.0 + ports: + - "3100:3100" + volumes: + - ./log-system/loki.yaml:/etc/loki/local-config.yaml + command: -config.file=/etc/loki/local-config.yaml + + # grafana + grafana: + environment: + - GF_PATHS_PROVISIONING=/etc/grafana/provisioning + - GF_AUTH_ANONYMOUS_ENABLED=true + - GF_AUTH_ANONYMOUS_ORG_ROLE=Admin + entrypoint: + - sh + - -euc + - | + mkdir -p /etc/grafana/provisioning/datasources + cat < /etc/grafana/provisioning/datasources/ds.yaml + apiVersion: 1 + datasources: + - name: Loki + type: loki + access: proxy + orgId: 1 + url: http://loki:3100 + basicAuth: false + isDefault: true + version: 1 + editable: false + EOF + /run.sh + image: grafana/grafana:latest + ports: + - "3000:3000" \ No newline at end of file diff --git a/docker/docker-compose/log-system/loki.yaml b/docker/docker-compose/log-system/loki.yaml new file mode 100644 index 00000000000..746f8baac55 --- /dev/null +++ b/docker/docker-compose/log-system/loki.yaml @@ -0,0 +1,58 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +auth_enabled: false + +limits_config: + allow_structured_metadata: true + volume_enabled: true + otlp_config: + resource_attributes: + attributes_config: + - action: index_label + attributes: + - level +server: + http_listen_port: 3100 + +common: + ring: + instance_addr: 0.0.0.0 + kvstore: + store: inmemory + replication_factor: 1 + path_prefix: /tmp/loki + +schema_config: + configs: + - from: 2020-05-15 + store: tsdb + object_store: filesystem + schema: v13 + index: + prefix: index_ + period: 24h + +storage_config: + tsdb_shipper: + active_index_directory: /tmp/loki/index + cache_location: /tmp/loki/index_cache + filesystem: + directory: /tmp/loki/chunks + +pattern_ingester: + enabled: true diff --git a/docker/docker-compose/log-system/otel-config.yaml b/docker/docker-compose/log-system/otel-config.yaml new file mode 100644 index 00000000000..6942f11e0f9 --- /dev/null +++ b/docker/docker-compose/log-system/otel-config.yaml @@ -0,0 +1,39 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +receivers: + otlp: + protocols: + grpc: + endpoint: logcollector:4317 +processors: + batch: + +exporters: + logging: + verbosity: detailed + otlphttp: + endpoint: http://loki:3100/otlp + tls: + insecure: true + +service: + pipelines: + logs: + receivers: [otlp] + processors: [batch] + exporters: [otlphttp, logging] \ No newline at end of file diff --git a/docker/docker-compose/otel-config.yaml b/docker/docker-compose/otel-config.yaml new file mode 100644 index 00000000000..8cbd40321d9 --- /dev/null +++ b/docker/docker-compose/otel-config.yaml @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +receivers: + otlp: + protocols: + grpc: + endpoint: logcollector:4317 +exporters: + logging: + verbosity: detailed +service: + pipelines: + logs: + receivers: [otlp] + exporters: [logging] \ No newline at end of file diff --git a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java index 82c88cf5f7b..54dfefcf11e 100644 --- a/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java +++ b/inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/util/OpenTelemetryLogger.java @@ -38,6 +38,9 @@ import java.nio.charset.StandardCharsets; +/** + * OpenTelemetryLogger to collect logs and send to OpenTelemetry + */ public class OpenTelemetryLogger { private OpenTelemetrySdk SDK; // OpenTelemetry SDK @@ -50,33 +53,98 @@ public class OpenTelemetryLogger { private final Level logLevel; // Log4j Log Level + private final String localHostIp; // Local Host IP + private static final Logger LOG = LoggerFactory.getLogger(OpenTelemetryLogger.class); - public OpenTelemetryLogger() { - // Default Service Name - serviceName = "inlong-sort-connector"; - // Get OpenTelemetry Exporter Endpoint from Environment Variable - if (System.getenv("OTEL_EXPORTER_ENDPOINT") != null) { - endpoint = System.getenv("OTEL_EXPORTER_ENDPOINT"); - } else { - endpoint = "localhost:4317"; - } - // Default Log4j Layout - this.layout = PatternLayout.newBuilder() - .withPattern("%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n") - .withCharset(StandardCharsets.UTF_8) - .build(); - // Default Log4j Log Level - this.logLevel = Level.INFO; + public OpenTelemetryLogger(Builder builder) { + this.serviceName = builder.serviceName; + this.endpoint = builder.endpoint; + this.layout = builder.layout; + this.logLevel = builder.logLevel; + this.localHostIp = builder.localHostIp; } - public OpenTelemetryLogger(String serviceName, String endpoint, Layout layout, Level logLevel) { + public OpenTelemetryLogger(String serviceName, String endpoint, Layout layout, Level logLevel, + String localHostIp) { this.serviceName = serviceName; this.endpoint = endpoint; this.layout = layout; this.logLevel = logLevel; + this.localHostIp = localHostIp; + } + + /** + * OpenTelemetryLogger Builder + */ + public static final class Builder { + + private String endpoint; // OpenTelemetry Exporter Endpoint + + private String serviceName; // OpenTelemetry Service Name + + private Layout layout; // Log4j Layout + + private Level logLevel; // Log4j Log Level + + private String localHostIp; + + public Builder() { + } + + public Builder setServiceName(String serviceName) { + this.serviceName = serviceName; + return this; + } + + public Builder setLayout(Layout layout) { + this.layout = layout; + return this; + } + + public Builder setEndpoint(String endpoint) { + this.endpoint = endpoint; + return this; + } + + public Builder setLogLevel(Level logLevel) { + this.logLevel = logLevel; + return this; + } + + public Builder setLocalHostIp(String localHostIp) { + this.localHostIp = localHostIp; + return this; + } + + public OpenTelemetryLogger build() { + if (this.serviceName == null) { + this.serviceName = "unnamed_service"; + } + if (this.endpoint == null) { + if (System.getenv("OTEL_EXPORTER_ENDPOINT") != null) { + this.endpoint = System.getenv("OTEL_EXPORTER_ENDPOINT"); + } else { + this.endpoint = "localhost:4317"; + } + } + if (this.layout == null) { + this.layout = PatternLayout.newBuilder() + .withPattern("%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n") + .withCharset(StandardCharsets.UTF_8) + .build(); + } + if (this.logLevel == null) { + this.logLevel = Level.INFO; + } + return new OpenTelemetryLogger(this); + } + } + /** + * Create OpenTelemetry SDK with OpenTelemetry Exporter + */ private void createOpenTelemetrySdk() { // Create OpenTelemetry SDK OpenTelemetrySdkBuilder sdkBuilder = OpenTelemetrySdk.builder(); @@ -84,7 +152,9 @@ private void createOpenTelemetrySdk() { SdkLoggerProviderBuilder loggerProviderBuilder = SdkLoggerProvider.builder(); // get Resource Resource resource = Resource.getDefault().toBuilder() + .put(ResourceAttributes.SERVICE_NAMESPACE, "inlong_sort") .put(ResourceAttributes.SERVICE_NAME, this.serviceName) + .put(ResourceAttributes.HOST_NAME, this.localHostIp) .build(); // set Resource loggerProviderBuilder.setResource(resource); @@ -102,7 +172,10 @@ private void createOpenTelemetrySdk() { SDK = sdkBuilder.build(); } - public void addOpenTelemetryAppender() { + /** + * Add OpenTelemetryAppender to Log4j + */ + private void addOpenTelemetryAppender() { org.apache.logging.log4j.spi.LoggerContext context = LogManager.getContext(false); LoggerContext loggerContext = (LoggerContext) context; Configuration config = loggerContext.getConfiguration(); @@ -122,7 +195,10 @@ public void addOpenTelemetryAppender() { loggerContext.updateLoggers(); } - public void removeOpenTelemetryAppender() { + /** + * Remove OpenTelemetryAppender from Log4j + */ + private void removeOpenTelemetryAppender() { org.apache.logging.log4j.spi.LoggerContext context = LogManager.getContext(false); LoggerContext loggerContext = (LoggerContext) context; Configuration config = loggerContext.getConfiguration(); @@ -137,6 +213,9 @@ public void removeOpenTelemetryAppender() { loggerContext.updateLoggers(); } + /** + * Install OpenTelemetryLogger for the application + */ public void install() { addOpenTelemetryAppender(); createOpenTelemetrySdk(); @@ -144,6 +223,9 @@ public void install() { LOG.info("OpenTelemetryLogger installed"); } + /** + * Uninstall OpenTelemetryLogger + */ public void uninstall() { LOG.info("OpenTelemetryLogger uninstalled"); SDK.close(); diff --git a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java index 01f34f28b15..3e457147fb6 100644 --- a/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java +++ b/inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/source/reader/MySqlSourceReader.java @@ -17,6 +17,7 @@ package org.apache.inlong.sort.mysql.source.reader; +import org.apache.inlong.sort.base.util.OpenTelemetryLogger; import org.apache.inlong.sort.mysql.RowDataDebeziumDeserializeSchema; import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils; @@ -88,6 +89,7 @@ public class MySqlSourceReader private final MySqlSourceReaderContext mySqlSourceReaderContext; private MySqlBinlogSplit suspendedBinlogSplit; private final DebeziumDeserializationSchema metricSchema; + private final OpenTelemetryLogger openTelemetryLogger; public MySqlSourceReader( FutureCompletingBlockingQueue> elementQueue, @@ -109,15 +111,26 @@ public MySqlSourceReader( this.mySqlSourceReaderContext = context; this.suspendedBinlogSplit = null; this.metricSchema = metricSchema; + // initialize OpenTelemetryLogger + this.openTelemetryLogger = new OpenTelemetryLogger.Builder() + .setServiceName(this.getClass().getSimpleName()) + .setLocalHostIp(this.context.getLocalHostName()).build(); } @Override public void start() { + openTelemetryLogger.install(); // install OpenTelemetryLogger if (getNumberOfCurrentlyAssignedSplits() == 0) { context.sendSplitRequest(); } } + @Override + public void close() throws Exception { + openTelemetryLogger.uninstall(); // uninstall OpenTelemetryLogger + super.close(); + } + @Override protected MySqlSplitState initializedState(MySqlSplit split) { if (split.isSnapshotSplit()) {