You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
In Druid, segment size is important because it affects to query performance. If it is too small, druid spawns too many threads each of which read only a few bytes. If it's too large, query execution might not be fully parallelized.
Since the size of each segment can vary according to data distribution, it's difficult to optimize created segment size at ingestion time, especially for realtime indexing.
To handle this problem, operators usually have to setup compaction tasks manually which have still some problems like manual segment handling and unsupported concurrent execution of indexing tasks and compaction tasks.
Goals
This proposal is to introduce a new feature of automatic background segment compaction.
Coordinator runs compaction tasks periodically.
A compaction job reads a set of segments, and writes new optimized-in-size segments if necessary.
Compaction tasks must support atomic segment update, which means, for the same intervals, either new segments or old segments can be visible at any time.
Compaction tasks must not block indexing tasks from being started.
Challenge & solution
Druid is using a versioning system for atomic segment update. Whenever new segments are written for the same interval, they have higher versions than old segments.
This versioning system works well for normal indexing tasks, but it makes difficult to run compaction tasks and index tasks which appends data to existing data sources (hereafter appending tasks) at the same time.
For example, it would be very common to run compaction tasks while appending tasks are running for the same data source (ex, realtime tasks). In this case, the result of appending tasks must be visible which means they have the same version with the existing segments of the destination data source. Here, if the compaction task increases the version of segments, result segments of appending tasks will be overshadowed because they have a lower version.
A simple solution might be increasing the version of result segments of appending tasks accordingly whenever compaction tasks are finished. However, this will block appending tasks as well as query execution while updating segment versions.
As a result, we need a new mechanism for overshadowing only before-compaction segments. The proposed solution is lazy overshadowing which is marking overshadowed segments and let druid coordinators clean them periodically. The new segments created by compaction tasks are in the inactive (i.e., not used) state when they are created. The coordinator atomically activates them when it clean the segments marked as overshadowed. More details are presented in the below section.
Segment compaction
Segment compaction modes
Major compaction: this mode reads existing segments (ex, IngestSegmentFirehose) and writes optimized segments. In this mode, the compaction results in more than one output segments from a subset of input segments. For example, segments A,B,C can be merged into D,E. This mode can generate more optimized segments, but requires more complicated locks across the entire cluster.
Minor compaction: this mode simply merges existing segments. The compaction always merges a subset of input segments into a single segment. For example, segments A,B,C can be merged into D. This mode might result in sub-optimal segments, but can run with a small lock.
This proposal focuses on the minor compaction.
1. Compact segment generation
The coordinator runs a compaction task.
The compaction task gets a lock for all input segments. This lock is to prevent input segments from being removed or modified by other tasks during compaction. Read-only tasks can read segments without any segment locks.
Merge old segments into new ones. The new segments are in the inactive (not used) state. These segments include which old segments are overshadowed by themselves. This information is stored in the payload column of the druid_segments table of the metastore.
Repeat 3 until all input segments are processed.
Changes in Payload
DataSegments are serialized as bytes and stored in the payload column of the druid_segments table of the metastore. A new variable overshadowedSegments will be added to DataSegment which contains the information about the segments overshadowed by this segment. This is represented by a range of which the start and the end are the combination of <interval, version, shardSpec> of the start and end segments.
Segment locking
This proposal requires a new locking mechanism, segment locking.
The overlord maintains locks for segments in addition to the existing interval locks.
When a replacing task (like compaction tasks, rebuilding tasks) is submitted, it requests to get a lock to the overlord.
If any segment or interval locks exist for the requested segments, the task waits for getting a lock.
If there is no lock for the requested segments, the task gets a lock and starts its job.
When the lock is released can depend on applications. Compaction tasks release the segment lock when all old segments are replaced with new segments.
Note: Segment locks can be viewed as an extension of interval locks. While the current interval locks can be acquired for only an interval, segment locks can be acquired for a subset of the entire segments of an interval. Segment locks and interval locks block each other. If some tasks are using interval locks for some intervals, segment locks for the segments which fall into those intervals cannot be acquired and the tasks must wait for the interval locks to be released. Similarly, interval locks cannot be acquired if there already exist segment locks corresponding to the requested intervals.
2. Atomic segment update
This algorithm is to achieve atomic segment update with a small lock.
When a compaction task is finished, it pushes new segments to the deep storage and writes metadata in metastore.
The coordinator allocates new segments to historicals by running load rules for the new segments.
When a historical loads a new segments, it checks that it has old segments overshadowed by the newly loaded segment. If it has, it immediately removes those segments. Finally, it activates the new segment and announces the changes. The segment replacement should be atomic.
The coordinator/broker updates timeline according to historiocals' announcements. Once it detects a new segment is loaded by a historical, it immediately removes old segments overshadowed by the new one from its timeline. The segment replacement should be atomic.
The coordinator removes remaining overshadowed segments in background. When it's done, the segment lock for removed segments is released.
Note that locking is required only when historicals and coordinators replace old segments with new ones.
Implementation plan for compaction task
I'm going to add a compaction task running with a single process as the first step. And then, I'll parallelize it as the second step. For the task parallelization, I'll raise another issue soon.
The compaction task divides a set of input segments into several groups and runs sub tasks for each group which are similar to the existing merge task. Each sub task simply merges input segments into a single segment. Note that the merge can occur across the segments of the different intervals.
The text was updated successfully, but these errors were encountered:
Motivation
In Druid, segment size is important because it affects to query performance. If it is too small, druid spawns too many threads each of which read only a few bytes. If it's too large, query execution might not be fully parallelized.
Since the size of each segment can vary according to data distribution, it's difficult to optimize created segment size at ingestion time, especially for realtime indexing.
To handle this problem, operators usually have to setup compaction tasks manually which have still some problems like manual segment handling and unsupported concurrent execution of indexing tasks and compaction tasks.
Goals
This proposal is to introduce a new feature of automatic background segment compaction.
Challenge & solution
Druid is using a versioning system for atomic segment update. Whenever new segments are written for the same interval, they have higher versions than old segments.
This versioning system works well for normal indexing tasks, but it makes difficult to run compaction tasks and index tasks which appends data to existing data sources (hereafter appending tasks) at the same time.
For example, it would be very common to run compaction tasks while appending tasks are running for the same data source (ex, realtime tasks). In this case, the result of appending tasks must be visible which means they have the same version with the existing segments of the destination data source. Here, if the compaction task increases the version of segments, result segments of appending tasks will be overshadowed because they have a lower version.
A simple solution might be increasing the version of result segments of appending tasks accordingly whenever compaction tasks are finished. However, this will block appending tasks as well as query execution while updating segment versions.
As a result, we need a new mechanism for overshadowing only before-compaction segments. The proposed solution is lazy overshadowing which is marking overshadowed segments and let druid coordinators clean them periodically. The new segments created by compaction tasks are in the inactive (i.e., not used) state when they are created. The coordinator atomically activates them when it clean the segments marked as overshadowed. More details are presented in the below section.
Segment compaction
Segment compaction modes
IngestSegmentFirehose
) and writes optimized segments. In this mode, the compaction results in more than one output segments from a subset of input segments. For example, segments A,B,C can be merged into D,E. This mode can generate more optimized segments, but requires more complicated locks across the entire cluster.This proposal focuses on the minor compaction.
1. Compact segment generation
payload
column of thedruid_segments
table of the metastore.Changes in Payload
DataSegment
s are serialized as bytes and stored in thepayload
column of thedruid_segments
table of the metastore. A new variableovershadowedSegments
will be added toDataSegment
which contains the information about the segments overshadowed by this segment. This is represented by a range of which the start and the end are the combination of <interval, version, shardSpec> of the start and end segments.Segment locking
This proposal requires a new locking mechanism, segment locking.
Note: Segment locks can be viewed as an extension of interval locks. While the current interval locks can be acquired for only an interval, segment locks can be acquired for a subset of the entire segments of an interval. Segment locks and interval locks block each other. If some tasks are using interval locks for some intervals, segment locks for the segments which fall into those intervals cannot be acquired and the tasks must wait for the interval locks to be released. Similarly, interval locks cannot be acquired if there already exist segment locks corresponding to the requested intervals.
2. Atomic segment update
This algorithm is to achieve atomic segment update with a small lock.
Note that locking is required only when historicals and coordinators replace old segments with new ones.
Implementation plan for compaction task
I'm going to add a compaction task running with a single process as the first step. And then, I'll parallelize it as the second step. For the task parallelization, I'll raise another issue soon.
The compaction task divides a set of input segments into several groups and runs sub tasks for each group which are similar to the existing merge task. Each sub task simply merges input segments into a single segment. Note that the merge can occur across the segments of the different intervals.
The text was updated successfully, but these errors were encountered: