Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a configurable bufferPeriod between when a segment is marked unused and deleted by KillUnusedSegments duty #12599

Merged
merged 50 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
c836aa8
Add new configurable buffer period to create gap between mark unused …
capistrant May 26, 2022
e18c6f1
Changes after testing
capistrant May 27, 2022
f974016
fixes and improvements
capistrant Jun 2, 2022
7b69b68
changes after initial self review
capistrant Jun 2, 2022
c78b282
Merge branch 'master' into implements-12526
capistrant Jun 7, 2022
d8b4cc7
self review changes
capistrant Jun 8, 2022
c571162
update sql statement that was lacking last_used
capistrant Jun 9, 2022
95ccd4e
shore up some code in SqlMetadataConnector after self review
capistrant Jun 9, 2022
5d38b4c
Merge branch 'master' into implements-12526
capistrant Jun 10, 2022
8a8e518
fix derby compatibility and improve testing/docs
capistrant Jun 13, 2022
7b52874
fix checkstyle violations
capistrant Jun 13, 2022
2fb019e
Merge branch 'master' into implements-12526
capistrant Jun 22, 2022
4e0efb0
Fixes post merge with master
capistrant Jun 22, 2022
0621295
add some unit tests to improve coverage
capistrant Jun 22, 2022
a1e9735
ignore test coverage on new UpdateTools cli tool
capistrant Jun 23, 2022
d8cb285
another attempt to ignore UpdateTables in coverage check
capistrant Jun 24, 2022
2fc7f19
Merge branch 'master' into implements-12526
capistrant Jun 28, 2022
bab850e
change column name to used_flag_last_updated
capistrant Jun 28, 2022
91828c3
fix a method signature after column name switch
capistrant Jun 28, 2022
f5b934f
update docs spelling
capistrant Jul 5, 2022
539cc35
Merge branch 'master' into implements-12526
capistrant Jul 7, 2022
acfbe06
Update spelling dictionary
capistrant Jul 8, 2022
51f4632
Merge branch 'master' into implements-12526
capistrant Jul 21, 2022
df53363
Merge branch 'master' into implements-12526
capistrant Aug 5, 2022
788dafc
Merge branch 'master' into implements-12526
capistrant Aug 19, 2022
d2f16a2
Fixing up docs/spelling and integrating altering tasks table with my …
capistrant Aug 22, 2022
de2922e
Update NULL values for used_flag_last_updated in the background
capistrant Aug 23, 2022
537c598
Remove logic to allow segs with null used_flag_last_updated to be kil…
capistrant Aug 23, 2022
3d642f0
remove unneeded things now that the new column is automatically updated
capistrant Aug 23, 2022
cb9ef38
Test new background row updater method
capistrant Aug 23, 2022
60fcee5
fix broken tests
capistrant Aug 23, 2022
6ae30bd
Merge branch 'master' into implements-12526
capistrant Aug 24, 2022
82454d8
fix create table statement
capistrant Aug 24, 2022
720fa0d
cleanup DDL formatting
capistrant Aug 25, 2022
8d36eb3
Merge branch 'master' into implements-12526
capistrant Aug 26, 2022
ec59ed6
Merge branch 'master' into implements-12526
capistrant Sep 1, 2022
edd9db8
Merge branch 'master' into implements-12526
capistrant Sep 7, 2022
ddc5e8f
Merge branch 'master' into implements-12526
capistrant Oct 6, 2022
9fa11d3
Revert adding columns to entry table by default
capistrant Oct 6, 2022
58e0942
fix compilation issues after merge with master
capistrant Oct 11, 2022
666837e
Merge branch 'master' into implements-12526
capistrant Oct 20, 2022
4647bef
Merge branch 'master' into implements-12526
capistrant Nov 10, 2022
2e4c6ec
discovered and fixed metastore inserts that were breaking integration…
capistrant Nov 10, 2022
225b8be
fixup forgotten insert by using pattern of sharing now timestamp acro…
capistrant Nov 14, 2022
2899c11
Merge branch 'master' into implements-12526
capistrant Dec 15, 2022
cd43ba5
Merge branch 'master' into implements-12526
capistrant Dec 16, 2022
862f6e9
fix issue introduced by merge
capistrant Dec 16, 2022
3e0f21f
Merge branch 'master' into implements-12526
capistrant Aug 11, 2023
5940760
fixup after merge with master
capistrant Aug 11, 2023
c30817c
add some directions to docs in the case of segment table validation i…
capistrant Aug 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,9 @@ default void exportTable(
void createSupervisorsTable();

void deleteAllRecords(String tableName);

void alterSegmentTableAddLastUsed();

void updateSegmentTablePopulateLastUsed();

}
1 change: 1 addition & 0 deletions docs/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -814,6 +814,7 @@ These Coordinator static configurations can be defined in the `coordinator/runti
|`druid.coordinator.kill.period`|How often to send kill tasks to the indexing service. Value must be greater than `druid.coordinator.period.indexingPeriod`. Only applies if kill is turned on.|P1D (1 Day)|
|`druid.coordinator.kill.durationToRetain`|Only applies if you set `druid.coordinator.kill.on` to `true`. This value is ignored if `druid.coordinator.kill.ignoreDurationToRetain` is `true`. Valid configurations must be a ISO8601 period. Druid will not kill unused segments whose interval end date is beyond `now - durationToRetain`. `durationToRetain` can be a negative ISO8601 period, which would result in `now - durationToRetain` to be in the future.|`P90D`|
|`druid.coordinator.kill.ignoreDurationToRetain`|A way to override `druid.coordinator.kill.durationToRetain` and tell the coordinator that you do not care about the end date of unused segment intervals when it comes to killing them. If true, the coordinator considers all unused segments as eligible to be killed.|false|
|`druid.coordinator.kill.bufferPeriod`|The amount of time that a segment must be unused before it is able to be permanently removed from metadata and deep storage. This can serve as a buffer period to prevent data loss if data ends up being needed after being marked unused.|`PT24H`|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

24h seems aggressive as a default. Personally I'd be more comfortable with 30 days, if it's a cluster I'm operating.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should describe that if you updated from an earlier version, this won't apply to all segments. We can also link from this to the doc about how to do the manual update. Maybe like:

The amount of time that a segment must be unused before it is able to be permanently removed from metadata and deep storage. This can serve as a buffer period to prevent data loss if data ends up being needed after being marked unused.<br /><br />Segments created with Druid 0.23.x and earlier do not have the necessary metadata required to respect this configuration, and may be permanently removed even if marked used or unused within the buffer period. To address this, you can [populate this field in existing segment records](../operations/upgrade-prep.md#populate-used-flag-last-updated) using a command-line tool or SQL command.

|`druid.coordinator.kill.maxSegments`|Kill at most n unused segments per kill task submission, must be greater than 0. Only applies and MUST be specified if kill is turned on.|100|
|`druid.coordinator.balancer.strategy`|Specify the type of balancing strategy for the coordinator to use to distribute segments among the historicals. `cachingCost` is logically equivalent to `cost` but is more CPU-efficient on large clusters. `diskNormalized` weights the costs according to the servers' disk usage ratios - there are known issues with this strategy distributing segments unevenly across the cluster. `random` distributes segments among services randomly.|`cost`|
|`druid.coordinator.balancer.cachingCost.awaitInitialization`|Whether to wait for segment view initialization before creating the `cachingCost` balancing strategy. This property is enabled only when `druid.coordinator.balancer.strategy` is `cachingCost`. If set to 'true', the Coordinator will not start to assign segments, until the segment view is initialized. If set to 'false', the Coordinator will fallback to use the `cost` balancing strategy only if the segment view is not initialized yet. Notes, it may take much time to wait for the initialization since the `cachingCost` balancing strategy involves much computing to build itself.|false|
Expand Down
4 changes: 3 additions & 1 deletion docs/dependencies/metadata-storage.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ system. The table has two main functional columns, the other columns are for ind
Value 1 in the `used` column means that the segment should be "used" by the cluster (i.e., it should be loaded and
available for requests). Value 0 means that the segment should not be loaded into the cluster. We do this as a means of
unloading segments from the cluster without actually removing their metadata (which allows for simpler rolling back if
that is ever an issue).
that is ever an issue). The `used` column has a corresponding `last_used` column that indicates the date at the instant
that the `used` status of the segment was last updated. This information can be used by the coordinator to determine if
a segment is a candidate for deletion (if automated segment killing is enabled).

The `payload` column stores a JSON blob that has all of the metadata for the segment.
Some of the data in the `payload` column intentionally duplicates data from other columns in the segments table.
Expand Down
114 changes: 114 additions & 0 deletions docs/operations/upgrade-prep.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
---
id: upgrade-prep
title: "Upgrade Prep"
---

<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing,
~ software distributed under the License is distributed on an
~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
~ KIND, either express or implied. See the License for the
~ specific language governing permissions and limitations
~ under the License.
-->

## Upgrade to `0.24+` from `0.23` and earlier

### Altering segments table

**If you have set `druid.metadata.storage.connector.createTables` to `true` (which is the default), and your metadata connect user has DDL priviliges, you can disregard this section. You are urged to still evaluate the optional section below**

**The coordinator and overlord services will fail if you do not execute this change prior to the upgrade**

A new column, `last_used`, is needed in the segments table to support new
segment killing functionality. You can manually alter the table, or you can use
a pre-written tool to perform the update.

#### Pre-written tool

Druid provides a `metadata-update` tool for updating Druid's metadata tables.

In the example commands below:

- `lib` is the Druid lib directory
- `extensions` is the Druid extensions directory
- `base` corresponds to the value of `druid.metadata.storage.tables.base` in the configuration, `druid` by default.
- The `--connectURI` parameter corresponds to the value of `druid.metadata.storage.connector.connectURI`.
- The `--user` parameter corresponds to the value of `druid.metadata.storage.connector.user`.
- The `--password` parameter corresponds to the value of `druid.metadata.storage.connector.password`.
- The `--action` parameter corresponds to the update action you are executing. In this case it is: `add-last-used-to-segments`

##### MySQL

```bash
cd ${DRUID_ROOT}
java -classpath "lib/*" -Dlog4j.configurationFile=conf/druid/cluster/_common/log4j2.xml -Ddruid.extensions.directory="extensions" -Ddruid.extensions.loadList=[\"mysql-metadata-storage\"] -Ddruid.metadata.storage.type=mysql org.apache.druid.cli.Main tools metadata-update --connectURI="<mysql-uri>" --user <user> --password <pass> --base druid --action add-last-used-to-segments
```

##### PostgreSQL

```bash
cd ${DRUID_ROOT}
java -classpath "lib/*" -Dlog4j.configurationFile=conf/druid/cluster/_common/log4j2.xml -Ddruid.extensions.directory="extensions" -Ddruid.extensions.loadList=[\"postgresql-metadata-storage\"] -Ddruid.metadata.storage.type=postgresql org.apache.druid.cli.Main tools metadata-update --connectURI="<postgresql-uri>" --user <user> --password <pass> --base druid --action add-last-used-to-segments
```


#### Manual ALTER TABLE

```SQL
ALTER TABLE druid_segments
ADD last_used varchar(255);
```

### Populating `last_used` column of the segments table after upgrade (Optional)

This is an optional step to take **after** you upgrade the Overlord and Coordinator to `0.24+` (from `0.23` and earlier). If you do not take this action and are also using `druid.coordinator.kill.on=true`, the logic to identify segments that can be killed will not honor `druid.coordinator.kill.bufferPeriod` for the rows in the segments table where `last_used == null`.

#### Pre-written tool

Druid provides a `metadata-update` tool for updating Druid's metadata tables. Note that this tool will update `last_used` for all rows that match `used = false` in one transaction.

In the example commands below:

- `lib` is the Druid lib directory
- `extensions` is the Druid extensions directory
- `base` corresponds to the value of `druid.metadata.storage.tables.base` in the configuration, `druid` by default.
- The `--connectURI` parameter corresponds to the value of `druid.metadata.storage.connector.connectURI`.
- The `--user` parameter corresponds to the value of `druid.metadata.storage.connector.user`.
- The `--password` parameter corresponds to the value of `druid.metadata.storage.connector.password`.
- The `--action` parameter corresponds to the update action you are executing. In this case it is: `add-last-used-to-segments`

##### MySQL

```bash
cd ${DRUID_ROOT}
java -classpath "lib/*" -Dlog4j.configurationFile=conf/druid/cluster/_common/log4j2.xml -Ddruid.extensions.directory="extensions" -Ddruid.extensions.loadList=[\"mysql-metadata-storage\"] -Ddruid.metadata.storage.type=mysql org.apache.druid.cli.Main tools metadata-update --connectURI="<mysql-uri>" --user <user> --password <pass> --base druid --action populate-last-used-column-in-segments
```

##### PostgreSQL

```bash
cd ${DRUID_ROOT}
java -classpath "lib/*" -Dlog4j.configurationFile=conf/druid/cluster/_common/log4j2.xml -Ddruid.extensions.directory="extensions" -Ddruid.extensions.loadList=[\"postgresql-metadata-storage\"] -Ddruid.metadata.storage.type=postgresql org.apache.druid.cli.Main tools metadata-update --connectURI="<postgresql-uri>" --user <user> --password <pass> --base druid --action populate-last-used-column-in-segments
```


#### Manual UPDATE

Note that we choose a random date string for this example. We reccommend using the current UTC time when you invoke the command. If you have lots of rows that match the conditional `used = false`, you may want to incrementlly update the table using a limit clause.

```SQL
UPDATE druid_segment
SET last_used = '2022-01-01T00:00:00.000Z'
where used = false;
```
Original file line number Diff line number Diff line change
Expand Up @@ -1006,8 +1006,8 @@ private Set<DataSegment> announceHistoricalSegmentBatch(

PreparedBatch preparedBatch = handle.prepareBatch(
StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload, last_used) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload, :last_used)",
dbTables.getSegmentsTable(),
connector.getQuoteString()
)
Expand All @@ -1024,7 +1024,8 @@ private Set<DataSegment> announceHistoricalSegmentBatch(
.bind("partitioned", (segment.getShardSpec() instanceof NoneShardSpec) ? false : true)
.bind("version", segment.getVersion())
.bind("used", usedSegments.contains(segment))
.bind("payload", jsonMapper.writeValueAsBytes(segment));
.bind("payload", jsonMapper.writeValueAsBytes(segment))
.bind("last_used", DateTimes.nowUtc().toString());
}
final int[] affectedRows = preparedBatch.execute();
final boolean succeeded = Arrays.stream(affectedRows).allMatch(eachAffectedRows -> eachAffectedRows == 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.common.collect.ImmutableList;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.dbcp2.BasicDataSourceFactory;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
Expand All @@ -44,6 +45,8 @@

import javax.annotation.Nullable;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLRecoverableException;
import java.sql.SQLTransientException;
Expand Down Expand Up @@ -208,6 +211,41 @@ public Void withHandle(Handle handle)
}
}

/**
* Execute the desired ALTER statement on the desired table
*
* @param tableName The name of the table being altered
* @param sql ALTER statment to be executed
*/
public void alterTable(final String tableName, final Iterable<String> sql)
{
try {
retryWithHandle(
new HandleCallback<Void>()
{
@Override
public Void withHandle(Handle handle)
{
if (tableExists(handle, tableName)) {
log.info("Altering table[%s]", tableName);
final Batch batch = handle.createBatch();
for (String s : sql) {
batch.add(s);
}
batch.execute();
} else {
log.info("Table[%s] doesn't exists", tableName);
}
return null;
}
}
);
}
catch (Exception e) {
log.warn(e, "Exception Altering table[%s]", tableName);
}
}

public void createPendingSegmentsTable(final String tableName)
{
createTable(
Expand Down Expand Up @@ -277,6 +315,7 @@ public void createSegmentTable(final String tableName)
+ " version VARCHAR(255) NOT NULL,\n"
+ " used BOOLEAN NOT NULL,\n"
+ " payload %2$s NOT NULL,\n"
+ " last_used VARCHAR(255) NOT NULL,\n"
+ " PRIMARY KEY (id)\n"
+ ")",
tableName, getPayloadType(), getQuoteString(), getCollation()
Expand Down Expand Up @@ -408,6 +447,50 @@ tableName, getSerialType(), getPayloadType()
);
}

/**
* Adds the last_used column to the Druid segment table.
*
* This is public due to allow the UpdateTables cli tool to use for upgrade prep.
*/
@Override
public void alterSegmentTableAddLastUsed()
{
String tableName = tablesConfigSupplier.get().getSegmentsTable();
if (!tableHasColumn(tableName, "last_used")) {
capistrant marked this conversation as resolved.
Show resolved Hide resolved
log.info("Adding last_used column to %s", tableName);
alterTable(
tableName,
ImmutableList.of(
StringUtils.format(
"ALTER TABLE %1$s \n"
+ "ADD last_used varchar(255)",
tableName
)
)
);
} else {
log.info("%s already has last_used column", tableName);
}
}

/**
* Populates the last_used column for all unused segments in the Druid segment table.
*
* The current UTC timestamp string is used for the content of each column.
* This is public due to allow the UpdateTables cli tool to use in a optional post-upgrade action.
*/
@Override
public void updateSegmentTablePopulateLastUsed()
capistrant marked this conversation as resolved.
Show resolved Hide resolved
{
String tableName = tablesConfigSupplier.get().getSegmentsTable();
getDBI().withHandle(
(Handle handle) -> handle
.createStatement(StringUtils.format("UPDATE %s SET last_used = :last_used WHERE used = false", tableName))
.bind("last_used", DateTimes.nowUtc().toString())
.execute()
);
}

@Override
public Void insertOrUpdate(
final String tableName,
Expand Down Expand Up @@ -553,7 +636,11 @@ public void createSegmentTable()
{
if (config.get().isCreateTables()) {
createSegmentTable(tablesConfigSupplier.get().getSegmentsTable());
alterSegmentTableAddLastUsed();
}
// Called outside of the above conditional because we want to validate the table
// regardless of cluster configuration for creating tables.
validateSegmentTable();
}

@Override
Expand Down Expand Up @@ -765,4 +852,52 @@ public Void withHandle(Handle handle)
log.warn(e, "Exception while deleting records from table");
}
}

/**
* Interrogate table metadata and return true or false depending on the existance of the indicated column
*
* @param tableName The table being interrogated
* @param columnName The column being looked for
* @return boolean indicating the existence of the column in question
*/
private boolean tableHasColumn(String tableName, String columnName)
capistrant marked this conversation as resolved.
Show resolved Hide resolved
{
return getDBI().withHandle(
new HandleCallback<Boolean>()
{
@Override
public Boolean withHandle(Handle handle)
{
try {
DatabaseMetaData dbMetaData = handle.getConnection().getMetaData();
ResultSet columns = dbMetaData.getColumns(
null,
null,
tableName,
columnName
);
return columns.next();
}
catch (SQLException e) {
return false;
}
}
}
);
}

/**
* Ensure that the segment table has the proper schema required to run Druid properly.
*
* Throws RuntimeException if the column does not exist. There is no recovering from an invalid schema,
* the program should crash.
*/
private void validateSegmentTable()
{
if (tableHasColumn(tablesConfigSupplier.get().getSegmentsTable(), "last_used")) {
return;
} else {
throw new RuntimeException("Invalid Segment Table Schema! No last_used column!");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public SQLMetadataSegmentPublisher(
this.config = config;
this.connector = connector;
this.statement = StringUtils.format(
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)",
"INSERT INTO %1$s (id, dataSource, created_date, start, %2$send%2$s, partitioned, version, used, payload, last_used) "
+ "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload, :last_used)",
config.getSegmentsTable(), connector.getQuoteString()
);
}
Expand All @@ -73,7 +73,8 @@ public void publishSegment(final DataSegment segment) throws IOException
(segment.getShardSpec() instanceof NoneShardSpec) ? false : true,
segment.getVersion(),
true,
jsonMapper.writeValueAsBytes(segment)
jsonMapper.writeValueAsBytes(segment),
DateTimes.nowUtc().toString()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be tidier to call DateTimes.nowUtc() just once.

);
}

Expand All @@ -87,7 +88,8 @@ void publishSegment(
final boolean partitioned,
final String version,
final boolean used,
final byte[] payload
final byte[] payload,
final String lastUsed
)
{
try {
Expand Down Expand Up @@ -128,6 +130,7 @@ public Void withHandle(Handle handle)
.bind("version", version)
.bind("used", used)
.bind("payload", payload)
.bind("last_used", lastUsed)
.execute();

return null;
Expand Down
Loading