Skip to content

Commit

Permalink
Adapt RocksDB 9.4.0 (#160)
Browse files Browse the repository at this point in the history
  • Loading branch information
linxGnu authored Sep 7, 2024
1 parent 1f73882 commit 28d05a0
Show file tree
Hide file tree
Showing 12 changed files with 195 additions and 5 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,10 @@ See also: [doc](https://godoc.org/github.com/linxGnu/grocksdb)
## API Support
Almost C API, excepts:
- [ ] get_db_identity
- [ ] putv/mergev/deletev/delete_rangev
- [ ] compaction_filter/compaction_filter_factory/compaction_filter_context
- [ ] transactiondb_property_value/transactiondb_property_int
- [ ] optimistictransactiondb_property_value/optimistictransactiondb_property_int
- [ ] writebatch_update_timestamps/writebatch_wi_update_timestamps
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ cd $BUILD_PATH && wget https://github.com/facebook/zstd/archive/v${zstd_version}

# Note: if you don't have a good reason, please do not set -DPORTABLE=ON
# This one is set here on purpose of compatibility with github action runtime processor
rocksdb_version="9.3.1"
rocksdb_version="9.4.0"
cd $BUILD_PATH && wget https://github.com/facebook/rocksdb/archive/v${rocksdb_version}.tar.gz && tar xzf v${rocksdb_version}.tar.gz && cd rocksdb-${rocksdb_version}/ && \
mkdir -p build_place && cd build_place && cmake -DCMAKE_BUILD_TYPE=Release $CMAKE_REQUIRED_PARAMS -DCMAKE_PREFIX_PATH=$INSTALL_PREFIX -DWITH_TESTS=OFF -DWITH_GFLAGS=OFF \
-DWITH_BENCHMARK_TOOLS=OFF -DWITH_TOOLS=OFF -DWITH_MD_LIBRARY=OFF -DWITH_RUNTIME_DEBUG=OFF -DROCKSDB_BUILD_SHARED=OFF -DWITH_SNAPPY=ON -DWITH_LZ4=ON -DWITH_ZLIB=ON -DWITH_LIBURING=OFF \
Expand Down
38 changes: 38 additions & 0 deletions c.h
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,9 @@ rocksdb_create_column_family_with_ttl(
extern ROCKSDB_LIBRARY_API void rocksdb_drop_column_family(
rocksdb_t* db, rocksdb_column_family_handle_t* handle, char** errptr);

extern ROCKSDB_LIBRARY_API rocksdb_column_family_handle_t*
rocksdb_get_default_column_family_handle(rocksdb_t* db);

extern ROCKSDB_LIBRARY_API void rocksdb_column_family_handle_destroy(
rocksdb_column_family_handle_t*);

Expand Down Expand Up @@ -502,6 +505,13 @@ extern ROCKSDB_LIBRARY_API char* rocksdb_get_cf_with_ts(
rocksdb_column_family_handle_t* column_family, const char* key,
size_t keylen, size_t* vallen, char** ts, size_t* tslen, char** errptr);

/**
* Returns a malloc() buffer with the DB identity, assigning the length to
* *id_len. Returns NULL if an error occurred.
*/
extern ROCKSDB_LIBRARY_API char* rocksdb_get_db_identity(rocksdb_t* db,
size_t* id_len);

// if values_list[i] == NULL and errs[i] == NULL,
// then we got status.IsNotFound(), which we will not return.
// all errors except status status.ok() and status.IsNotFound() are returned.
Expand Down Expand Up @@ -727,6 +737,8 @@ extern ROCKSDB_LIBRARY_API const char* rocksdb_iter_timestamp(
const rocksdb_iterator_t*, size_t* tslen);
extern ROCKSDB_LIBRARY_API void rocksdb_iter_get_error(
const rocksdb_iterator_t*, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_iter_refresh(
const rocksdb_iterator_t* iter, char** errptr);

extern ROCKSDB_LIBRARY_API void rocksdb_wal_iter_next(
rocksdb_wal_iterator_t* iter);
Expand All @@ -747,6 +759,10 @@ extern ROCKSDB_LIBRARY_API rocksdb_writebatch_t* rocksdb_writebatch_create(
void);
extern ROCKSDB_LIBRARY_API rocksdb_writebatch_t* rocksdb_writebatch_create_from(
const char* rep, size_t size);
extern ROCKSDB_LIBRARY_API rocksdb_writebatch_t*
rocksdb_writebatch_create_with_params(size_t reserved_bytes, size_t max_bytes,
size_t protection_bytes_per_key,
size_t default_cf_ts_sz);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_destroy(
rocksdb_writebatch_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_clear(rocksdb_writebatch_t*);
Expand Down Expand Up @@ -842,6 +858,9 @@ extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_rollback_to_save_point(
rocksdb_writebatch_t*, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_pop_save_point(
rocksdb_writebatch_t*, char** errptr);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_update_timestamps(
rocksdb_writebatch_t* wb, const char* ts, size_t tslen, void* state,
size_t (*get_ts_size)(void*, uint32_t), char** errptr);

/* Write batch with index */

Expand All @@ -850,6 +869,11 @@ rocksdb_writebatch_wi_create(size_t reserved_bytes,
unsigned char overwrite_keys);
extern ROCKSDB_LIBRARY_API rocksdb_writebatch_wi_t*
rocksdb_writebatch_wi_create_from(const char* rep, size_t size);
extern ROCKSDB_LIBRARY_API rocksdb_writebatch_wi_t*
rocksdb_writebatch_wi_create_with_params(
rocksdb_comparator_t* backup_index_comparator, size_t reserved_bytes,
unsigned char overwrite_key, size_t max_bytes,
size_t protection_bytes_per_key);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_wi_destroy(
rocksdb_writebatch_wi_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_wi_clear(
Expand Down Expand Up @@ -960,6 +984,9 @@ extern ROCKSDB_LIBRARY_API rocksdb_iterator_t*
rocksdb_writebatch_wi_create_iterator_with_base_cf(
rocksdb_writebatch_wi_t* wbwi, rocksdb_iterator_t* base_iterator,
rocksdb_column_family_handle_t* cf);
extern ROCKSDB_LIBRARY_API void rocksdb_writebatch_wi_update_timestamps(
rocksdb_writebatch_wi_t* wbwi, const char* ts, size_t tslen, void* state,
size_t (*get_ts_size)(void*, uint32_t), char** errptr);

/* Options utils */

Expand Down Expand Up @@ -1588,6 +1615,17 @@ extern ROCKSDB_LIBRARY_API void rocksdb_options_set_plain_table_factory(
rocksdb_options_t*, uint32_t, int, double, size_t, size_t, char,
unsigned char, unsigned char);

extern ROCKSDB_LIBRARY_API unsigned char
rocksdb_options_get_write_dbid_to_manifest(rocksdb_options_t*);
extern ROCKSDB_LIBRARY_API void rocksdb_options_set_write_dbid_to_manifest(
rocksdb_options_t*, unsigned char);

extern ROCKSDB_LIBRARY_API unsigned char
rocksdb_options_get_track_and_verify_wals_in_manifest(rocksdb_options_t*);
extern ROCKSDB_LIBRARY_API void
rocksdb_options_set_track_and_verify_wals_in_manifest(rocksdb_options_t*,
unsigned char);

extern ROCKSDB_LIBRARY_API void rocksdb_options_set_min_level_to_compress(
rocksdb_options_t* opt, int level);

Expand Down
5 changes: 5 additions & 0 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -1269,6 +1269,11 @@ func (db *DB) CreateColumnFamily(opts *Options, name string) (handle *ColumnFami
return
}

// GetDefaultColumnFamily gets default column family handle.
func (db *DB) GetDefaultColumnFamily() *ColumnFamilyHandle {
return newNativeColumnFamilyHandle(C.rocksdb_get_default_column_family_handle(db.c))
}

// CreateColumnFamilies creates new column families.
func (db *DB) CreateColumnFamilies(opts *Options, names []string) (handles []*ColumnFamilyHandle, err error) {
if len(names) == 0 {
Expand Down
10 changes: 10 additions & 0 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func TestDBCRUD(t *testing.T) {
ro = NewDefaultReadOptions()
)

df := db.GetDefaultColumnFamily()
require.NotNil(t, df)

// create
require.Nil(t, db.Put(wo, givenKey, givenVal1))

Expand All @@ -68,6 +71,13 @@ func TestDBCRUD(t *testing.T) {
require.Nil(t, err)
require.EqualValues(t, v1.Data(), givenVal1)

{
_v1, err := db.GetCF(ro, df, givenKey)
defer _v1.Free()
require.Nil(t, err)
require.EqualValues(t, _v1.Data(), givenVal1)
}

// retrieve bytes
_v1, err := db.GetBytes(ro, givenKey)
require.Nil(t, err)
Expand Down
15 changes: 15 additions & 0 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,21 @@ func (iter *Iterator) Err() (err error) {
return
}

// Refresh if supported, the DB state that the iterator reads from is updated to
// the latest state. The iterator will be invalidated after the call.
// Regardless of whether the iterator was created/refreshed previously
// with or without a snapshot, the iterator will be reading the
// latest DB state after this call.
// Note that you will need to call a Seek*() function to get the iterator
// back into a valid state before calling a function that assumes the
// state is already valid, like Next().
func (iter *Iterator) Refresh() (err error) {
var cErr *C.char
C.rocksdb_iter_refresh(iter.c, &cErr)
err = fromCError(cErr)
return
}

// Close closes the iterator.
func (iter *Iterator) Close() {
C.rocksdb_iter_destroy(iter.c)
Expand Down
2 changes: 2 additions & 0 deletions iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ func TestIterator(t *testing.T) {
}
require.Nil(t, iter.Err())
require.EqualValues(t, actualKeys, givenKeys)

require.NoError(t, iter.Refresh())
}

func TestIteratorWriteManyThenIter(t *testing.T) {
Expand Down
44 changes: 44 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2044,6 +2044,50 @@ func (opts *Options) SetPlainTableFactory(
)
}

// WriteDBIDToManifest writes/does not write historically DB ID has always been stored in Identity File in DB folder.
// If this flag is true, the DB ID is written to Manifest file in addition
// to the Identity file. By doing this 2 problems are solved
// 1. We don't checksum the Identity file where as Manifest file is.
// 2. Since the source of truth for DB is Manifest file DB ID will sit with
// the source of truth. Previously the Identity file could be copied
// independent of Manifest and that can result in wrong DB ID.
//
// We recommend setting this flag to true.
//
// Default: false
func (opts *Options) WriteDBIDToManifest(v bool) {
C.rocksdb_options_set_write_dbid_to_manifest(opts.c, boolToChar(v))
}

// IsDBIDWrittenToManifest returns if historically DB ID has always been stored in Identity File in DB folder.
func (opts *Options) IsDBIDWrittenToManifest() bool {
return charToBool(C.rocksdb_options_get_write_dbid_to_manifest(opts.c))
}

// ToggleTrackAndVerifyWALsInManifestFlag if true, the log numbers and sizes of the synced WALs are tracked
// in MANIFEST. During DB recovery, if a synced WAL is missing
// from disk, or the WAL's size does not match the recorded size in
// MANIFEST, an error will be reported and the recovery will be aborted.
//
// This is one additional protection against WAL corruption besides the
// per-WAL-entry checksum.
//
// Note that this option does not work with secondary instance.
// Currently, only syncing closed WALs are tracked. Calling `DB::SyncWAL()`,
// etc. or writing with `WriteOptions::sync=true` to sync the live WAL is not
// tracked for performance/efficiency reasons.
//
// Default: false
func (opts *Options) ToggleTrackAndVerifyWALsInManifestFlag(v bool) {
C.rocksdb_options_set_track_and_verify_wals_in_manifest(opts.c, boolToChar(v))
}

// TrackAndVerifyWALsInManifestFlag checks if the log numbers and sizes of the synced WALs are tracked
// in MANIFEST.
func (opts *Options) TrackAndVerifyWALsInManifestFlag() bool {
return charToBool(C.rocksdb_options_get_track_and_verify_wals_in_manifest(opts.c))
}

// SetWriteBufferManager binds with a WriteBufferManager.
//
// The memory usage of memtable will report to this object. The same object
Expand Down
8 changes: 8 additions & 0 deletions options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,14 @@ func TestOptions(t *testing.T) {
opts.SetMaxSubcompactions(3)
require.EqualValues(t, 3, opts.GetMaxSubcompactions())

require.False(t, opts.IsDBIDWrittenToManifest())
opts.WriteDBIDToManifest(true)
require.True(t, opts.IsDBIDWrittenToManifest())

require.False(t, opts.TrackAndVerifyWALsInManifestFlag())
opts.ToggleTrackAndVerifyWALsInManifestFlag(true)
require.True(t, opts.TrackAndVerifyWALsInManifestFlag())

opts.SetMaxBytesForLevelMultiplierAdditional([]int{2 << 20})

opts.SetDbLogDir("./abc")
Expand Down
9 changes: 7 additions & 2 deletions write_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ func NewWriteBatch() *WriteBatch {
return newNativeWriteBatch(C.rocksdb_writebatch_create())
}

// NewWriteBatchWithParams with params.
func NewWriteBatchWithParams(reservedBytes, maxBytes, protectionBytesPerKey, defaultCFTs int) *WriteBatch {
return newNativeWriteBatch(C.rocksdb_writebatch_create_with_params(C.size_t(reservedBytes), C.size_t(maxBytes), C.size_t(protectionBytesPerKey), C.size_t(defaultCFTs)))
}

// NewNativeWriteBatch create a WriteBatch object.
func newNativeWriteBatch(c *C.rocksdb_writebatch_t) *WriteBatch {
return &WriteBatch{
Expand Down Expand Up @@ -129,15 +134,15 @@ func (wb *WriteBatch) SingleDeleteCFWithTS(cf *ColumnFamilyHandle, key, ts []byt
}

// DeleteRange deletes keys that are between [startKey, endKey)
func (wb *WriteBatch) DeleteRange(startKey []byte, endKey []byte) {
func (wb *WriteBatch) DeleteRange(startKey, endKey []byte) {
cStartKey := refGoBytes(startKey)
cEndKey := refGoBytes(endKey)
C.rocksdb_writebatch_delete_range(wb.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey)))
}

// DeleteRangeCF deletes keys that are between [startKey, endKey) and
// belong to a given column family
func (wb *WriteBatch) DeleteRangeCF(cf *ColumnFamilyHandle, startKey []byte, endKey []byte) {
func (wb *WriteBatch) DeleteRangeCF(cf *ColumnFamilyHandle, startKey, endKey []byte) {
cStartKey := refGoBytes(startKey)
cEndKey := refGoBytes(endKey)
C.rocksdb_writebatch_delete_range_cf(wb.c, cf.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey)))
Expand Down
49 changes: 49 additions & 0 deletions write_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,55 @@ func TestWriteBatch(t *testing.T) {
require.True(t, v1.Data() == nil)
}

func TestWriteBatchWithParams(t *testing.T) {
t.Parallel()

db := newTestDB(t, nil)
defer db.Close()

var (
givenKey1 = []byte("key1")
givenVal1 = []byte("val1")
givenKey2 = []byte("key2")
)
wo := NewDefaultWriteOptions()
require.Nil(t, db.Put(wo, givenKey2, []byte("foo")))

// create and fill the write batch
wb := NewWriteBatchWithParams(10000, 200000, 10, 0)
defer wb.Destroy()
wb.Put(givenKey1, givenVal1)
wb.Delete(givenKey2)
require.EqualValues(t, wb.Count(), 2)

// perform the batch
require.Nil(t, db.Write(wo, wb))

// check changes
ro := NewDefaultReadOptions()
v1, err := db.Get(ro, givenKey1)
defer v1.Free()
require.Nil(t, err)
require.EqualValues(t, v1.Data(), givenVal1)

v2, err := db.Get(ro, givenKey2)
defer v2.Free()
require.Nil(t, err)
require.True(t, v2.Data() == nil)

// DeleteRange test
wb.Clear()
wb.DeleteRange(givenKey1, givenKey2)

// perform the batch
require.Nil(t, db.Write(wo, wb))

v1, err = db.Get(ro, givenKey1)
defer v1.Free()
require.Nil(t, err)
require.True(t, v1.Data() == nil)
}

func TestWriteBatchIterator(t *testing.T) {
t.Parallel()

Expand Down
15 changes: 13 additions & 2 deletions write_batch_wi.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ func NewWriteBatchWI(reservedBytes uint, overwriteKeys bool) *WriteBatchWI {
return newNativeWriteBatchWI(cWB)
}

// NewWriteBatchWIWithParams with params.
func NewWriteBatchWIWithParams(cp *Comparator, reservedBytes int, overwriteKey bool, maxBytes, protectionBytesPerKey int) *WriteBatchWI {
return newNativeWriteBatchWI(C.rocksdb_writebatch_wi_create_with_params(
cp.c,
C.size_t(reservedBytes),
boolToChar(overwriteKey),
C.size_t(maxBytes),
C.size_t(protectionBytesPerKey),
))
}

// NewNativeWriteBatchWI create a WriteBatchWI object.
func newNativeWriteBatchWI(c *C.rocksdb_writebatch_wi_t) *WriteBatchWI {
return &WriteBatchWI{c: c}
Expand Down Expand Up @@ -99,15 +110,15 @@ func (wb *WriteBatchWI) SingleDeleteCF(cf *ColumnFamilyHandle, key []byte) {
}

// DeleteRange deletes keys that are between [startKey, endKey)
func (wb *WriteBatchWI) DeleteRange(startKey []byte, endKey []byte) {
func (wb *WriteBatchWI) DeleteRange(startKey, endKey []byte) {
cStartKey := refGoBytes(startKey)
cEndKey := refGoBytes(endKey)
C.rocksdb_writebatch_wi_delete_range(wb.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey)))
}

// DeleteRangeCF deletes keys that are between [startKey, endKey) and
// belong to a given column family
func (wb *WriteBatchWI) DeleteRangeCF(cf *ColumnFamilyHandle, startKey []byte, endKey []byte) {
func (wb *WriteBatchWI) DeleteRangeCF(cf *ColumnFamilyHandle, startKey, endKey []byte) {
cStartKey := refGoBytes(startKey)
cEndKey := refGoBytes(endKey)
C.rocksdb_writebatch_wi_delete_range_cf(wb.c, cf.c, cStartKey, C.size_t(len(startKey)), cEndKey, C.size_t(len(endKey)))
Expand Down

0 comments on commit 28d05a0

Please sign in to comment.