Skip to content

Commit

Permalink
HBASE-26581 Add metrics for failed replication edits - sink.failedBat…
Browse files Browse the repository at this point in the history
…ches and source.failedBatches
  • Loading branch information
Briana Augenreich committed Apr 25, 2022
1 parent 242a194 commit 49f8f80
Show file tree
Hide file tree
Showing 10 changed files with 127 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedBytesCounter;
private final MutableFastCounter logReadInBytesCounter;
Expand All @@ -62,6 +63,8 @@ public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms

shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);

failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L);

shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);

shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
Expand Down Expand Up @@ -119,6 +122,10 @@ public MetricsReplicationGlobalSourceSourceImpl(MetricsReplicationSourceImpl rms
shippedBatchesCounter.incr(batches);
}

@Override public void incrFailedBatches() {
failedBatchesCounter.incr();
}

@Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@
public interface MetricsReplicationSinkSource {
public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
public static final String SINK_FAILED_BATCHES = "sink.failedBatches";
public static final String SINK_APPLIED_OPS = "sink.appliedOps";
public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles";

void setLastAppliedOpAge(long age);
void incrAppliedBatches(long batches);
void incrAppliedOps(long batchsize);
void incrFailedBatches();
long getLastAppliedOpAge();
void incrAppliedHFiles(long hfileSize);
long getSinkAppliedOps();
long getFailedBatches();
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS

private final MutableHistogram ageHist;
private final MutableFastCounter batchesCounter;
private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter opsCounter;
private final MutableFastCounter hfilesCounter;

public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
ageHist = rms.getMetricsRegistry().newTimeHistogram(SINK_AGE_OF_LAST_APPLIED_OP);
batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L);
failedBatchesCounter = rms.getMetricsRegistry().getCounter(SINK_FAILED_BATCHES, 0L);
opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L);
hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L);
}
Expand All @@ -49,6 +51,16 @@ public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
opsCounter.incr(batchsize);
}

@Override
public void incrFailedBatches(){
failedBatchesCounter.incr();
}

@Override
public long getFailedBatches() {
return failedBatchesCounter.value();
}

@Override
public long getLastAppliedOpAge() {
return ageHist.getMax();
Expand All @@ -62,4 +74,5 @@ public void incrAppliedHFiles(long hfiles) {
@Override public long getSinkAppliedOps() {
return opsCounter.value();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
public static final String SOURCE_FAILED_BATCHES = "source.failedBatches";

public static final String SOURCE_SHIPPED_BYTES = "source.shippedBytes";
public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
Expand Down Expand Up @@ -57,6 +58,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void decrSizeOfLogQueue(int size);
void incrLogEditsFiltered(long size);
void incrBatchesShipped(int batches);
void incrFailedBatches();
void incrOpsShipped(long ops);
void incrShippedBytes(long size);
void incrLogReadInBytes(long size);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logEditsFilteredKey;
private final String shippedBatchesKey;
private final String shippedOpsKey;
private final String failedBatchesKey;
private String keyPrefix;

private final String shippedBytesKey;
Expand All @@ -48,6 +49,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter;
private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedBytesCounter;
private final MutableFastCounter logReadInBytesCounter;
Expand Down Expand Up @@ -85,6 +87,9 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
shippedBatchesKey = this.keyPrefix + "shippedBatches";
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);

failedBatchesKey = this.keyPrefix + "failedBatches";
failedBatchesCounter = rms.getMetricsRegistry().getCounter(failedBatchesKey, 0L);

shippedOpsKey = this.keyPrefix + "shippedOps";
shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);

Expand Down Expand Up @@ -158,6 +163,10 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
shippedBatchesCounter.incr(batches);
}

@Override public void incrFailedBatches() {
failedBatchesCounter.incr();
}

@Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops);
}
Expand All @@ -176,6 +185,7 @@ public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, Stri
rms.removeMetric(sizeOfLogQueueKey);

rms.removeMetric(shippedBatchesKey);
rms.removeMetric(failedBatchesKey);
rms.removeMetric(shippedOpsKey);
rms.removeMetric(shippedBytesKey);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,20 @@ public void applyBatch(long batchSize, long hfileSize) {
mss.incrAppliedHFiles(hfileSize);
}

/**
* Convenience method to update metrics when batch of operations has failed.
*/
public void incrementFailedBatches(){
mss.incrFailedBatches();
}

/** Get the count of the failed bathes
* @return failedBatches
*/
protected long getFailedBatches() {
return mss.getFailedBatches();
}

/**
* Get the Age of Last Applied Op
* @return ageOfLastAppliedOp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,15 @@ public void shipBatch(long batchSize, int sizeInBytes) {
globalSourceSource.incrShippedBytes(sizeInBytes);
}

/**
* Convenience method to update metrics when batch of operations has failed.
*/
public void incrementFailedBatches(){
singleSourceSource.incrFailedBatches();
globalSourceSource.incrFailedBatches();
}


/**
* Gets the number of edits not eligible for replication this source queue logs so far.
* @return logEditsFiltered non-replicable edits filtered from this queue logs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
}
Expand All @@ -205,6 +206,7 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
Expand Down Expand Up @@ -279,8 +281,9 @@ public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
this.totalReplicatedEdits.addAndGet(totalReplicated);
} catch (IOException ex) {
} catch (Exception ex) {
LOG.error("Unable to accept edit because:", ex);
this.metrics.incrementFailedBatches();
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ private void shipEdits(WALEntryBatch entryBatch) {
}
break;
} catch (Exception ex) {
source.getSourceMetrics().incrementFailedBatches();
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import static org.junit.Assert.assertEquals;


import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -69,15 +71,14 @@
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;


@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSink {

Expand Down Expand Up @@ -427,6 +428,67 @@ public void testReplicateEntriesForHFiles() throws Exception {
// Clean up the created hfiles or it will mess up subsequent tests
}

/**
* Test failure metrics produced for failed replication edits
*/
@Test
public void testFailedReplicationSinkMetrics() throws IOException {
long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches();
long errorCount = 0L;
List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
List<Cell> cells = new ArrayList<>();
for(int i = 0; i < BATCH_SIZE; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
cells.clear(); // cause IndexOutOfBoundsException
try {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw ArrayIndexOutOfBoundsException.");
} catch (ArrayIndexOutOfBoundsException e) {
errorCount++;
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
}

entries.clear();
cells.clear();
TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException
for (int i = 0; i < BATCH_SIZE; i++) {
entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
}
try {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw TableNotFoundException.");
} catch (TableNotFoundException e) {
errorCount++;
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
}

entries.clear();
cells.clear();
for(int i = 0; i < BATCH_SIZE; i++) {
entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
}
// cause IOException in batch()
try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
try (Admin admin = conn.getAdmin()) {
admin.disableTable(TABLE_NAME1);
try {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw IOException.");
} catch (IOException e) {
errorCount++;
assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
} finally {
admin.enableTable(TABLE_NAME1);
}
}
}
}


private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) {
byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
byte[] rowBytes = Bytes.toBytes(row);
Expand Down

0 comments on commit 49f8f80

Please sign in to comment.