Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Transform][Rollup] add processing stats to record the time spent for processing results #53770

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ public abstract class IndexerJobStats {
public static ParseField NUM_INVOCATIONS = new ParseField("trigger_count");
public static ParseField INDEX_TIME_IN_MS = new ParseField("index_time_in_ms");
public static ParseField SEARCH_TIME_IN_MS = new ParseField("search_time_in_ms");
public static ParseField PROCESSING_TIME_IN_MS = new ParseField("processing_time_in_ms");
public static ParseField INDEX_TOTAL = new ParseField("index_total");
public static ParseField SEARCH_TOTAL = new ParseField("search_total");
public static ParseField PROCESSING_TOTAL = new ParseField("processing_total");
public static ParseField SEARCH_FAILURES = new ParseField("search_failures");
public static ParseField INDEX_FAILURES = new ParseField("index_failures");

Expand All @@ -44,11 +46,14 @@ public abstract class IndexerJobStats {
protected final long indexTotal;
protected final long searchTime;
protected final long searchTotal;
protected final long processingTime;
protected final long processingTotal;
protected final long indexFailures;
protected final long searchFailures;

public IndexerJobStats(long numPages, long numInputDocuments, long numOutputDocuments, long numInvocations,
long indexTime, long searchTime, long indexTotal, long searchTotal, long indexFailures, long searchFailures) {
long indexTime, long searchTime, long processingTime, long indexTotal, long searchTotal, long processingTotal,
long indexFailures, long searchFailures) {
this.numPages = numPages;
this.numInputDocuments = numInputDocuments;
this.numOuputDocuments = numOutputDocuments;
Expand All @@ -57,6 +62,8 @@ public IndexerJobStats(long numPages, long numInputDocuments, long numOutputDocu
this.indexTotal = indexTotal;
this.searchTime = searchTime;
this.searchTotal = searchTotal;
this.processingTime = processingTime;
this.processingTotal = processingTotal;
this.indexFailures = indexFailures;
this.searchFailures = searchFailures;
}
Expand Down Expand Up @@ -117,6 +124,13 @@ public long getSearchTime() {
return searchTime;
}

/**
* Returns the time spent processing (cumulative) in milliseconds
*/
public long getProcessingTime() {
return processingTime;
}

/**
* Returns the total number of indexing requests that have been processed
* (Note: this is not the number of _documents_ that have been indexed)
Expand All @@ -132,6 +146,14 @@ public long getSearchTotal() {
return searchTotal;
}

/**
* Returns the total number of processing runs that have been made
*/
public long getProcessingTotal() {
return processingTotal;
}


@Override
public boolean equals(Object other) {
if (this == other) {
Expand All @@ -149,16 +171,19 @@ public boolean equals(Object other) {
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.processingTime, that.processingTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.processingTotal, that.processingTotal)
&& Objects.equals(this.indexTotal, that.indexTotal);
}

@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, searchTotal, indexTotal);
indexTime, searchTime, processingTime, indexFailures, searchFailures, searchTotal,
indexTotal, processingTotal);
}

@Override
Expand All @@ -172,6 +197,8 @@ public final String toString() {
+ ", index_time_in_ms=" + indexTime
+ ", index_total=" + indexTotal
+ ", search_time_in_ms=" + searchTime
+ ", search_total=" + searchTotal+ "}";
+ ", search_total=" + searchTotal
+ ", processing_time_in_ms=" + processingTime
+ ", processing_total=" + processingTotal + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -177,16 +177,18 @@ public final String toString() {
public static class RollupIndexerJobStats extends IndexerJobStats {

RollupIndexerJobStats(long numPages, long numInputDocuments, long numOuputDocuments, long numInvocations,
long indexTime, long indexTotal, long searchTime, long searchTotal, long indexFailures, long searchFailures) {
long indexTime, long indexTotal, long searchTime, long searchTotal, long processingTime,
long processingTotal, long indexFailures, long searchFailures) {
super(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexTotal, searchTotal, indexFailures, searchFailures);
indexTime, searchTime, processingTime, indexTotal, searchTotal, processingTotal, indexFailures, searchFailures);
}

private static final ConstructingObjectParser<RollupIndexerJobStats, Void> PARSER = new ConstructingObjectParser<>(
STATS.getPreferredName(),
true,
args -> new RollupIndexerJobStats((long) args[0], (long) args[1], (long) args[2], (long) args[3],
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9]));
(long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9],
(long) args[10], (long) args[11]));
static {
PARSER.declareLong(constructorArg(), NUM_PAGES);
PARSER.declareLong(constructorArg(), NUM_INPUT_DOCUMENTS);
Expand All @@ -196,6 +198,8 @@ public static class RollupIndexerJobStats extends IndexerJobStats {
PARSER.declareLong(constructorArg(), INDEX_TOTAL);
PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
PARSER.declareLong(constructorArg(), PROCESSING_TIME_IN_MS);
PARSER.declareLong(constructorArg(), PROCESSING_TOTAL);
PARSER.declareLong(constructorArg(), INDEX_FAILURES);
PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class TransformIndexerStats extends IndexerJobStats {
true,
args -> new TransformIndexerStats((long) args[0], (long) args[1], (long) args[2],
(long) args[3], (long) args[4], (long) args[5], (long) args[6], (long) args[7], (long) args[8], (long) args[9],
(Double) args[10], (Double) args[11], (Double) args[12]));
(long) args[10], (long) args[11], (Double) args[12], (Double) args[13], (Double) args[14]));

