Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11355][Sort] Add new source metrics for sort-connector-mongodb-cdc-v1.15 #11356

Merged
merged 2 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

package org.apache.inlong.sort.mongodb;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;

import com.ververica.cdc.debezium.Validator;
import com.ververica.cdc.debezium.internal.DebeziumChangeConsumer;
import com.ververica.cdc.debezium.internal.DebeziumOffset;
Expand Down Expand Up @@ -61,6 +64,8 @@
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedQueue;
Expand Down Expand Up @@ -197,17 +202,25 @@ public class DebeziumSourceFunction<T> extends RichSourceFunction<T>
/** Buffer the events from the source and record the errors from the debezium. */
private transient Handover handover;

private transient SourceExactlyMetric sourceExactlyMetric;

private final MetricOption metricOption;

private transient Map<Long, Long> checkpointStartTimeMap;

// ---------------------------------------------------------------------------------------

public DebeziumSourceFunction(
DebeziumDeserializationSchema<T> deserializer,
Properties properties,
@Nullable DebeziumOffset specificOffset,
Validator validator) {
Validator validator,
MetricOption metricOption) {
this.deserializer = deserializer;
this.properties = properties;
this.specificOffset = specificOffset;
this.validator = validator;
this.metricOption = metricOption;
}

@Override
Expand All @@ -220,6 +233,14 @@ public void open(Configuration parameters) throws Exception {
this.executor = Executors.newSingleThreadExecutor(threadFactory);
this.handover = new Handover();
this.changeConsumer = new DebeziumChangeConsumer(handover);
if (metricOption != null) {
sourceExactlyMetric = new SourceExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
this.checkpointStartTimeMap = new HashMap<>();
// set sourceExactlyMetric for deserializer
if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
((MongoDBConnectorDeserializationSchema) deserializer).setSourceExactlyMetric(sourceExactlyMetric);
}
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -304,17 +325,32 @@ private void restoreHistoryRecordsState() throws Exception {

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException(
"Call snapshotState() on closed source, checkpoint failed.");
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
}
if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
((MongoDBConnectorDeserializationSchema) deserializer)
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
try {
if (handover.hasError()) {
LOG.debug("snapshotState() called on closed source");
throw new FlinkRuntimeException(
"Call snapshotState() on closed source, checkpoint failed.");
} else {
snapshotOffsetState(functionSnapshotContext.getCheckpointId());
snapshotHistoryRecordsState();
}
if (deserializer instanceof MongoDBConnectorDeserializationSchema) {
((MongoDBConnectorDeserializationSchema) deserializer)
.updateCurrentCheckpointId(functionSnapshotContext.getCheckpointId());
}
if (checkpointStartTimeMap != null) {
checkpointStartTimeMap.put(functionSnapshotContext.getCheckpointId(), System.currentTimeMillis());
} else {
LOG.error("checkpointStartTimeMap is null, can't record the start time of checkpoint");
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotCreate();;
}
} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeError();
}
throw e;
}
}

