From fb9b14e56e7c2c8549b859b1abeaf38cea2780d9 Mon Sep 17 00:00:00 2001 From: Sergey Nuyanzin Date: Tue, 26 Mar 2024 23:34:47 +0100 Subject: [PATCH] [FLINK-34942] Add support for Flink 1.19, 1.20-SNAPSHOT --- .github/workflows/push_pr.yml | 10 +- .github/workflows/weekly.yml | 15 +- .../sink/OpensearchSinkBuilderTest.java | 90 ++------ .../sink/OpensearchWriterITCase.java | 19 +- .../sink/TestingSinkWriterMetricGroup.java | 198 ++++++++++++++++++ flink-sql-connector-opensearch/pom.xml | 6 + pom.xml | 7 + 7 files changed, 263 insertions(+), 82 deletions(-) create mode 100644 flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestingSinkWriterMetricGroup.java diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index 6c2dc05..577f472 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -25,7 +25,15 @@ jobs: compile_and_test: strategy: matrix: - flink: [1.17.1, 1.18-SNAPSHOT] + flink: [ 1.17-SNAPSHOT ] + jdk: [ '8, 11' ] + include: + - flink: 1.18-SNAPSHOT + jdk: '8, 11, 17' + - flink: 1.19-SNAPSHOT + jdk: '8, 11, 17, 21' + - flink: 1.20-SNAPSHOT + jdk: '8, 11, 17, 21' uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} diff --git a/.github/workflows/weekly.yml b/.github/workflows/weekly.yml index 937446e..bb03f1f 100644 --- a/.github/workflows/weekly.yml +++ b/.github/workflows/weekly.yml @@ -26,17 +26,26 @@ jobs: if: github.repository_owner == 'apache' strategy: matrix: - flink_branches: [{ + flink_branches: [ { flink: 1.17-SNAPSHOT, branch: main }, { flink: 1.18-SNAPSHOT, + jdk: '8, 11, 17', branch: main }, { - flink: 1.16.2, + flink: 1.19-SNAPSHOT, + jdk: '8, 11, 17, 21', + branch: main + }, { + flink: 1.20-SNAPSHOT, + jdk: '8, 11, 17, 21', + branch: main + }, { + flink: 1.17.2, branch: v1.0 }, { - flink: 1.17.1, + flink: 1.18.1, branch: v1.0 }] uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java index 693ae44..ce4278b 100644 --- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java +++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchSinkBuilderTest.java @@ -17,25 +17,17 @@ package org.apache.flink.connector.opensearch.sink; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.operators.MailboxExecutor; -import org.apache.flink.api.common.operators.ProcessingTimeService; -import org.apache.flink.api.common.serialization.SerializationSchema; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.connector.sink2.Sink; import org.apache.flink.api.connector.sink2.Sink.InitContext; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory; import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.SimpleUserCodeClassLoader; import org.apache.flink.util.TestLoggerExtension; -import org.apache.flink.util.UserCodeClassLoader; import org.apache.flink.util.function.ThrowingRunnable; import org.apache.http.HttpHost; @@ -44,8 +36,8 @@ import org.junit.jupiter.api.TestFactory; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mockito; -import java.util.OptionalLong; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; @@ -138,9 +130,12 @@ void testOverrideFailureHandler() { final OpensearchSink sink = createMinimalBuilder().setFailureHandler(failureHandler).build(); - final InitContext sinkInitContext = new MockInitContext(); final BulkResponseInspector bulkResponseInspector = - sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup); + sink.getBulkResponseInspectorFactory() + .apply( + () -> + TestingSinkWriterMetricGroup.getSinkWriterMetricGroup( + new UnregisteredMetricsGroup())); assertThat(bulkResponseInspector) .isInstanceOf(DefaultBulkResponseInspector.class) .extracting( @@ -163,7 +158,20 @@ void testOverrideBulkResponseInspectorFactory() { .setBulkResponseInspectorFactory(bulkResponseInspectorFactory) .build(); - final InitContext sinkInitContext = new MockInitContext(); + final InitContext sinkInitContext = Mockito.mock(InitContext.class); + Mockito.when(sinkInitContext.metricGroup()) + .thenReturn( + TestingSinkWriterMetricGroup.getSinkWriterMetricGroup( + new UnregisteredMetricsGroup())); + + Mockito.when(sinkInitContext.getMailboxExecutor()) + .thenReturn(new OpensearchSinkBuilderTest.DummyMailboxExecutor()); + Mockito.when(sinkInitContext.getProcessingTimeService()) + .thenReturn(new TestProcessingTimeService()); + Mockito.when(sinkInitContext.getUserCodeClassLoader()) + .thenReturn( + SimpleUserCodeClassLoader.create( + OpensearchSinkBuilderTest.class.getClassLoader())); assertThatCode(() -> sink.createWriter(sinkInitContext)).doesNotThrowAnyException(); assertThat(called).isTrue(); @@ -184,64 +192,6 @@ public boolean tryYield() throws FlinkRuntimeException { } } - private static class MockInitContext - implements Sink.InitContext, SerializationSchema.InitializationContext { - - public UserCodeClassLoader getUserCodeClassLoader() { - return SimpleUserCodeClassLoader.create( - OpensearchSinkBuilderTest.class.getClassLoader()); - } - - public MailboxExecutor getMailboxExecutor() { - return new OpensearchSinkBuilderTest.DummyMailboxExecutor(); - } - - public ProcessingTimeService getProcessingTimeService() { - return new TestProcessingTimeService(); - } - - public int getSubtaskId() { - return 0; - } - - public int getNumberOfParallelSubtasks() { - return 0; - } - - public int getAttemptNumber() { - return 0; - } - - public SinkWriterMetricGroup metricGroup() { - return InternalSinkWriterMetricGroup.mock(new UnregisteredMetricsGroup()); - } - - public MetricGroup getMetricGroup() { - return this.metricGroup(); - } - - public OptionalLong getRestoredCheckpointId() { - return OptionalLong.empty(); - } - - public SerializationSchema.InitializationContext - asSerializationSchemaInitializationContext() { - return this; - } - - public boolean isObjectReuseEnabled() { - return false; - } - - public TypeSerializer createInputSerializer() { - throw new UnsupportedOperationException(); - } - - public JobID getJobId() { - throw new UnsupportedOperationException(); - } - } - private OpensearchSinkBuilder createEmptyBuilder() { return new OpensearchSinkBuilder<>(); } diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java index fc083a4..838c6bd 100644 --- a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java +++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/OpensearchWriterITCase.java @@ -30,7 +30,6 @@ import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.metrics.testutils.MetricListener; import org.apache.flink.runtime.metrics.MetricNames; -import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.TestLoggerExtension; @@ -168,17 +167,19 @@ void testIncrementByteOutMetric() throws Exception { final String index = "test-inc-byte-out"; final OperatorIOMetricGroup operatorIOMetricGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); - final InternalSinkWriterMetricGroup metricGroup = - InternalSinkWriterMetricGroup.mock( - metricListener.getMetricGroup(), operatorIOMetricGroup); final int flushAfterNActions = 2; final BulkProcessorConfig bulkProcessorConfig = new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0); try (final OpensearchWriter> writer = - createWriter(index, false, bulkProcessorConfig, metricGroup)) { + createWriter( + index, + false, + bulkProcessorConfig, + TestingSinkWriterMetricGroup.getSinkWriterMetricGroup( + operatorIOMetricGroup, metricListener.getMetricGroup()))) { final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter(); - assertThat(numBytesOut.getCount()).isEqualTo(0); + assertThat(numBytesOut.getCount()).isZero(); writer.write(Tuple2.of(1, buildMessage(1)), null); writer.write(Tuple2.of(2, buildMessage(2)), null); @@ -280,7 +281,8 @@ private OpensearchWriter> createWriter( index, flushOnCheckpoint, bulkProcessorConfig, - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), + TestingSinkWriterMetricGroup.getSinkWriterMetricGroup( + metricListener.getMetricGroup()), new DefaultFailureHandler()); } @@ -293,7 +295,8 @@ private OpensearchWriter> createWriter( index, flushOnCheckpoint, bulkProcessorConfig, - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), + TestingSinkWriterMetricGroup.getSinkWriterMetricGroup( + metricListener.getMetricGroup()), failureHandler); } diff --git a/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestingSinkWriterMetricGroup.java b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestingSinkWriterMetricGroup.java new file mode 100644 index 0000000..0ad609f --- /dev/null +++ b/flink-connector-opensearch/src/test/java/org/apache/flink/connector/opensearch/sink/TestingSinkWriterMetricGroup.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.opensearch.sink; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.SinkWriterMetricGroup; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; + +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** Testing implementation for {@link SinkWriterMetricGroup}. */ +public class TestingSinkWriterMetricGroup extends ProxyMetricGroup + implements SinkWriterMetricGroup { + + private final Supplier numRecordsOutErrorsCounterSupplier; + + private final Supplier numRecordsSendErrorsCounterSupplier; + + private final Supplier numRecordsSendCounterSupplier; + + private final Supplier numBytesSendCounterSupplier; + + private final Consumer> currentSendTimeGaugeConsumer; + + private final Supplier ioMetricGroupSupplier; + + public TestingSinkWriterMetricGroup( + MetricGroup parentMetricGroup, + Supplier numRecordsOutErrorsCounterSupplier, + Supplier numRecordsSendErrorsCounterSupplier, + Supplier numRecordsSendCounterSupplier, + Supplier numBytesSendCounterSupplier, + Consumer> currentSendTimeGaugeConsumer, + Supplier ioMetricGroupSupplier) { + super(parentMetricGroup); + this.numRecordsOutErrorsCounterSupplier = numRecordsOutErrorsCounterSupplier; + this.numRecordsSendErrorsCounterSupplier = numRecordsSendErrorsCounterSupplier; + this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier; + this.numBytesSendCounterSupplier = numBytesSendCounterSupplier; + this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer; + this.ioMetricGroupSupplier = ioMetricGroupSupplier; + } + + @Override + public Counter getNumRecordsOutErrorsCounter() { + return numRecordsOutErrorsCounterSupplier.get(); + } + + @Override + public Counter getNumRecordsSendErrorsCounter() { + return numRecordsSendErrorsCounterSupplier.get(); + } + + @Override + public Counter getNumRecordsSendCounter() { + return numRecordsSendCounterSupplier.get(); + } + + @Override + public Counter getNumBytesSendCounter() { + return numBytesSendCounterSupplier.get(); + } + + @Override + public void setCurrentSendTimeGauge(Gauge gauge) { + currentSendTimeGaugeConsumer.accept(gauge); + } + + @Override + public OperatorIOMetricGroup getIOMetricGroup() { + return ioMetricGroupSupplier.get(); + } + + static TestingSinkWriterMetricGroup getSinkWriterMetricGroup(MetricGroup parentMetricGroup) { + final OperatorIOMetricGroup operatorIOMetricGroup = + UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); + return getSinkWriterMetricGroup(operatorIOMetricGroup, parentMetricGroup); + } + + static TestingSinkWriterMetricGroup getSinkWriterMetricGroup( + OperatorIOMetricGroup operatorIOMetricGroup, MetricGroup parentMetricGroup) { + Counter numRecordsOutErrors = parentMetricGroup.counter(MetricNames.NUM_RECORDS_OUT_ERRORS); + Counter numRecordsSendErrors = + parentMetricGroup.counter(MetricNames.NUM_RECORDS_SEND_ERRORS, numRecordsOutErrors); + Counter numRecordsWritten = + parentMetricGroup.counter( + MetricNames.NUM_RECORDS_SEND, + operatorIOMetricGroup.getNumRecordsOutCounter()); + Counter numBytesWritten = + parentMetricGroup.counter( + MetricNames.NUM_BYTES_SEND, operatorIOMetricGroup.getNumBytesOutCounter()); + Consumer> currentSendTimeGaugeConsumer = + currentSendTimeGauge -> + parentMetricGroup.gauge( + MetricNames.CURRENT_SEND_TIME, currentSendTimeGauge); + return new TestingSinkWriterMetricGroup.Builder() + .setParentMetricGroup(parentMetricGroup) + .setIoMetricGroupSupplier(() -> operatorIOMetricGroup) + .setNumRecordsOutErrorsCounterSupplier(() -> numRecordsOutErrors) + .setNumRecordsSendErrorsCounterSupplier(() -> numRecordsSendErrors) + .setNumRecordsSendCounterSupplier(() -> numRecordsWritten) + .setNumBytesSendCounterSupplier(() -> numBytesWritten) + .setCurrentSendTimeGaugeConsumer(currentSendTimeGaugeConsumer) + .build(); + } + + /** Builder for {@link TestingSinkWriterMetricGroup}. */ + public static class Builder { + + private MetricGroup parentMetricGroup = null; + + private Supplier numRecordsOutErrorsCounterSupplier = () -> null; + + private Supplier numRecordsSendErrorsCounterSupplier = () -> null; + + private Supplier numRecordsSendCounterSupplier = () -> null; + + private Supplier numBytesSendCounterSupplier = () -> null; + + private Consumer> currentSendTimeGaugeConsumer = counter -> {}; + + private Supplier ioMetricGroupSupplier = () -> null; + + public Builder setParentMetricGroup(MetricGroup parentMetricGroup) { + this.parentMetricGroup = parentMetricGroup; + return this; + } + + public Builder setNumRecordsOutErrorsCounterSupplier( + Supplier numRecordsOutErrorsCounterSupplier) { + this.numRecordsOutErrorsCounterSupplier = numRecordsOutErrorsCounterSupplier; + return this; + } + + public Builder setNumRecordsSendErrorsCounterSupplier( + Supplier numRecordsSendErrorsCounterSupplier) { + this.numRecordsSendErrorsCounterSupplier = numRecordsSendErrorsCounterSupplier; + return this; + } + + public Builder setNumRecordsSendCounterSupplier( + Supplier numRecordsSendCounterSupplier) { + this.numRecordsSendCounterSupplier = numRecordsSendCounterSupplier; + return this; + } + + public Builder setNumBytesSendCounterSupplier( + Supplier numBytesSendCounterSupplier) { + this.numBytesSendCounterSupplier = numBytesSendCounterSupplier; + return this; + } + + public Builder setCurrentSendTimeGaugeConsumer( + Consumer> currentSendTimeGaugeConsumer) { + this.currentSendTimeGaugeConsumer = currentSendTimeGaugeConsumer; + return this; + } + + public Builder setIoMetricGroupSupplier( + Supplier ioMetricGroupSupplier) { + this.ioMetricGroupSupplier = ioMetricGroupSupplier; + return this; + } + + public TestingSinkWriterMetricGroup build() { + return new TestingSinkWriterMetricGroup( + parentMetricGroup, + numRecordsOutErrorsCounterSupplier, + numRecordsSendErrorsCounterSupplier, + numRecordsSendCounterSupplier, + numBytesSendCounterSupplier, + currentSendTimeGaugeConsumer, + ioMetricGroupSupplier); + } + } +} diff --git a/flink-sql-connector-opensearch/pom.xml b/flink-sql-connector-opensearch/pom.xml index e3f4cd4..8f3cdc1 100644 --- a/flink-sql-connector-opensearch/pom.xml +++ b/flink-sql-connector-opensearch/pom.xml @@ -40,6 +40,12 @@ under the License. flink-connector-opensearch ${project.version} + + org.apache.flink + flink-test-utils + ${flink.version} + test + diff --git a/pom.xml b/pom.xml index d6dbd4c..676663c 100644 --- a/pom.xml +++ b/pom.xml @@ -297,6 +297,13 @@ under the License. import + + + org.xerial.snappy + snappy-java + 1.1.10.5 + + org.assertj assertj-core