Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize storage of incremental segments #13982

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ The `tuningConfig` object is optional. If you don't specify the `tuningConfig` o
|`intermediateHandoffPeriod`|ISO 8601 period|The period that determines how often tasks hand off segments. Handoff occurs if `maxRowsPerSegment` or `maxTotalRows` is reached or every `intermediateHandoffPeriod`, whichever happens first.|No|P2147483647D|
|`intermediatePersistPeriod`|ISO 8601 period|The period that determines the rate at which intermediate persists occur.|No|PT10M|
|`maxPendingPersists`|Integer|Maximum number of persists that can be pending but not started. If a new intermediate persist exceeds this limit, Druid blocks ingestion until the currently running persist finishes. One persist can be running concurrently with ingestion, and none can be queued up. The maximum heap memory usage for indexing scales is `maxRowsInMemory * (2 + maxPendingPersists)`.|No|0|
|`numPersistThreads`|Integer|The number of threads to use to create and persist incremental segments on the disk. Higher ingestion data throughput results in a larger number of incremental segments, causing significant CPU time to be spent on the creation of the incremental segments on the disk. For datasources with number of columns running into hundreds or thousands, creation of the incremental segments may take up significant time, in the order of multiple seconds. In both of these scenarios, ingestion can stall or pause frequently, causing it to fall behind. You can use additional threads to parallelize the segment creation without blocking ingestion as long as there are sufficient CPU resources available.|No|1|
|`indexSpec`|Object|Defines how Druid indexes the data. See [IndexSpec](#indexspec) for more information.|No||
|`indexSpecForIntermediatePersists`|Object|Defines segment storage format options to use at indexing time for intermediate persisted temporary segments. You can use `indexSpecForIntermediatePersists` to disable dimension/metric compression on intermediate segments to reduce memory required for final merging. However, disabling compression on intermediate segments might increase page cache use while they are used before getting merged into final segment published. See [IndexSpec](#indexspec) for possible values.|No|Same as `indexSpec`|
|`reportParseExceptions`|Boolean|DEPRECATED. If `true`, Druid throws exceptions encountered during parsing causing ingestion to halt. If `false`, Druid skips unparseable rows and fields. Setting `reportParseExceptions` to `true` overrides existing configurations for `maxParseExceptions` and `maxSavedParseExceptions`, setting `maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to not more than 1.|No|`false`|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public static Task getTask()
null,
null,
null,
1L
1L,
null
)
),
null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ public KafkaIndexTaskTuningConfig(
@Nullable Period intermediateHandoffPeriod,
@Nullable Boolean logParseExceptions,
@Nullable Integer maxParseExceptions,
@Nullable Integer maxSavedParseExceptions
@Nullable Integer maxSavedParseExceptions,
@Nullable Integer numPersistThreads
)
{
super(
Expand All @@ -74,7 +75,8 @@ public KafkaIndexTaskTuningConfig(
intermediateHandoffPeriod,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
numPersistThreads
);
}

Expand All @@ -97,7 +99,8 @@ private KafkaIndexTaskTuningConfig(
@JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads
)
{
this(
Expand All @@ -119,7 +122,8 @@ private KafkaIndexTaskTuningConfig(
intermediateHandoffPeriod,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
numPersistThreads
);
}

Expand All @@ -145,7 +149,8 @@ public KafkaIndexTaskTuningConfig withBasePersistDirectory(File dir)
getIntermediateHandoffPeriod(),
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions()
getMaxSavedParseExceptions(),
getNumPersistThreads()
);
}

Expand All @@ -171,7 +176,8 @@ public String toString()
", logParseExceptions=" + isLogParseExceptions() +
", maxParseExceptions=" + getMaxParseExceptions() +
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
'}';
", numPersistThreads=" + getNumPersistThreads() +
'}';
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public static KafkaSupervisorTuningConfig defaultConfig()
null,
null,
null,
null,
null
);
}
Expand Down Expand Up @@ -91,7 +92,8 @@ public KafkaSupervisorTuningConfig(
@JsonProperty("intermediateHandoffPeriod") Period intermediateHandoffPeriod,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads
)
{
super(
Expand All @@ -113,7 +115,8 @@ public KafkaSupervisorTuningConfig(
intermediateHandoffPeriod,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
numPersistThreads
);
this.workerThreads = workerThreads;
this.chatRetries = (chatRetries != null ? chatRetries : DEFAULT_CHAT_RETRIES);
Expand Down Expand Up @@ -199,6 +202,7 @@ public String toString()
", logParseExceptions=" + isLogParseExceptions() +
", maxParseExceptions=" + getMaxParseExceptions() +
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", numPersistThreads=" + getNumPersistThreads() +
'}';
}

Expand All @@ -224,7 +228,8 @@ public KafkaIndexTaskTuningConfig convertToTaskTuningConfig()
getIntermediateHandoffPeriod(),
isLogParseExceptions(),
getMaxParseExceptions(),
getMaxSavedParseExceptions()
getMaxSavedParseExceptions(),
getNumPersistThreads()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2851,7 +2851,8 @@ private KafkaIndexTask createTask(
intermediateHandoffPeriod,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
null
);
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
final TreeMap<Integer, Map<KafkaTopicPartition, Long>> checkpoints = new TreeMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public void testSerdeWithDefaults() throws Exception
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpecForIntermediatePersists());
Assert.assertEquals(false, config.isReportParseExceptions());
Assert.assertEquals(Duration.ofMinutes(15).toMillis(), config.getHandoffConditionTimeout());
Assert.assertEquals(1, config.getNumPersistThreads());
}

@Test
Expand All @@ -89,7 +90,8 @@ public void testSerdeWithNonDefaults() throws Exception
+ " \"handoffConditionTimeout\": 100,\n"
+ " \"indexSpec\": { \"metricCompression\" : \"NONE\" },\n"
+ " \"indexSpecForIntermediatePersists\": { \"dimensionCompression\" : \"uncompressed\" },\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" }\n"
+ " \"appendableIndexSpec\": { \"type\" : \"onheap\" },\n"
+ " \"numPersistThreads\": 2\n"
+ "}";

KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig) mapper.readValue(
Expand Down Expand Up @@ -120,6 +122,7 @@ public void testSerdeWithNonDefaults() throws Exception
IndexSpec.builder().withDimensionCompression(CompressionStrategy.UNCOMPRESSED).build(),
config.getIndexSpecForIntermediatePersists()
);
Assert.assertEquals(2, config.getNumPersistThreads());
}

@Test
Expand Down Expand Up @@ -148,7 +151,8 @@ public void testConvert()
null,
null,
null,
null
null,
2
);
KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();

Expand All @@ -163,6 +167,7 @@ public void testConvert()
Assert.assertEquals(IndexSpec.DEFAULT, copy.getIndexSpec());
Assert.assertEquals(true, copy.isReportParseExceptions());
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
Assert.assertEquals(2, copy.getNumPersistThreads());
}

@Test
Expand All @@ -187,7 +192,8 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
null,
true,
42,
42
42,
2
);

String serialized = mapper.writeValueAsString(base);
Expand All @@ -212,6 +218,7 @@ public void testSerdeWithModifiedTuningConfigAddedField() throws IOException
Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads());
}

@Test
Expand All @@ -236,6 +243,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
true,
42,
42,
2,
"extra string"
);

Expand All @@ -260,6 +268,7 @@ public void testSerdeWithModifiedTuningConfigRemovedField() throws IOException
Assert.assertEquals(base.isLogParseExceptions(), deserialized.isLogParseExceptions());
Assert.assertEquals(base.getMaxParseExceptions(), deserialized.getMaxParseExceptions());
Assert.assertEquals(base.getMaxSavedParseExceptions(), deserialized.getMaxSavedParseExceptions());
Assert.assertEquals(base.getNumPersistThreads(), deserialized.getNumPersistThreads());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,7 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
null,
null,
null,
null,
null
);

Expand Down Expand Up @@ -489,6 +490,7 @@ public void testCreateBaseTaskContexts() throws JsonProcessingException
null,
null,
null,
null,
null
),
null
Expand Down Expand Up @@ -3997,6 +3999,7 @@ public void testIsTaskCurrent()
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -4035,6 +4038,7 @@ public void testIsTaskCurrent()
null,
null,
null,
null,
null
);

Expand Down Expand Up @@ -4187,6 +4191,7 @@ public void testSequenceNameDoesNotChangeWithTaskId()
null,
null,
null,
null,
null
)
);
Expand Down Expand Up @@ -4661,7 +4666,8 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
null,
null,
null,
10
10,
null
);

return new TestableKafkaSupervisor(
Expand Down Expand Up @@ -4774,6 +4780,7 @@ public SeekableStreamIndexTaskClient<KafkaTopicPartition, Long> build(
null,
null,
null,
null,
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public TestModifiedKafkaIndexTaskTuningConfig(
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("extra") String extra
)
{
Expand All @@ -77,7 +78,8 @@ public TestModifiedKafkaIndexTaskTuningConfig(
intermediateHandoffPeriod,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
numPersistThreads
);
this.extra = extra;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ public KinesisIndexTaskTuningConfig(
intermediateHandoffPeriod,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
null
);
this.recordBufferSize = recordBufferSize;
this.recordBufferSizeBytes = recordBufferSizeBytes;
Expand Down Expand Up @@ -362,6 +363,6 @@ public String toString()
", maxRecordsPerPoll=" + maxRecordsPerPoll +
", maxBytesPerPoll=" + maxBytesPerPoll +
", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() +
'}';
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ public class RealtimeAppenderatorTuningConfig implements AppenderatorConfig
private final int maxParseExceptions;
private final int maxSavedParseExceptions;

private final int numPersistThreads;

public RealtimeAppenderatorTuningConfig(
@Nullable AppendableIndexSpec appendableIndexSpec,
Integer maxRowsInMemory,
Expand All @@ -87,7 +89,8 @@ public RealtimeAppenderatorTuningConfig(
@Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@Nullable Boolean logParseExceptions,
@Nullable Integer maxParseExceptions,
@Nullable Integer maxSavedParseExceptions
@Nullable Integer maxSavedParseExceptions,
@Nullable Integer numPersistThreads
)
{
this.appendableIndexSpec = appendableIndexSpec == null ? DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
Expand Down Expand Up @@ -133,6 +136,8 @@ public RealtimeAppenderatorTuningConfig(
this.logParseExceptions = logParseExceptions == null
? TuningConfig.DEFAULT_LOG_PARSE_EXCEPTIONS
: logParseExceptions;
this.numPersistThreads = numPersistThreads == null ?
DEFAULT_NUM_PERSIST_THREADS : Math.max(numPersistThreads, DEFAULT_NUM_PERSIST_THREADS);
}

@JsonCreator
Expand All @@ -154,7 +159,8 @@ private RealtimeAppenderatorTuningConfig(
@JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
@JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
@JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions
@JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads
)
{
this(
Expand All @@ -176,7 +182,8 @@ private RealtimeAppenderatorTuningConfig(
segmentWriteOutMediumFactory,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
numPersistThreads
);
}

Expand Down Expand Up @@ -314,6 +321,13 @@ public int getMaxSavedParseExceptions()
return maxSavedParseExceptions;
}

@Override
@JsonProperty
public int getNumPersistThreads()
{
return numPersistThreads;
}

@Override
public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir)
{
Expand All @@ -336,7 +350,8 @@ public RealtimeAppenderatorTuningConfig withBasePersistDirectory(File dir)
segmentWriteOutMediumFactory,
logParseExceptions,
maxParseExceptions,
maxSavedParseExceptions
maxSavedParseExceptions,
numPersistThreads
);
}
}
Loading
Loading