Expand Down Expand Up @@ -496,6 +532,16 @@ public void notifyCheckpointComplete(long checkpointId) {
schema.flushAudit();
schema.updateLastCheckpointId(checkpointId);
}
if (checkpointStartTimeMap != null) {
Long snapShotStartTimeById = checkpointStartTimeMap.remove(checkpointId);
if (snapShotStartTimeById != null && sourceExactlyMetric != null) {
sourceExactlyMetric.incNumSnapshotComplete();
sourceExactlyMetric
.recordSnapshotToCheckpointDelay(System.currentTimeMillis() - snapShotStartTimeById);
}
} else {
LOG.error("checkpointStartTimeMap is null, can't get the start time of checkpoint");
}
} catch (Exception e) {
// ignore exception if we are no longer running
LOG.warn("Ignore error when committing offset to database.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,53 +137,66 @@ public void open() {

@Override
public void deserialize(SourceRecord record, Collector<RowData> out) throws Exception {
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();

OperationType op = operationTypeFor(record);
BsonDocument documentKey =
checkNotNull(
extractBsonDocument(
value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD));
BsonDocument fullDocument =
extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD);
switch (op) {
case INSERT:
GenericRowData insert = extractRowData(fullDocument);
insert.setRowKind(RowKind.INSERT);
emit(record, insert,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case DELETE:
GenericRowData delete = extractRowData(documentKey);
delete.setRowKind(RowKind.DELETE);
emit(record, delete,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case UPDATE:
// It’s null if another operation deletes the document
// before the lookup operation happens. Ignored it.
if (fullDocument == null) {
long deserializeStartTime = System.currentTimeMillis();
try {
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();

OperationType op = operationTypeFor(record);
BsonDocument documentKey =
checkNotNull(
extractBsonDocument(
value, valueSchema, MongoDBEnvelope.DOCUMENT_KEY_FIELD));
BsonDocument fullDocument =
extractBsonDocument(value, valueSchema, MongoDBEnvelope.FULL_DOCUMENT_FIELD);
switch (op) {
case INSERT:
GenericRowData insert = extractRowData(fullDocument);
insert.setRowKind(RowKind.INSERT);
emit(record, insert,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
}
GenericRowData updateAfter = extractRowData(fullDocument);
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, updateAfter,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case REPLACE:
GenericRowData replaceAfter = extractRowData(fullDocument);
replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, replaceAfter,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case INVALIDATE:
case DROP:
case DROP_DATABASE:
case RENAME:
case OTHER:
default:
break;
case DELETE:
GenericRowData delete = extractRowData(documentKey);
delete.setRowKind(RowKind.DELETE);
emit(record, delete,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case UPDATE:
// It’s null if another operation deletes the document
// before the lookup operation happens. Ignored it.
if (fullDocument == null) {
break;
}
GenericRowData updateAfter = extractRowData(fullDocument);
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, updateAfter,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case REPLACE:
GenericRowData replaceAfter = extractRowData(fullDocument);
replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
emit(record, replaceAfter,
sourceExactlyMetric == null ? out : new MetricsCollector<>(out, sourceExactlyMetric));
break;
case INVALIDATE:
case DROP:
case DROP_DATABASE:
case RENAME:
case OTHER:
default:
break;
}
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeSuccess();
sourceExactlyMetric.recordDeserializeDelay(System.currentTimeMillis() - deserializeStartTime);
}

} catch (Exception e) {
if (sourceExactlyMetric != null) {
sourceExactlyMetric.incNumDeserializeError();
}
throw e;
}
}

Expand Down Expand Up @@ -827,4 +840,9 @@ public void updateLastCheckpointId(long checkpointId) {
sourceExactlyMetric.updateLastCheckpointId(checkpointId);
}
}

/** setter for DebeziumSourceFunction to set SourceExactlyMetric*/
public void setSourceExactlyMetric(SourceExactlyMetric sourceExactlyMetric) {
this.sourceExactlyMetric = sourceExactlyMetric;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.sort.mongodb;

import org.apache.inlong.sort.base.metric.MetricOption;

import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.kafka.connect.source.MongoSourceConfig;
import com.mongodb.kafka.connect.source.MongoSourceConfig.ErrorTolerance;
Expand All @@ -35,7 +37,11 @@
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBConnectorSourceTask.DATABASE_INCLUDE_LIST;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.HEARTBEAT_TOPIC_NAME;
import static com.ververica.cdc.connectors.mongodb.internal.MongoDBEnvelope.OUTPUT_SCHEMA;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.*;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.COPY_EXISTING;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.HEARTBEAT_INTERVAL_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_AWAIT_TIME_MILLIS;
import static com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceOptions.POLL_MAX_BATCH_SIZE;
import static com.ververica.cdc.connectors.mongodb.source.utils.MongoUtils.buildConnectionString;
import static org.apache.flink.util.Preconditions.checkArgument;

Expand Down Expand Up @@ -76,6 +82,7 @@ public static class Builder<T> {
private String copyExistingPipeline;
private Integer heartbeatIntervalMillis = HEARTBEAT_INTERVAL_MILLIS.defaultValue();
private DebeziumDeserializationSchema<T> deserializer;
private MetricOption metricOption;

/** The comma-separated list of hostname and port pairs of mongodb servers. */
public Builder<T> hosts(String hosts) {
Expand Down Expand Up @@ -243,6 +250,11 @@ public Builder<T> deserializer(DebeziumDeserializationSchema<T> deserializer) {
return this;
}

public Builder<T> metricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

/**
* The properties of mongodb kafka connector.
* https://docs.mongodb.com/kafka-connector/current/kafka-source
Expand Down Expand Up @@ -338,7 +350,7 @@ public DebeziumSourceFunction<T> build() {
MongoSourceConfig.ERRORS_TOLERANCE_CONFIG, ErrorTolerance.NONE.value());

return new DebeziumSourceFunction<>(
deserializer, props, null, Validator.getDefaultValidator());
deserializer, props, null, Validator.getDefaultValidator(), metricOption);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,13 +191,15 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.ifPresent(builder::heartbeatIntervalMillis);
Optional.ofNullable(splitMetaGroupSize).ifPresent(builder::splitMetaGroupSize);
Optional.ofNullable(splitSizeMB).ifPresent(builder::splitSizeMB);
Optional.ofNullable(metricOption).ifPresent(builder::metricOption);

return SourceProvider.of(builder.build());
} else {
org.apache.inlong.sort.mongodb.MongoDBSource.Builder<RowData> builder =
org.apache.inlong.sort.mongodb.MongoDBSource.<RowData>builder()
.hosts(hosts)
.deserializer(deserializer);
.deserializer(deserializer)
.metricOption(metricOption);

Optional.ofNullable(databaseList).ifPresent(builder::databaseList);
Optional.ofNullable(collectionList).ifPresent(builder::collectionList);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.inlong.sort.mongodb.source;

import org.apache.inlong.sort.base.metric.MetricOption;
import org.apache.inlong.sort.mongodb.DebeziumDeserializationSchema;

import com.ververica.cdc.connectors.base.options.StartupOptions;
Expand Down Expand Up @@ -54,6 +55,7 @@ public class MongoDBSourceBuilder<T> {

private final MongoDBSourceConfigFactory configFactory = new MongoDBSourceConfigFactory();
private DebeziumDeserializationSchema<T> deserializer;
private MetricOption metricOption;

/** The comma-separated list of hostname and port pairs of mongodb servers. */
public MongoDBSourceBuilder<T> hosts(String hosts) {
Expand Down Expand Up @@ -189,6 +191,12 @@ public MongoDBSourceBuilder<T> deserializer(DebeziumDeserializationSchema<T> des
return this;
}

/** The metric option used to collect metrics when inlong.metric.labels is present in flink sql. */
public MongoDBSourceBuilder<T> metricOption(MetricOption metricOption) {
this.metricOption = metricOption;
return this;
}

/**
* Build the {@link MongoDBSource}.
*
Expand Down
Loading