Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: changes to sync debezium metrics to hevo #5

Open
wants to merge 2 commits into
base: 2.3
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package io.debezium.connector.oracle;

import io.debezium.pipeline.source.spi.HevoBatchStats;

import java.math.BigInteger;

public class HevoOracleBatchStats implements HevoBatchStats {
BigInteger currentScn;

Integer lastCapturedDmlCount;

Long lastBatchProcessingDuration;

Long commitedTransactions;

BigInteger commitedScn;

Long commitedDmlCount;

Long skippedDmlCount;

BigInteger startSCN;

BigInteger batchEndScn;

public HevoOracleBatchStats(BigInteger currentScn, Integer lastCapturedDmlCount, Long lastBatchProcessingDuration, Long commitedTransactions, BigInteger commitedScn, Long commitedDmlCount, Long skippedDmlCount, BigInteger startSCN, BigInteger batchEndScn) {
this.currentScn = currentScn;
this.lastCapturedDmlCount = lastCapturedDmlCount;
this.lastBatchProcessingDuration = lastBatchProcessingDuration;
this.commitedTransactions = commitedTransactions;
this.commitedScn = commitedScn;
this.commitedDmlCount = commitedDmlCount;
this.skippedDmlCount = skippedDmlCount;
this.startSCN = startSCN;
this.batchEndScn = batchEndScn;
}

public String getSourceCurrentPosition(){
return currentScn.toString();
}

public Integer getDmlsCapturedCountInDBZBatch(){
return lastCapturedDmlCount;
}

public Long getDBZBatchProcessingDuration(){
return lastBatchProcessingDuration;
}


public Long getProcessedDmlRecordsCount(){
return commitedDmlCount;
}

public Long getSkippedDmlRecordsCount(){
return skippedDmlCount;
}

public String getDBZBatchStartPosition(){
return startSCN.toString();
}

public String getDBZBatchEndPosition(){
return batchEndScn.toString();
}

public Long getDBZBatchCommitedTransactionsCount(){
return commitedTransactions;
}

public String getLastCommitPosition(){
return commitedScn.toString();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class OracleStreamingChangeEventSourceMetrics extends DefaultStreamingCha
private static final int TRANSACTION_ID_SET_SIZE = 10;

private final AtomicReference<Scn> currentScn = new AtomicReference<>();

private final AtomicReference<Scn> startScn = new AtomicReference<>();

private final AtomicReference<Scn> endScn = new AtomicReference<>();
private final AtomicInteger logMinerQueryCount = new AtomicInteger();
private final AtomicInteger totalCapturedDmlCount = new AtomicInteger();
private final AtomicReference<Duration> totalDurationOfFetchingQuery = new AtomicReference<>();
Expand Down Expand Up @@ -85,6 +89,9 @@ public class OracleStreamingChangeEventSourceMetrics extends DefaultStreamingCha
private final AtomicReference<LRUCacheMap<String, String>> abandonedTransactionIds = new AtomicReference<>();
private final AtomicReference<LRUCacheMap<String, String>> rolledBackTransactionIds = new AtomicReference<>();
private final AtomicLong registeredDmlCount = new AtomicLong();

private final AtomicLong skippedDmlCount = new AtomicLong();

private final AtomicLong committedDmlCount = new AtomicLong();
private final AtomicInteger errorCount = new AtomicInteger();
private final AtomicInteger warningCount = new AtomicInteger();
Expand Down Expand Up @@ -137,6 +144,8 @@ public OracleStreamingChangeEventSourceMetrics(CdcSourceTaskContext taskContext,
zoneOffset.set(ZoneOffset.UTC);

currentScn.set(Scn.NULL);
startScn.set(Scn.NULL);
endScn.set(Scn.NULL);
oldestScn.set(Scn.NULL);
offsetScn.set(Scn.NULL);
committedScn.set(Scn.NULL);
Expand Down Expand Up @@ -173,6 +182,7 @@ public void reset() {
totalDurationOfFetchingQuery.set(Duration.ZERO);
lastCapturedDmlCount.set(0);
maxCapturedDmlCount.set(0);
skippedDmlCount.set(0);
totalBatchProcessingDuration.set(Duration.ZERO);
maxBatchProcessingThroughput.set(0);
lastBatchProcessingDuration.set(Duration.ZERO);
Expand Down Expand Up @@ -213,6 +223,10 @@ public void setCurrentScn(Scn scn) {
currentScn.set(scn);
}

public void setStartScn(Scn scn){startScn.set(scn);}

public void setEndScn(Scn scn){endScn.set(scn);}

public void setCurrentLogFileName(Set<String> names) {
currentLogFileName.set(names.stream().toArray(String[]::new));
if (names.size() < minimumLogsMined.get()) {
Expand Down Expand Up @@ -626,6 +640,14 @@ public long getMiningSessionProcessGlobalAreaMemoryInBytes() {
return miningSessionProcessGlobalAreaMemory.get();
}

public long getCommitedDmlCount(){return committedDmlCount.get();}

public long getSkippedDmlCount(){return skippedDmlCount.get();}

public BigInteger getBatchStartScn(){return startScn.get().asBigInteger();}

public BigInteger getBatchEndScn(){return endScn.get().asBigInteger();}

@Override
public long getMiningSessionProcessGlobalAreaMaxMemoryInBytes() {
return miningSessionProcessGlobalAreaMaxMemory.get();
Expand Down Expand Up @@ -667,6 +689,10 @@ public void incrementCommittedDmlCount(long counter) {
committedDmlCount.getAndAdd(counter);
}

public void incrementSkippedDmlCount(){
skippedDmlCount.incrementAndGet();
}

public void incrementErrorCount() {
errorCount.incrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import static io.debezium.connector.oracle.logminer.LogMinerHelper.setLogFilesForMining;

import java.lang.reflect.InvocationTargetException;
import java.math.BigInteger;
import java.sql.SQLException;
import java.text.DecimalFormat;
Expand All @@ -20,10 +21,14 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import io.debezium.config.Instantiator;
import io.debezium.connector.oracle.HevoOracleBatchStats;
import io.debezium.pipeline.source.spi.HevoStatsConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -91,6 +96,8 @@ public class LogMinerStreamingChangeEventSource implements StreamingChangeEventS
private List<BigInteger> currentRedoLogSequences;
private OracleOffsetContext effectiveOffset;

HevoStatsConsumer statsConsumer;

public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
OracleConnection jdbcConnection, EventDispatcher<OraclePartition, TableId> dispatcher,
ErrorHandler errorHandler, Clock clock, OracleDatabaseSchema schema,
Expand All @@ -111,13 +118,26 @@ public LogMinerStreamingChangeEventSource(OracleConnectorConfig connectorConfig,
this.logFileQueryMaxRetries = connectorConfig.getMaximumNumberOfLogQueryRetries();
this.initialDelay = connectorConfig.getLogMiningInitialDelay();
this.maxDelay = connectorConfig.getLogMiningMaxDelay();
this.statsConsumer = initialiseStatsConsumer();
}

@Override
public void init(OracleOffsetContext offsetContext) throws InterruptedException {
this.effectiveOffset = offsetContext == null ? emptyContext() : offsetContext;
}

private HevoStatsConsumer initialiseStatsConsumer(){
try {
String statsConsumerClassName = connectorConfig.getStatsConsumer();
Class<? extends HevoStatsConsumer> statsConsumerClass = (Class<HevoStatsConsumer>) Instantiator.getClassLoader().loadClass(statsConsumerClassName);
return statsConsumerClass.getDeclaredConstructor().newInstance();
}catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException |
InvocationTargetException e){
LOGGER.warn("Observability stats consumer could not be initialised for hevo batch id: {}", connectorConfig.getHevoBatchId(), e);
}
return null;
}

private OracleOffsetContext emptyContext() {
return OracleOffsetContext.create().logicalName(connectorConfig)
.snapshotPendingTransactions(Collections.emptyMap())
Expand Down Expand Up @@ -174,6 +194,7 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,

int retryAttempts = 1;
Stopwatch sw = Stopwatch.accumulating().start();
int iterations = 1;
while (context.isRunning()) {
// Calculate time difference before each mining session to detect time zone offset changes (e.g. DST) on database server
streamingMetrics.calculateTimeDifference(getDatabaseSystemTime(jdbcConnection));
Expand Down Expand Up @@ -233,14 +254,20 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
}

if (context.isRunning()) {
LOGGER.info("Observability metrics tracking: iteration: {}, starting logminer for SCN range - startScn: {}, endScn: {}", iterations++, startScn, endScn);
if (!startMiningSession(jdbcConnection, startScn, endScn, retryAttempts)) {
retryAttempts++;
}
else {
retryAttempts = 1;
LOGGER.info("Observability metrics tracking: iteration: {}, starting mining in SCN range - startScn: {}, endScn: {}", iterations, startScn, endScn);
streamingMetrics.setStartScn(startScn);
streamingMetrics.setEndScn(endScn);
startScn = processor.process(startScn, endScn);
streamingMetrics.setCurrentBatchProcessingTime(Duration.between(start, Instant.now()));
captureSessionMemoryStatistics(jdbcConnection);
publishBatchStats(iterations);
LOGGER.info("Observability metrics tracking: iteration: {}, mining complete for batch in SCN range - startScn: {}, endScn: {}, total records processed: {}, last commit SCN: {}", iterations++, startScn, endScn, streamingMetrics.getTotalProcessedRows(), streamingMetrics.getCommittedScn());
}
pauseBetweenMiningSessions();
}
Expand All @@ -260,6 +287,14 @@ public void execute(ChangeEventSourceContext context, OraclePartition partition,
}
}

private void publishBatchStats(long runNumber){
if(Objects.nonNull(statsConsumer)) {
statsConsumer.publishStats(new HevoOracleBatchStats(streamingMetrics.getCurrentScn(), streamingMetrics.getLastCapturedDmlCount(), streamingMetrics.getLastBatchProcessingTimeInMilliseconds(), streamingMetrics.getNumberOfCommittedTransactions(), streamingMetrics.getCommittedScn(), streamingMetrics.getCommitedDmlCount(), streamingMetrics.getSkippedDmlCount(), streamingMetrics.getBatchStartScn(), streamingMetrics.getBatchEndScn()));
}else {
LOGGER.warn("Observability stats could not be published for batch id: {}, on run number: {}", connectorConfig.getHevoBatchId(), runNumber);
}
}

private void prepareConnection(boolean closeAndReconnect) throws SQLException {
if (closeAndReconnect) {
// Close and reconnect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,7 @@ protected void processRow(OraclePartition partition, LogMinerEventRow row) throw
// We do the non-DDL ones here to cover multiple switch handlers in one place.
if (!EventType.DDL.equals(row.getEventType()) && !tableFilter.isIncluded(row.getTableId())) {
LOGGER.trace("Skipping change associated with table '{}' which does not match filters.", row.getTableId());
metrics.incrementSkippedDmlCount();
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package io.debezium.pipeline.source.spi;

import org.apache.kafka.common.protocol.types.Field;

/**
* The interface Hevo batch stats.
*/
public interface HevoBatchStats {

/**
* Gets source current position.
*
* @return the source current position
*/
public abstract String getSourceCurrentPosition();

/**
* Gets dmls captured count in dbz batch.
*
* @return the dmls captured count in dbz batch
*/
public abstract Integer getDmlsCapturedCountInDBZBatch();

/**
* Gets dbz batch processing duration.
*
* @return the dbz batch processing duration
*/
public abstract Long getDBZBatchProcessingDuration();

/**
* Gets processed dml records count.
*
* @return the processed dml records count
*/
public abstract Long getProcessedDmlRecordsCount();

/**
* Gets skipped dml records count.
*
* @return the skipped dml records count
*/
public abstract Long getSkippedDmlRecordsCount();

/**
* Gets dbz batch start position.
*
* @return the dbz batch start position
*/
public abstract String getDBZBatchStartPosition();

/**
* Gets dbz batch end position.
*
* @return the dbz batch end position
*/
public abstract String getDBZBatchEndPosition();

/**
* Gets dbz batch commited transactions count.
*
* @return the dbz batch commited transactions count
*/
public abstract Long getDBZBatchCommitedTransactionsCount();

/**
* Gets last commit position.
*
* @return the last commit position
*/
public abstract String getLastCommitPosition();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package io.debezium.pipeline.source.spi;

public interface HevoStatsConsumer {
public abstract void publishStats(HevoBatchStats hevoBatchStats);
}
Original file line number Diff line number Diff line change
Expand Up @@ -528,13 +528,29 @@ public static SnapshotTablesRowCountOrder parse(String value, String defaultValu
+ "A `ascending` value will order the tables by row count ascending. "
+ "A value of `disabled` (the default) will disable ordering by row count.");

public static final Field HEVO_BATCH_ID = Field.create("batch.id")
.withDisplayName("Hevo Batch Id")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("Batch id to recognize the batch for which the connector is running currently");

public static final Field HEVO_STATS_CONSUMER = Field.create("stats.consumer")
.withDisplayName("Stats Consumer class")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(Importance.MEDIUM)
.withDescription("Stats consumer");

protected static final ConfigDefinition CONFIG_DEFINITION = CommonConnectorConfig.CONFIG_DEFINITION.edit()
.type(
CommonConnectorConfig.TOPIC_PREFIX)
.connector(
DECIMAL_HANDLING_MODE,
TIME_PRECISION_MODE,
SNAPSHOT_LOCK_TIMEOUT_MS)
SNAPSHOT_LOCK_TIMEOUT_MS,
HEVO_BATCH_ID,
HEVO_STATS_CONSUMER)
.events(
COLUMN_INCLUDE_LIST,
COLUMN_EXCLUDE_LIST,
Expand Down Expand Up @@ -652,6 +668,20 @@ public Duration snapshotLockTimeout() {
return Duration.ofMillis(getConfig().getLong(SNAPSHOT_LOCK_TIMEOUT_MS));
}

/**
* Returns corresponding Hevo batch id associated with the connector run.
*
* @return the hevo batch id .
*/
public String getHevoBatchId() { return getConfig().getString(HEVO_BATCH_ID);}

/**
* Returns hevo's batch stats consumer class.
*
* @return class name.
*/
public String getStatsConsumer() {return getConfig().getString(HEVO_STATS_CONSUMER);}

public String schemaExcludeList() {
return getConfig().getString(SCHEMA_EXCLUDE_LIST);
}
Expand Down
Loading