Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoint method and path to metrics name. #2850

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,10 @@
import marquez.common.Utils;
import marquez.db.DbMigration;
import marquez.jobs.DbRetentionJob;
import marquez.logging.DelegatingSqlLogger;
import marquez.logging.LabelledSqlLogger;
import marquez.logging.LoggingMdcFilter;
import marquez.service.DatabaseMetrics;
import marquez.tracing.SentryConfig;
import marquez.tracing.TracingContainerResponseFilter;
import marquez.tracing.TracingSQLLogger;
Expand All @@ -57,7 +60,9 @@ public final class MarquezApp extends Application<MarquezConfig> {

// Monitoring
private static final String PROMETHEUS = "prometheus";
private static final String PROMETHEUS_V2 = "prometheus_v2";
private static final String PROMETHEUS_ENDPOINT = "/metrics";
private static final String PROMETHEUS_ENDPOINT_V2 = "/v2beta/metrics";

public static void main(final String[] args) throws Exception {
new MarquezApp().run(args);
Expand All @@ -73,7 +78,9 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
// Enable metric collection for prometheus.
CollectorRegistry.defaultRegistry.register(
new DropwizardExports(bootstrap.getMetricRegistry()));
DatabaseMetrics.registry.register(new DropwizardExports(bootstrap.getMetricRegistry()));
DefaultExports.initialize(); // Add metrics for CPU, JVM memory, etc.
DefaultExports.register(DatabaseMetrics.registry);

// Enable variable substitution with environment variables.
bootstrap.setConfigurationSourceProvider(
Expand Down Expand Up @@ -162,7 +169,8 @@ private Jdbi newJdbi(
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics());
SqlLogger sqlLogger =
new DelegatingSqlLogger(new LabelledSqlLogger(), new InstrumentedSqlLogger(env.metrics()));
if (isSentryEnabled(config)) {
sqlLogger = new TracingSQLLogger(sqlLogger);
}
Expand Down Expand Up @@ -197,6 +205,9 @@ private void registerServlets(@NonNull Environment env) {

// Expose metrics for monitoring.
env.servlets().addServlet(PROMETHEUS, new MetricsServlet()).addMapping(PROMETHEUS_ENDPOINT);
env.servlets()
.addServlet(PROMETHEUS_V2, new MetricsServlet(DatabaseMetrics.registry))
.addMapping(PROMETHEUS_ENDPOINT_V2);
}

private void registerFilters(@NonNull Environment env, MarquezContext marquezContext) {
Expand Down
35 changes: 35 additions & 0 deletions api/src/main/java/marquez/logging/DelegatingSqlLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.logging;

import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import org.jdbi.v3.core.statement.SqlLogger;
import org.jdbi.v3.core.statement.StatementContext;

/** A {@link SqlLogger} implementation that delegates to multiple {@link SqlLogger}s. */
public class DelegatingSqlLogger implements SqlLogger {
private final List<SqlLogger> sqlLoggers;

public DelegatingSqlLogger(SqlLogger... sqlLoggers) {
this.sqlLoggers = Arrays.asList(sqlLoggers);
}

@Override
public void logAfterExecution(StatementContext statementContext) {
for (SqlLogger sqlLogger : sqlLoggers) {
sqlLogger.logAfterExecution(statementContext);
}
}

@Override
public void logException(StatementContext statementContext, SQLException ex) {
for (SqlLogger sqlLogger : sqlLoggers) {
sqlLogger.logException(statementContext, ex);
}
}
}
46 changes: 46 additions & 0 deletions api/src/main/java/marquez/logging/LabelledSqlLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.logging;

import java.sql.SQLException;
import java.time.temporal.ChronoUnit;
import marquez.service.DatabaseMetrics;
import org.jdbi.v3.core.extension.ExtensionMethod;
import org.jdbi.v3.core.statement.SqlLogger;
import org.jdbi.v3.core.statement.StatementContext;
import org.slf4j.MDC;

/**
* A {@link SqlLogger} implementation for JDBI which uses the SQL objects' class names and method
* names for nanosecond-precision timers.
*/
public class LabelledSqlLogger implements SqlLogger {

@Override
public void logAfterExecution(StatementContext context) {
log(context);
}

@Override
public void logException(StatementContext context, SQLException ex) {
log(context);
}

private void log(StatementContext context) {
ExtensionMethod extensionMethod = context.getExtensionMethod();
if (extensionMethod != null) {
final long elapsed = context.getElapsedTime(ChronoUnit.NANOS);
if (MDC.get("method") != null && MDC.get("pathWithParams") != null) {
DatabaseMetrics.recordDbDuration(
extensionMethod.getType().getName(),
extensionMethod.getMethod().getName(),
MDC.get("method"),
MDC.get("pathWithParams"),
elapsed / 1e9);
}
}
}
}
15 changes: 15 additions & 0 deletions api/src/main/java/marquez/logging/LoggingMdcFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
package marquez.logging;

import java.io.IOException;
import java.util.List;
import java.util.UUID;
import javax.ws.rs.container.CompletionCallback;
import javax.ws.rs.container.ContainerRequestContext;
import javax.ws.rs.container.ContainerRequestFilter;
import javax.ws.rs.container.ContainerResponseContext;
import javax.ws.rs.container.ContainerResponseFilter;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import lombok.extern.slf4j.Slf4j;
import org.glassfish.jersey.server.ExtendedUriInfo;
import org.glassfish.jersey.uri.UriTemplate;
import org.slf4j.MDC;

/**
Expand All @@ -28,19 +32,30 @@ public class LoggingMdcFilter
private static final String REQUEST_ID = "requestID";
private static final String METHOD = "method";
private static final String PATH = "path";
private static final String PATH_WITH_PARAMETERS = "pathWithParams";

@Context private ExtendedUriInfo uriInfo;

@Override
public void onComplete(Throwable throwable) {
MDC.remove(REQUEST_ID);
MDC.remove(METHOD);
MDC.remove(PATH);
MDC.remove(PATH_WITH_PARAMETERS);
}

@Override
public void filter(ContainerRequestContext requestContext) throws IOException {
MDC.put(REQUEST_ID, UUID.randomUUID().toString());
MDC.put(METHOD, requestContext.getMethod());
MDC.put(PATH, requestContext.getUriInfo().getPath());
StringBuilder pathWithPlaceholders = new StringBuilder();

List<UriTemplate> matchedTemplates = uriInfo.getMatchedTemplates();
for (int i = matchedTemplates.size() - 1; i >= 0; i--) {
pathWithPlaceholders.append(matchedTemplates.get(i).getTemplate());
}
MDC.put(PATH_WITH_PARAMETERS, pathWithPlaceholders.toString());
pawel-big-lebowski marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
Expand Down
26 changes: 26 additions & 0 deletions api/src/main/java/marquez/service/DatabaseMetrics.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.service;

import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Histogram;

public class DatabaseMetrics {
public static final CollectorRegistry registry = new io.prometheus.client.CollectorRegistry();

public static final Histogram dbDurationSeconds =
Histogram.build()
.namespace("marquez")
.labelNames("sql_class", "sql_method", "http_method", "http_path")
.name("db_duration_seconds_by_http_call")
.help("The time to make the DB call for a given HTTP endpoint.")
.register(registry);

public static void recordDbDuration(
String sqlClass, String sqlMethod, String httpMethod, String httpPath, double duration) {
dbDurationSeconds.labels(sqlClass, sqlMethod, httpMethod, httpPath).observe(duration);
}
}
14 changes: 14 additions & 0 deletions api/src/test/java/marquez/BaseIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,20 @@ protected CompletableFuture<HttpResponse<String>> sendLineage(String body) {
return http2.sendAsync(request, BodyHandlers.ofString());
}

protected CompletableFuture<HttpResponse<String>> getMetrics() {
HttpRequest request =
HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics")).GET().build();

return http2.sendAsync(request, BodyHandlers.ofString());
}

protected CompletableFuture<HttpResponse<String>> getMetricsV2() {
HttpRequest request =
HttpRequest.newBuilder().uri(URI.create(baseUrl + "/v2beta/metrics")).GET().build();

return http2.sendAsync(request, BodyHandlers.ofString());
}

protected CompletableFuture<HttpResponse<String>> fetchLineage(String nodeId) {
HttpRequest request =
HttpRequest.newBuilder()
Expand Down
54 changes: 54 additions & 0 deletions api/src/test/java/marquez/MetricsIntegrationTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.net.http.HttpResponse;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

@org.junit.jupiter.api.Tag("IntegrationTests")
@Slf4j
public class MetricsIntegrationTest extends BaseIntegrationTest {
@Test
public void testCheckMetricV1Name() throws IOException {
client.listNamespaces();
CompletableFuture<String> response =
this.getMetrics()
.thenApply(HttpResponse::body)
.whenComplete(
(val, error) -> {
if (error != null) {
Assertions.fail("Could not complete request");
}
});
assertThat(response.join()).contains("marquez_db_NamespaceDao_findAll_count");
}

@Test
public void testCheckMetricV2Name() throws IOException {
client.listNamespaces();
CompletableFuture<String> response =
this.getMetricsV2()
.thenApply(HttpResponse::body)
.whenComplete(
(val, error) -> {
if (error != null) {
Assertions.fail("Could not complete request");
}
});
assertThat(response.join())
.contains(
"marquez_db_duration_seconds_by_http_call_sum{sql_class=\"marquez.db.NamespaceDao\","
+ "sql_method=\"findAll\","
+ "http_method=\"GET\","
+ "http_path=\"/api/v1/namespaces\",}");
}
}
53 changes: 53 additions & 0 deletions api/src/test/java/marquez/logging/DelegatingSqlLoggerTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.logging;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import java.sql.SQLException;
import org.jdbi.v3.core.statement.SqlLogger;
import org.jdbi.v3.core.statement.StatementContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class DelegatingSqlLoggerTest {
private SqlLogger logger1;
private SqlLogger logger2;
private DelegatingSqlLogger delegatingSqlLogger;
private StatementContext statementContext;
private SQLException sqlException;

@BeforeEach
public void setUp() {
logger1 = mock(SqlLogger.class);
logger2 = mock(SqlLogger.class);
delegatingSqlLogger = new DelegatingSqlLogger(logger1, logger2);
statementContext = mock(StatementContext.class);
sqlException = new SQLException("Test SQL Exception");
}

@Test
public void testLogAfterExecution() {
// Act
delegatingSqlLogger.logAfterExecution(statementContext);

// Assert
verify(logger1, times(1)).logAfterExecution(statementContext);
verify(logger2, times(1)).logAfterExecution(statementContext);
}

@Test
public void testLogException() {
// Act
delegatingSqlLogger.logException(statementContext, sqlException);

// Assert
verify(logger1, times(1)).logException(statementContext, sqlException);
verify(logger2, times(1)).logException(statementContext, sqlException);
}
}
Loading
Loading