From 89ef52f5205e5a28fb602ffa92a1ea70acde19cc Mon Sep 17 00:00:00 2001 From: Jakub Dardzinski Date: Mon, 8 Jul 2024 15:10:22 +0200 Subject: [PATCH 1/3] Add endpoint method and path to metrics name. Signed-off-by: Jakub Dardzinski Add MetricsIntegrationTest. Signed-off-by: Jakub Dardzinski --- api/src/main/java/marquez/MarquezApp.java | 3 +- .../marquez/logging/EndpointNameStrategy.java | 33 +++++++ .../marquez/logging/LoggingMdcFilter.java | 15 +++ .../logging/MarquezMetricNameStrategy.java | 20 ++++ .../java/marquez/BaseIntegrationTest.java | 10 ++ .../java/marquez/MetricsIntegrationTest.java | 92 +++++++++++++++++++ 6 files changed, 172 insertions(+), 1 deletion(-) create mode 100644 api/src/main/java/marquez/logging/EndpointNameStrategy.java create mode 100644 api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java create mode 100644 api/src/test/java/marquez/MetricsIntegrationTest.java diff --git a/api/src/main/java/marquez/MarquezApp.java b/api/src/main/java/marquez/MarquezApp.java index 0bc95bd4ae..70cb485706 100644 --- a/api/src/main/java/marquez/MarquezApp.java +++ b/api/src/main/java/marquez/MarquezApp.java @@ -36,6 +36,7 @@ import marquez.db.DbMigration; import marquez.jobs.DbRetentionJob; import marquez.logging.LoggingMdcFilter; +import marquez.logging.MarquezMetricNameStrategy; import marquez.tracing.SentryConfig; import marquez.tracing.TracingContainerResponseFilter; import marquez.tracing.TracingSQLLogger; @@ -162,7 +163,7 @@ private Jdbi newJdbi( .installPlugin(new SqlObjectPlugin()) .installPlugin(new PostgresPlugin()) .installPlugin(new Jackson2Plugin()); - SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics()); + SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics(), new MarquezMetricNameStrategy()); if (isSentryEnabled(config)) { sqlLogger = new TracingSQLLogger(sqlLogger); } diff --git a/api/src/main/java/marquez/logging/EndpointNameStrategy.java b/api/src/main/java/marquez/logging/EndpointNameStrategy.java new file mode 100644 index 0000000000..6dd55121df --- /dev/null +++ b/api/src/main/java/marquez/logging/EndpointNameStrategy.java @@ -0,0 +1,33 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.jdbi3.strategies.StatementNameStrategy; +import org.jdbi.v3.core.extension.ExtensionMethod; +import org.jdbi.v3.core.statement.StatementContext; +import org.slf4j.MDC; + +public class EndpointNameStrategy implements StatementNameStrategy { + + @Override + public String getStatementName(StatementContext statementContext) { + ExtensionMethod extensionMethod = statementContext.getExtensionMethod(); + if (extensionMethod != null) { + if (MDC.get("method") != null && MDC.get("pathWithParams") != null) { + StringBuilder builder = + new StringBuilder() + .append(extensionMethod.getMethod().getName()) + .append(".") + .append(MDC.get("method")) + .append(".") + .append(MDC.get("pathWithParams")); + return MetricRegistry.name(extensionMethod.getType(), builder.toString()); + } + } + return null; + } +} diff --git a/api/src/main/java/marquez/logging/LoggingMdcFilter.java b/api/src/main/java/marquez/logging/LoggingMdcFilter.java index e7a05662cc..fdd43d213b 100644 --- a/api/src/main/java/marquez/logging/LoggingMdcFilter.java +++ b/api/src/main/java/marquez/logging/LoggingMdcFilter.java @@ -6,14 +6,18 @@ package marquez.logging; import java.io.IOException; +import java.util.List; import java.util.UUID; import javax.ws.rs.container.CompletionCallback; import javax.ws.rs.container.ContainerRequestContext; import javax.ws.rs.container.ContainerRequestFilter; import javax.ws.rs.container.ContainerResponseContext; import javax.ws.rs.container.ContainerResponseFilter; +import javax.ws.rs.core.Context; import javax.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; +import org.glassfish.jersey.server.ExtendedUriInfo; +import org.glassfish.jersey.uri.UriTemplate; import org.slf4j.MDC; /** @@ -28,12 +32,16 @@ public class LoggingMdcFilter private static final String REQUEST_ID = "requestID"; private static final String METHOD = "method"; private static final String PATH = "path"; + private static final String PATH_WITH_PARAMETERS = "pathWithParams"; + + @Context private ExtendedUriInfo uriInfo; @Override public void onComplete(Throwable throwable) { MDC.remove(REQUEST_ID); MDC.remove(METHOD); MDC.remove(PATH); + MDC.remove(PATH_WITH_PARAMETERS); } @Override @@ -41,6 +49,13 @@ public void filter(ContainerRequestContext requestContext) throws IOException { MDC.put(REQUEST_ID, UUID.randomUUID().toString()); MDC.put(METHOD, requestContext.getMethod()); MDC.put(PATH, requestContext.getUriInfo().getPath()); + StringBuilder pathWithPlaceholders = new StringBuilder(); + + List matchedTemplates = uriInfo.getMatchedTemplates(); + for (int i = matchedTemplates.size() - 1; i >= 0; i--) { + pathWithPlaceholders.append(matchedTemplates.get(i).getTemplate()); + } + MDC.put(PATH_WITH_PARAMETERS, pathWithPlaceholders.toString()); } @Override diff --git a/api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java b/api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java new file mode 100644 index 0000000000..f28290a5f8 --- /dev/null +++ b/api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java @@ -0,0 +1,20 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import com.codahale.metrics.jdbi3.strategies.DefaultNameStrategy; +import com.codahale.metrics.jdbi3.strategies.DelegatingStatementNameStrategy; + +public class MarquezMetricNameStrategy extends DelegatingStatementNameStrategy { + + public MarquezMetricNameStrategy() { + super( + DefaultNameStrategy.CHECK_EMPTY, + new EndpointNameStrategy(), + DefaultNameStrategy.SQL_OBJECT, + DefaultNameStrategy.CONSTANT_SQL_RAW); + } +} diff --git a/api/src/test/java/marquez/BaseIntegrationTest.java b/api/src/test/java/marquez/BaseIntegrationTest.java index 30eb8bfa4c..d9e03d4aac 100644 --- a/api/src/test/java/marquez/BaseIntegrationTest.java +++ b/api/src/test/java/marquez/BaseIntegrationTest.java @@ -213,6 +213,16 @@ protected CompletableFuture> sendLineage(String body) { return http2.sendAsync(request, BodyHandlers.ofString()); } + protected CompletableFuture> getMetrics() { + HttpRequest request = + HttpRequest.newBuilder() + .uri(URI.create(baseUrl + "/metrics")) + .GET() + .build(); + + return http2.sendAsync(request, BodyHandlers.ofString()); + } + protected CompletableFuture> fetchLineage(String nodeId) { HttpRequest request = HttpRequest.newBuilder() diff --git a/api/src/test/java/marquez/MetricsIntegrationTest.java b/api/src/test/java/marquez/MetricsIntegrationTest.java new file mode 100644 index 0000000000..70c18327c6 --- /dev/null +++ b/api/src/test/java/marquez/MetricsIntegrationTest.java @@ -0,0 +1,92 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez; + +import static marquez.db.LineageTestUtils.PRODUCER_URL; +import static marquez.db.LineageTestUtils.SCHEMA_URL; +import static org.assertj.core.api.Assertions.as; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import com.google.common.base.Predicate; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import io.dropwizard.util.Resources; +import io.openlineage.client.OpenLineage; +import io.openlineage.client.OpenLineage.RunEvent; +import io.openlineage.client.OpenLineage.RunEvent.EventType; +import io.openlineage.client.OpenLineage.RunFacet; +import io.openlineage.client.OpenLineage.RunFacetsBuilder; +import java.io.IOException; +import java.net.URI; +import java.net.http.HttpResponse; +import java.nio.charset.Charset; +import java.time.Instant; +import java.time.ZoneId; +import java.time.ZonedDateTime; +import java.time.temporal.ChronoField; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import lombok.NonNull; +import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; +import marquez.api.JdbiUtils; +import marquez.client.MarquezClient; +import marquez.client.models.Dataset; +import marquez.client.models.DatasetVersion; +import marquez.client.models.Job; +import marquez.client.models.JobId; +import marquez.client.models.JobVersion; +import marquez.client.models.LineageEvent; +import marquez.client.models.Run; +import marquez.common.Utils; +import marquez.db.LineageTestUtils; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.jdbi.v3.core.Jdbi; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.LoggerFactory; + +@org.junit.jupiter.api.Tag("IntegrationTests") +@Slf4j +public class MetricsIntegrationTest extends BaseIntegrationTest { + @Test + public void testCheckMetricName() throws IOException { + client.listNamespaces(); + CompletableFuture response = + this.getMetrics() + .thenApply(HttpResponse::body) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + assertThat(response.join()) + .contains( + "marquez_db_NamespaceDao_findAll_GET__api_v1_namespaces"); + } +} From e83e96720f3cc46b9892a09453649790524434c0 Mon Sep 17 00:00:00 2001 From: Jakub Dardzinski Date: Tue, 16 Jul 2024 16:15:20 +0200 Subject: [PATCH 2/3] Introduce labels to metrics and add them to v2 metrics endpoint. Signed-off-by: Jakub Dardzinski --- api/src/main/java/marquez/MarquezApp.java | 14 ++- .../marquez/logging/DelegatingSqlLogger.java | 35 ++++++ .../marquez/logging/EndpointNameStrategy.java | 33 ------ .../marquez/logging/LabelledSqlLogger.java | 46 ++++++++ .../logging/MarquezMetricNameStrategy.java | 20 ---- .../main/java/marquez/service/SqlMetrics.java | 32 +++++ .../java/marquez/BaseIntegrationTest.java | 12 +- .../java/marquez/MetricsIntegrationTest.java | 78 ++++--------- .../logging/DelegatingSqlLoggerTest.java | 53 +++++++++ .../logging/LabelledSqlLoggerTest.java | 109 ++++++++++++++++++ .../marquez/logging/LoggingMdcFilterTest.java | 61 ++++++++++ 11 files changed, 376 insertions(+), 117 deletions(-) create mode 100644 api/src/main/java/marquez/logging/DelegatingSqlLogger.java delete mode 100644 api/src/main/java/marquez/logging/EndpointNameStrategy.java create mode 100644 api/src/main/java/marquez/logging/LabelledSqlLogger.java delete mode 100644 api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java create mode 100644 api/src/main/java/marquez/service/SqlMetrics.java create mode 100644 api/src/test/java/marquez/logging/DelegatingSqlLoggerTest.java create mode 100644 api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java create mode 100644 api/src/test/java/marquez/logging/LoggingMdcFilterTest.java diff --git a/api/src/main/java/marquez/MarquezApp.java b/api/src/main/java/marquez/MarquezApp.java index 70cb485706..96800f4533 100644 --- a/api/src/main/java/marquez/MarquezApp.java +++ b/api/src/main/java/marquez/MarquezApp.java @@ -35,8 +35,10 @@ import marquez.common.Utils; import marquez.db.DbMigration; import marquez.jobs.DbRetentionJob; +import marquez.logging.DelegatingSqlLogger; +import marquez.logging.LabelledSqlLogger; import marquez.logging.LoggingMdcFilter; -import marquez.logging.MarquezMetricNameStrategy; +import marquez.service.SqlMetrics; import marquez.tracing.SentryConfig; import marquez.tracing.TracingContainerResponseFilter; import marquez.tracing.TracingSQLLogger; @@ -58,7 +60,9 @@ public final class MarquezApp extends Application { // Monitoring private static final String PROMETHEUS = "prometheus"; + private static final String PROMETHEUS_V2 = "prometheus_v2"; private static final String PROMETHEUS_ENDPOINT = "/metrics"; + private static final String PROMETHEUS_ENDPOINT_V2 = "/metrics/v2"; public static void main(final String[] args) throws Exception { new MarquezApp().run(args); @@ -74,7 +78,9 @@ public void initialize(@NonNull Bootstrap bootstrap) { // Enable metric collection for prometheus. CollectorRegistry.defaultRegistry.register( new DropwizardExports(bootstrap.getMetricRegistry())); + SqlMetrics.registry.register(new DropwizardExports(bootstrap.getMetricRegistry())); DefaultExports.initialize(); // Add metrics for CPU, JVM memory, etc. + DefaultExports.register(SqlMetrics.registry); // Enable variable substitution with environment variables. bootstrap.setConfigurationSourceProvider( @@ -163,7 +169,8 @@ private Jdbi newJdbi( .installPlugin(new SqlObjectPlugin()) .installPlugin(new PostgresPlugin()) .installPlugin(new Jackson2Plugin()); - SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics(), new MarquezMetricNameStrategy()); + SqlLogger sqlLogger = + new DelegatingSqlLogger(new LabelledSqlLogger(), new InstrumentedSqlLogger(env.metrics())); if (isSentryEnabled(config)) { sqlLogger = new TracingSQLLogger(sqlLogger); } @@ -198,6 +205,9 @@ private void registerServlets(@NonNull Environment env) { // Expose metrics for monitoring. env.servlets().addServlet(PROMETHEUS, new MetricsServlet()).addMapping(PROMETHEUS_ENDPOINT); + env.servlets() + .addServlet(PROMETHEUS_V2, new MetricsServlet(SqlMetrics.registry)) + .addMapping(PROMETHEUS_ENDPOINT_V2); } private void registerFilters(@NonNull Environment env, MarquezContext marquezContext) { diff --git a/api/src/main/java/marquez/logging/DelegatingSqlLogger.java b/api/src/main/java/marquez/logging/DelegatingSqlLogger.java new file mode 100644 index 0000000000..8de6f3bc8a --- /dev/null +++ b/api/src/main/java/marquez/logging/DelegatingSqlLogger.java @@ -0,0 +1,35 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import org.jdbi.v3.core.statement.SqlLogger; +import org.jdbi.v3.core.statement.StatementContext; + +/** A {@link SqlLogger} implementation that delegates to multiple {@link SqlLogger}s. */ +public class DelegatingSqlLogger implements SqlLogger { + private final List sqlLoggers; + + public DelegatingSqlLogger(SqlLogger... sqlLoggers) { + this.sqlLoggers = Arrays.asList(sqlLoggers); + } + + @Override + public void logAfterExecution(StatementContext statementContext) { + for (SqlLogger sqlLogger : sqlLoggers) { + sqlLogger.logAfterExecution(statementContext); + } + } + + @Override + public void logException(StatementContext statementContext, SQLException ex) { + for (SqlLogger sqlLogger : sqlLoggers) { + sqlLogger.logException(statementContext, ex); + } + } +} diff --git a/api/src/main/java/marquez/logging/EndpointNameStrategy.java b/api/src/main/java/marquez/logging/EndpointNameStrategy.java deleted file mode 100644 index 6dd55121df..0000000000 --- a/api/src/main/java/marquez/logging/EndpointNameStrategy.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2018-2023 contributors to the Marquez project - * SPDX-License-Identifier: Apache-2.0 - */ - -package marquez.logging; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.jdbi3.strategies.StatementNameStrategy; -import org.jdbi.v3.core.extension.ExtensionMethod; -import org.jdbi.v3.core.statement.StatementContext; -import org.slf4j.MDC; - -public class EndpointNameStrategy implements StatementNameStrategy { - - @Override - public String getStatementName(StatementContext statementContext) { - ExtensionMethod extensionMethod = statementContext.getExtensionMethod(); - if (extensionMethod != null) { - if (MDC.get("method") != null && MDC.get("pathWithParams") != null) { - StringBuilder builder = - new StringBuilder() - .append(extensionMethod.getMethod().getName()) - .append(".") - .append(MDC.get("method")) - .append(".") - .append(MDC.get("pathWithParams")); - return MetricRegistry.name(extensionMethod.getType(), builder.toString()); - } - } - return null; - } -} diff --git a/api/src/main/java/marquez/logging/LabelledSqlLogger.java b/api/src/main/java/marquez/logging/LabelledSqlLogger.java new file mode 100644 index 0000000000..dca998ae56 --- /dev/null +++ b/api/src/main/java/marquez/logging/LabelledSqlLogger.java @@ -0,0 +1,46 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import java.sql.SQLException; +import java.time.temporal.ChronoUnit; +import marquez.service.SqlMetrics; +import org.jdbi.v3.core.extension.ExtensionMethod; +import org.jdbi.v3.core.statement.SqlLogger; +import org.jdbi.v3.core.statement.StatementContext; +import org.slf4j.MDC; + +/** + * A {@link SqlLogger} implementation for JDBI which uses the SQL objects' class names and method + * names for nanosecond-precision timers. + */ +public class LabelledSqlLogger implements SqlLogger { + + @Override + public void logAfterExecution(StatementContext context) { + log(context); + } + + @Override + public void logException(StatementContext context, SQLException ex) { + log(context); + } + + private void log(StatementContext context) { + ExtensionMethod extensionMethod = context.getExtensionMethod(); + if (extensionMethod != null) { + final long elapsed = context.getElapsedTime(ChronoUnit.NANOS); + if (MDC.get("method") != null && MDC.get("pathWithParams") != null) { + SqlMetrics.emitSqlDurationMetrics( + extensionMethod.getType().getName(), + extensionMethod.getMethod().getName(), + MDC.get("method"), + MDC.get("pathWithParams"), + elapsed / 1e9); + } + } + } +} diff --git a/api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java b/api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java deleted file mode 100644 index f28290a5f8..0000000000 --- a/api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2018-2023 contributors to the Marquez project - * SPDX-License-Identifier: Apache-2.0 - */ - -package marquez.logging; - -import com.codahale.metrics.jdbi3.strategies.DefaultNameStrategy; -import com.codahale.metrics.jdbi3.strategies.DelegatingStatementNameStrategy; - -public class MarquezMetricNameStrategy extends DelegatingStatementNameStrategy { - - public MarquezMetricNameStrategy() { - super( - DefaultNameStrategy.CHECK_EMPTY, - new EndpointNameStrategy(), - DefaultNameStrategy.SQL_OBJECT, - DefaultNameStrategy.CONSTANT_SQL_RAW); - } -} diff --git a/api/src/main/java/marquez/service/SqlMetrics.java b/api/src/main/java/marquez/service/SqlMetrics.java new file mode 100644 index 0000000000..c5336b7750 --- /dev/null +++ b/api/src/main/java/marquez/service/SqlMetrics.java @@ -0,0 +1,32 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Histogram; + +public class SqlMetrics { + public static final CollectorRegistry registry = new io.prometheus.client.CollectorRegistry(); + + public static final Histogram duration = + Histogram.build() + .namespace("marquez") + .labelNames("object_name", "method_name", "endpoint_method", "endpoint_path") + .name("sql_duration_seconds") + .help("SQL execution duration in seconds") + .register(registry); + + public static void emitSqlDurationMetrics( + String objectName, + String methodName, + String endpointMethod, + String endpointPath, + double duration) { + SqlMetrics.duration + .labels(objectName, methodName, endpointMethod, endpointPath) + .observe(duration); + } +} diff --git a/api/src/test/java/marquez/BaseIntegrationTest.java b/api/src/test/java/marquez/BaseIntegrationTest.java index d9e03d4aac..21032c7ff6 100644 --- a/api/src/test/java/marquez/BaseIntegrationTest.java +++ b/api/src/test/java/marquez/BaseIntegrationTest.java @@ -215,10 +215,14 @@ protected CompletableFuture> sendLineage(String body) { protected CompletableFuture> getMetrics() { HttpRequest request = - HttpRequest.newBuilder() - .uri(URI.create(baseUrl + "/metrics")) - .GET() - .build(); + HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics")).GET().build(); + + return http2.sendAsync(request, BodyHandlers.ofString()); + } + + protected CompletableFuture> getMetricsV2() { + HttpRequest request = + HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics/v2")).GET().build(); return http2.sendAsync(request, BodyHandlers.ofString()); } diff --git a/api/src/test/java/marquez/MetricsIntegrationTest.java b/api/src/test/java/marquez/MetricsIntegrationTest.java index 70c18327c6..2a7c64e116 100644 --- a/api/src/test/java/marquez/MetricsIntegrationTest.java +++ b/api/src/test/java/marquez/MetricsIntegrationTest.java @@ -5,76 +5,20 @@ package marquez; -import static marquez.db.LineageTestUtils.PRODUCER_URL; -import static marquez.db.LineageTestUtils.SCHEMA_URL; -import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.TextNode; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import io.dropwizard.util.Resources; -import io.openlineage.client.OpenLineage; -import io.openlineage.client.OpenLineage.RunEvent; -import io.openlineage.client.OpenLineage.RunEvent.EventType; -import io.openlineage.client.OpenLineage.RunFacet; -import io.openlineage.client.OpenLineage.RunFacetsBuilder; import java.io.IOException; -import java.net.URI; import java.net.http.HttpResponse; -import java.nio.charset.Charset; -import java.time.Instant; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.time.temporal.ChronoField; -import java.time.temporal.ChronoUnit; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import lombok.NonNull; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import marquez.api.JdbiUtils; -import marquez.client.MarquezClient; -import marquez.client.models.Dataset; -import marquez.client.models.DatasetVersion; -import marquez.client.models.Job; -import marquez.client.models.JobId; -import marquez.client.models.JobVersion; -import marquez.client.models.LineageEvent; -import marquez.client.models.Run; -import marquez.common.Utils; -import marquez.db.LineageTestUtils; -import org.assertj.core.api.InstanceOfAssertFactories; -import org.jdbi.v3.core.Jdbi; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; -import org.slf4j.LoggerFactory; @org.junit.jupiter.api.Tag("IntegrationTests") @Slf4j public class MetricsIntegrationTest extends BaseIntegrationTest { @Test - public void testCheckMetricName() throws IOException { + public void testCheckMetricV1Name() throws IOException { client.listNamespaces(); CompletableFuture response = this.getMetrics() @@ -85,8 +29,26 @@ public void testCheckMetricName() throws IOException { Assertions.fail("Could not complete request"); } }); + assertThat(response.join()).contains("marquez_db_NamespaceDao_findAll_count"); + } + + @Test + public void testCheckMetricV2Name() throws IOException { + client.listNamespaces(); + CompletableFuture response = + this.getMetricsV2() + .thenApply(HttpResponse::body) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); assertThat(response.join()) .contains( - "marquez_db_NamespaceDao_findAll_GET__api_v1_namespaces"); + "marquez_sql_duration_seconds_sum{object_name=\"marquez.db.NamespaceDao\"," + + "method_name=\"findAll\"," + + "endpoint_method=\"GET\"," + + "endpoint_path=\"/api/v1/namespaces\",}"); } } diff --git a/api/src/test/java/marquez/logging/DelegatingSqlLoggerTest.java b/api/src/test/java/marquez/logging/DelegatingSqlLoggerTest.java new file mode 100644 index 0000000000..4fee31f29f --- /dev/null +++ b/api/src/test/java/marquez/logging/DelegatingSqlLoggerTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.sql.SQLException; +import org.jdbi.v3.core.statement.SqlLogger; +import org.jdbi.v3.core.statement.StatementContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class DelegatingSqlLoggerTest { + private SqlLogger logger1; + private SqlLogger logger2; + private DelegatingSqlLogger delegatingSqlLogger; + private StatementContext statementContext; + private SQLException sqlException; + + @BeforeEach + public void setUp() { + logger1 = mock(SqlLogger.class); + logger2 = mock(SqlLogger.class); + delegatingSqlLogger = new DelegatingSqlLogger(logger1, logger2); + statementContext = mock(StatementContext.class); + sqlException = new SQLException("Test SQL Exception"); + } + + @Test + public void testLogAfterExecution() { + // Act + delegatingSqlLogger.logAfterExecution(statementContext); + + // Assert + verify(logger1, times(1)).logAfterExecution(statementContext); + verify(logger2, times(1)).logAfterExecution(statementContext); + } + + @Test + public void testLogException() { + // Act + delegatingSqlLogger.logException(statementContext, sqlException); + + // Assert + verify(logger1, times(1)).logException(statementContext, sqlException); + verify(logger2, times(1)).logException(statementContext, sqlException); + } +} diff --git a/api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java b/api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java new file mode 100644 index 0000000000..59c2410ed1 --- /dev/null +++ b/api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2018-2024 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import java.sql.SQLException; +import java.time.temporal.ChronoUnit; +import marquez.service.SqlMetrics; +import org.jdbi.v3.core.extension.ExtensionMethod; +import org.jdbi.v3.core.statement.StatementContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.slf4j.MDC; + +public class LabelledSqlLoggerTest { + + private LabelledSqlLogger logger; + private StatementContext context; + + @BeforeEach + public void setUp() { + logger = new LabelledSqlLogger(); + context = mock(StatementContext.class); + + // Setup default mock behavior + when(context.getElapsedTime(ChronoUnit.NANOS)) + .thenReturn(1000000000L); // 1 second in nanoseconds + } + + @AfterEach + public void tearDown() { + MDC.clear(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + public void testLogAfterExecution() { + // Prepare MDC context + MDC.put("method", "GET"); + MDC.put("pathWithParams", "/test/path"); + + ExtensionMethod extensionMethod = mock(ExtensionMethod.class); + when(extensionMethod.getType()).thenReturn((Class) TestClass.class); + when(extensionMethod.getMethod()).thenReturn(TestClass.class.getMethods()[0]); + + when(context.getExtensionMethod()).thenReturn(extensionMethod); + + // Mock static method + try (MockedStatic mockedSqlMetrics = Mockito.mockStatic(SqlMetrics.class)) { + // Call the method under test + logger.logAfterExecution(context); + + // Verify the static method call + mockedSqlMetrics.verify( + () -> + SqlMetrics.emitSqlDurationMetrics( + "marquez.logging.LabelledSqlLoggerTest$TestClass", + "testMethod", + "GET", + "/test/path", + 1.0), + times(1)); + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + public void testLogException() { + // Prepare MDC context + MDC.put("method", "POST"); + MDC.put("pathWithParams", "/test/exception"); + + ExtensionMethod extensionMethod = mock(ExtensionMethod.class); + when(extensionMethod.getType()).thenReturn((Class) TestClass.class); + when(extensionMethod.getMethod()).thenReturn(TestClass.class.getMethods()[0]); + + when(context.getExtensionMethod()).thenReturn(extensionMethod); + + // Mock static method + try (MockedStatic mockedSqlMetrics = Mockito.mockStatic(SqlMetrics.class)) { + // Call the method under test + logger.logException(context, new SQLException("Test Exception")); + + // Verify the static method call + mockedSqlMetrics.verify( + () -> + SqlMetrics.emitSqlDurationMetrics( + "marquez.logging.LabelledSqlLoggerTest$TestClass", + "testMethod", + "POST", + "/test/exception", + 1.0)); + } + } + + // Dummy class for the purpose of mocking + private static class TestClass { + public void testMethod() {} + } +} diff --git a/api/src/test/java/marquez/logging/LoggingMdcFilterTest.java b/api/src/test/java/marquez/logging/LoggingMdcFilterTest.java new file mode 100644 index 0000000000..5e71a5b0a6 --- /dev/null +++ b/api/src/test/java/marquez/logging/LoggingMdcFilterTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.List; +import javax.ws.rs.container.ContainerRequestContext; +import org.glassfish.jersey.server.ExtendedUriInfo; +import org.glassfish.jersey.uri.UriTemplate; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.slf4j.MDC; + +public class LoggingMdcFilterTest { + + @Mock private ContainerRequestContext requestContext; + + @Mock private ExtendedUriInfo uriInfo; + + private LoggingMdcFilter filter; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + filter = new LoggingMdcFilter(); + Mockito.when(requestContext.getUriInfo()).thenReturn(uriInfo); + } + + @Test + public void testPathWithPlaceholders() + throws IOException, + NoSuchFieldException, + SecurityException, + IllegalArgumentException, + IllegalAccessException { + String expectedPath = "/api/users/{userId}"; + List templates = Collections.singletonList(new UriTemplate(expectedPath)); + + Field uriInfoField = LoggingMdcFilter.class.getDeclaredField("uriInfo"); + uriInfoField.setAccessible(true); + uriInfoField.set(filter, uriInfo); + + Mockito.when(requestContext.getUriInfo()).thenReturn(uriInfo); + Mockito.when(uriInfo.getMatchedTemplates()).thenReturn(templates); + + filter.filter(requestContext); + + assertEquals(expectedPath, MDC.get("pathWithParams")); + MDC.clear(); // Clean up MDC + } +} From d141c1fa327e6167547bd9c97546a81e5bc52938 Mon Sep 17 00:00:00 2001 From: Jakub Dardzinski Date: Wed, 24 Jul 2024 23:59:39 +0200 Subject: [PATCH 3/3] Rename metric name and labels. Rename endpoint name for v2 metrics. Signed-off-by: Jakub Dardzinski --- api/src/main/java/marquez/MarquezApp.java | 10 +++--- .../marquez/logging/LabelledSqlLogger.java | 4 +-- .../java/marquez/service/DatabaseMetrics.java | 26 +++++++++++++++ .../main/java/marquez/service/SqlMetrics.java | 32 ------------------- .../java/marquez/BaseIntegrationTest.java | 2 +- .../java/marquez/MetricsIntegrationTest.java | 8 ++--- .../logging/LabelledSqlLoggerTest.java | 14 ++++---- 7 files changed, 46 insertions(+), 50 deletions(-) create mode 100644 api/src/main/java/marquez/service/DatabaseMetrics.java delete mode 100644 api/src/main/java/marquez/service/SqlMetrics.java diff --git a/api/src/main/java/marquez/MarquezApp.java b/api/src/main/java/marquez/MarquezApp.java index 96800f4533..06ec8551d4 100644 --- a/api/src/main/java/marquez/MarquezApp.java +++ b/api/src/main/java/marquez/MarquezApp.java @@ -38,7 +38,7 @@ import marquez.logging.DelegatingSqlLogger; import marquez.logging.LabelledSqlLogger; import marquez.logging.LoggingMdcFilter; -import marquez.service.SqlMetrics; +import marquez.service.DatabaseMetrics; import marquez.tracing.SentryConfig; import marquez.tracing.TracingContainerResponseFilter; import marquez.tracing.TracingSQLLogger; @@ -62,7 +62,7 @@ public final class MarquezApp extends Application { private static final String PROMETHEUS = "prometheus"; private static final String PROMETHEUS_V2 = "prometheus_v2"; private static final String PROMETHEUS_ENDPOINT = "/metrics"; - private static final String PROMETHEUS_ENDPOINT_V2 = "/metrics/v2"; + private static final String PROMETHEUS_ENDPOINT_V2 = "/v2beta/metrics"; public static void main(final String[] args) throws Exception { new MarquezApp().run(args); @@ -78,9 +78,9 @@ public void initialize(@NonNull Bootstrap bootstrap) { // Enable metric collection for prometheus. CollectorRegistry.defaultRegistry.register( new DropwizardExports(bootstrap.getMetricRegistry())); - SqlMetrics.registry.register(new DropwizardExports(bootstrap.getMetricRegistry())); + DatabaseMetrics.registry.register(new DropwizardExports(bootstrap.getMetricRegistry())); DefaultExports.initialize(); // Add metrics for CPU, JVM memory, etc. - DefaultExports.register(SqlMetrics.registry); + DefaultExports.register(DatabaseMetrics.registry); // Enable variable substitution with environment variables. bootstrap.setConfigurationSourceProvider( @@ -206,7 +206,7 @@ private void registerServlets(@NonNull Environment env) { // Expose metrics for monitoring. env.servlets().addServlet(PROMETHEUS, new MetricsServlet()).addMapping(PROMETHEUS_ENDPOINT); env.servlets() - .addServlet(PROMETHEUS_V2, new MetricsServlet(SqlMetrics.registry)) + .addServlet(PROMETHEUS_V2, new MetricsServlet(DatabaseMetrics.registry)) .addMapping(PROMETHEUS_ENDPOINT_V2); } diff --git a/api/src/main/java/marquez/logging/LabelledSqlLogger.java b/api/src/main/java/marquez/logging/LabelledSqlLogger.java index dca998ae56..8d5c21695f 100644 --- a/api/src/main/java/marquez/logging/LabelledSqlLogger.java +++ b/api/src/main/java/marquez/logging/LabelledSqlLogger.java @@ -7,7 +7,7 @@ import java.sql.SQLException; import java.time.temporal.ChronoUnit; -import marquez.service.SqlMetrics; +import marquez.service.DatabaseMetrics; import org.jdbi.v3.core.extension.ExtensionMethod; import org.jdbi.v3.core.statement.SqlLogger; import org.jdbi.v3.core.statement.StatementContext; @@ -34,7 +34,7 @@ private void log(StatementContext context) { if (extensionMethod != null) { final long elapsed = context.getElapsedTime(ChronoUnit.NANOS); if (MDC.get("method") != null && MDC.get("pathWithParams") != null) { - SqlMetrics.emitSqlDurationMetrics( + DatabaseMetrics.recordDbDuration( extensionMethod.getType().getName(), extensionMethod.getMethod().getName(), MDC.get("method"), diff --git a/api/src/main/java/marquez/service/DatabaseMetrics.java b/api/src/main/java/marquez/service/DatabaseMetrics.java new file mode 100644 index 0000000000..4e4f5e12dd --- /dev/null +++ b/api/src/main/java/marquez/service/DatabaseMetrics.java @@ -0,0 +1,26 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Histogram; + +public class DatabaseMetrics { + public static final CollectorRegistry registry = new io.prometheus.client.CollectorRegistry(); + + public static final Histogram dbDurationSeconds = + Histogram.build() + .namespace("marquez") + .labelNames("sql_class", "sql_method", "http_method", "http_path") + .name("db_duration_seconds_by_http_call") + .help("The time to make the DB call for a given HTTP endpoint.") + .register(registry); + + public static void recordDbDuration( + String sqlClass, String sqlMethod, String httpMethod, String httpPath, double duration) { + dbDurationSeconds.labels(sqlClass, sqlMethod, httpMethod, httpPath).observe(duration); + } +} diff --git a/api/src/main/java/marquez/service/SqlMetrics.java b/api/src/main/java/marquez/service/SqlMetrics.java deleted file mode 100644 index c5336b7750..0000000000 --- a/api/src/main/java/marquez/service/SqlMetrics.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2018-2023 contributors to the Marquez project - * SPDX-License-Identifier: Apache-2.0 - */ - -package marquez.service; - -import io.prometheus.client.CollectorRegistry; -import io.prometheus.client.Histogram; - -public class SqlMetrics { - public static final CollectorRegistry registry = new io.prometheus.client.CollectorRegistry(); - - public static final Histogram duration = - Histogram.build() - .namespace("marquez") - .labelNames("object_name", "method_name", "endpoint_method", "endpoint_path") - .name("sql_duration_seconds") - .help("SQL execution duration in seconds") - .register(registry); - - public static void emitSqlDurationMetrics( - String objectName, - String methodName, - String endpointMethod, - String endpointPath, - double duration) { - SqlMetrics.duration - .labels(objectName, methodName, endpointMethod, endpointPath) - .observe(duration); - } -} diff --git a/api/src/test/java/marquez/BaseIntegrationTest.java b/api/src/test/java/marquez/BaseIntegrationTest.java index 21032c7ff6..bb9412a119 100644 --- a/api/src/test/java/marquez/BaseIntegrationTest.java +++ b/api/src/test/java/marquez/BaseIntegrationTest.java @@ -222,7 +222,7 @@ protected CompletableFuture> getMetrics() { protected CompletableFuture> getMetricsV2() { HttpRequest request = - HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics/v2")).GET().build(); + HttpRequest.newBuilder().uri(URI.create(baseUrl + "/v2beta/metrics")).GET().build(); return http2.sendAsync(request, BodyHandlers.ofString()); } diff --git a/api/src/test/java/marquez/MetricsIntegrationTest.java b/api/src/test/java/marquez/MetricsIntegrationTest.java index 2a7c64e116..533e970c1a 100644 --- a/api/src/test/java/marquez/MetricsIntegrationTest.java +++ b/api/src/test/java/marquez/MetricsIntegrationTest.java @@ -46,9 +46,9 @@ public void testCheckMetricV2Name() throws IOException { }); assertThat(response.join()) .contains( - "marquez_sql_duration_seconds_sum{object_name=\"marquez.db.NamespaceDao\"," - + "method_name=\"findAll\"," - + "endpoint_method=\"GET\"," - + "endpoint_path=\"/api/v1/namespaces\",}"); + "marquez_db_duration_seconds_by_http_call_sum{sql_class=\"marquez.db.NamespaceDao\"," + + "sql_method=\"findAll\"," + + "http_method=\"GET\"," + + "http_path=\"/api/v1/namespaces\",}"); } } diff --git a/api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java b/api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java index 59c2410ed1..b16c9c795a 100644 --- a/api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java +++ b/api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java @@ -11,7 +11,7 @@ import java.sql.SQLException; import java.time.temporal.ChronoUnit; -import marquez.service.SqlMetrics; +import marquez.service.DatabaseMetrics; import org.jdbi.v3.core.extension.ExtensionMethod; import org.jdbi.v3.core.statement.StatementContext; import org.junit.jupiter.api.AfterEach; @@ -55,14 +55,15 @@ public void testLogAfterExecution() { when(context.getExtensionMethod()).thenReturn(extensionMethod); // Mock static method - try (MockedStatic mockedSqlMetrics = Mockito.mockStatic(SqlMetrics.class)) { + try (MockedStatic mockedDatabaseMetrics = + Mockito.mockStatic(DatabaseMetrics.class)) { // Call the method under test logger.logAfterExecution(context); // Verify the static method call - mockedSqlMetrics.verify( + mockedDatabaseMetrics.verify( () -> - SqlMetrics.emitSqlDurationMetrics( + DatabaseMetrics.recordDbDuration( "marquez.logging.LabelledSqlLoggerTest$TestClass", "testMethod", "GET", @@ -86,14 +87,15 @@ public void testLogException() { when(context.getExtensionMethod()).thenReturn(extensionMethod); // Mock static method - try (MockedStatic mockedSqlMetrics = Mockito.mockStatic(SqlMetrics.class)) { + try (MockedStatic mockedSqlMetrics = + Mockito.mockStatic(DatabaseMetrics.class)) { // Call the method under test logger.logException(context, new SQLException("Test Exception")); // Verify the static method call mockedSqlMetrics.verify( () -> - SqlMetrics.emitSqlDurationMetrics( + DatabaseMetrics.recordDbDuration( "marquez.logging.LabelledSqlLoggerTest$TestClass", "testMethod", "POST",