Skip to content

Commit

Permalink
[FLINK-34942] Add support for Flink 1.19, 1.20-SNAPSHOT
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Apr 13, 2024
1 parent 9e161cc commit 925be57
Show file tree
Hide file tree
Showing 10 changed files with 360 additions and 93 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [1.17.1, 1.18-SNAPSHOT]
flink: [ 1.17-SNAPSHOT, 1.18-SNAPSHOT, 1.19-SNAPSHOT, 1.20-SNAPSHOT ]
jdk: [ '8, 11' ]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
jdk_version: ${{ matrix.jdk }}
16 changes: 13 additions & 3 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,31 @@ jobs:
if: github.repository_owner == 'apache'
strategy:
matrix:
flink_branches: [{
flink_branches: [ {
flink: 1.17-SNAPSHOT,
branch: main
}, {
flink: 1.18-SNAPSHOT,
jdk: '8, 11',
branch: main
}, {
flink: 1.16.2,
flink: 1.19-SNAPSHOT,
jdk: '8, 11',
branch: main
}, {
flink: 1.20-SNAPSHOT,
jdk: '8, 11',
branch: main
}, {
flink: 1.17.2,
branch: v1.0
}, {
flink: 1.17.1,
flink: 1.18.1,
branch: v1.0
}]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink_branches.flink }}
jdk_version: ${{ matrix.flink_branches.jdk || '8, 11' }}
connector_branch: ${{ matrix.flink_branches.branch }}
run_dependency_convergence: false
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.core.execution;

import org.apache.flink.streaming.tests.OpensearchSinkE2ECase;

