Skip to content

Commit

Permalink
[FLINK-34942] Add support for Flink 1.19, 1.20-SNAPSHOT
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed Mar 26, 2024
1 parent 9e161cc commit fb9b14e
Show file tree
Hide file tree
Showing 7 changed files with 263 additions and 82 deletions.
10 changes: 9 additions & 1 deletion .github/workflows/push_pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@ jobs:
compile_and_test:
strategy:
matrix:
flink: [1.17.1, 1.18-SNAPSHOT]
flink: [ 1.17-SNAPSHOT ]
jdk: [ '8, 11' ]
include:
- flink: 1.18-SNAPSHOT
jdk: '8, 11, 17'
- flink: 1.19-SNAPSHOT
jdk: '8, 11, 17, 21'
- flink: 1.20-SNAPSHOT
jdk: '8, 11, 17, 21'
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
with:
flink_version: ${{ matrix.flink }}
15 changes: 12 additions & 3 deletions .github/workflows/weekly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,26 @@ jobs:
if: github.repository_owner == 'apache'
strategy:
matrix:
flink_branches: [{
flink_branches: [ {
flink: 1.17-SNAPSHOT,
branch: main
}, {
flink: 1.18-SNAPSHOT,
jdk: '8, 11, 17',
branch: main
}, {
flink: 1.16.2,
flink: 1.19-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
}, {
flink: 1.20-SNAPSHOT,
jdk: '8, 11, 17, 21',
branch: main
}, {
flink: 1.17.2,
branch: v1.0
}, {
flink: 1.17.1,
flink: 1.18.1,
branch: v1.0
}]
uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,17 @@

package org.apache.flink.connector.opensearch.sink;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.Sink.InitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.opensearch.sink.BulkResponseInspector.BulkResponseInspectorFactory;
import org.apache.flink.connector.opensearch.sink.OpensearchWriter.DefaultBulkResponseInspector;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.SimpleUserCodeClassLoader;
import org.apache.flink.util.TestLoggerExtension;
import org.apache.flink.util.UserCodeClassLoader;
import org.apache.flink.util.function.ThrowingRunnable;

import org.apache.http.HttpHost;
Expand All @@ -44,8 +36,8 @@
import org.junit.jupiter.api.TestFactory;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mockito;

import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

Expand Down Expand Up @@ -138,9 +130,12 @@ void testOverrideFailureHandler() {
final OpensearchSink<Object> sink =
createMinimalBuilder().setFailureHandler(failureHandler).build();

final InitContext sinkInitContext = new MockInitContext();
final BulkResponseInspector bulkResponseInspector =
sink.getBulkResponseInspectorFactory().apply(sinkInitContext::metricGroup);
sink.getBulkResponseInspectorFactory()
.apply(
() ->
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
new UnregisteredMetricsGroup()));
assertThat(bulkResponseInspector)
.isInstanceOf(DefaultBulkResponseInspector.class)
.extracting(
Expand All @@ -163,7 +158,20 @@ void testOverrideBulkResponseInspectorFactory() {
.setBulkResponseInspectorFactory(bulkResponseInspectorFactory)
.build();

final InitContext sinkInitContext = new MockInitContext();
final InitContext sinkInitContext = Mockito.mock(InitContext.class);
Mockito.when(sinkInitContext.metricGroup())
.thenReturn(
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
new UnregisteredMetricsGroup()));

Mockito.when(sinkInitContext.getMailboxExecutor())
.thenReturn(new OpensearchSinkBuilderTest.DummyMailboxExecutor());
Mockito.when(sinkInitContext.getProcessingTimeService())
.thenReturn(new TestProcessingTimeService());
Mockito.when(sinkInitContext.getUserCodeClassLoader())
.thenReturn(
SimpleUserCodeClassLoader.create(
OpensearchSinkBuilderTest.class.getClassLoader()));

assertThatCode(() -> sink.createWriter(sinkInitContext)).doesNotThrowAnyException();
assertThat(called).isTrue();
Expand All @@ -184,64 +192,6 @@ public boolean tryYield() throws FlinkRuntimeException {
}
}

private static class MockInitContext
implements Sink.InitContext, SerializationSchema.InitializationContext {

public UserCodeClassLoader getUserCodeClassLoader() {
return SimpleUserCodeClassLoader.create(
OpensearchSinkBuilderTest.class.getClassLoader());
}

public MailboxExecutor getMailboxExecutor() {
return new OpensearchSinkBuilderTest.DummyMailboxExecutor();
}

public ProcessingTimeService getProcessingTimeService() {
return new TestProcessingTimeService();
}

public int getSubtaskId() {
return 0;
}

public int getNumberOfParallelSubtasks() {
return 0;
}

public int getAttemptNumber() {
return 0;
}

public SinkWriterMetricGroup metricGroup() {
return InternalSinkWriterMetricGroup.mock(new UnregisteredMetricsGroup());
}

public MetricGroup getMetricGroup() {
return this.metricGroup();
}

public OptionalLong getRestoredCheckpointId() {
return OptionalLong.empty();
}

public SerializationSchema.InitializationContext
asSerializationSchemaInitializationContext() {
return this;
}

public boolean isObjectReuseEnabled() {
return false;
}

public <IN> TypeSerializer<IN> createInputSerializer() {
throw new UnsupportedOperationException();
}

public JobID getJobId() {
throw new UnsupportedOperationException();
}
}

private OpensearchSinkBuilder<Object> createEmptyBuilder() {
return new OpensearchSinkBuilder<>();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.metrics.MetricNames;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLoggerExtension;
Expand Down Expand Up @@ -168,17 +167,19 @@ void testIncrementByteOutMetric() throws Exception {
final String index = "test-inc-byte-out";
final OperatorIOMetricGroup operatorIOMetricGroup =
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
final InternalSinkWriterMetricGroup metricGroup =
InternalSinkWriterMetricGroup.mock(
metricListener.getMetricGroup(), operatorIOMetricGroup);
final int flushAfterNActions = 2;
final BulkProcessorConfig bulkProcessorConfig =
new BulkProcessorConfig(flushAfterNActions, -1, -1, FlushBackoffType.NONE, 0, 0);

try (final OpensearchWriter<Tuple2<Integer, String>> writer =
createWriter(index, false, bulkProcessorConfig, metricGroup)) {
createWriter(
index,
false,
bulkProcessorConfig,
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
operatorIOMetricGroup, metricListener.getMetricGroup()))) {
final Counter numBytesOut = operatorIOMetricGroup.getNumBytesOutCounter();
assertThat(numBytesOut.getCount()).isEqualTo(0);
assertThat(numBytesOut.getCount()).isZero();
writer.write(Tuple2.of(1, buildMessage(1)), null);
writer.write(Tuple2.of(2, buildMessage(2)), null);

Expand Down Expand Up @@ -280,7 +281,8 @@ private OpensearchWriter<Tuple2<Integer, String>> createWriter(
index,
flushOnCheckpoint,
bulkProcessorConfig,
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
metricListener.getMetricGroup()),
new DefaultFailureHandler());
}

Expand All @@ -293,7 +295,8 @@ private OpensearchWriter<Tuple2<Integer, String>> createWriter(
index,
flushOnCheckpoint,
bulkProcessorConfig,
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
TestingSinkWriterMetricGroup.getSinkWriterMetricGroup(
metricListener.getMetricGroup()),
failureHandler);
}

Expand Down
Loading

0 comments on commit fb9b14e

Please sign in to comment.