Skip to content

Commit

Permalink
# This is a combination of 5 commits.
Browse files Browse the repository at this point in the history
KAFKA-15443 Upgrade RocksDB to 9.7.3

# This is the 1st commit message:

KAFKA-15443 Update rocksdb version and changes in java class

# This is the commit message apache#2:

KAFKA-15443 Upgrade RocksDB dependency and address compatibility issues

# This is the commit message apache#3:

Update RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java

# This is the commit message apache#4:

Update RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java

# This is the commit message apache#5:

To satisfy rocksdb 9.7.3

KAFKA-15443 Upgrade RocksDB to 9.7.3

KAFKA-15443 Upgrade RocksDB to 9.7.3

KAFKA-15443 Upgrade RocksDB to 9.7.3
  • Loading branch information
swikarpat committed Jan 3, 2025
1 parent 40e0543 commit 1af566b
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 31 deletions.
2 changes: 1 addition & 1 deletion LICENSE-binary
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ netty-transport-native-epoll-4.1.115.Final
netty-transport-native-unix-common-4.1.115.Final
opentelemetry-proto-1.0.0-alpha
plexus-utils-3.5.1
rocksdbjni-7.9.2
rocksdbjni-9.7.3
scala-library-2.13.15
scala-logging_2.13-3.9.5
scala-reflect-2.13.15
Expand Down
5 changes: 0 additions & 5 deletions docs/ops.html
Original file line number Diff line number Diff line change
Expand Up @@ -3453,11 +3453,6 @@ <h5 class="anchor-heading"><a id="kafka_streams_rocksdb_monitoring" class="ancho
<td>The maximum duration in ms of disc compactions.</td>
<td>kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)</td>
</tr>
<tr>
<td>number-open-files</td>
<td>The number of current open files.</td>
<td>kafka.streams:type=stream-state-metrics,thread-id=([-.\w]+),task-id=([-.\w]+),[store-scope]-id=([-.\w]+)</td>
</tr>
<tr>
<td>number-file-errors-total</td>
<td>The total number of file errors occurred.</td>
Expand Down
17 changes: 15 additions & 2 deletions docs/streams/upgrade-guide.html
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ <h3 class="anchor-heading"><a id="streams_notable_changes" class="anchor-link"><
More details about the new config <code>StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG</code> can be found in <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-295%3A+Add+Streams+Configuration+Allowing+for+Optional+Topology+Optimization">KIP-295</a>.
</p>

<h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>

<p>
In this release the <code>ClientInstanceIds</code> instance stores the global consumer<code>Uuid</code> for the
<a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-714%3A+Client+metrics+and+observability#KIP714:Clientmetricsandobservability-Clientidentificationandtheclientinstanceid">KIP-714</a>
Expand Down Expand Up @@ -173,6 +171,21 @@ <h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API
See <a href="https://cwiki.apache.org/confluence/x/TZCMEw">KIP-1112</a> for more details.
</p>

<h3><a id="streams_api_changes_400" href="#streams_api_changes_400">Streams API changes in 4.0.0</a></h3>

<p>
Upgraded RocksDB dependency to version 9.7.3 (from 7.9.2). This upgrade incorporates various improvements and optimizations within RocksDB. However, it also introduces some API changes.
The <code>org.rocksdb.AccessHint</code> class, along with its associated methods, has been removed.

<code>org.rocksdb.Options.setMemtableMaxRangeDeletions(int p1): ColumnFamilyOptionsInterface</code> has been added, providing control over the maximum number of range deletions in the memtable.
<code>org.rocksdb.Options.memtableMaxRangeDeletions(): int</code> has been added, allowing retrieval of the configured maximum number of range deletions in the memtable.

The <code>org.rocksdb.Options.setLogger()</code> method now accepts a <code>LoggerInterface</code> as a parameter instead of the previous <code>Logger</code>.
The <code>NO_FILE_CLOSES</code> field has been removed from the <code>org.rocksdb.TickerTypeenum</code>.
Some data types used in RocksDB's Java API have been modified. These changes, along with the removed class, field, and new methods, are primarily relevant to users implementing custom RocksDB configurations.
These changes are expected to be largely transparent to most Kafka Streams users. However, those employing advanced RocksDB customizations within their Streams applications, particularly through the <code>rocksdb.config.setter</code>, are advised to consult the detailed RocksDB 9.7.3 changelog to ensure a smooth transition and adapt their configurations as needed. Specifically, users leveraging the removed <code>AccessHintclass</code>, the <code>NO_FILE_CLOSES</code> field from <code>TickerType</code>, or relying on the previous signature of <code>setLogger()</code> will need to update their implementations.
</p>