/**
* This is a copy of {@link CheckpointingMode} from flink-core module introduced in Flink 1.20. We
* need it here to make {@link OpensearchSinkE2ECase} compatible with earlier releases. Could be
* removed together with dropping support of Flink 1.19.
*/
public enum CheckpointingMode {
EXACTLY_ONCE,
AT_LEAST_ONCE;

private CheckpointingMode() {}

public static org.apache.flink.streaming.api.CheckpointingMode convertFromCheckpointingMode(
org.apache.flink.core.execution.CheckpointingMode semantic) {
switch (semantic) {
case EXACTLY_ONCE:
return org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
case AT_LEAST_ONCE:
return org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
default:
throw new IllegalArgumentException("Unsupported semantic: " + semantic);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.List;

import static org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
import static org.apache.flink.core.execution.CheckpointingMode.convertFromCheckpointingMode;
import static org.apache.flink.runtime.testutils.CommonTestUtils.waitUntilCondition;

/** End to end test for OpensearchSink based on connector testing framework. */
Expand Down Expand Up @@ -85,7 +86,8 @@ public OpensearchSinkE2ECase() throws Exception {}
.toUri()
.toURL()));

@Override
/** Could be removed together with dropping support of Flink 1.19. */
@Deprecated
protected void checkResultWithSemantic(
ExternalSystemDataReader<ComparableTuple2<Integer, String>> reader,
List<ComparableTuple2<Integer, String>> testData,
Expand All @@ -109,6 +111,30 @@ protected void checkResultWithSemantic(
READER_RETRY_ATTEMPTS);
}

protected void checkResultWithSemantic(
ExternalSystemDataReader<ComparableTuple2<Integer, String>> reader,
List<ComparableTuple2<Integer, String>> testData,
org.apache.flink.core.execution.CheckpointingMode semantic)
throws Exception {
waitUntilCondition(
() -> {
try {
List<ComparableTuple2<Integer, String>> result =
reader.poll(Duration.ofMillis(READER_TIMEOUT));
assertThat(sort(result).iterator())
.matchesRecordsFromSource(
Collections.singletonList(sort(testData)),
convertFromCheckpointingMode(semantic));
return true;
} catch (Throwable t) {
LOG.warn("Polled results not as expected", t);
return false;
}
},
5000,
READER_RETRY_ATTEMPTS);
}

private static <T extends Comparable<T>> List<T> sort(List<T> list) {
Collections.sort(list);
return list;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,17 @@

package org.apache.flink.connector.opensearch.sink;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.ThrowingRunnable;

import org.apache.http.HttpHost;
Expand All @@ -44,8 +36,8 @@
import org.junit.jupiter.api.TestFactory;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;

import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

Expand Down Expand Up @@ -138,9 +130,12 @@ void testOverrideFailureHandler() {
final OpensearchSink<Object> sink =
createMinimalBuilder().setFailureHandler(failureHandler).build();

final InitContext sinkInitContext = new MockInitContext();
final BulkResponseInspector bulkResponseInspector =
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
sink.getBulkResponseInspectorFactory()
.apply(
() ->
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
new UnregisteredMetricsGroup()));
assertThat(bulkResponseInspector)
.isInstanceOf(DefaultBulkResponseInspector.class)
.extracting(
Expand All @@ -163,7 +158,20 @@ void testOverrideBulkResponseInspectorFactory() {
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
.build();

final InitContext sinkInitContext = new MockInitContext();
final InitContext sinkInitContext = Mockito.mock(InitContext.class);
Mockito.when(sinkInitContext.metricGroup())
.thenReturn(
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
new UnregisteredMetricsGroup()));

Mockito.when(sinkInitContext.getMailboxExecutor())
.thenReturn(new OpensearchSinkBuilderTest.DummyMailboxExecutor());
Mockito.when(sinkInitContext.getProcessingTimeService())
.thenReturn(new TestProcessingTimeService());
Mockito.when(sinkInitContext.getUserCodeClassLoader())
.thenReturn(
SimpleUserCodeClassLoader.create(
OpensearchSinkBuilderTest.class.getClassLoader()));

assertThatCode(() -> sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
assertThat(called).isTrue();
Expand All @@ -184,64 +192,6 @@ public boolean tryYield() throws FlinkRuntimeException {
}
}

private static class MockInitContext
implements Sink.InitContext, SerializationSchema.InitializationContext {

public UserCodeClassLoader getUserCodeClassLoader() {
return SimpleUserCodeClassLoader.create(
OpensearchSinkBuilderTest.class.getClassLoader());
}

public MailboxExecutor getMailboxExecutor() {
return new OpensearchSinkBuilderTest.DummyMailboxExecutor();
}

public ProcessingTimeService getProcessingTimeService() {
return new TestProcessingTimeService();
}

public int getSubtaskId() {
return 0;
}

public int getNumberOfParallelSubtasks() {
return 0;
}

public int getAttemptNumber() {
return 0;
}

public SinkWriterMetricGroup metricGroup() {
return InternalSinkWriterMetricGroup.mock(new UnregisteredMetricsGroup());
}

public MetricGroup getMetricGroup() {
return this.metricGroup();
}

public OptionalLong getRestoredCheckpointId() {
return OptionalLong.empty();
}

public SerializationSchema.InitializationContext
asSerializationSchemaInitializationContext() {
return this;
}

public boolean isObjectReuseEnabled() {
return false;
}

public <IN> TypeSerializer<IN> createInputSerializer() {
throw new UnsupportedOperationException();
}

public JobID getJobId() {
throw new UnsupportedOperationException();
}
}

private OpensearchSinkBuilder<Object> createEmptyBuilder() {
return new OpensearchSinkBuilder<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLoggerExtension;
Expand Down Expand Up @@ -168,17 +167,19 @@ void testIncrementByteOutMetric() throws Exception {
final String index = "test-inc-byte-out";
final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
final InternalSinkWriterMetricGroup metricGroup =
InternalSinkWriterMetricGroup.mock(
metricListener.getMetricGroup(), operatorIOMetricGroup);
final int flushAfterNActions = 2;
final BulkProcessorConfig bulkProcessorConfig =
new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0);

try (final OpensearchWriter<Tuple2<Integer, String>> writer =
createWriter(index, false, bulkProcessorConfig, metricGroup)) {
createWriter(
index,
false,
bulkProcessorConfig,
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
operatorIOMetricGroup, metricListener.getMetricGroup()))) {
final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter();
assertThat(numBytesOut.getCount()).isEqualTo(0);
assertThat(numBytesOut.getCount()).isZero();
writer.write(Tuple2.of(1, buildMessage(1)), null);
writer.write(Tuple2.of(2, buildMessage(2)), null);

Expand Down Expand Up @@ -280,7 +281,8 @@ private OpensearchWriter<Tuple2<Integer, String>> createWriter(
index,
flushOnCheckpoint,
bulkProcessorConfig,
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
metricListener.getMetricGroup()),
new DefaultFailureHandler());
}

Expand All @@ -293,7 +295,8 @@ private OpensearchWriter<Tuple2<Integer, String>> createWriter(
index,
flushOnCheckpoint,
bulkProcessorConfig,
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
metricListener.getMetricGroup()),
failureHandler);
}

Expand Down
Loading

0 comments on commit 925be57

Please sign in to comment.