diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index f4b39d8686dd..62f524fdb1dd 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -76,6 +76,7 @@ import io.confluent.ksql.planner.plan.FilterNode; import io.confluent.ksql.planner.plan.PlanNode; import io.confluent.ksql.planner.plan.ProjectNode; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.Column; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PersistenceSchema; @@ -186,6 +187,8 @@ public void init() { when(queryBuilder.getKsqlConfig()).thenReturn(ksqlConfig); when(queryBuilder.getFunctionRegistry()).thenReturn(functionRegistry); + when(queryBuilder.getProcessingLogContext()).thenReturn(processingLogContext); + when(queryBuilder.getQueryId()).thenReturn(new QueryId("foo")); planBuilder = new KSPlanBuilder( queryBuilder, mock(SqlPredicateFactory.class), diff --git a/ksql-execution/src/main/java/io/confluent/ksql/execution/util/EngineProcessingLogMessageFactory.java b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/EngineProcessingLogMessageFactory.java index 90f4760066a2..299287699b5b 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/execution/util/EngineProcessingLogMessageFactory.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/execution/util/EngineProcessingLogMessageFactory.java @@ -21,7 +21,9 @@ import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema; import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.MessageType; import io.confluent.ksql.util.ErrorMessageUtil; +import java.util.Collections; import java.util.List; +import java.util.Optional; import java.util.function.Function; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; @@ -38,6 +40,18 @@ private EngineProcessingLogMessageFactory() { public static Function recordProcessingError( final String errorMsg, final Throwable exception, final GenericRow record + ) { + return recordProcessingError(errorMsg, Optional.of(exception), record); + } + + public static Function recordProcessingError( + final String errorMsg, final GenericRow record + ) { + return recordProcessingError(errorMsg, Optional.empty(), record); + } + + private static Function recordProcessingError( + final String errorMsg, final Optional exception, final GenericRow record ) { return (config) -> { final Struct struct = new Struct(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA); @@ -49,8 +63,8 @@ public static Function recordProcessingErro ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_MESSAGE, errorMsg ); - final List cause = ErrorMessageUtil.getErrorMessages(exception); - cause.remove(0); + final List cause = exception.map(EngineProcessingLogMessageFactory::getCause) + .orElse(Collections.emptyList()); recordProcessingError.put( ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_CAUSE, cause @@ -66,6 +80,12 @@ public static Function recordProcessingErro }; } + private static List getCause(final Throwable e) { + final List cause = ErrorMessageUtil.getErrorMessages(e); + cause.remove(0); + return cause; + } + private static String serializeRow(final ProcessingLogConfig config, final GenericRow record) { if (!config.getBoolean(ProcessingLogConfig.INCLUDE_ROWS)) { return null; diff --git a/ksql-execution/src/test/java/io/confluent/ksql/execution/util/EngineProcessingLogMessageFactoryTest.java b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/EngineProcessingLogMessageFactoryTest.java index d59818f8694a..f883e3006df3 100644 --- a/ksql-execution/src/test/java/io/confluent/ksql/execution/util/EngineProcessingLogMessageFactoryTest.java +++ b/ksql-execution/src/test/java/io/confluent/ksql/execution/util/EngineProcessingLogMessageFactoryTest.java @@ -80,6 +80,23 @@ public void shouldBuildRecordProcessingErrorCorrectlyIfRowNull() { nullValue()); } + @Test + public void shouldBuildRecordProcessingErrorCorrectlyIfNoException() { + // When: + final SchemaAndValue msgAndSchema = EngineProcessingLogMessageFactory.recordProcessingError( + errorMsg, null + ).apply(config); + + // Then: + final Struct msg = (Struct) msgAndSchema.value(); + final Struct recordProcessingError = + msg.getStruct(ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR); + assertThat( + recordProcessingError.get( + ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_CAUSE), + equalTo(Collections.emptyList())); + } + @Test public void shouldBuildRecordProcessingErrorWithNullRowIfIncludeRowsFalse() { // Given: diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java index e1fcd25b88a6..57c7a0d44eec 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/GroupByParamsFactory.java @@ -20,13 +20,16 @@ import com.google.common.collect.ImmutableList; import io.confluent.ksql.GenericRow; import io.confluent.ksql.execution.codegen.ExpressionMetadata; +import io.confluent.ksql.execution.util.EngineProcessingLogMessageFactory; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; import io.confluent.ksql.util.SchemaUtil; import java.util.List; +import java.util.Objects; import java.util.function.Function; import org.apache.kafka.connect.data.Struct; import org.slf4j.Logger; @@ -41,17 +44,25 @@ final class GroupByParamsFactory { private GroupByParamsFactory() { } - public static GroupByParams build( + public static LogicalSchema buildSchema( final LogicalSchema sourceSchema, final List expressions ) { - final Function mapper = expressions.size() == 1 - ? new SingleExpressionGrouper(expressions.get(0))::apply - : new MultiExpressionGrouper(expressions)::apply; - - final LogicalSchema schema = expressions.size() == 1 + return expressions.size() == 1 ? singleExpressionSchema(sourceSchema, expressions.get(0).getExpressionType()) : multiExpressionSchema(sourceSchema); + } + + public static GroupByParams build( + final LogicalSchema sourceSchema, + final List expressions, + final ProcessingLogger logger + ) { + final Function mapper = expressions.size() == 1 + ? new SingleExpressionGrouper(expressions.get(0), logger)::apply + : new MultiExpressionGrouper(expressions, logger)::apply; + + final LogicalSchema schema = buildSchema(sourceSchema, expressions); return new GroupByParams(schema, mapper); } @@ -59,17 +70,17 @@ public static GroupByParams build( private static LogicalSchema multiExpressionSchema( final LogicalSchema sourceSchema ) { - return buildSchema(sourceSchema, SqlTypes.STRING); + return buildSchemaWithKeyType(sourceSchema, SqlTypes.STRING); } private static LogicalSchema singleExpressionSchema( final LogicalSchema sourceSchema, final SqlType rowKeyType ) { - return buildSchema(sourceSchema, rowKeyType); + return buildSchemaWithKeyType(sourceSchema, rowKeyType); } - private static LogicalSchema buildSchema( + private static LogicalSchema buildSchemaWithKeyType( final LogicalSchema sourceSchema, final SqlType rowKeyType ) { @@ -82,18 +93,34 @@ private static LogicalSchema buildSchema( private static Object processColumn( final int index, final ExpressionMetadata exp, - final GenericRow row + final GenericRow row, + final ProcessingLogger logger ) { try { final Object result = exp.evaluate(row); if (result == null) { - LOG.error("Group-by column with index {} resolved to null. " - + "The source row will be excluded from the table.", index); + logger.error( + EngineProcessingLogMessageFactory.recordProcessingError( + String.format( + "Group-by column with index %d resolved to null." + + " The source row will be excluded from the table.", + index), + row + ) + ); } return result; } catch (final Exception e) { - LOG.error("Error calculating group-by column with index {}. " - + "The source row will be excluded from the table.", index, e); + logger.error( + EngineProcessingLogMessageFactory.recordProcessingError( + String.format( + "Error calculating group-by column with index %d. " + + "The source row will be excluded from the table: %s", + index, e.getMessage()), + e, + row + ) + ); return null; } } @@ -102,14 +129,18 @@ private static final class SingleExpressionGrouper { private final KeyBuilder keyBuilder; private final ExpressionMetadata expression; + private final ProcessingLogger logger; - SingleExpressionGrouper(final ExpressionMetadata expression) { + SingleExpressionGrouper( + final ExpressionMetadata expression, + final ProcessingLogger logger) { this.expression = requireNonNull(expression, "expression"); this.keyBuilder = StructKeyUtil.keyBuilder(expression.getExpressionType()); + this.logger = Objects.requireNonNull(logger, "logger"); } public Struct apply(final GenericRow row) { - final Object rowKey = processColumn(0, expression, row); + final Object rowKey = processColumn(0, expression, row, logger); if (rowKey == null) { return null; } @@ -121,10 +152,15 @@ private static final class MultiExpressionGrouper { private final KeyBuilder keyBuilder; private final ImmutableList expressions; + private final ProcessingLogger logger; - MultiExpressionGrouper(final List expressions) { + MultiExpressionGrouper( + final List expressions, + final ProcessingLogger logger + ) { this.expressions = ImmutableList.copyOf(requireNonNull(expressions, "expressions")); this.keyBuilder = StructKeyUtil.keyBuilder(SqlTypes.STRING); + this.logger = Objects.requireNonNull(logger, "logger"); if (expressions.isEmpty()) { throw new IllegalArgumentException("Empty group by"); @@ -134,7 +170,7 @@ private static final class MultiExpressionGrouper { public Struct apply(final GenericRow row) { final StringBuilder rowKey = new StringBuilder(); for (int i = 0; i < expressions.size(); i++) { - final Object result = processColumn(i, expressions.get(i), row); + final Object result = processColumn(i, expressions.get(i), row, logger); if (result == null) { return null; } diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java index f9d27e8b9909..2c0bfaebfcb0 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StepSchemaResolver.java @@ -182,7 +182,7 @@ private LogicalSchema handleStreamGroupBy( functionRegistry ); - return GroupByParamsFactory.build(sourceSchema, compiledGroupBy).getSchema(); + return GroupByParamsFactory.buildSchema(sourceSchema, compiledGroupBy); } private LogicalSchema handleTableGroupBy( @@ -197,7 +197,7 @@ private LogicalSchema handleTableGroupBy( functionRegistry ); - return GroupByParamsFactory.build(sourceSchema, compiledGroupBy).getSchema(); + return GroupByParamsFactory.buildSchema(sourceSchema, compiledGroupBy); } private LogicalSchema handleStreamSelect( diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java index b1c689192ade..1aa4c1ce2677 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/StreamGroupByBuilder.java @@ -20,11 +20,13 @@ import io.confluent.ksql.execution.codegen.CodeGenRunner; import io.confluent.ksql.execution.codegen.ExpressionMetadata; import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.plan.KGroupedStreamHolder; import io.confluent.ksql.execution.plan.KStreamHolder; import io.confluent.ksql.execution.plan.StreamGroupBy; import io.confluent.ksql.execution.plan.StreamGroupByKey; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; import java.util.List; @@ -74,7 +76,14 @@ public static KGroupedStreamHolder build( queryBuilder.getFunctionRegistry() ); - final GroupByParams params = GroupByParamsFactory.build(sourceSchema, groupBy); + final ProcessingLogger logger = queryBuilder + .getProcessingLogContext() + .getLoggerFactory() + .getLogger( + QueryLoggerUtil.queryLoggerName(queryBuilder.getQueryId(), queryContext) + ); + + final GroupByParams params = GroupByParamsFactory.build(sourceSchema, groupBy, logger); final Grouped grouped = buildGrouped( formats, diff --git a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java index 43f52b245ae0..6ff9fdb225ac 100644 --- a/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java +++ b/ksql-streams/src/main/java/io/confluent/ksql/execution/streams/TableGroupByBuilder.java @@ -20,10 +20,12 @@ import io.confluent.ksql.execution.codegen.CodeGenRunner; import io.confluent.ksql.execution.codegen.ExpressionMetadata; import io.confluent.ksql.execution.context.QueryContext; +import io.confluent.ksql.execution.context.QueryLoggerUtil; import io.confluent.ksql.execution.plan.Formats; import io.confluent.ksql.execution.plan.KGroupedTableHolder; import io.confluent.ksql.execution.plan.KTableHolder; import io.confluent.ksql.execution.plan.TableGroupBy; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; import java.util.List; @@ -58,7 +60,14 @@ public static KGroupedTableHolder build( queryBuilder.getFunctionRegistry() ); - final GroupByParams params = GroupByParamsFactory.build(sourceSchema, groupBy); + final ProcessingLogger logger = queryBuilder + .getProcessingLogContext() + .getLoggerFactory() + .getLogger( + QueryLoggerUtil.queryLoggerName(queryBuilder.getQueryId(), queryContext) + ); + + final GroupByParams params = GroupByParamsFactory.build(sourceSchema, groupBy, logger); final PhysicalSchema physicalSchema = PhysicalSchema.from( params.getSchema(), diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/GroupByParamsFactoryTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/GroupByParamsFactoryTest.java index 2f47919a1a3e..641d136de504 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/GroupByParamsFactoryTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/GroupByParamsFactoryTest.java @@ -16,6 +16,7 @@ package io.confluent.ksql.execution.streams; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; import static org.mockito.ArgumentMatchers.any; @@ -29,16 +30,24 @@ import io.confluent.ksql.execution.codegen.ExpressionMetadata; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; +import io.confluent.ksql.logging.processing.ProcessingLogConfig; +import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema; +import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema.MessageType; +import io.confluent.ksql.logging.processing.ProcessingLogger; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.types.SqlType; import io.confluent.ksql.schema.ksql.types.SqlTypes; import java.util.Collections; import java.util.List; +import java.util.function.Function; +import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Struct; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; @@ -54,22 +63,31 @@ public class GroupByParamsFactoryTest { @Mock private ExpressionMetadata groupBy0; - @Mock private ExpressionMetadata groupBy1; - @Mock private GenericRow value; + @Mock + private ProcessingLogger logger; + @Captor + private ArgumentCaptor> msgCaptor; + @Mock + private ProcessingLogConfig processingLogConfig; - private GroupByParams singlePrams; + private GroupByParams singleParams; private GroupByParams multiParams; @Before public void setUp() { when(groupBy0.getExpressionType()).thenReturn(SqlTypes.INTEGER); + when(processingLogConfig.getBoolean(ProcessingLogConfig.INCLUDE_ROWS)).thenReturn(false); - singlePrams = GroupByParamsFactory.build(SOURCE_SCHEMA, ImmutableList.of(groupBy0)); - multiParams = GroupByParamsFactory.build(SOURCE_SCHEMA, ImmutableList.of(groupBy0, groupBy1)); + singleParams = GroupByParamsFactory.build(SOURCE_SCHEMA, ImmutableList.of(groupBy0), logger); + multiParams = GroupByParamsFactory.build( + SOURCE_SCHEMA, + ImmutableList.of(groupBy0, groupBy1), + logger + ); when(groupBy0.evaluate(any())).thenReturn(0); when(groupBy1.evaluate(any())).thenReturn(0L); @@ -87,7 +105,7 @@ public void shouldThrowOnNullParam() { @Test(expected = IllegalArgumentException.class) public void shouldThrowOnEmptyParam() { - GroupByParamsFactory.build(SOURCE_SCHEMA, Collections.emptyList()); + GroupByParamsFactory.build(SOURCE_SCHEMA, Collections.emptyList(), logger); } @Test @@ -106,7 +124,7 @@ public void shouldGenerateSingleExpressionGroupByKey() { when(groupBy0.evaluate(any())).thenReturn(10); // When: - final Struct result = singlePrams.getMapper().apply(value); + final Struct result = singleParams.getMapper().apply(value); // Then: assertThat(result, is(INT_KEY_BUILDER.build(10))); @@ -131,12 +149,31 @@ public void shouldReturnNullIfSingleExpressionResolvesToNull() { when(groupBy0.evaluate(any())).thenReturn(null); // When: - final Struct result = singlePrams.getMapper().apply(value); + final Struct result = singleParams.getMapper().apply(value); // Then: assertThat(result, is(nullValue())); } + @Test + public void shouldLogProcessingErrorIfSingleExpressionResolvesToNull() { + // Given + when(groupBy0.evaluate(any())).thenReturn(null); + + // When: + singleParams.getMapper().apply(value); + + // Then: + final Struct msg = verifyAndGetLogMsg(); + assertThat( + msg.get(ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_MESSAGE), + equalTo( + "Group-by column with index 0 resolved to null. " + + "The source row will be excluded from the table." + ) + ); + } + @Test public void shouldReturnNullIfAnyMultiExpressionResolvesToNull() { // Given: @@ -149,18 +186,56 @@ public void shouldReturnNullIfAnyMultiExpressionResolvesToNull() { assertThat(result, is(nullValue())); } + @Test + public void shouldLogProcessingErrorIfAnyMultiExpressionResolvesToNull() { + // Given + when(groupBy0.evaluate(any())).thenReturn(null); + + // When: + multiParams.getMapper().apply(value); + + // Then: + final Struct msg = verifyAndGetLogMsg(); + assertThat( + msg.get(ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_MESSAGE), + equalTo( + "Group-by column with index 0 resolved to null. " + + "The source row will be excluded from the table." + ) + ); + } + @Test public void shouldReturnNullIfExpressionThrowsInSingle() { // Given: when(groupBy0.evaluate(any())).thenThrow(new RuntimeException("Boom")); // When: - final Struct result = singlePrams.getMapper().apply(value); + final Struct result = singleParams.getMapper().apply(value); // Then: assertThat(result, is(nullValue())); } + @Test + public void shouldLogProcessingErrorIfExpressionThrowsInSingle() { + // Given + when(groupBy0.evaluate(any())).thenThrow(new RuntimeException("Bap")); + + // When: + singleParams.getMapper().apply(value); + + // Then: + final Struct msg = verifyAndGetLogMsg(); + assertThat( + msg.get(ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_MESSAGE), + equalTo( + "Error calculating group-by column with index 0. " + + "The source row will be excluded from the table: Bap" + ) + ); + } + @Test public void shouldReturnNullExpressionThrowsInMulti() { // Given: @@ -172,4 +247,36 @@ public void shouldReturnNullExpressionThrowsInMulti() { // Then: assertThat(result, is(nullValue())); } + + @Test + public void shouldLogProcessingErrorIfExpressionThrowsInMulti() { + // Given + when(groupBy0.evaluate(any())).thenThrow(new RuntimeException("Bap")); + + // When: + multiParams.getMapper().apply(value); + + // Then: + final Struct msg = verifyAndGetLogMsg(); + assertThat( + msg.get(ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR_FIELD_MESSAGE), + equalTo( + "Error calculating group-by column with index 0. " + + "The source row will be excluded from the table: Bap" + ) + ); + } + + private Struct verifyAndGetLogMsg() { + verify(logger).error(msgCaptor.capture()); + final Function msgSupplier = msgCaptor.getValue(); + final SchemaAndValue schemaAndValue = msgSupplier.apply(processingLogConfig); + assertThat(schemaAndValue.schema(), equalTo(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA)); + final Struct msg = (Struct) schemaAndValue.value(); + assertThat( + msg.get(ProcessingLogMessageSchema.TYPE), + equalTo(MessageType.RECORD_PROCESSING_ERROR.getTypeId()) + ); + return msg.getStruct(ProcessingLogMessageSchema.RECORD_PROCESSING_ERROR); + } } diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java index 6dc38fd459d6..62f140497faa 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/StreamGroupByBuilderTest.java @@ -26,8 +26,12 @@ import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.execution.util.StructKeyUtil.KeyBuilder; import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.logging.processing.ProcessingLogContext; +import io.confluent.ksql.logging.processing.ProcessingLogger; +import io.confluent.ksql.logging.processing.ProcessingLoggerFactory; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.name.SourceName; +import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; import io.confluent.ksql.schema.ksql.PhysicalSchema; import io.confluent.ksql.schema.ksql.types.SqlTypes; @@ -120,6 +124,10 @@ public class StreamGroupByBuilderTest { private KGroupedStream groupedStream; @Captor private ArgumentCaptor> predicateCaptor; + @Mock + private ProcessingLogContext processingLogContext; + @Mock + private ProcessingLoggerFactory processingLoggerFactory; private PlanBuilder planBuilder; private StreamGroupBy streamGroupBy; @@ -135,6 +143,8 @@ public void init() { when(queryBuilder.getFunctionRegistry()).thenReturn(functionRegistry); when(queryBuilder.buildKeySerde(any(), any(), any())).thenReturn(keySerde); when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valueSerde); + when(queryBuilder.getProcessingLogContext()).thenReturn(processingLogContext); + when(queryBuilder.getQueryId()).thenReturn(new QueryId("qid")); when(groupedFactory.create(any(), any(Serde.class), any())).thenReturn(grouped); when(sourceStream.groupByKey(any(Grouped.class))).thenReturn(groupedStream); when(sourceStream.filter(any())).thenReturn(filteredStream); @@ -143,6 +153,8 @@ public void init() { when(sourceStep.getProperties()).thenReturn(SOURCE_PROPERTIES); when(sourceStep.build(any())).thenReturn( new KStreamHolder<>(sourceStream, SCHEMA, mock(KeySerdeFactory.class))); + when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory); + when(processingLoggerFactory.getLogger(any())).thenReturn(mock(ProcessingLogger.class)); streamGroupBy = new StreamGroupBy<>( PROPERTIES, sourceStep, diff --git a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java index 8bc4ac8efb17..ad6e30d2066f 100644 --- a/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java +++ b/ksql-streams/src/test/java/io/confluent/ksql/execution/streams/TableGroupByBuilderTest.java @@ -25,6 +25,9 @@ import io.confluent.ksql.execution.streams.TableGroupByBuilder.TableKeyValueMapper; import io.confluent.ksql.execution.util.StructKeyUtil; import io.confluent.ksql.function.FunctionRegistry; +import io.confluent.ksql.logging.processing.ProcessingLogContext; +import io.confluent.ksql.logging.processing.ProcessingLogger; +import io.confluent.ksql.logging.processing.ProcessingLoggerFactory; import io.confluent.ksql.name.ColumnName; import io.confluent.ksql.query.QueryId; import io.confluent.ksql.schema.ksql.LogicalSchema; @@ -118,6 +121,10 @@ public class TableGroupByBuilderTest { private ArgumentCaptor> mapperCaptor; @Captor private ArgumentCaptor> predicateCaptor; + @Mock + private ProcessingLogContext processingLogContext; + @Mock + private ProcessingLoggerFactory processingLoggerFactory; private PlanBuilder planBuilder; private TableGroupBy groupBy; @@ -133,6 +140,7 @@ public void init() { when(queryBuilder.getFunctionRegistry()).thenReturn(functionRegistry); when(queryBuilder.buildKeySerde(any(), any(), any())).thenReturn(keySerde); when(queryBuilder.buildValueSerde(any(), any(), any())).thenReturn(valueSerde); + when(queryBuilder.getProcessingLogContext()).thenReturn(processingLogContext); when(groupedFactory.create(any(), any(Serde.class), any())).thenReturn(grouped); when(sourceTable.filter(any())).thenReturn(filteredTable); when(filteredTable.groupBy(any(KeyValueMapper.class), any(Grouped.class))) @@ -140,6 +148,8 @@ public void init() { when(sourceStep.getProperties()).thenReturn(SOURCE_PROPERTIES); when(sourceStep.build(any())).thenReturn( KTableHolder.unmaterialized(sourceTable, SCHEMA, mock(KeySerdeFactory.class))); + when(processingLogContext.getLoggerFactory()).thenReturn(processingLoggerFactory); + when(processingLoggerFactory.getLogger(any())).thenReturn(mock(ProcessingLogger.class)); groupBy = new TableGroupBy<>( PROPERTIES, sourceStep,