Skip to content

Commit

Permalink
Merge pull request #158 from /issues/156
Browse files Browse the repository at this point in the history
[fix]: Ensure checkpointing triggers records flush to ClickHouse #156
  • Loading branch information
czy006 authored Dec 15, 2024
2 parents 165a969 + ea5d1ec commit 5f879c6
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.flink.connector.clickhouse;

import org.apache.flink.connector.clickhouse.internal.AbstractClickHouseOutputFormat;
import org.apache.flink.connector.clickhouse.internal.ClickHouseRowDataSinkFunction;
import org.apache.flink.connector.clickhouse.internal.options.ClickHouseDmlOptions;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.OutputFormatProvider;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
Expand Down Expand Up @@ -98,7 +99,8 @@ public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
.withPrimaryKey(primaryKeys)
.withPartitionKey(partitionKeys)
.build();
return OutputFormatProvider.of(outputFormat, options.getParallelism());
return SinkFunctionProvider.of(
new ClickHouseRowDataSinkFunction(outputFormat), options.getParallelism());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.clickhouse.internal;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;

import java.io.IOException;

/** A rich sink function to write {@link RowData} records into ClickHouse. */
@Internal
public class ClickHouseRowDataSinkFunction extends RichSinkFunction<RowData>
implements CheckpointedFunction {

private final AbstractClickHouseOutputFormat outputFormat;

public ClickHouseRowDataSinkFunction(@Nonnull AbstractClickHouseOutputFormat outputFormat) {
this.outputFormat = Preconditions.checkNotNull(outputFormat);
}

@Override
public void open(Configuration parameters) throws Exception {
outputFormat.configure(parameters);
RuntimeContext runtimeContext = getRuntimeContext();
outputFormat.setRuntimeContext(runtimeContext);
outputFormat.open(
runtimeContext.getIndexOfThisSubtask(),
runtimeContext.getNumberOfParallelSubtasks());
}

@Override
public void invoke(RowData value, Context context) throws IOException {
outputFormat.writeRecord(value);
}

@Override
public void initializeState(FunctionInitializationContext context) {}

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
outputFormat.flush();
}

@Override
public void close() {
outputFormat.close();
}
}

0 comments on commit 5f879c6

Please sign in to comment.