Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional metadata to spooling tracing #23560

Merged
merged 4 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
458 changes: 20 additions & 438 deletions core/trino-spi/pom.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@
package io.trino.spi.protocol;

import io.airlift.slice.Slice;
import io.trino.spi.Experimental;

import java.net.URI;
import java.util.List;
import java.util.Map;

import static java.util.Objects.requireNonNull;

@Experimental(eta = "2025-05-31")
public sealed interface SpooledLocation
{
Map<String, List<String>> headers();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
*/
package io.trino.spi.protocol;

import io.trino.spi.Experimental;
import io.trino.spi.QueryId;

import java.time.Instant;

/**
* SpooledSegmentHandle is used to uniquely identify a spooled segment and to manage its lifecycle.
*/
@Experimental(eta = "2025-05-31")
wendigo marked this conversation as resolved.
Show resolved Hide resolved
public interface SpooledSegmentHandle
{
QueryId queryId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@
*/
package io.trino.spi.protocol;

import io.trino.spi.Experimental;
import io.trino.spi.QueryId;

import static java.util.Objects.requireNonNull;

public record SpoolingContext(String encoding, QueryId queryId, long rowCount, long size)
@Experimental(eta = "2025-05-31")
public record SpoolingContext(String encoding, QueryId queryId, long rows, long size)
{
public SpoolingContext
{
requireNonNull(queryId, "queryId is null");
requireNonNull(encoding, "encoding is null");
if (rowCount < 0) {
throw new IllegalArgumentException("rowCount is negative");
if (rows < 0) {
throw new IllegalArgumentException("rows is negative");
}
if (size < 0) {
throw new IllegalArgumentException("size is negative");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@
*/
package io.trino.spi.protocol;

import io.trino.spi.Experimental;
import io.trino.spi.protocol.SpooledLocation.DirectLocation;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Optional;

@Experimental(eta = "2025-05-31")
public interface SpoolingManager
{
SpooledSegmentHandle create(SpoolingContext context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import io.trino.spi.Experimental;

@Experimental(eta = "2025-05-31")
public interface SpoolingManagerContext
{
default OpenTelemetry getOpenTelemetry()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
*/
package io.trino.spi.protocol;

import io.trino.spi.Experimental;

import java.util.Map;

@Experimental(eta = "2025-05-31")
public interface SpoolingManagerFactory
{
String getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import static com.google.common.base.Verify.verify;
import static java.util.Objects.requireNonNull;

public record FileSystemSpooledSegmentHandle(@Override String encoding, @Override QueryId queryId, byte[] uuid, Optional<EncryptionKey> encryptionKey)
public record FileSystemSpooledSegmentHandle(
@Override String encoding,
@Override QueryId queryId,
byte[] uuid,
Optional<EncryptionKey> encryptionKey)
implements SpooledSegmentHandle
{
private static final String OBJECT_NAME_SEPARATOR = "::";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,6 @@ public SpooledLocation location(SpooledSegmentHandle handle)
output.writeBytes(fileHandle.queryId().toString().getBytes(UTF_8));
output.writeBytes(fileHandle.encoding().getBytes(UTF_8));
output.writeBoolean(fileHandle.encryptionKey().isPresent());

return coordinatorLocation(output.slice(), headers(fileHandle));
}

Expand All @@ -202,7 +201,6 @@ public SpooledSegmentHandle handle(SpooledLocation location)

QueryId queryId = QueryId.valueOf(input.readSlice(queryLength).toStringUtf8());
String encoding = input.readSlice(encodingLength).toStringUtf8();

if (!input.readBoolean()) {
return new FileSystemSpooledSegmentHandle(encoding, queryId, uuid, Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.OutputStream;
import java.util.Optional;

import static io.opentelemetry.api.common.AttributeKey.longKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static java.util.Objects.requireNonNull;

Expand All @@ -39,6 +40,9 @@ public class TracingSpoolingManager
public static final AttributeKey<String> SEGMENT_ID = stringKey("trino.segment.id");
public static final AttributeKey<String> SEGMENT_QUERY_ID = stringKey("trino.segment.query_id");
public static final AttributeKey<String> SEGMENT_ENCODING = stringKey("trino.segment.encoding");
public static final AttributeKey<Long> SEGMENT_SIZE = longKey("trino.segment.size");
public static final AttributeKey<Long> SEGMENT_ROWS = longKey("trino.segment.rows");
public static final AttributeKey<String> SEGMENT_EXPIRATION = stringKey("trino.segment.expiration");

private final Tracer tracer;
private final SpoolingManager delegate;
Expand All @@ -55,6 +59,8 @@ public SpooledSegmentHandle create(SpoolingContext context)
Span span = tracer.spanBuilder("SpoolingManager.create")
.setAttribute(SEGMENT_QUERY_ID, context.queryId().toString())
.setAttribute(SEGMENT_ENCODING, context.encoding())
.setAttribute(SEGMENT_ROWS, context.rows())
.setAttribute(SEGMENT_SIZE, context.size())
.startSpan();
return withTracing(span, () -> delegate.create(context));
}
Expand All @@ -63,48 +69,28 @@ public SpooledSegmentHandle create(SpoolingContext context)
public OutputStream createOutputStream(SpooledSegmentHandle handle)
throws IOException
{
Span span = tracer.spanBuilder("SpoolingManager.createOutputStream")
.setAttribute(SEGMENT_QUERY_ID, handle.queryId().toString())
.setAttribute(SEGMENT_ID, handle.identifier())
.setAttribute(SEGMENT_ENCODING, handle.encoding())
.startSpan();
return withTracing(span, () -> delegate.createOutputStream(handle));
return withTracing(span(tracer, handle, "createOutputStream"), () -> delegate.createOutputStream(handle));
}

@Override
public InputStream openInputStream(SpooledSegmentHandle handle)
throws IOException
{
Span span = tracer.spanBuilder("SpoolingManager.openInputStream")
.setAttribute(SEGMENT_QUERY_ID, handle.queryId().toString())
.setAttribute(SEGMENT_ID, handle.identifier())
.setAttribute(SEGMENT_ENCODING, handle.encoding())
.startSpan();
return withTracing(span, () -> delegate.openInputStream(handle));
return withTracing(span(tracer, handle, "openInputStream"), () -> delegate.openInputStream(handle));
}

@Override
public void acknowledge(SpooledSegmentHandle handle)
throws IOException
{
Span span = tracer.spanBuilder("SpoolingManager.acknowledge")
.setAttribute(SEGMENT_QUERY_ID, handle.queryId().toString())
.setAttribute(SEGMENT_ID, handle.identifier())
.setAttribute(SEGMENT_ENCODING, handle.encoding())
.startSpan();
withTracing(span, () -> delegate.acknowledge(handle));
withTracing(span(tracer, handle, "acknowledge"), () -> delegate.acknowledge(handle));
}

@Override
public Optional<DirectLocation> directLocation(SpooledSegmentHandle handle)
throws IOException
{
Span span = tracer.spanBuilder("SpoolingManager.directLocation")
.setAttribute(SEGMENT_QUERY_ID, handle.queryId().toString())
.setAttribute(SEGMENT_ID, handle.identifier())
.setAttribute(SEGMENT_ENCODING, handle.encoding())
.startSpan();
return withTracing(span, () -> delegate.directLocation(handle));
return withTracing(span(tracer, handle, "directLocation"), () -> delegate.directLocation(handle));
}

// Methods below do not need to be traced as they are not doing any I/O
Expand All @@ -129,6 +115,17 @@ public static <E extends Exception> void withTracing(Span span, CheckedRunnable<
});
}

public static Span span(Tracer tracer, SpooledSegmentHandle handle, String name)
{
return tracer
.spanBuilder("SpoolingManager." + name)
.setAttribute(SEGMENT_ID, handle.identifier())
.setAttribute(SEGMENT_QUERY_ID, handle.queryId().toString())
.setAttribute(SEGMENT_ENCODING, handle.encoding())
.setAttribute(SEGMENT_EXPIRATION, handle.expirationTime().toString())
.startSpan();
}

public static <T, E extends Exception> T withTracing(Span span, CheckedSupplier<T, E> supplier)
throws E
{
Expand Down