Skip to content

Commit

Permalink
move serializer into Interface
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Jungblut <[email protected]>
  • Loading branch information
tjungblu committed Jul 1, 2024
1 parent 7e8d09b commit 7f65806
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 150 deletions.
8 changes: 3 additions & 5 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,8 @@ type DB struct {
rwtx *Tx
txs []*Tx

freelist fl.Interface
freelistSerializer fl.Serializable
freelistLoad sync.Once
freelist fl.Interface
freelistLoad sync.Once

pagePool sync.Pool

Expand Down Expand Up @@ -192,7 +191,6 @@ func Open(path string, mode os.FileMode, options *Options) (db *DB, err error) {
db.NoFreelistSync = options.NoFreelistSync
db.PreLoadFreelist = options.PreLoadFreelist
db.FreelistType = options.FreelistType
db.freelistSerializer = fl.Serializer{}
db.Mlock = options.Mlock

// Set default values for later DB operations.
Expand Down Expand Up @@ -425,7 +423,7 @@ func (db *DB) loadFreelist() {
db.freelist.Init(db.freepages())
} else {
// Read free list from freelist page.
db.freelistSerializer.Read(db.freelist, db.page(db.meta().Freelist()))
db.freelist.Read(db.page(db.meta().Freelist()))
}
db.stats.FreePageN = db.freelist.FreeCount()
db.stats.PendingPageN = db.freelist.PendingCount()
Expand Down
2 changes: 1 addition & 1 deletion internal/freelist/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,6 @@ func NewArrayFreelist() Interface {
},
}
// this loopy reference allows us to share the span merging via interfaces
a.shared.spanMerger = a
a.shared.sharedInterface = a
return a
}
18 changes: 16 additions & 2 deletions internal/freelist/freelist.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,21 @@ import (
"go.etcd.io/bbolt/internal/common"
)

type ReadWriter interface {
// Read calls Init with the page ids stored in te given page.
Read(page *common.Page)

// Write writes the freelist into the given page.
Write(page *common.Page)

// EstimatedWritePageSize returns the size of the freelist after serialization in Write.
// This should never underestimate the size.
EstimatedWritePageSize() int
}

type Interface interface {
ReadWriter

// Init initializes this freelist with the given list of pages.
Init(ids common.Pgids)

Expand Down Expand Up @@ -58,8 +72,8 @@ func Copyall(f Interface, dst []common.Pgid) {
}

// Reload reads the freelist from a page and filters out pending items.
func Reload(s Serializable, f Interface, p *common.Page) {
s.Read(f, p)
func Reload(f Interface, p *common.Page) {
f.Read(p)
NoSyncReload(f, p.FreelistPageIds())
}

Expand Down
47 changes: 47 additions & 0 deletions internal/freelist/freelist_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"reflect"
"sort"
"testing"
"unsafe"

"go.etcd.io/bbolt/internal/common"
)
Expand Down Expand Up @@ -178,6 +179,52 @@ func TestFreelist_releaseRange(t *testing.T) {
}
}

// Ensure that a freelist can deserialize from a freelist page.
func TestFreelist_read(t *testing.T) {
// Create a page.
var buf [4096]byte
page := (*common.Page)(unsafe.Pointer(&buf[0]))
page.SetFlags(common.FreelistPageFlag)
page.SetCount(2)

// Insert 2 page ids.
ids := (*[3]common.Pgid)(unsafe.Pointer(uintptr(unsafe.Pointer(page)) + unsafe.Sizeof(*page)))
ids[0] = 23
ids[1] = 50

// Deserialize page into a freelist.
f := newTestFreelist()
f.Read(page)

// Ensure that there are two page ids in the freelist.
if exp := common.Pgids([]common.Pgid{23, 50}); !reflect.DeepEqual(exp, f.FreePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f.FreePageIds())
}
}

// Ensure that a freelist can serialize into a freelist page.
func TestFreelist_write(t *testing.T) {
// Create a freelist and write it to a page.
var buf [4096]byte
f := newTestFreelist()

f.Init([]common.Pgid{12, 39})
f.pendingPageIds()[100] = &txPending{ids: []common.Pgid{28, 11}}
f.pendingPageIds()[101] = &txPending{ids: []common.Pgid{3}}
p := (*common.Page)(unsafe.Pointer(&buf[0]))
f.Write(p)

// Read the page back out.
f2 := newTestFreelist()
f2.Read(p)

// Ensure that the freelist is correct.
// All pages should be present and in reverse order.
if exp := common.Pgids([]common.Pgid{3, 11, 12, 28, 39}); !reflect.DeepEqual(exp, f2.FreePageIds()) {
t.Fatalf("exp=%v; got=%v", exp, f2.FreePageIds())
}
}

