From e83e96720f3cc46b9892a09453649790524434c0 Mon Sep 17 00:00:00 2001 From: Jakub Dardzinski Date: Tue, 16 Jul 2024 16:15:20 +0200 Subject: [PATCH] Introduce labels to metrics and add them to v2 metrics endpoint. Signed-off-by: Jakub Dardzinski --- api/src/main/java/marquez/MarquezApp.java | 14 ++- .../marquez/logging/DelegatingSqlLogger.java | 35 ++++++ .../marquez/logging/EndpointNameStrategy.java | 33 ------ .../marquez/logging/LabelledSqlLogger.java | 46 ++++++++ .../logging/MarquezMetricNameStrategy.java | 20 ---- .../main/java/marquez/service/SqlMetrics.java | 32 +++++ .../java/marquez/BaseIntegrationTest.java | 12 +- .../java/marquez/MetricsIntegrationTest.java | 78 ++++--------- .../logging/DelegatingSqlLoggerTest.java | 53 +++++++++ .../logging/LabelledSqlLoggerTest.java | 109 ++++++++++++++++++ .../marquez/logging/LoggingMdcFilterTest.java | 61 ++++++++++ 11 files changed, 376 insertions(+), 117 deletions(-) create mode 100644 api/src/main/java/marquez/logging/DelegatingSqlLogger.java delete mode 100644 api/src/main/java/marquez/logging/EndpointNameStrategy.java create mode 100644 api/src/main/java/marquez/logging/LabelledSqlLogger.java delete mode 100644 api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java create mode 100644 api/src/main/java/marquez/service/SqlMetrics.java create mode 100644 api/src/test/java/marquez/logging/DelegatingSqlLoggerTest.java create mode 100644 api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java create mode 100644 api/src/test/java/marquez/logging/LoggingMdcFilterTest.java diff --git a/api/src/main/java/marquez/MarquezApp.java b/api/src/main/java/marquez/MarquezApp.java index 70cb485706..96800f4533 100644 --- a/api/src/main/java/marquez/MarquezApp.java +++ b/api/src/main/java/marquez/MarquezApp.java @@ -35,8 +35,10 @@ import marquez.common.Utils; import marquez.db.DbMigration; import marquez.jobs.DbRetentionJob; +import marquez.logging.DelegatingSqlLogger; +import marquez.logging.LabelledSqlLogger; import marquez.logging.LoggingMdcFilter; -import marquez.logging.MarquezMetricNameStrategy; +import marquez.service.SqlMetrics; import marquez.tracing.SentryConfig; import marquez.tracing.TracingContainerResponseFilter; import marquez.tracing.TracingSQLLogger; @@ -58,7 +60,9 @@ public final class MarquezApp extends Application { // Monitoring private static final String PROMETHEUS = "prometheus"; + private static final String PROMETHEUS_V2 = "prometheus_v2"; private static final String PROMETHEUS_ENDPOINT = "/metrics"; + private static final String PROMETHEUS_ENDPOINT_V2 = "/metrics/v2"; public static void main(final String[] args) throws Exception { new MarquezApp().run(args); @@ -74,7 +78,9 @@ public void initialize(@NonNull Bootstrap bootstrap) { // Enable metric collection for prometheus. CollectorRegistry.defaultRegistry.register( new DropwizardExports(bootstrap.getMetricRegistry())); + SqlMetrics.registry.register(new DropwizardExports(bootstrap.getMetricRegistry())); DefaultExports.initialize(); // Add metrics for CPU, JVM memory, etc. + DefaultExports.register(SqlMetrics.registry); // Enable variable substitution with environment variables. bootstrap.setConfigurationSourceProvider( @@ -163,7 +169,8 @@ private Jdbi newJdbi( .installPlugin(new SqlObjectPlugin()) .installPlugin(new PostgresPlugin()) .installPlugin(new Jackson2Plugin()); - SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics(), new MarquezMetricNameStrategy()); + SqlLogger sqlLogger = + new DelegatingSqlLogger(new LabelledSqlLogger(), new InstrumentedSqlLogger(env.metrics())); if (isSentryEnabled(config)) { sqlLogger = new TracingSQLLogger(sqlLogger); } @@ -198,6 +205,9 @@ private void registerServlets(@NonNull Environment env) { // Expose metrics for monitoring. env.servlets().addServlet(PROMETHEUS, new MetricsServlet()).addMapping(PROMETHEUS_ENDPOINT); + env.servlets() + .addServlet(PROMETHEUS_V2, new MetricsServlet(SqlMetrics.registry)) + .addMapping(PROMETHEUS_ENDPOINT_V2); } private void registerFilters(@NonNull Environment env, MarquezContext marquezContext) { diff --git a/api/src/main/java/marquez/logging/DelegatingSqlLogger.java b/api/src/main/java/marquez/logging/DelegatingSqlLogger.java new file mode 100644 index 0000000000..8de6f3bc8a --- /dev/null +++ b/api/src/main/java/marquez/logging/DelegatingSqlLogger.java @@ -0,0 +1,35 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import java.sql.SQLException; +import java.util.Arrays; +import java.util.List; +import org.jdbi.v3.core.statement.SqlLogger; +import org.jdbi.v3.core.statement.StatementContext; + +/** A {@link SqlLogger} implementation that delegates to multiple {@link SqlLogger}s. */ +public class DelegatingSqlLogger implements SqlLogger { + private final List sqlLoggers; + + public DelegatingSqlLogger(SqlLogger... sqlLoggers) { + this.sqlLoggers = Arrays.asList(sqlLoggers); + } + + @Override + public void logAfterExecution(StatementContext statementContext) { + for (SqlLogger sqlLogger : sqlLoggers) { + sqlLogger.logAfterExecution(statementContext); + } + } + + @Override + public void logException(StatementContext statementContext, SQLException ex) { + for (SqlLogger sqlLogger : sqlLoggers) { + sqlLogger.logException(statementContext, ex); + } + } +} diff --git a/api/src/main/java/marquez/logging/EndpointNameStrategy.java b/api/src/main/java/marquez/logging/EndpointNameStrategy.java deleted file mode 100644 index 6dd55121df..0000000000 --- a/api/src/main/java/marquez/logging/EndpointNameStrategy.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright 2018-2023 contributors to the Marquez project - * SPDX-License-Identifier: Apache-2.0 - */ - -package marquez.logging; - -import com.codahale.metrics.MetricRegistry; -import com.codahale.metrics.jdbi3.strategies.StatementNameStrategy; -import org.jdbi.v3.core.extension.ExtensionMethod; -import org.jdbi.v3.core.statement.StatementContext; -import org.slf4j.MDC; - -public class EndpointNameStrategy implements StatementNameStrategy { - - @Override - public String getStatementName(StatementContext statementContext) { - ExtensionMethod extensionMethod = statementContext.getExtensionMethod(); - if (extensionMethod != null) { - if (MDC.get("method") != null && MDC.get("pathWithParams") != null) { - StringBuilder builder = - new StringBuilder() - .append(extensionMethod.getMethod().getName()) - .append(".") - .append(MDC.get("method")) - .append(".") - .append(MDC.get("pathWithParams")); - return MetricRegistry.name(extensionMethod.getType(), builder.toString()); - } - } - return null; - } -} diff --git a/api/src/main/java/marquez/logging/LabelledSqlLogger.java b/api/src/main/java/marquez/logging/LabelledSqlLogger.java new file mode 100644 index 0000000000..dca998ae56 --- /dev/null +++ b/api/src/main/java/marquez/logging/LabelledSqlLogger.java @@ -0,0 +1,46 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import java.sql.SQLException; +import java.time.temporal.ChronoUnit; +import marquez.service.SqlMetrics; +import org.jdbi.v3.core.extension.ExtensionMethod; +import org.jdbi.v3.core.statement.SqlLogger; +import org.jdbi.v3.core.statement.StatementContext; +import org.slf4j.MDC; + +/** + * A {@link SqlLogger} implementation for JDBI which uses the SQL objects' class names and method + * names for nanosecond-precision timers. + */ +public class LabelledSqlLogger implements SqlLogger { + + @Override + public void logAfterExecution(StatementContext context) { + log(context); + } + + @Override + public void logException(StatementContext context, SQLException ex) { + log(context); + } + + private void log(StatementContext context) { + ExtensionMethod extensionMethod = context.getExtensionMethod(); + if (extensionMethod != null) { + final long elapsed = context.getElapsedTime(ChronoUnit.NANOS); + if (MDC.get("method") != null && MDC.get("pathWithParams") != null) { + SqlMetrics.emitSqlDurationMetrics( + extensionMethod.getType().getName(), + extensionMethod.getMethod().getName(), + MDC.get("method"), + MDC.get("pathWithParams"), + elapsed / 1e9); + } + } + } +} diff --git a/api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java b/api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java deleted file mode 100644 index f28290a5f8..0000000000 --- a/api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java +++ /dev/null @@ -1,20 +0,0 @@ -/* - * Copyright 2018-2023 contributors to the Marquez project - * SPDX-License-Identifier: Apache-2.0 - */ - -package marquez.logging; - -import com.codahale.metrics.jdbi3.strategies.DefaultNameStrategy; -import com.codahale.metrics.jdbi3.strategies.DelegatingStatementNameStrategy; - -public class MarquezMetricNameStrategy extends DelegatingStatementNameStrategy { - - public MarquezMetricNameStrategy() { - super( - DefaultNameStrategy.CHECK_EMPTY, - new EndpointNameStrategy(), - DefaultNameStrategy.SQL_OBJECT, - DefaultNameStrategy.CONSTANT_SQL_RAW); - } -} diff --git a/api/src/main/java/marquez/service/SqlMetrics.java b/api/src/main/java/marquez/service/SqlMetrics.java new file mode 100644 index 0000000000..c5336b7750 --- /dev/null +++ b/api/src/main/java/marquez/service/SqlMetrics.java @@ -0,0 +1,32 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.service; + +import io.prometheus.client.CollectorRegistry; +import io.prometheus.client.Histogram; + +public class SqlMetrics { + public static final CollectorRegistry registry = new io.prometheus.client.CollectorRegistry(); + + public static final Histogram duration = + Histogram.build() + .namespace("marquez") + .labelNames("object_name", "method_name", "endpoint_method", "endpoint_path") + .name("sql_duration_seconds") + .help("SQL execution duration in seconds") + .register(registry); + + public static void emitSqlDurationMetrics( + String objectName, + String methodName, + String endpointMethod, + String endpointPath, + double duration) { + SqlMetrics.duration + .labels(objectName, methodName, endpointMethod, endpointPath) + .observe(duration); + } +} diff --git a/api/src/test/java/marquez/BaseIntegrationTest.java b/api/src/test/java/marquez/BaseIntegrationTest.java index d9e03d4aac..21032c7ff6 100644 --- a/api/src/test/java/marquez/BaseIntegrationTest.java +++ b/api/src/test/java/marquez/BaseIntegrationTest.java @@ -215,10 +215,14 @@ protected CompletableFuture> sendLineage(String body) { protected CompletableFuture> getMetrics() { HttpRequest request = - HttpRequest.newBuilder() - .uri(URI.create(baseUrl + "/metrics")) - .GET() - .build(); + HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics")).GET().build(); + + return http2.sendAsync(request, BodyHandlers.ofString()); + } + + protected CompletableFuture> getMetricsV2() { + HttpRequest request = + HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics/v2")).GET().build(); return http2.sendAsync(request, BodyHandlers.ofString()); } diff --git a/api/src/test/java/marquez/MetricsIntegrationTest.java b/api/src/test/java/marquez/MetricsIntegrationTest.java index 70c18327c6..2a7c64e116 100644 --- a/api/src/test/java/marquez/MetricsIntegrationTest.java +++ b/api/src/test/java/marquez/MetricsIntegrationTest.java @@ -5,76 +5,20 @@ package marquez; -import static marquez.db.LineageTestUtils.PRODUCER_URL; -import static marquez.db.LineageTestUtils.SCHEMA_URL; -import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.assertEquals; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ObjectNode; -import com.fasterxml.jackson.databind.node.TextNode; -import com.google.common.base.Predicate; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Maps; -import io.dropwizard.util.Resources; -import io.openlineage.client.OpenLineage; -import io.openlineage.client.OpenLineage.RunEvent; -import io.openlineage.client.OpenLineage.RunEvent.EventType; -import io.openlineage.client.OpenLineage.RunFacet; -import io.openlineage.client.OpenLineage.RunFacetsBuilder; import java.io.IOException; -import java.net.URI; import java.net.http.HttpResponse; -import java.nio.charset.Charset; -import java.time.Instant; -import java.time.ZoneId; -import java.time.ZonedDateTime; -import java.time.temporal.ChronoField; -import java.time.temporal.ChronoUnit; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import lombok.NonNull; -import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; -import marquez.api.JdbiUtils; -import marquez.client.MarquezClient; -import marquez.client.models.Dataset; -import marquez.client.models.DatasetVersion; -import marquez.client.models.Job; -import marquez.client.models.JobId; -import marquez.client.models.JobVersion; -import marquez.client.models.LineageEvent; -import marquez.client.models.Run; -import marquez.common.Utils; -import marquez.db.LineageTestUtils; -import org.assertj.core.api.InstanceOfAssertFactories; -import org.jdbi.v3.core.Jdbi; -import org.jetbrains.annotations.NotNull; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; -import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.MethodSource; -import org.junit.jupiter.params.provider.ValueSource; -import org.slf4j.LoggerFactory; @org.junit.jupiter.api.Tag("IntegrationTests") @Slf4j public class MetricsIntegrationTest extends BaseIntegrationTest { @Test - public void testCheckMetricName() throws IOException { + public void testCheckMetricV1Name() throws IOException { client.listNamespaces(); CompletableFuture response = this.getMetrics() @@ -85,8 +29,26 @@ public void testCheckMetricName() throws IOException { Assertions.fail("Could not complete request"); } }); + assertThat(response.join()).contains("marquez_db_NamespaceDao_findAll_count"); + } + + @Test + public void testCheckMetricV2Name() throws IOException { + client.listNamespaces(); + CompletableFuture response = + this.getMetricsV2() + .thenApply(HttpResponse::body) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); assertThat(response.join()) .contains( - "marquez_db_NamespaceDao_findAll_GET__api_v1_namespaces"); + "marquez_sql_duration_seconds_sum{object_name=\"marquez.db.NamespaceDao\"," + + "method_name=\"findAll\"," + + "endpoint_method=\"GET\"," + + "endpoint_path=\"/api/v1/namespaces\",}"); } } diff --git a/api/src/test/java/marquez/logging/DelegatingSqlLoggerTest.java b/api/src/test/java/marquez/logging/DelegatingSqlLoggerTest.java new file mode 100644 index 0000000000..4fee31f29f --- /dev/null +++ b/api/src/test/java/marquez/logging/DelegatingSqlLoggerTest.java @@ -0,0 +1,53 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.sql.SQLException; +import org.jdbi.v3.core.statement.SqlLogger; +import org.jdbi.v3.core.statement.StatementContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +public class DelegatingSqlLoggerTest { + private SqlLogger logger1; + private SqlLogger logger2; + private DelegatingSqlLogger delegatingSqlLogger; + private StatementContext statementContext; + private SQLException sqlException; + + @BeforeEach + public void setUp() { + logger1 = mock(SqlLogger.class); + logger2 = mock(SqlLogger.class); + delegatingSqlLogger = new DelegatingSqlLogger(logger1, logger2); + statementContext = mock(StatementContext.class); + sqlException = new SQLException("Test SQL Exception"); + } + + @Test + public void testLogAfterExecution() { + // Act + delegatingSqlLogger.logAfterExecution(statementContext); + + // Assert + verify(logger1, times(1)).logAfterExecution(statementContext); + verify(logger2, times(1)).logAfterExecution(statementContext); + } + + @Test + public void testLogException() { + // Act + delegatingSqlLogger.logException(statementContext, sqlException); + + // Assert + verify(logger1, times(1)).logException(statementContext, sqlException); + verify(logger2, times(1)).logException(statementContext, sqlException); + } +} diff --git a/api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java b/api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java new file mode 100644 index 0000000000..59c2410ed1 --- /dev/null +++ b/api/src/test/java/marquez/logging/LabelledSqlLoggerTest.java @@ -0,0 +1,109 @@ +/* + * Copyright 2018-2024 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.when; + +import java.sql.SQLException; +import java.time.temporal.ChronoUnit; +import marquez.service.SqlMetrics; +import org.jdbi.v3.core.extension.ExtensionMethod; +import org.jdbi.v3.core.statement.StatementContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.slf4j.MDC; + +public class LabelledSqlLoggerTest { + + private LabelledSqlLogger logger; + private StatementContext context; + + @BeforeEach + public void setUp() { + logger = new LabelledSqlLogger(); + context = mock(StatementContext.class); + + // Setup default mock behavior + when(context.getElapsedTime(ChronoUnit.NANOS)) + .thenReturn(1000000000L); // 1 second in nanoseconds + } + + @AfterEach + public void tearDown() { + MDC.clear(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + public void testLogAfterExecution() { + // Prepare MDC context + MDC.put("method", "GET"); + MDC.put("pathWithParams", "/test/path"); + + ExtensionMethod extensionMethod = mock(ExtensionMethod.class); + when(extensionMethod.getType()).thenReturn((Class) TestClass.class); + when(extensionMethod.getMethod()).thenReturn(TestClass.class.getMethods()[0]); + + when(context.getExtensionMethod()).thenReturn(extensionMethod); + + // Mock static method + try (MockedStatic mockedSqlMetrics = Mockito.mockStatic(SqlMetrics.class)) { + // Call the method under test + logger.logAfterExecution(context); + + // Verify the static method call + mockedSqlMetrics.verify( + () -> + SqlMetrics.emitSqlDurationMetrics( + "marquez.logging.LabelledSqlLoggerTest$TestClass", + "testMethod", + "GET", + "/test/path", + 1.0), + times(1)); + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + @Test + public void testLogException() { + // Prepare MDC context + MDC.put("method", "POST"); + MDC.put("pathWithParams", "/test/exception"); + + ExtensionMethod extensionMethod = mock(ExtensionMethod.class); + when(extensionMethod.getType()).thenReturn((Class) TestClass.class); + when(extensionMethod.getMethod()).thenReturn(TestClass.class.getMethods()[0]); + + when(context.getExtensionMethod()).thenReturn(extensionMethod); + + // Mock static method + try (MockedStatic mockedSqlMetrics = Mockito.mockStatic(SqlMetrics.class)) { + // Call the method under test + logger.logException(context, new SQLException("Test Exception")); + + // Verify the static method call + mockedSqlMetrics.verify( + () -> + SqlMetrics.emitSqlDurationMetrics( + "marquez.logging.LabelledSqlLoggerTest$TestClass", + "testMethod", + "POST", + "/test/exception", + 1.0)); + } + } + + // Dummy class for the purpose of mocking + private static class TestClass { + public void testMethod() {} + } +} diff --git a/api/src/test/java/marquez/logging/LoggingMdcFilterTest.java b/api/src/test/java/marquez/logging/LoggingMdcFilterTest.java new file mode 100644 index 0000000000..5e71a5b0a6 --- /dev/null +++ b/api/src/test/java/marquez/logging/LoggingMdcFilterTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2018-2023 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.logging; + +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.lang.reflect.Field; +import java.util.Collections; +import java.util.List; +import javax.ws.rs.container.ContainerRequestContext; +import org.glassfish.jersey.server.ExtendedUriInfo; +import org.glassfish.jersey.uri.UriTemplate; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.slf4j.MDC; + +public class LoggingMdcFilterTest { + + @Mock private ContainerRequestContext requestContext; + + @Mock private ExtendedUriInfo uriInfo; + + private LoggingMdcFilter filter; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + filter = new LoggingMdcFilter(); + Mockito.when(requestContext.getUriInfo()).thenReturn(uriInfo); + } + + @Test + public void testPathWithPlaceholders() + throws IOException, + NoSuchFieldException, + SecurityException, + IllegalArgumentException, + IllegalAccessException { + String expectedPath = "/api/users/{userId}"; + List templates = Collections.singletonList(new UriTemplate(expectedPath)); + + Field uriInfoField = LoggingMdcFilter.class.getDeclaredField("uriInfo"); + uriInfoField.setAccessible(true); + uriInfoField.set(filter, uriInfo); + + Mockito.when(requestContext.getUriInfo()).thenReturn(uriInfo); + Mockito.when(uriInfo.getMatchedTemplates()).thenReturn(templates); + + filter.filter(requestContext); + + assertEquals(expectedPath, MDC.get("pathWithParams")); + MDC.clear(); // Clean up MDC + } +}