Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increment records/failed counter for failed writes #445

Merged
merged 6 commits into from
Aug 12, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
- [improvement] [#435](https://github.com/datastax/dsbulk/issues/435): Auto-detect cloud server-side rate-limit.
- [improvement] [#438](https://github.com/datastax/dsbulk/issues/438): Ability to limit throughput in bytes per second.
- [bug] [#440](https://github.com/datastax/dsbulk/issues/440): Properly close files when unload is interrupted.
- [bug] [#444](https://github.com/datastax/dsbulk/issues/444): Increment records/failed counter for failed writes.

## 1.9.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,11 @@ void load_errors() throws Exception {
ExitStatus status = new DataStaxBulkLoader(addCommonSettings(args)).run();
assertStatus(status, STATUS_COMPLETED_WITH_ERRORS);

// Verify that the console reporter has the correct number of errors
List<String> streamLines = stdErr.getStreamLinesPlain();
assertThat(streamLines).anyMatch(line -> line.startsWith("total | failed"));
assertThat(streamLines).anyMatch(line -> line.startsWith(" 24 | 4"));

// There are 24 rows of data, but two extra queries due to the retry for the write timeout and
// the unavailable.
validateQueryCount(simulacron, 26, "INSERT INTO ip_by_country", LOCAL_ONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@
import com.codahale.metrics.jmx.JmxReporter;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.BatchStatement;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.datastax.oss.driver.shaded.guava.common.util.concurrent.MoreExecutors;
import com.datastax.oss.dsbulk.connectors.api.ErrorRecord;
import com.datastax.oss.dsbulk.connectors.api.Record;
import com.datastax.oss.dsbulk.executor.api.listener.AbstractMetricsReportingExecutionListenerBuilder;
import com.datastax.oss.dsbulk.executor.api.listener.LogSink;
import com.datastax.oss.dsbulk.executor.api.listener.MetricsCollectingExecutionListener;
Expand Down Expand Up @@ -457,13 +459,31 @@ public <T> Function<Flux<T>, Flux<T>> newTotalItemsMonitor() {
return upstream -> upstream.doOnNext(item -> totalItems.inc());
}

public <T> Function<Flux<T>, Flux<T>> newFailedItemsMonitor() {
public Function<Flux<Record>, Flux<Record>> newFailedRecordsMonitor() {
adutra marked this conversation as resolved.
Show resolved Hide resolved
return upstream ->
upstream.doOnNext(
item -> {
if (item instanceof ErrorRecord
|| item instanceof UnmappableStatement
|| (item instanceof Result && !((Result) item).isSuccess())) {
if (item instanceof ErrorRecord) {
failedItems.inc();
}
});
}

public Function<Flux<BatchableStatement<?>>, Flux<BatchableStatement<?>>> newUnmappableStatementsMonitor() {
return upstream ->
upstream.doOnNext(
item -> {
if (item instanceof UnmappableStatement) {
failedItems.inc();
}
});
}

public <T extends Result> Function<Flux<T>, Flux<T>> newFailedResultsMonitor() {
return upstream ->
upstream.doOnNext(
item -> {
if (!item.isSuccess()) {
failedItems.inc();
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.slf4j.event.Level.DEBUG;
import static org.slf4j.event.Level.INFO;
import static org.slf4j.event.Level.WARN;
Expand All @@ -38,6 +39,8 @@
import com.datastax.oss.dsbulk.connectors.api.DefaultRecord;
import com.datastax.oss.dsbulk.connectors.api.Record;
import com.datastax.oss.dsbulk.executor.api.listener.WritesReportingExecutionListener;
import com.datastax.oss.dsbulk.executor.api.result.Result;
import com.datastax.oss.dsbulk.executor.api.result.WriteResult;
import com.datastax.oss.dsbulk.tests.logging.LogCapture;
import com.datastax.oss.dsbulk.tests.logging.LogConfigurationResource;
import com.datastax.oss.dsbulk.tests.logging.LogInterceptingExtension;
Expand All @@ -59,8 +62,14 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.mockito.junit.jupiter.MockitoSettings;
import org.mockito.quality.Strictness;
import reactor.core.publisher.Flux;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
@ExtendWith(LogInterceptingExtension.class)
@ExtendWith(StreamInterceptingExtension.class)
@LogConfigurationResource("logback.xml")
Expand All @@ -70,7 +79,18 @@ class MetricsManagerTest {
private Record record2;
private Record record3;

private Statement<?> stmt3;
@Mock
private WriteResult result1;
@Mock
private WriteResult result2;
@Mock
private WriteResult result3;

@Mock
private BatchableStatement<?> stmt1;
@Mock
private BatchableStatement<?> stmt2;
private BatchableStatement<?> stmt3;

private BatchStatement batch;

Expand Down Expand Up @@ -127,7 +147,98 @@ void should_increment_records(
Flux<Record> records = Flux.just(record1, record2, record3);
records
.transform(manager.newTotalItemsMonitor())
.transform(manager.newFailedItemsMonitor())
.transform(manager.newFailedRecordsMonitor())
.blockLast();
manager.stop(Duration.ofSeconds(123), true);
MetricRegistry registry =
(MetricRegistry) ReflectionUtils.getInternalState(manager, "registry");
assertThat(registry.counter("records/total").getCount()).isEqualTo(3);
assertThat(registry.counter("records/failed").getCount()).isEqualTo(1);
assertThat(logs.getLoggedEvents()).isEmpty();
assertThat(stderr.getStreamLinesPlain())
.anySatisfy(line -> assertThat(line).startsWith(" 3 | 1 |"));
}
}

@Test
void should_increment_mapped_statements(
@LogCapture(value = MetricsManager.class, level = INFO) LogInterceptor logs,
@StreamCapture(STDERR) StreamInterceptor stderr) {
try (MetricsManager manager =
new MetricsManager(
new MetricRegistry(),
false,
"test",
Executors.newSingleThreadScheduledExecutor(),
SECONDS,
MILLISECONDS,
-1,
-1,
true,
false,
false,
true,
null,
null,
LogSettings.Verbosity.normal,
Duration.ofSeconds(5),
false,
protocolVersion,
codecRegistry,
RowType.REGULAR)) {
manager.init();
manager.start();
Flux<BatchableStatement<?>> records = Flux.just(stmt1, stmt2, stmt3);
records
.transform(manager.newTotalItemsMonitor())
.transform(manager.newUnmappableStatementsMonitor())
.blockLast();
manager.stop(Duration.ofSeconds(123), true);
MetricRegistry registry =
(MetricRegistry) ReflectionUtils.getInternalState(manager, "registry");
assertThat(registry.counter("records/total").getCount()).isEqualTo(3);
assertThat(registry.counter("records/failed").getCount()).isEqualTo(1);
assertThat(logs.getLoggedEvents()).isEmpty();
assertThat(stderr.getStreamLinesPlain())
.anySatisfy(line -> assertThat(line).startsWith(" 3 | 1 |"));
}
}

@Test
void should_increment_results(
@LogCapture(value = MetricsManager.class, level = INFO) LogInterceptor logs,
@StreamCapture(STDERR) StreamInterceptor stderr) {
when(result1.isSuccess()).thenReturn(true);
when(result2.isSuccess()).thenReturn(true);
when(result3.isSuccess()).thenReturn(false);
try (MetricsManager manager =
new MetricsManager(
new MetricRegistry(),
false,
"test",
Executors.newSingleThreadScheduledExecutor(),
SECONDS,
MILLISECONDS,
-1,
-1,
true,
false,
false,
true,
null,
null,
LogSettings.Verbosity.normal,
Duration.ofSeconds(5),
false,
protocolVersion,
codecRegistry,
RowType.REGULAR)) {
manager.init();
manager.start();
Flux<Result> records = Flux.just(result1, result2, result3);
records
.transform(manager.newTotalItemsMonitor())
.transform(manager.newFailedResultsMonitor())
.blockLast();
manager.stop(Duration.ofSeconds(123), true);
MetricRegistry registry =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void init() throws Exception {
closed.set(false);
success = false;
totalItemsMonitor = metricsManager.newTotalItemsMonitor();
failedItemsMonitor = metricsManager.newFailedItemsMonitor();
failedItemsMonitor = metricsManager.newFailedResultsMonitor();
totalItemsCounter = logManager.newTotalItemsCounter();
failedReadsHandler = logManager.newFailedReadsHandler();
queryWarningsHandler = logManager.newQueryWarningsHandler();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ public class LoadWorkflow implements Workflow {
private Function<Flux<Record>, Flux<Record>> totalItemsCounter;
private Function<Flux<Record>, Flux<Record>> failedRecordsMonitor;
private Function<Flux<BatchableStatement<?>>, Flux<BatchableStatement<?>>>
failedStatementsMonitor;
unmappableStatementsMonitor;
private Function<Flux<WriteResult>, Flux<WriteResult>>
failedWritesMonitor;
private Function<Flux<Record>, Flux<Record>> failedRecordsHandler;
private Function<Flux<BatchableStatement<?>>, Flux<BatchableStatement<?>>>
unmappableStatementsHandler;
Expand Down Expand Up @@ -193,8 +195,9 @@ public void init() throws Exception {
}
closed.set(false);
totalItemsMonitor = metricsManager.newTotalItemsMonitor();
failedRecordsMonitor = metricsManager.newFailedItemsMonitor();
failedStatementsMonitor = metricsManager.newFailedItemsMonitor();
failedRecordsMonitor = metricsManager.newFailedRecordsMonitor();
unmappableStatementsMonitor = metricsManager.newUnmappableStatementsMonitor();
failedWritesMonitor = metricsManager.newFailedResultsMonitor();
batcherMonitor = metricsManager.newBatcherMonitor();
totalItemsCounter = logManager.newTotalItemsCounter();
failedRecordsHandler = logManager.newFailedRecordsHandler();
Expand Down Expand Up @@ -234,6 +237,7 @@ public boolean execute() {
statements
.transform(this::executeStatements)
.transform(queryWarningsHandler)
.transform(failedWritesMonitor)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was the root cause of the bug: the monitor that increments the records/failed metric wasn't being invoked here, and so failed writes weren't being accounted for.

.transform(failedWritesHandler)
.transform(resultPositionsHandler)
.transform(terminationHandler)
Expand Down Expand Up @@ -270,7 +274,7 @@ private Flux<Statement<?>> manyReaders() {
.transform(failedRecordsMonitor)
.transform(failedRecordsHandler)
.flatMap(mapper)
.transform(failedStatementsMonitor)
.transform(unmappableStatementsMonitor)
.transform(unmappableStatementsHandler)
.transform(this::bufferAndBatch)
.subscribeOn(scheduler),
Expand Down Expand Up @@ -301,7 +305,7 @@ private Flux<Statement<?>> fewReaders() {
.transform(failedRecordsMonitor)
.transform(failedRecordsHandler)
.flatMap(mapper)
.transform(failedStatementsMonitor)
.transform(unmappableStatementsMonitor)
.transform(unmappableStatementsHandler)
.transform(this::batchBuffered)
.subscribeOn(scheduler),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ public void init() throws Exception {
closed.set(false);
writer = connector.write();
totalItemsMonitor = metricsManager.newTotalItemsMonitor();
failedRecordsMonitor = metricsManager.newFailedItemsMonitor();
failedReadResultsMonitor = metricsManager.newFailedItemsMonitor();
failedRecordsMonitor = metricsManager.newFailedRecordsMonitor();
failedReadResultsMonitor = metricsManager.newFailedResultsMonitor();
failedRecordsHandler = logManager.newFailedRecordsHandler();
totalItemsCounter = logManager.newTotalItemsCounter();
failedReadsHandler = logManager.newFailedReadsHandler();
Expand Down