static {
LENIENT_PARSER.declareLong(constructorArg(), NUM_PAGES);
Expand All @@ -50,8 +50,10 @@ public class TransformIndexerStats extends IndexerJobStats {
LENIENT_PARSER.declareLong(constructorArg(), NUM_INVOCATIONS);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_TIME_IN_MS);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TIME_IN_MS);
LENIENT_PARSER.declareLong(constructorArg(), PROCESSING_TIME_IN_MS);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), PROCESSING_TOTAL);
LENIENT_PARSER.declareLong(constructorArg(), INDEX_FAILURES);
LENIENT_PARSER.declareLong(constructorArg(), SEARCH_FAILURES);
LENIENT_PARSER.declareDouble(optionalConstructorArg(), EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS);
Expand All @@ -68,12 +70,12 @@ public static TransformIndexerStats fromXContent(XContentParser parser) throws I
private final double expAvgDocumentsProcessed;

public TransformIndexerStats(long numPages, long numInputDocuments, long numOuputDocuments,
long numInvocations, long indexTime, long searchTime,
long indexTotal, long searchTotal, long indexFailures, long searchFailures,
long numInvocations, long indexTime, long searchTime, long processingTime,
long indexTotal, long searchTotal, long processingTotal, long indexFailures, long searchFailures,
Double expAvgCheckpointDurationMs, Double expAvgDocumentsIndexed,
Double expAvgDocumentsProcessed) {
super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime,
indexTotal, searchTotal, indexFailures, searchFailures);
super(numPages, numInputDocuments, numOuputDocuments, numInvocations, indexTime, searchTime, processingTime,
indexTotal, searchTotal, processingTotal, indexFailures, searchFailures);
this.expAvgCheckpointDurationMs = expAvgCheckpointDurationMs == null ? 0.0 : expAvgCheckpointDurationMs;
this.expAvgDocumentsIndexed = expAvgDocumentsIndexed == null ? 0.0 : expAvgDocumentsIndexed;
this.expAvgDocumentsProcessed = expAvgDocumentsProcessed == null ? 0.0 : expAvgDocumentsProcessed;
Expand Down Expand Up @@ -109,10 +111,12 @@ public boolean equals(Object other) {
&& Objects.equals(this.numInvocations, that.numInvocations)
&& Objects.equals(this.indexTime, that.indexTime)
&& Objects.equals(this.searchTime, that.searchTime)
&& Objects.equals(this.processingTime, that.processingTime)
&& Objects.equals(this.indexFailures, that.indexFailures)
&& Objects.equals(this.searchFailures, that.searchFailures)
&& Objects.equals(this.indexTotal, that.indexTotal)
&& Objects.equals(this.searchTotal, that.searchTotal)
&& Objects.equals(this.processingTotal, that.processingTotal)
&& Objects.equals(this.expAvgCheckpointDurationMs, that.expAvgCheckpointDurationMs)
&& Objects.equals(this.expAvgDocumentsIndexed, that.expAvgDocumentsIndexed)
&& Objects.equals(this.expAvgDocumentsProcessed, that.expAvgDocumentsProcessed);
Expand All @@ -121,7 +125,7 @@ public boolean equals(Object other) {
@Override
public int hashCode() {
return Objects.hash(numPages, numInputDocuments, numOuputDocuments, numInvocations,
indexTime, searchTime, indexFailures, searchFailures, indexTotal, searchTotal,
expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed);
indexTime, searchTime, processingTime, indexFailures, searchFailures, indexTotal, searchTotal,
processingTotal, expAvgCheckpointDurationMs, expAvgDocumentsIndexed, expAvgDocumentsProcessed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,8 @@ public void testGetStats() throws Exception {
0L,
0L,
0L,
0L,
0L,
0.0,
0.0,
0.0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ private GetRollupJobResponse createTestInstance() {

private RollupIndexerJobStats randomStats() {
return new RollupIndexerJobStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong());
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong());
}

private RollupJobStatus randomStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ public void testFromXContent() throws IOException {

public static TransformIndexerStats randomStats() {
return new TransformIndexerStats(randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),
randomNonNegativeLong(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
Expand All @@ -59,6 +60,8 @@ public static void toXContent(TransformIndexerStats stats, XContentBuilder build
builder.field(IndexerJobStats.INDEX_FAILURES.getPreferredName(), stats.getIndexFailures());
builder.field(IndexerJobStats.SEARCH_TIME_IN_MS.getPreferredName(), stats.getSearchTime());
builder.field(IndexerJobStats.SEARCH_TOTAL.getPreferredName(), stats.getSearchTotal());
builder.field(IndexerJobStats.PROCESSING_TIME_IN_MS.getPreferredName(), stats.getProcessingTime());
builder.field(IndexerJobStats.PROCESSING_TOTAL.getPreferredName(), stats.getProcessingTotal());
builder.field(IndexerJobStats.SEARCH_FAILURES.getPreferredName(), stats.getSearchFailures());
builder.field(TransformIndexerStats.EXPONENTIAL_AVG_CHECKPOINT_DURATION_MS.getPreferredName(),
stats.getExpAvgCheckpointDurationMs());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStat
return new org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static org.elasticsearch.xpack.core.transform.transforms.TransformIndexer
return new org.elasticsearch.xpack.core.transform.transforms.TransformIndexerStats(randomLongBetween(10L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L),
randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L), randomLongBetween(0L, 10000L),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble(),
randomBoolean() ? null : randomDouble());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ private void onSearchResponse(SearchResponse searchResponse) {

// allowPartialSearchResults is set to false, so we should never see shard failures here
assert (searchResponse.getShardFailures().length == 0);

stats.markStartProcessing();
stats.incrementNumPages(1);
IterationResult<JobPosition> iterationResult = doProcess(searchResponse);

Expand All @@ -359,7 +359,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
onFinish(ActionListener.wrap(
r -> doSaveState(finishAndSetState(), position.get(), () -> {}),
e -> doSaveState(finishAndSetState(), position.get(), () -> {})));

stats.markEndProcessing();
return;
}

Expand All @@ -369,7 +369,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
if (docs.isEmpty() == false) {
final BulkRequest bulkRequest = new BulkRequest();
docs.forEach(bulkRequest::add);

stats.markEndProcessing();
stats.markStartIndexing();
doNextBulk(bulkRequest, ActionListener.wrap(bulkResponse -> {
// TODO we should check items in the response and move after accordingly to
Expand All @@ -390,6 +390,7 @@ private void onSearchResponse(SearchResponse searchResponse) {
onBulkResponse(bulkResponse, newPosition);
}, this::finishWithIndexingFailure));
} else {
stats.markEndProcessing();
// no documents need to be indexed, continue with search
try {
JobPosition newPosition = iterationResult.getPosition();
Expand Down
Loading