func Benchmark_FreelistRelease10K(b *testing.B) { benchmark_FreelistRelease(b, 10000) }
func Benchmark_FreelistRelease100K(b *testing.B) { benchmark_FreelistRelease(b, 100000) }
func Benchmark_FreelistRelease1000K(b *testing.B) { benchmark_FreelistRelease(b, 1000000) }
Expand Down
2 changes: 1 addition & 1 deletion internal/freelist/hashmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,6 @@ func NewHashMapFreelist() Interface {
backwardMap: make(map[common.Pgid]uint64),
}
// this loopy reference allows us to share the span merging via interfaces
hm.shared.spanMerger = hm
hm.shared.sharedInterface = hm
return hm
}
79 changes: 0 additions & 79 deletions internal/freelist/serde.go

This file was deleted.

55 changes: 0 additions & 55 deletions internal/freelist/serde_test.go

This file was deleted.

80 changes: 77 additions & 3 deletions internal/freelist/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,19 @@ package freelist

import (
"fmt"
"sort"
"unsafe"

"go.etcd.io/bbolt/internal/common"
)

type spanMerger interface {
type sharedInterface interface {
// Init initializes this freelist with the given list of pages.
Init(ids common.Pgids)
// Count returns the number of free and pending pages.
Count() int
// FreePageIds returns all free pages.
FreePageIds() common.Pgids
// mergeSpans is merging the given pages into the freelist
mergeSpans(ids common.Pgids)
}
Expand All @@ -18,7 +26,7 @@ type txPending struct {
}

type shared struct {
spanMerger
sharedInterface

allocs map[common.Pgid]common.Txid // mapping of Txid that allocated a pgid.
cache map[common.Pgid]struct{} // fast lookup of all free and pending page ids.
Expand Down Expand Up @@ -96,7 +104,7 @@ func (t *shared) Rollback(txid common.Txid) {
}
// Remove pages from pending list and mark as free if allocated by txid.
delete(t.pending, txid)
t.spanMerger.mergeSpans(m)
t.sharedInterface.mergeSpans(m)
}

func (t *shared) Release(txid common.Txid) {
Expand Down Expand Up @@ -144,6 +152,72 @@ func (t *shared) ReleaseRange(begin, end common.Txid) {
t.mergeSpans(m)
}

func (t *shared) Read(p *common.Page) {
if !p.IsFreelistPage() {
panic(fmt.Sprintf("invalid freelist page: %d, page type is %s", p.Id(), p.Typ()))
}

ids := p.FreelistPageIds()

// Copy the list of page ids from the freelist.
if len(ids) == 0 {
t.Init(nil)
} else {
// copy the ids, so we don't modify on the freelist page directly
idsCopy := make([]common.Pgid, len(ids))
copy(idsCopy, ids)
// Make sure they're sorted.
sort.Sort(common.Pgids(idsCopy))

t.Init(idsCopy)
}
}

func (t *shared) EstimatedWritePageSize() int {
n := t.Count()
if n >= 0xFFFF {
// The first element will be used to store the count. See freelist.write.
n++
}
return int(common.PageHeaderSize) + (int(unsafe.Sizeof(common.Pgid(0))) * n)
}

func (t *shared) Write(p *common.Page) {
// Combine the old free pgids and pgids waiting on an open transaction.

// Update the header flag.
p.SetFlags(common.FreelistPageFlag)

// The page.count can only hold up to 64k elements so if we overflow that
// number then we handle it by putting the size in the first element.
l := t.Count()
if l == 0 {
p.SetCount(uint16(l))
} else if l < 0xFFFF {
p.SetCount(uint16(l))
data := common.UnsafeAdd(unsafe.Pointer(p), unsafe.Sizeof(*p))
ids := unsafe.Slice((*common.Pgid)(data), l)
t.copyall(ids)
} else {
p.SetCount(0xFFFF)
data := common.UnsafeAdd(unsafe.Pointer(p), unsafe.Sizeof(*p))
ids := unsafe.Slice((*common.Pgid)(data), l+1)
ids[0] = common.Pgid(l)
t.copyall(ids[1:])
}
}

// Copyall copies a list of all free ids and all pending ids in one sorted list.
// f.count returns the minimum length required for dst.
func (t *shared) copyall(dst []common.Pgid) {
m := make(common.Pgids, 0, t.PendingCount())
for _, txp := range t.pendingPageIds() {
m = append(m, txp.ids...)
}
sort.Sort(m)
common.Mergepgids(dst, t.FreePageIds(), m)
}

// reindex rebuilds the free cache based on available and pending free lists.
func (t *shared) reindex(free common.Pgids, pending map[common.Txid]*txPending) {
t.cache = make(map[common.Pgid]struct{}, len(free))
Expand Down
Loading

0 comments on commit 7f65806

Please sign in to comment.