Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increment records/failed counter for failed writes #445

Merged
merged 6 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [improvement] [#435](https://github.com/datastax/dsbulk/issues/435): Auto-detect cloud server-side rate-limit.
- [improvement] [#438](https://github.com/datastax/dsbulk/issues/438): Ability to limit throughput in bytes per second.
- [bug] [#440](https://github.com/datastax/dsbulk/issues/440): Properly close files when unload is interrupted.
- [bug] [#444](https://github.com/datastax/dsbulk/issues/444): Increment records/failed counter for failed writes.

## 1.9.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ void load_errors() throws Exception {
ExitStatus status = new DataStaxBulkLoader(addCommonSettings(args)).run();
assertStatus(status, STATUS_COMPLETED_WITH_ERRORS);

// Verify that the console reporter has the correct number of errors
List<String> streamLines = stdErr.getStreamLinesPlain();
assertThat(streamLines).anyMatch(line -> line.startsWith("total | failed"));
assertThat(streamLines).anyMatch(line -> line.startsWith(" 24 | 4"));

// There are 24 rows of data, but two extra queries due to the retry for the write timeout and
// the unavailable.
validateQueryCount(simulacron, 26, "INSERT INTO ip_by_country", LOCAL_ONE);
Expand Down Expand Up @@ -733,7 +738,7 @@ public Publisher<Publisher<Record>> read(
assertStatus(status, STATUS_COMPLETED_WITH_ERRORS);
assertThat(logs.getAllMessagesAsString())
.contains("completed with 11000 errors")
.contains("Records: total: 100,000, successful: 90,000, failed: 10,000")
.contains("Records: total: 100,000, successful: 89,000, failed: 11,000")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This bug report unveiled a flaw in this existing assertion.

.contains("Writes: total: 90,000, successful: 89,000, failed: 1,000, in-flight: 0");
validateExceptionsLog(10_000, "Record could not be read:", "connector-errors.log");
validateExceptionsLog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import com.codahale.metrics.jmx.JmxReporter;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.MoreExecutors;
import com.datastax.oss.dsbulk.connectors.api.ErrorRecord;
import com.datastax.oss.dsbulk.connectors.api.Record;
import com.datastax.oss.dsbulk.executor.api.listener.AbstractMetricsReportingExecutionListenerBuilder;
import com.datastax.oss.dsbulk.executor.api.listener.LogSink;
import com.datastax.oss.dsbulk.executor.api.listener.MetricsCollectingExecutionListener;
Expand Down Expand Up @@ -457,18 +459,85 @@ public <T> Function<Flux<T>, Flux<T>> newTotalItemsMonitor() {
return upstream -> upstream.doOnNext(item -> totalItems.inc());
}

public <T> Function<Flux<T>, Flux<T>> newFailedItemsMonitor() {
/**
* Returns a new monitor that will increment the records/failed metric when the record is a
* rejected record (that is, an instance of {@link ErrorRecord}).
*
* <p>This monitor is suitable for use in the following cases:
*
* <ul>
* <li>when loading, to count records that the connector could not read properly;
* <li>when unloading, to count records could not be mapped from database rows;
* <li>when unloading, to count records that the connector could not write properly.
* </ul>
*/
public Function<Flux<Record>, Flux<Record>> newFailedRecordsMonitor() {
adutra marked this conversation as resolved.
Show resolved Hide resolved
return upstream ->
upstream.doOnNext(
item -> {
if (item instanceof ErrorRecord
|| item instanceof UnmappableStatement
|| (item instanceof Result && !((Result) item).isSuccess())) {
if (item instanceof ErrorRecord) {
failedItems.inc();
}
});
}

/**
* Returns a new monitor that will increment the records/failed metric when a record cannot be
* mapped to a bound statement (that is, the resulting statement is an instance of {@link
* UnmappableStatement}).
*
* <p>This monitor is suitable for use in the following cases:
*
* <ul>
* <li>when loading, to count records that the mapper could not map.
* </ul>
*/
public Function<Flux<BatchableStatement<?>>, Flux<BatchableStatement<?>>>
newUnmappableStatementsMonitor() {
return upstream ->
upstream.doOnNext(
item -> {
if (item instanceof UnmappableStatement) {
failedItems.inc();
}
});
}

/**
* Returns a new monitor that will increment the records/failed metric when a record cannot be
* written to or read from the database (that is, when {@link Result#isSuccess()} returns false).
*
* <p>This monitor is suitable for use in the following cases:
*
* <ul>
* <li>when loading, to count records that could not be written to the database.
* <li>when unloading, to count token range reads that could not be executed.
* </ul>
*
* TODO: we currently increment the records/failed counter by 1 for a token range read failure. It
* would be better to not increment the counter at all, since this is a global failure; this
* counter should be specialized in counting problems affecting individual records/rows only.
*/
public <T extends Result> Function<Flux<T>, Flux<T>> newFailedResultsMonitor() {
return upstream ->
upstream.doOnNext(
item -> {
if (!item.isSuccess()) {
failedItems.inc();
}
});
}

/**
* Returns a new monitor that will increment the "batches" histogram.
*
* <p>This monitor is suitable for use in the following cases:
*
* <ul>
* <li>when loading, to track batching efficiency of statements grouped together by the batcher
* component.
* </ul>
*/
public Function<Flux<Statement<?>>, Flux<Statement<?>>> newBatcherMonitor() {
return upstream ->
upstream.doOnNext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.slf4j.event.Level.DEBUG;
import static org.slf4j.event.Level.INFO;
import static org.slf4j.event.Level.WARN;
Expand All @@ -38,6 +39,8 @@
import com.datastax.oss.dsbulk.connectors.api.DefaultRecord;
import com.datastax.oss.dsbulk.connectors.api.Record;
import com.datastax.oss.dsbulk.executor.api.listener.WritesReportingExecutionListener;
import com.datastax.oss.dsbulk.executor.api.result.Result;
import com.datastax.oss.dsbulk.executor.api.result.WriteResult;
import com.datastax.oss.dsbulk.tests.logging.LogCapture;
import com.datastax.oss.dsbulk.tests.logging.LogConfigurationResource;
import com.datastax.oss.dsbulk.tests.logging.LogInterceptingExtension;
Expand All @@ -59,8 +62,14 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import reactor.core.publisher.Flux;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@ExtendWith(LogInterceptingExtension.class)
@ExtendWith(StreamInterceptingExtension.class)
@LogConfigurationResource("logback.xml")
Expand All @@ -70,7 +79,13 @@ class MetricsManagerTest {
private Record record2;
private Record record3;

private Statement<?> stmt3;
@Mock private WriteResult result1;
@Mock private WriteResult result2;
@Mock private WriteResult result3;

@Mock private BatchableStatement<?> stmt1;
@Mock private BatchableStatement<?> stmt2;
private BatchableStatement<?> stmt3;

private BatchStatement batch;

Expand Down Expand Up @@ -127,7 +142,98 @@ void should_increment_records(
Flux<Record> records = Flux.just(record1, record2, record3);
records
.transform(manager.newTotalItemsMonitor())
.transform(manager.newFailedItemsMonitor())
.transform(manager.newFailedRecordsMonitor())
.blockLast();
manager.stop(Duration.ofSeconds(123), true);
MetricRegistry registry =
(MetricRegistry) ReflectionUtils.getInternalState(manager, "registry");
assertThat(registry.counter("records/total").getCount()).isEqualTo(3);
assertThat(registry.counter("records/failed").getCount()).isEqualTo(1);
assertThat(logs.getLoggedEvents()).isEmpty();
assertThat(stderr.getStreamLinesPlain())
.anySatisfy(line -> assertThat(line).startsWith(" 3 | 1 |"));
}
}

@Test
void should_increment_mapped_statements(
@LogCapture(value = MetricsManager.class, level = INFO) LogInterceptor logs,
@StreamCapture(STDERR) StreamInterceptor stderr) {
try (MetricsManager manager =
new MetricsManager(
new MetricRegistry(),
false,
"test",
Executors.newSingleThreadScheduledExecutor(),
SECONDS,
MILLISECONDS,
-1,
-1,
true,
false,
false,
true,
null,
null,
LogSettings.Verbosity.normal,
Duration.ofSeconds(5),
false,
protocolVersion,
codecRegistry,
RowType.REGULAR)) {
manager.init();
manager.start();
Flux<BatchableStatement<?>> records = Flux.just(stmt1, stmt2, stmt3);
records
.transform(manager.newTotalItemsMonitor())
.transform(manager.newUnmappableStatementsMonitor())
.blockLast();
manager.stop(Duration.ofSeconds(123), true);
MetricRegistry registry =
(MetricRegistry) ReflectionUtils.getInternalState(manager, "registry");
assertThat(registry.counter("records/total").getCount()).isEqualTo(3);
assertThat(registry.counter("records/failed").getCount()).isEqualTo(1);
assertThat(logs.getLoggedEvents()).isEmpty();
assertThat(stderr.getStreamLinesPlain())
.anySatisfy(line -> assertThat(line).startsWith(" 3 | 1 |"));
}
}

@Test
void should_increment_results(
@LogCapture(value = MetricsManager.class, level = INFO) LogInterceptor logs,
@StreamCapture(STDERR) StreamInterceptor stderr) {
when(result1.isSuccess()).thenReturn(true);
when(result2.isSuccess()).thenReturn(true);
when(result3.isSuccess()).thenReturn(false);
try (MetricsManager manager =
new MetricsManager(
new MetricRegistry(),
false,
"test",
Executors.newSingleThreadScheduledExecutor(),
SECONDS,
MILLISECONDS,
-1,
-1,
true,
false,
false,
true,
null,
null,
LogSettings.Verbosity.normal,
Duration.ofSeconds(5),
false,
protocolVersion,
codecRegistry,
RowType.REGULAR)) {
manager.init();
manager.start();
Flux<Result> records = Flux.just(result1, result2, result3);
records
.transform(manager.newTotalItemsMonitor())
.transform(manager.newFailedResultsMonitor())
.blockLast();
manager.stop(Duration.ofSeconds(123), true);
MetricRegistry registry =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void init() throws Exception {
closed.set(false);
success = false;
totalItemsMonitor = metricsManager.newTotalItemsMonitor();
failedItemsMonitor = metricsManager.newFailedItemsMonitor();
failedItemsMonitor = metricsManager.newFailedResultsMonitor();
totalItemsCounter = logManager.newTotalItemsCounter();
failedReadsHandler = logManager.newFailedReadsHandler();
queryWarningsHandler = logManager.newQueryWarningsHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public class LoadWorkflow implements Workflow {
private Function<Flux<Record>, Flux<Record>> totalItemsCounter;
private Function<Flux<Record>, Flux<Record>> failedRecordsMonitor;
private Function<Flux<BatchableStatement<?>>, Flux<BatchableStatement<?>>>
failedStatementsMonitor;
unmappableStatementsMonitor;
private Function<Flux<WriteResult>, Flux<WriteResult>> failedWritesMonitor;
private Function<Flux<Record>, Flux<Record>> failedRecordsHandler;
private Function<Flux<BatchableStatement<?>>, Flux<BatchableStatement<?>>>
unmappableStatementsHandler;
Expand Down Expand Up @@ -193,8 +194,9 @@ public void init() throws Exception {
}
closed.set(false);
totalItemsMonitor = metricsManager.newTotalItemsMonitor();
failedRecordsMonitor = metricsManager.newFailedItemsMonitor();
failedStatementsMonitor = metricsManager.newFailedItemsMonitor();
failedRecordsMonitor = metricsManager.newFailedRecordsMonitor();
unmappableStatementsMonitor = metricsManager.newUnmappableStatementsMonitor();
failedWritesMonitor = metricsManager.newFailedResultsMonitor();
batcherMonitor = metricsManager.newBatcherMonitor();
totalItemsCounter = logManager.newTotalItemsCounter();
failedRecordsHandler = logManager.newFailedRecordsHandler();
Expand Down Expand Up @@ -234,6 +236,7 @@ public boolean execute() {
statements
.transform(this::executeStatements)
.transform(queryWarningsHandler)
.transform(failedWritesMonitor)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the root cause of the bug: the monitor that increments the records/failed metric wasn't being invoked here, and so failed writes weren't being accounted for.

.transform(failedWritesHandler)
.transform(resultPositionsHandler)
.transform(terminationHandler)
Expand Down Expand Up @@ -270,7 +273,7 @@ private Flux<Statement<?>> manyReaders() {
.transform(failedRecordsMonitor)
.transform(failedRecordsHandler)
.flatMap(mapper)
.transform(failedStatementsMonitor)
.transform(unmappableStatementsMonitor)
.transform(unmappableStatementsHandler)
.transform(this::bufferAndBatch)
.subscribeOn(scheduler),
Expand Down Expand Up @@ -301,7 +304,7 @@ private Flux<Statement<?>> fewReaders() {
.transform(failedRecordsMonitor)
.transform(failedRecordsHandler)
.flatMap(mapper)
.transform(failedStatementsMonitor)
.transform(unmappableStatementsMonitor)
.transform(unmappableStatementsHandler)
.transform(this::batchBuffered)
.subscribeOn(scheduler),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ public void init() throws Exception {
closed.set(false);
writer = connector.write();
totalItemsMonitor = metricsManager.newTotalItemsMonitor();
failedRecordsMonitor = metricsManager.newFailedItemsMonitor();
failedReadResultsMonitor = metricsManager.newFailedItemsMonitor();
failedRecordsMonitor = metricsManager.newFailedRecordsMonitor();
failedReadResultsMonitor = metricsManager.newFailedResultsMonitor();
failedRecordsHandler = logManager.newFailedRecordsHandler();
totalItemsCounter = logManager.newTotalItemsCounter();
failedReadsHandler = logManager.newFailedReadsHandler();
Expand Down