<h3><a id="streams_api_changes_390" href="#streams_api_changes_390">Streams API changes in 3.9.0</a></h3>

<p>
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ versions += [
protobuf: "3.25.5", // a dependency of opentelemetryProto
pcollections: "4.0.1",
re2j: "1.7",
rocksDB: "7.9.2",
rocksDB: "9.7.3",
// When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now
// has the version field as mandatory in its configuration, see
// https://github.com/scalameta/scalafmt/releases/tag/v3.1.0.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.rocksdb.AbstractEventListener;
import org.rocksdb.AbstractSlice;
import org.rocksdb.AbstractWalFilter;
import org.rocksdb.AccessHint;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
Expand All @@ -37,6 +36,7 @@
import org.rocksdb.DbPath;
import org.rocksdb.Env;
import org.rocksdb.InfoLogLevel;
import org.rocksdb.LoggerInterface;
import org.rocksdb.MemTableConfig;
import org.rocksdb.MergeOperator;
import org.rocksdb.Options;
Expand Down Expand Up @@ -571,16 +571,7 @@ public long dbWriteBufferSize() {
return dbOptions.dbWriteBufferSize();
}

@Override
public Options setAccessHintOnCompactionStart(final AccessHint accessHint) {
dbOptions.setAccessHintOnCompactionStart(accessHint);
return this;
}

@Override
public AccessHint accessHintOnCompactionStart() {
return dbOptions.accessHintOnCompactionStart();
}

@Deprecated
public Options setNewTableReaderForCompactionInputs(final boolean newTableReaderForCompactionInputs) {
Expand Down Expand Up @@ -843,7 +834,7 @@ public Options setSstFileManager(final SstFileManager sstFileManager) {
}

@Override
public Options setLogger(final org.rocksdb.Logger logger) {
public Options setLogger(final LoggerInterface logger) {
dbOptions.setLogger(logger);
return this;
}
Expand Down Expand Up @@ -914,6 +905,16 @@ public Options setCompressionType(final CompressionType compressionType) {
return this;
}

@Override
public Options setMemtableMaxRangeDeletions(final int n) {
columnFamilyOptions.setMemtableMaxRangeDeletions(n);
return this;
}

@Override
public int memtableMaxRangeDeletions() {
return columnFamilyOptions.memtableMaxRangeDeletions();
}

@Override
public Options setBottommostCompressionType(final CompressionType bottommostCompressionType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,8 +462,7 @@ public void record(final long now) {
writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS);
bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES);
bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES);
numberOfOpenFiles += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS)
- valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_CLOSES);
numberOfOpenFiles = -1;
numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS);
final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME);
memtableFlushTimeSum += memtableFlushTimeData.getSum();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.rocksdb.AbstractCompactionFilter.Context;
import org.rocksdb.AbstractCompactionFilterFactory;
import org.rocksdb.AbstractWalFilter;
import org.rocksdb.AccessHint;
import org.rocksdb.BuiltinComparator;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.CompactionPriority;
Expand Down Expand Up @@ -112,6 +111,8 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest {
add("setMaxBackgroundCompactions");
add("maxBackgroundFlushes");
add("setMaxBackgroundFlushes");
add("tablePropertiesCollectorFactory");
add("setTablePropertiesCollectorFactory");
addAll(walRelatedMethods);
}
};
Expand Down Expand Up @@ -176,9 +177,6 @@ private Object[] getDBOptionsParameters(final Class<?>[] parameterTypes) throws
case "java.util.Collection":
parameters[i] = new ArrayList<>();
break;
case "org.rocksdb.AccessHint":
parameters[i] = AccessHint.NONE;
break;
case "org.rocksdb.Cache":
parameters[i] = new LRUCache(1L);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,19 +474,17 @@ public void shouldRecordStatisticsBasedMetrics() {
final double expectedCompactionTimeMaxSensor = 24.0;

when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L);
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L);
final double expectedNumberOfOpenFilesSensor = (5 + 7) - (3 + 4);
final double expectedNumberOfOpenFilesSensor = -1;

when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L);
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L);
final double expectedNumberOfFileErrorsSensor = 11 + 34;

recorder.record(now);

verify(statisticsToAdd1, times(17)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd2, times(17)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd1, times(15)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd2, times(15)).getAndResetTickerCount(isA(TickerType.class));
verify(statisticsToAdd1, times(2)).getHistogramData(isA(HistogramType.class));
verify(statisticsToAdd2, times(2)).getHistogramData(isA(HistogramType.class));
verify(bytesWrittenToDatabaseSensor).record(expectedBytesWrittenToDatabaseSensor, now);
Expand Down

0 comments on commit 1af566b

Please sign in to comment.