Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Log SDK to implement OTEP-0150 #3759

Merged
merged 16 commits into from
Nov 1, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
Comparing source compatibility of against
No changes.
+++ NEW CLASS: PUBLIC(+) FINAL(+) io.opentelemetry.exporter.logging.otlp.OtlpJsonLoggingLogExporter (not serializable)
+++ CLASS FILE FORMAT VERSION: 52.0 <- n.a.
+++ NEW SUPERCLASS: java.lang.Object
+++ NEW METHOD: PUBLIC(+) STATIC(+) io.opentelemetry.sdk.logs.export.LogExporter create()
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.CompletableResultCode export(java.util.Collection)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.sdk.common.CompletableResultCode shutdown()
Original file line number Diff line number Diff line change
Expand Up @@ -6,24 +6,68 @@
package io.opentelemetry.sdk.logs;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogRecord;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.logs.data.LogData;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

public interface LogProcessor {
public interface LogProcessor extends Closeable {

void addLogRecord(LogRecord record);
/**
* Process a log.
*
* @param logData the log
*/
void process(LogData logData);

/**
* Called when {@link SdkTracerProvider#shutdown()} is called.
* Shutdown the log processor.
*
* @return result
*/
CompletableResultCode shutdown();
default CompletableResultCode shutdown() {
return forceFlush();
}

/**
* Processes all span events that have not yet been processed.
* Process all logs that have not yet been processed.
*
* @return result
*/
CompletableResultCode forceFlush();
default CompletableResultCode forceFlush() {
return CompletableResultCode.ofSuccess();
}

@Override
default void close() {
shutdown().join(10, TimeUnit.SECONDS);
}

/**
* Returns a {@link LogProcessor} which simply delegates to all processing to the {@code
* processors} in order.
*/
static LogProcessor composite(LogProcessor... processors) {
jack-berg marked this conversation as resolved.
Show resolved Hide resolved
return composite(Arrays.asList(processors));
}

/**
* Returns a {@link LogProcessor} which simply delegates to all processing to the {@code
* processors} in order.
*/
static LogProcessor composite(Iterable<LogProcessor> processors) {
List<LogProcessor> processorList = new ArrayList<>();
for (LogProcessor processor : processors) {
processorList.add(processor);
}
if (processorList.isEmpty()) {
return NoopLogProcessor.getInstance();
}
if (processorList.size() == 1) {
return processorList.get(0);
}
return MultiLogProcessor.create(processorList);
}
}
17 changes: 13 additions & 4 deletions sdk/logs/src/main/java/io/opentelemetry/sdk/logs/LogSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,23 @@

package io.opentelemetry.sdk.logs;

import io.opentelemetry.sdk.logs.data.LogRecord;
import io.opentelemetry.sdk.logs.data.LogBuilder;
import io.opentelemetry.sdk.logs.data.LogData;

/** A LogSink accepts logging records for transmission to an aggregator or log processing system. */
public interface LogSink {

/**
* Create a log builder. {@link LogBuilder#build()} can be passed to {@link #offer(LogData)}.
*
* @return the builder
*/
LogBuilder builder();

/**
* Pass a record to the SDK for transmission to a logging exporter.
* Pass the {@link LogData} to the sink.
*
* @param record record to transmit
* @param logData the log
*/
void offer(LogRecord record);
void offer(LogData logData);
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,49 @@

package io.opentelemetry.sdk.logs;

import static java.util.Objects.requireNonNull;

import io.opentelemetry.sdk.resources.Resource;
import java.util.ArrayList;
import java.util.List;

public final class LogSinkSdkProviderBuilder {

private final List<LogProcessor> logProcessors = new ArrayList<>();
private Resource resource = Resource.getDefault();

LogSinkSdkProviderBuilder() {}

public LogSinkSdkProvider build() {
return new LogSinkSdkProvider();
/**
* Set the resource.
*
* @param resource the resource
* @return this
*/
public LogSinkSdkProviderBuilder setResource(Resource resource) {
requireNonNull(resource, "resource");
this.resource = resource;
return this;
}

/**
* Add a log processor.
*
* @param processor the log processor
* @return this
*/
public LogSinkSdkProviderBuilder addLogProcessor(LogProcessor processor) {
requireNonNull(processor, "processor");
logProcessors.add(processor);
return this;
}

/**
* Create a {@link SdkLogSinkProvider} instance.
*
* @return an instance configured with the provided options
*/
public SdkLogSinkProvider build() {
return new SdkLogSinkProvider(resource, logProcessors);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.logs;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.resources.Resource;
import java.util.List;
import javax.annotation.Nullable;

final class LogSinkSharedState {
private final Object lock = new Object();
private final Resource resource;
private final LogProcessor activeLogProcessor;
@Nullable private volatile CompletableResultCode shutdownResult = null;

LogSinkSharedState(Resource resource, List<LogProcessor> logProcessors) {
this.resource = resource;
this.activeLogProcessor = LogProcessor.composite(logProcessors);
}

Resource getResource() {
return resource;
}

LogProcessor getActiveLogProcessor() {
return activeLogProcessor;
}

boolean hasBeenShutdown() {
return shutdownResult != null;
}

CompletableResultCode shutdown() {
synchronized (lock) {
if (shutdownResult != null) {
return shutdownResult;
}
shutdownResult = activeLogProcessor.shutdown();
return shutdownResult;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.logs;

import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.data.LogData;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

final class MultiLogProcessor implements LogProcessor {

private final List<LogProcessor> logProcessors;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

static LogProcessor create(List<LogProcessor> logProcessorsList) {
return new MultiLogProcessor(
new ArrayList<>(Objects.requireNonNull(logProcessorsList, "logProcessorsList")));
}

@Override
public void process(LogData logData) {
for (LogProcessor logProcessor : logProcessors) {
logProcessor.process(logData);
}
}

@Override
public CompletableResultCode shutdown() {
if (isShutdown.getAndSet(true)) {
return CompletableResultCode.ofSuccess();
}
List<CompletableResultCode> results = new ArrayList<>(logProcessors.size());
for (LogProcessor logProcessor : logProcessors) {
results.add(logProcessor.shutdown());
}
return CompletableResultCode.ofAll(results);
}

@Override
public CompletableResultCode forceFlush() {
List<CompletableResultCode> results = new ArrayList<>(logProcessors.size());
for (LogProcessor logProcessor : logProcessors) {
results.add(logProcessor.forceFlush());
}
return CompletableResultCode.ofAll(results);
}

private MultiLogProcessor(List<LogProcessor> logProcessorsList) {
this.logProcessors = logProcessorsList;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.sdk.logs;

import io.opentelemetry.sdk.logs.data.LogData;

final class NoopLogProcessor implements LogProcessor {
private static final NoopLogProcessor INSTANCE = new NoopLogProcessor();

static LogProcessor getInstance() {
return INSTANCE;
}

private NoopLogProcessor() {}

@Override
public void process(LogData logData) {}
}
Loading