Skip to content

Commit

Permalink
feat: log groupby errors to processing logger (#4575)
Browse files Browse the repository at this point in the history
* feat: log groupby errors to processing logger
  • Loading branch information
rodesai authored Feb 20, 2020
1 parent d7bc85a commit b503d25
Show file tree
Hide file tree
Showing 10 changed files with 256 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@
import io.confluent.ksql.planner.plan.FilterNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.Column;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PersistenceSchema;
Expand Down Expand Up @@ -186,6 +187,8 @@ public void init() {

when(queryBuilder.getKsqlConfig()).thenReturn(ksqlConfig);
when(queryBuilder.getFunctionRegistry()).thenReturn(functionRegistry);
when(queryBuilder.getProcessingLogContext()).thenReturn(processingLogContext);
when(queryBuilder.getQueryId()).thenReturn(new QueryId("foo"));
planBuilder = new KSPlanBuilder(
queryBuilder,
mock(SqlPredicateFactory.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema;
import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.MessageType;
import io.confluent.ksql.util.ErrorMessageUtil;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.data.Struct;
Expand All @@ -38,6 +40,18 @@ private EngineProcessingLogMessageFactory() {

public static Function<ProcessingLogConfig, SchemaAndValue> recordProcessingError(
final String errorMsg, final Throwable exception, final GenericRow record
) {
return recordProcessingError(errorMsg, Optional.of(exception), record);
}

public static Function<ProcessingLogConfig, SchemaAndValue> recordProcessingError(
final String errorMsg, final GenericRow record
) {
return recordProcessingError(errorMsg, Optional.empty(), record);
}

private static Function<ProcessingLogConfig, SchemaAndValue> recordProcessingError(
final String errorMsg, final Optional<Throwable> exception, final GenericRow record
) {
return (config) -> {
final Struct struct = new Struct(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA);
Expand All @@ -49,8 +63,8 @@ public static Function<ProcessingLogConfig, SchemaAndValue> recordProcessingErro
ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_MESSAGE,
errorMsg
);
final List<String> cause = ErrorMessageUtil.getErrorMessages(exception);
cause.remove(0);
final List<String> cause = exception.map(EngineProcessingLogMessageFactory::getCause)
.orElse(Collections.emptyList());
recordProcessingError.put(
ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_CAUSE,
cause
Expand All @@ -66,6 +80,12 @@ public static Function<ProcessingLogConfig, SchemaAndValue> recordProcessingErro
};
}

private static List<String> getCause(final Throwable e) {
final List<String> cause = ErrorMessageUtil.getErrorMessages(e);
cause.remove(0);
return cause;
}

private static String serializeRow(final ProcessingLogConfig config, final GenericRow record) {
if (!config.getBoolean(ProcessingLogConfig.INCLUDE_ROWS)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,23 @@ public void shouldBuildRecordProcessingErrorCorrectlyIfRowNull() {
nullValue());
}

@Test
public void shouldBuildRecordProcessingErrorCorrectlyIfNoException() {
// When:
final SchemaAndValue msgAndSchema = EngineProcessingLogMessageFactory.recordProcessingError(
errorMsg, null
).apply(config);

// Then:
final Struct msg = (Struct) msgAndSchema.value();
final Struct recordProcessingError =
msg.getStruct(ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR);
assertThat(
recordProcessingError.get(
ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_CAUSE),
equalTo(Collections.emptyList()));
}

@Test
public void shouldBuildRecordProcessingErrorWithNullRowIfIncludeRowsFalse() {
// Given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.codegen.ExpressionMetadata;
import io.confluent.ksql.execution.util.EngineProcessingLogMessageFactory;
import io.confluent.ksql.execution.util.StructKeyUtil;
import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlType;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.util.SchemaUtil;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
Expand All @@ -41,35 +44,43 @@ final class GroupByParamsFactory {
private GroupByParamsFactory() {
}

public static GroupByParams build(
public static LogicalSchema buildSchema(
final LogicalSchema sourceSchema,
final List<ExpressionMetadata> expressions
) {
final Function<GenericRow, Struct> mapper = expressions.size() == 1
? new SingleExpressionGrouper(expressions.get(0))::apply
: new MultiExpressionGrouper(expressions)::apply;

final LogicalSchema schema = expressions.size() == 1
return expressions.size() == 1
? singleExpressionSchema(sourceSchema, expressions.get(0).getExpressionType())
: multiExpressionSchema(sourceSchema);
}

public static GroupByParams build(
final LogicalSchema sourceSchema,
final List<ExpressionMetadata> expressions,
final ProcessingLogger logger
) {
final Function<GenericRow, Struct> mapper = expressions.size() == 1
? new SingleExpressionGrouper(expressions.get(0), logger)::apply
: new MultiExpressionGrouper(expressions, logger)::apply;

final LogicalSchema schema = buildSchema(sourceSchema, expressions);

return new GroupByParams(schema, mapper);
}

private static LogicalSchema multiExpressionSchema(
final LogicalSchema sourceSchema
) {
return buildSchema(sourceSchema, SqlTypes.STRING);
return buildSchemaWithKeyType(sourceSchema, SqlTypes.STRING);
}

private static LogicalSchema singleExpressionSchema(
final LogicalSchema sourceSchema,
final SqlType rowKeyType
) {
return buildSchema(sourceSchema, rowKeyType);
return buildSchemaWithKeyType(sourceSchema, rowKeyType);
}

private static LogicalSchema buildSchema(
private static LogicalSchema buildSchemaWithKeyType(
final LogicalSchema sourceSchema,
final SqlType rowKeyType
) {
Expand All @@ -82,18 +93,34 @@ private static LogicalSchema buildSchema(
private static Object processColumn(
final int index,
final ExpressionMetadata exp,
final GenericRow row
final GenericRow row,
final ProcessingLogger logger
) {
try {
final Object result = exp.evaluate(row);
if (result == null) {
LOG.error("Group-by column with index {} resolved to null. "
+ "The source row will be excluded from the table.", index);
logger.error(
EngineProcessingLogMessageFactory.recordProcessingError(
String.format(
"Group-by column with index %d resolved to null."
+ " The source row will be excluded from the table.",
index),
row
)
);
}
return result;
} catch (final Exception e) {
LOG.error("Error calculating group-by column with index {}. "
+ "The source row will be excluded from the table.", index, e);
logger.error(
EngineProcessingLogMessageFactory.recordProcessingError(
String.format(
"Error calculating group-by column with index %d. "
+ "The source row will be excluded from the table: %s",
index, e.getMessage()),
e,
row
)
);
return null;
}
}
Expand All @@ -102,14 +129,18 @@ private static final class SingleExpressionGrouper {

private final KeyBuilder keyBuilder;
private final ExpressionMetadata expression;
private final ProcessingLogger logger;

SingleExpressionGrouper(final ExpressionMetadata expression) {
SingleExpressionGrouper(
final ExpressionMetadata expression,
final ProcessingLogger logger) {
this.expression = requireNonNull(expression, "expression");
this.keyBuilder = StructKeyUtil.keyBuilder(expression.getExpressionType());
this.logger = Objects.requireNonNull(logger, "logger");
}

public Struct apply(final GenericRow row) {
final Object rowKey = processColumn(0, expression, row);
final Object rowKey = processColumn(0, expression, row, logger);
if (rowKey == null) {
return null;
}
Expand All @@ -121,10 +152,15 @@ private static final class MultiExpressionGrouper {

private final KeyBuilder keyBuilder;
private final ImmutableList<ExpressionMetadata> expressions;
private final ProcessingLogger logger;

MultiExpressionGrouper(final List<ExpressionMetadata> expressions) {
MultiExpressionGrouper(
final List<ExpressionMetadata> expressions,
final ProcessingLogger logger
) {
this.expressions = ImmutableList.copyOf(requireNonNull(expressions, "expressions"));
this.keyBuilder = StructKeyUtil.keyBuilder(SqlTypes.STRING);
this.logger = Objects.requireNonNull(logger, "logger");

if (expressions.isEmpty()) {
throw new IllegalArgumentException("Empty group by");
Expand All @@ -134,7 +170,7 @@ private static final class MultiExpressionGrouper {
public Struct apply(final GenericRow row) {
final StringBuilder rowKey = new StringBuilder();
for (int i = 0; i < expressions.size(); i++) {
final Object result = processColumn(i, expressions.get(i), row);
final Object result = processColumn(i, expressions.get(i), row, logger);
if (result == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ private LogicalSchema handleStreamGroupBy(
functionRegistry
);

return GroupByParamsFactory.build(sourceSchema, compiledGroupBy).getSchema();
return GroupByParamsFactory.buildSchema(sourceSchema, compiledGroupBy);
}

private LogicalSchema handleTableGroupBy(
Expand All @@ -197,7 +197,7 @@ private LogicalSchema handleTableGroupBy(
functionRegistry
);

return GroupByParamsFactory.build(sourceSchema, compiledGroupBy).getSchema();
return GroupByParamsFactory.buildSchema(sourceSchema, compiledGroupBy);
}

private LogicalSchema handleStreamSelect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import io.confluent.ksql.execution.codegen.CodeGenRunner;
import io.confluent.ksql.execution.codegen.ExpressionMetadata;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.KGroupedStreamHolder;
import io.confluent.ksql.execution.plan.KStreamHolder;
import io.confluent.ksql.execution.plan.StreamGroupBy;
import io.confluent.ksql.execution.plan.StreamGroupByKey;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import java.util.List;
Expand Down Expand Up @@ -74,7 +76,14 @@ public static <K> KGroupedStreamHolder build(
queryBuilder.getFunctionRegistry()
);

final GroupByParams params = GroupByParamsFactory.build(sourceSchema, groupBy);
final ProcessingLogger logger = queryBuilder
.getProcessingLogContext()
.getLoggerFactory()
.getLogger(
QueryLoggerUtil.queryLoggerName(queryBuilder.getQueryId(), queryContext)
);

final GroupByParams params = GroupByParamsFactory.build(sourceSchema, groupBy, logger);

final Grouped<Struct, GenericRow> grouped = buildGrouped(
formats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import io.confluent.ksql.execution.codegen.CodeGenRunner;
import io.confluent.ksql.execution.codegen.ExpressionMetadata;
import io.confluent.ksql.execution.context.QueryContext;
import io.confluent.ksql.execution.context.QueryLoggerUtil;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.execution.plan.KGroupedTableHolder;
import io.confluent.ksql.execution.plan.KTableHolder;
import io.confluent.ksql.execution.plan.TableGroupBy;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import java.util.List;
Expand Down Expand Up @@ -58,7 +60,14 @@ public static <K> KGroupedTableHolder build(
queryBuilder.getFunctionRegistry()
);

final GroupByParams params = GroupByParamsFactory.build(sourceSchema, groupBy);
final ProcessingLogger logger = queryBuilder
.getProcessingLogContext()
.getLoggerFactory()
.getLogger(
QueryLoggerUtil.queryLoggerName(queryBuilder.getQueryId(), queryContext)
);

final GroupByParams params = GroupByParamsFactory.build(sourceSchema, groupBy, logger);

final PhysicalSchema physicalSchema = PhysicalSchema.from(
params.getSchema(),
Expand Down
Loading

0 comments on commit b503d25

Please sign in to comment.