From 1af566b151108b2771018c44823e941ceb6d958b Mon Sep 17 00:00:00 2001 From: swikar1 <123534169+swikar1@users.noreply.github.com> Date: Wed, 18 Dec 2024 16:24:29 -0800 Subject: [PATCH] # This is a combination of 5 commits. KAFKA-15443 Upgrade RocksDB to 9.7.3 # This is the 1st commit message: KAFKA-15443 Update rocksdb version and changes in java class # This is the commit message #2: KAFKA-15443 Upgrade RocksDB dependency and address compatibility issues # This is the commit message #3: Update RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java # This is the commit message #4: Update RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java # This is the commit message #5: To satisfy rocksdb 9.7.3 KAFKA-15443 Upgrade RocksDB to 9.7.3 KAFKA-15443 Upgrade RocksDB to 9.7.3 KAFKA-15443 Upgrade RocksDB to 9.7.3 --- LICENSE-binary | 2 +- docs/ops.html | 5 ---- docs/streams/upgrade-guide.html | 17 ++++++++++++-- gradle/dependencies.gradle | 2 +- ...ToDbOptionsColumnFamilyOptionsAdapter.java | 23 ++++++++++--------- .../metrics/RocksDBMetricsRecorder.java | 3 +-- ...OptionsColumnFamilyOptionsAdapterTest.java | 6 ++--- .../metrics/RocksDBMetricsRecorderTest.java | 8 +++---- 8 files changed, 35 insertions(+), 31 deletions(-) diff --git a/LICENSE-binary b/LICENSE-binary index 5383cf60ec6e5..686ebe39030e1 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -268,7 +268,7 @@ netty-transport-native-epoll-4.1.115.Final netty-transport-native-unix-common-4.1.115.Final opentelemetry-proto-1.0.0-alpha plexus-utils-3.5.1 -rocksdbjni-7.9.2 +rocksdbjni-9.7.3 scala-library-2.13.15 scala-logging_2.13-3.9.5 scala-reflect-2.13.15 diff --git a/docs/ops.html b/docs/ops.html index abb335afa5baf..5b44fcef30075 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -3453,11 +3453,6 @@
< More details about the new config StreamsConfig#TOPOLOGY_OPTIMIZATION_CONFIG can be found in KIP-295.

-

Streams API changes in 4.0.0

-

In this release the ClientInstanceIds instance stores the global consumerUuid for the KIP-714 @@ -173,6 +171,21 @@

Streams API See KIP-1112 for more details.

+

Streams API changes in 4.0.0

+ +

+ Upgraded RocksDB dependency to version 9.7.3 (from 7.9.2). This upgrade incorporates various improvements and optimizations within RocksDB. However, it also introduces some API changes. + The org.rocksdb.AccessHint class, along with its associated methods, has been removed. + + org.rocksdb.Options.setMemtableMaxRangeDeletions(int p1): ColumnFamilyOptionsInterface has been added, providing control over the maximum number of range deletions in the memtable. + org.rocksdb.Options.memtableMaxRangeDeletions(): int has been added, allowing retrieval of the configured maximum number of range deletions in the memtable. + + The org.rocksdb.Options.setLogger() method now accepts a LoggerInterface as a parameter instead of the previous Logger. + The NO_FILE_CLOSES field has been removed from the org.rocksdb.TickerTypeenum. + Some data types used in RocksDB's Java API have been modified. These changes, along with the removed class, field, and new methods, are primarily relevant to users implementing custom RocksDB configurations. + These changes are expected to be largely transparent to most Kafka Streams users. However, those employing advanced RocksDB customizations within their Streams applications, particularly through the rocksdb.config.setter, are advised to consult the detailed RocksDB 9.7.3 changelog to ensure a smooth transition and adapt their configurations as needed. Specifically, users leveraging the removed AccessHintclass, the NO_FILE_CLOSES field from TickerType, or relying on the previous signature of setLogger() will need to update their implementations. +

+

Streams API changes in 3.9.0

diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 52ef2fcfde1b4..473edec0c6619 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -119,7 +119,7 @@ versions += [ protobuf: "3.25.5", // a dependency of opentelemetryProto pcollections: "4.0.1", re2j: "1.7", - rocksDB: "7.9.2", + rocksDB: "9.7.3", // When updating the scalafmt version please also update the version field in checkstyle/.scalafmt.conf. scalafmt now // has the version field as mandatory in its configuration, see // https://github.com/scalameta/scalafmt/releases/tag/v3.1.0. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java index b7b611f8be0d3..5a8083ac094c6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java @@ -22,7 +22,6 @@ import org.rocksdb.AbstractEventListener; import org.rocksdb.AbstractSlice; import org.rocksdb.AbstractWalFilter; -import org.rocksdb.AccessHint; import org.rocksdb.BuiltinComparator; import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; @@ -37,6 +36,7 @@ import org.rocksdb.DbPath; import org.rocksdb.Env; import org.rocksdb.InfoLogLevel; +import org.rocksdb.LoggerInterface; import org.rocksdb.MemTableConfig; import org.rocksdb.MergeOperator; import org.rocksdb.Options; @@ -571,16 +571,7 @@ public long dbWriteBufferSize() { return dbOptions.dbWriteBufferSize(); } - @Override - public Options setAccessHintOnCompactionStart(final AccessHint accessHint) { - dbOptions.setAccessHintOnCompactionStart(accessHint); - return this; - } - @Override - public AccessHint accessHintOnCompactionStart() { - return dbOptions.accessHintOnCompactionStart(); - } @Deprecated public Options setNewTableReaderForCompactionInputs(final boolean newTableReaderForCompactionInputs) { @@ -843,7 +834,7 @@ public Options setSstFileManager(final SstFileManager sstFileManager) { } @Override - public Options setLogger(final org.rocksdb.Logger logger) { + public Options setLogger(final LoggerInterface logger) { dbOptions.setLogger(logger); return this; } @@ -914,6 +905,16 @@ public Options setCompressionType(final CompressionType compressionType) { return this; } + @Override + public Options setMemtableMaxRangeDeletions(final int n) { + columnFamilyOptions.setMemtableMaxRangeDeletions(n); + return this; + } + + @Override + public int memtableMaxRangeDeletions() { + return columnFamilyOptions.memtableMaxRangeDeletions(); + } @Override public Options setBottommostCompressionType(final CompressionType bottommostCompressionType) { diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java index 03b1f7eaf02f1..10e8cb804fece 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorder.java @@ -462,8 +462,7 @@ public void record(final long now) { writeStallDuration += valueProviders.statistics.getAndResetTickerCount(TickerType.STALL_MICROS); bytesWrittenDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES); bytesReadDuringCompaction += valueProviders.statistics.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES); - numberOfOpenFiles += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_OPENS) - - valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_CLOSES); + numberOfOpenFiles = -1; numberOfFileErrors += valueProviders.statistics.getAndResetTickerCount(TickerType.NO_FILE_ERRORS); final HistogramData memtableFlushTimeData = valueProviders.statistics.getHistogramData(HistogramType.FLUSH_TIME); memtableFlushTimeSum += memtableFlushTimeData.getSum(); diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java index 5ddcf5bef551e..08248b020544e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest.java @@ -28,7 +28,6 @@ import org.rocksdb.AbstractCompactionFilter.Context; import org.rocksdb.AbstractCompactionFilterFactory; import org.rocksdb.AbstractWalFilter; -import org.rocksdb.AccessHint; import org.rocksdb.BuiltinComparator; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.CompactionPriority; @@ -112,6 +111,8 @@ public class RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapterTest { add("setMaxBackgroundCompactions"); add("maxBackgroundFlushes"); add("setMaxBackgroundFlushes"); + add("tablePropertiesCollectorFactory"); + add("setTablePropertiesCollectorFactory"); addAll(walRelatedMethods); } }; @@ -176,9 +177,6 @@ private Object[] getDBOptionsParameters(final Class[] parameterTypes) throws case "java.util.Collection": parameters[i] = new ArrayList<>(); break; - case "org.rocksdb.AccessHint": - parameters[i] = AccessHint.NONE; - break; case "org.rocksdb.Cache": parameters[i] = new LRUCache(1L); break; diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java index 7ec3f4bf38c76..a0c068b59ee5d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java @@ -474,10 +474,8 @@ public void shouldRecordStatisticsBasedMetrics() { final double expectedCompactionTimeMaxSensor = 24.0; when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L); - when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L); - when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L); - final double expectedNumberOfOpenFilesSensor = (5 + 7) - (3 + 4); + final double expectedNumberOfOpenFilesSensor = -1; when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L); when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L); @@ -485,8 +483,8 @@ public void shouldRecordStatisticsBasedMetrics() { recorder.record(now); - verify(statisticsToAdd1, times(17)).getAndResetTickerCount(isA(TickerType.class)); - verify(statisticsToAdd2, times(17)).getAndResetTickerCount(isA(TickerType.class)); + verify(statisticsToAdd1, times(15)).getAndResetTickerCount(isA(TickerType.class)); + verify(statisticsToAdd2, times(15)).getAndResetTickerCount(isA(TickerType.class)); verify(statisticsToAdd1, times(2)).getHistogramData(isA(HistogramType.class)); verify(statisticsToAdd2, times(2)).getHistogramData(isA(HistogramType.class)); verify(bytesWrittenToDatabaseSensor).record(expectedBytesWrittenToDatabaseSensor, now);