Skip to content

Commit

Permalink
Introduce labels to metrics and add them to v2 metrics endpoint.
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dardzinski <[email protected]>
  • Loading branch information
JDarDagran committed Jul 16, 2024
1 parent 89ef52f commit dfea8a7
Show file tree
Hide file tree
Showing 9 changed files with 144 additions and 117 deletions.
16 changes: 14 additions & 2 deletions api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@
import marquez.common.Utils;
import marquez.db.DbMigration;
import marquez.jobs.DbRetentionJob;
import marquez.logging.DelegatingSqlLogger;
import marquez.logging.LabelledSqlLogger;
import marquez.logging.LoggingMdcFilter;
import marquez.logging.MarquezMetricNameStrategy;
import marquez.tracing.SentryConfig;
import marquez.tracing.TracingContainerResponseFilter;
import marquez.tracing.TracingSQLLogger;
Expand All @@ -58,7 +59,11 @@ public final class MarquezApp extends Application<MarquezConfig> {

// Monitoring
private static final String PROMETHEUS = "prometheus";
private static final String PROMETHEUS_V2 = "prometheus_v2";
private static final String PROMETHEUS_ENDPOINT = "/metrics";
private static final String PROMETHEUS_ENDPOINT_V2 = "/metrics/v2";

private static final CollectorRegistry registry = new CollectorRegistry();

public static void main(final String[] args) throws Exception {
new MarquezApp().run(args);
Expand All @@ -74,7 +79,9 @@ public void initialize(@NonNull Bootstrap<MarquezConfig> bootstrap) {
// Enable metric collection for prometheus.
CollectorRegistry.defaultRegistry.register(
new DropwizardExports(bootstrap.getMetricRegistry()));
registry.register(new DropwizardExports(bootstrap.getMetricRegistry()));
DefaultExports.initialize(); // Add metrics for CPU, JVM memory, etc.
DefaultExports.register(registry);

// Enable variable substitution with environment variables.
bootstrap.setConfigurationSourceProvider(
Expand Down Expand Up @@ -163,7 +170,9 @@ private Jdbi newJdbi(
.installPlugin(new SqlObjectPlugin())
.installPlugin(new PostgresPlugin())
.installPlugin(new Jackson2Plugin());
SqlLogger sqlLogger = new InstrumentedSqlLogger(env.metrics(), new MarquezMetricNameStrategy());
SqlLogger sqlLogger =
new DelegatingSqlLogger(
new LabelledSqlLogger(registry), new InstrumentedSqlLogger(env.metrics()));
if (isSentryEnabled(config)) {
sqlLogger = new TracingSQLLogger(sqlLogger);
}
Expand Down Expand Up @@ -198,6 +207,9 @@ private void registerServlets(@NonNull Environment env) {

// Expose metrics for monitoring.
env.servlets().addServlet(PROMETHEUS, new MetricsServlet()).addMapping(PROMETHEUS_ENDPOINT);
env.servlets()
.addServlet(PROMETHEUS_V2, new MetricsServlet(registry))
.addMapping(PROMETHEUS_ENDPOINT_V2);
}

private void registerFilters(@NonNull Environment env, MarquezContext marquezContext) {
Expand Down
2 changes: 2 additions & 0 deletions api/src/main/java/marquez/api/DatasetResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import marquez.common.models.NamespaceName;
import marquez.common.models.TagName;
import marquez.common.models.Version;
import marquez.service.JobMetrics;
import marquez.service.ServiceFactory;
import marquez.service.models.Dataset;
import marquez.service.models.DatasetMeta;
Expand Down Expand Up @@ -151,6 +152,7 @@ public Response list(
datasetService.findAllWithTags(namespaceName.getValue(), limit, offset);
columnLineageService.enrichWithColumnLineage(datasets);
final int totalCount = datasetService.countFor(namespaceName.getValue());
JobMetrics.emitJobCreationMetric("test", "mymeta");
return Response.ok(new ResultsPage<>("datasets", datasets, totalCount)).build();
}

Expand Down
2 changes: 2 additions & 0 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import marquez.db.mappers.JobRowMapper;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
import marquez.service.JobMetrics;
import marquez.service.models.Job;
import marquez.service.models.JobMeta;
import marquez.service.models.Run;
Expand Down Expand Up @@ -313,6 +314,7 @@ default void setJobData(Run run, Job j) {

default JobRow upsertJobMeta(
NamespaceName namespaceName, JobName jobName, JobMeta jobMeta, ObjectMapper mapper) {
JobMetrics.emitJobCreationMetric(namespaceName.getValue(), jobMeta.getType().toString());
return upsertJobMeta(namespaceName, jobName, null, jobMeta, mapper);
}

Expand Down
35 changes: 35 additions & 0 deletions api/src/main/java/marquez/logging/DelegatingSqlLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.logging;

import java.sql.SQLException;
import java.util.Arrays;
import java.util.List;
import org.jdbi.v3.core.statement.SqlLogger;
import org.jdbi.v3.core.statement.StatementContext;

/** A {@link SqlLogger} implementation that delegates to multiple {@link SqlLogger}s. */
public class DelegatingSqlLogger implements SqlLogger {
private final List<SqlLogger> sqlLoggers;

public DelegatingSqlLogger(SqlLogger... sqlLoggers) {
this.sqlLoggers = Arrays.asList(sqlLoggers);
}

@Override
public void logAfterExecution(StatementContext statementContext) {
for (SqlLogger sqlLogger : sqlLoggers) {
sqlLogger.logAfterExecution(statementContext);
}
}

@Override
public void logException(StatementContext statementContext, SQLException ex) {
for (SqlLogger sqlLogger : sqlLoggers) {
sqlLogger.logException(statementContext, ex);
}
}
}
33 changes: 0 additions & 33 deletions api/src/main/java/marquez/logging/EndpointNameStrategy.java

This file was deleted.

63 changes: 63 additions & 0 deletions api/src/main/java/marquez/logging/LabelledSqlLogger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.logging;

import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Histogram;
import java.sql.SQLException;
import java.time.temporal.ChronoUnit;
import org.jdbi.v3.core.extension.ExtensionMethod;
import org.jdbi.v3.core.statement.SqlLogger;
import org.jdbi.v3.core.statement.StatementContext;
import org.slf4j.MDC;

/**
* A {@link SqlLogger} implementation for JDBI which uses the SQL objects' class names and method
* names for nanosecond-precision timers.
*/
public class LabelledSqlLogger implements SqlLogger {
private final Histogram metric;

public LabelledSqlLogger() {
this(CollectorRegistry.defaultRegistry);
}

public LabelledSqlLogger(CollectorRegistry registry) {
this.metric =
Histogram.build()
.namespace("marquez")
.labelNames("object_name", "method_name", "endpoint_method", "endpoint_path")
.name("sql_duration_seconds")
.help("SQL execution duration in seconds")
.register(registry);
}

@Override
public void logAfterExecution(StatementContext context) {
log(context);
}

@Override
public void logException(StatementContext context, SQLException ex) {
log(context);
}

private void log(StatementContext context) {
ExtensionMethod extensionMethod = context.getExtensionMethod();
if (extensionMethod != null) {
final long elapsed = context.getElapsedTime(ChronoUnit.NANOS);
if (MDC.get("method") != null && MDC.get("pathWithParams") != null) {
metric
.labels(
extensionMethod.getType().getName(),
extensionMethod.getMethod().getName(),
MDC.get("method"),
MDC.get("pathWithParams"))
.observe(elapsed / 1e9);
}
}
}
}
20 changes: 0 additions & 20 deletions api/src/main/java/marquez/logging/MarquezMetricNameStrategy.java

This file was deleted.

12 changes: 8 additions & 4 deletions api/src/test/java/marquez/BaseIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -215,10 +215,14 @@ protected CompletableFuture<HttpResponse<String>> sendLineage(String body) {

protected CompletableFuture<HttpResponse<String>> getMetrics() {
HttpRequest request =
HttpRequest.newBuilder()
.uri(URI.create(baseUrl + "/metrics"))
.GET()
.build();
HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics")).GET().build();

return http2.sendAsync(request, BodyHandlers.ofString());
}

protected CompletableFuture<HttpResponse<String>> getMetricsV2() {
HttpRequest request =
HttpRequest.newBuilder().uri(URI.create(baseUrl + "/metrics/v2")).GET().build();

return http2.sendAsync(request, BodyHandlers.ofString());
}
Expand Down
78 changes: 20 additions & 58 deletions api/src/test/java/marquez/MetricsIntegrationTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,76 +5,20 @@

package marquez;

import static marquez.db.LineageTestUtils.PRODUCER_URL;
import static marquez.db.LineageTestUtils.SCHEMA_URL;
import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.node.TextNode;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.dropwizard.util.Resources;
import io.openlineage.client.OpenLineage;
import io.openlineage.client.OpenLineage.RunEvent;
import io.openlineage.client.OpenLineage.RunEvent.EventType;
import io.openlineage.client.OpenLineage.RunFacet;
import io.openlineage.client.OpenLineage.RunFacetsBuilder;
import java.io.IOException;
import java.net.URI;
import java.net.http.HttpResponse;
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import marquez.api.JdbiUtils;
import marquez.client.MarquezClient;
import marquez.client.models.Dataset;
import marquez.client.models.DatasetVersion;
import marquez.client.models.Job;
import marquez.client.models.JobId;
import marquez.client.models.JobVersion;
import marquez.client.models.LineageEvent;
import marquez.client.models.Run;
import marquez.common.Utils;
import marquez.db.LineageTestUtils;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.jdbi.v3.core.Jdbi;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.LoggerFactory;

@org.junit.jupiter.api.Tag("IntegrationTests")
@Slf4j
public class MetricsIntegrationTest extends BaseIntegrationTest {
@Test
public void testCheckMetricName() throws IOException {
public void testCheckMetricV1Name() throws IOException {
client.listNamespaces();
CompletableFuture<String> response =
this.getMetrics()
Expand All @@ -85,8 +29,26 @@ public void testCheckMetricName() throws IOException {
Assertions.fail("Could not complete request");
}
});
assertThat(response.join()).contains("marquez_db_NamespaceDao_findAll_count");
}

@Test
public void testCheckMetricV2Name() throws IOException {
client.listNamespaces();
CompletableFuture<String> response =
this.getMetricsV2()
.thenApply(HttpResponse::body)
.whenComplete(
(val, error) -> {
if (error != null) {
Assertions.fail("Could not complete request");
}
});
assertThat(response.join())
.contains(
"marquez_db_NamespaceDao_findAll_GET__api_v1_namespaces");
"marquez_sql_duration_seconds_sum{object_name=\"marquez.db.NamespaceDao\","
+ "method_name=\"findAll\","
+ "endpoint_method=\"GET\","
+ "endpoint_path=\"/api/v1/namespaces\",}");
}
}

0 comments on commit dfea8a7

Please sign in to comment.