Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve file utilization when using a JetStream stream as a KV. #2456

Merged
merged 4 commits into from
Aug 20, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/klauspost/compress v1.11.12
github.com/minio/highwayhash v1.0.1
github.com/nats-io/jwt/v2 v2.0.3
github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19 h1:9WQzXoYc37xB
github.com/nats-io/nats.go v1.11.1-0.20210813172934-9c00d13a8a19/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9 h1:aJYmbbVrq6rsFGAvQnAvoChjkjUOJGqVBdQ47vbEWD4=
github.com/nats-io/nats.go v1.11.1-0.20210817011318-78b4cc260af9/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.11.1-0.20210817171728-9d3a000c8a66 h1:J3LNTmD/AUgjKJjZK2IEsGl2GD1znemMOq64ZKu83ok=
github.com/nats-io/nats.go v1.11.1-0.20210817171728-9d3a000c8a66/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0 h1:lffRgFiHXqxwf8lYNSXXeOZdOgAIOabGwOSwdttqCn0=
github.com/nats-io/nats.go v1.11.1-0.20210819195927-9053aa4200f0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nkeys v0.2.0/go.mod h1:XdZpAbhgyyODYqjTawOnIOI7VlbKSarI9Gfy1tqEu/s=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
Expand Down
138 changes: 131 additions & 7 deletions server/filestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ const (
purgeDir = "__msgs__"
// used to scan blk file names.
blkScan = "%d.blk"
// used for compacted blocks that are staged.
newScan = "%d.new"
// used to scan index file names.
indexScan = "%d.idx"
// used to load per subject meta information.
Expand Down Expand Up @@ -216,8 +218,12 @@ const (
defaultStreamBlockSize = 16 * 1024 * 1024 // 16MB
// Default for workqueue or interest based.
defaultOtherBlockSize = 8 * 1024 * 1024 // 8MB
// Default for KV based
defaultKVBlockSize = 8 * 1024 * 1024 // 8MB
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
// max block size for now.
maxBlockSize = defaultStreamBlockSize
// Compact minimum threshold.
compactMinimum = 2 * 1024 * 1024 // 2MB
// FileStoreMinBlkSize is minimum size we will do for a blk size.
FileStoreMinBlkSize = 32 * 1000 // 32kib
// FileStoreMaxBlkSize is maximum size we will do for a blk size.
Expand Down Expand Up @@ -495,7 +501,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e
ekey, err := ioutil.ReadFile(path.Join(mdir, fmt.Sprintf(keyScan, mb.index)))
if err != nil {
// We do not seem to have keys even though we should. Could be a plaintext conversion.
// Create the keys and we will doubel check below.
// Create the keys and we will double check below.
if err := fs.genEncryptionKeysForBlock(mb); err != nil {
return nil, err
}
Expand Down Expand Up @@ -635,7 +641,10 @@ func (fs *fileStore) rebuildState(ld *LostStreamData) {
func (mb *msgBlock) rebuildState() (*LostStreamData, error) {
mb.mu.Lock()
defer mb.mu.Unlock()
return mb.rebuildStateLocked()
}

func (mb *msgBlock) rebuildStateLocked() (*LostStreamData, error) {
startLastSeq := mb.last.seq

// Clear state we need to rebuild.
Expand Down Expand Up @@ -791,7 +800,6 @@ func (mb *msgBlock) rebuildState() (*LostStreamData, error) {

mb.msgs++
mb.bytes += uint64(rl)
mb.rbytes += uint64(rl)
derekcollison marked this conversation as resolved.
Show resolved Hide resolved

// Do per subject info.
if mb.fss != nil {
Expand Down Expand Up @@ -1812,12 +1820,23 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
}
}
} else {
// Out of order delete.
if mb.dmap == nil {
mb.dmap = make(map[uint64]struct{})
// Check if we are empty first, as long as not the last message block.
if isLast := mb != fs.lmb; isLast && mb.msgs == 0 {
fs.removeMsgBlock(mb)
firstSeqNeedsUpdate = seq == fs.state.FirstSeq
} else {
// Out of order delete.
shouldWriteIndex = true
if mb.dmap == nil {
mb.dmap = make(map[uint64]struct{})
}
mb.dmap[seq] = struct{}{}
}
mb.dmap[seq] = struct{}{}
shouldWriteIndex = true
}

// Check if <50% utilization and minimum size met.
if mb.rbytes > compactMinimum && mb.rbytes>>1 > mb.bytes {
mb.compact()
}

var qch, fch chan struct{}
Expand Down Expand Up @@ -1870,6 +1889,95 @@ func (fs *fileStore) removeMsg(seq uint64, secure, needFSLock bool) (bool, error
return true, nil
}

// This will compact and rewrite this block. This should only be called when we know we want to rewrite this block.
// This should not be called on the lmb since we will prune tail deleted messages which could cause issues with
// writing new messages. We will silently bail on any issues with the underlying block and let someone else detect.
// Write lock needs to be held.
func (mb *msgBlock) compact() {
if !mb.cacheAlreadyLoaded() {
if err := mb.loadMsgsWithLock(); err != nil {
return
}
}

buf := mb.cache.buf
nbuf := make([]byte, 0, len(buf))

var le = binary.LittleEndian
var firstSet bool

isDeleted := func(seq uint64) bool {
if seq == 0 || seq&ebit != 0 || seq < mb.first.seq {
return true
}
if mb.dmap != nil {
if _, ok := mb.dmap[seq]; ok {
return true
}
}
return false
}

for index, lbuf := uint32(0), uint32(len(buf)); index < lbuf; {
if index+msgHdrSize >= lbuf {
return
}
hdr := buf[index : index+msgHdrSize]
rl, slen := le.Uint32(hdr[0:]), le.Uint16(hdr[20:])
// Clear any headers bit that could be set.
rl &^= hbit
dlen := int(rl) - msgHdrSize
// Do some quick sanity checks here.
if dlen < 0 || int(slen) > dlen || dlen > int(rl) || rl > 32*1024*1024 || index+rl > lbuf {
return
}
// Only need to process non-deleted messages.
if seq := le.Uint64(hdr[4:]); !isDeleted(seq) {
// Normal message here.
nbuf = append(nbuf, buf[index:index+rl]...)
if !firstSet {
firstSet = true
mb.first.seq = seq
}
mb.last.seq = seq
}
// Advance to next record.
index += rl
}

// Check for encryption.
if mb.bek != nil && len(nbuf) > 0 {
// Recreate to reset counter.
rbek, err := chacha20.NewUnauthenticatedCipher(mb.seed, mb.nonce)
if err != nil {
return
}
rbek.XORKeyStream(nbuf, nbuf)
}

// Close FDs first.
mb.closeFDsLocked()

// We will write to a new file and mv/rename it in case of failure.
mfn := path.Join(path.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
defer os.Remove(mfn)
if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
return
}
os.Rename(mfn, mb.mfn)

// Close cache and open FDs and index file.
derekcollison marked this conversation as resolved.
Show resolved Hide resolved
mb.clearCacheAndOffset()
mb.removeIndexFileLocked()
mb.deleteDmap()
mb.rebuildStateLocked()
}

// Nil out our dmap.
func (mb *msgBlock) deleteDmap() {
mb.dmap = nil
}

// Grab info from a slot.
// Lock should be held.
func (mb *msgBlock) slotInfo(slot int) (uint32, uint32, bool, error) {
Expand Down Expand Up @@ -3290,6 +3398,18 @@ func (fs *fileStore) State() StreamState {
return state
}

func (fs *fileStore) Utilization() (total, reported uint64, err error) {
fs.mu.RLock()
defer fs.mu.RUnlock()
for _, mb := range fs.blks {
mb.mu.RLock()
reported += mb.bytes
total += mb.rbytes
mb.mu.RUnlock()
}
return total, reported, nil
}

const emptyRecordLen = 22 + 8

func fileStoreMsgSize(subj string, hdr, msg []byte) uint64 {
Expand Down Expand Up @@ -3880,6 +4000,10 @@ func (fs *fileStore) numMsgBlocks() int {
func (mb *msgBlock) removeIndexFile() {
mb.mu.RLock()
defer mb.mu.RUnlock()
mb.removeIndexFileLocked()
}

func (mb *msgBlock) removeIndexFileLocked() {
if mb.ifd != nil {
mb.ifd.Close()
mb.ifd = nil
Expand Down
138 changes: 138 additions & 0 deletions server/filestore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package server
import (
"archive/tar"
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"encoding/json"
Expand Down Expand Up @@ -3162,3 +3164,139 @@ func TestFileStoreExpireMsgsOnStart(t *testing.T) {
checkFiltered("orders.5", SimpleState{Msgs: 5, First: 55, Last: 95})
checkBlkState(0)
}

func TestFileStoreSparseCompaction(t *testing.T) {
storeDir := createDir(t, JetStreamStoreDir)
defer removeDir(t, storeDir)

cfg := StreamConfig{Name: "KV", Subjects: []string{"kv.>"}, Storage: FileStorage}
var fs *fileStore

fs, err := newFileStore(FileStoreConfig{StoreDir: storeDir, BlockSize: 1024 * 1024}, cfg)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

msg := bytes.Repeat([]byte("ABC"), 33) // ~100bytes
loadMsgs := func(n int) {
t.Helper()
for i := 1; i <= n; i++ {
if _, _, err := fs.StoreMsg(fmt.Sprintf("kv.%d", i%10), nil, msg); err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
}

checkState := func(msgs, first, last uint64) {
t.Helper()
if fs == nil {
t.Fatalf("No fs")
return
}
state := fs.State()
if state.Msgs != msgs {
t.Fatalf("Expected %d msgs, got %d", msgs, state.Msgs)
}
if state.FirstSeq != first {
t.Fatalf("Expected %d as first, got %d", first, state.FirstSeq)
}
if state.LastSeq != last {
t.Fatalf("Expected %d as last, got %d", last, state.LastSeq)
}
}

deleteMsgs := func(seqs ...uint64) {
t.Helper()
for _, seq := range seqs {
removed, err := fs.RemoveMsg(seq)
if err != nil || !removed {
t.Fatalf("Got an error on remove of %d: %v", seq, err)
}
}
}

eraseMsgs := func(seqs ...uint64) {
t.Helper()
for _, seq := range seqs {
removed, err := fs.EraseMsg(seq)
if err != nil || !removed {
t.Fatalf("Got an error on erase of %d: %v", seq, err)
}
}
}

compact := func() {
t.Helper()
var ssb, ssa StreamState
fs.FastState(&ssb)
tb, ub, _ := fs.Utilization()

fs.mu.RLock()
if len(fs.blks) == 0 {
t.Fatalf("No blocks?")
}
mb := fs.blks[0]
fs.mu.RUnlock()

mb.mu.Lock()
mb.compact()
mb.mu.Unlock()
fs.FastState(&ssa)
if !reflect.DeepEqual(ssb, ssa) {
t.Fatalf("States do not match; %+v vs %+v", ssb, ssa)
}
ta, ua, _ := fs.Utilization()
if ub != ua {
t.Fatalf("Expected used to be the same, got %d vs %d", ub, ua)
}
if ta >= tb {
t.Fatalf("Expected total after to be less then before, got %d vs %d", tb, ta)
}
if ta != ua {
t.Fatalf("Expected compact to make total and used same, got %d vs %d", ta, ua)
}
}

// Actual testing here.
loadMsgs(1000)
checkState(1000, 1, 1000)

// Now delete a few messages.
deleteMsgs(1)
compact()

deleteMsgs(1000, 999, 998, 997)
compact()

eraseMsgs(500, 502, 504, 506, 508, 510)
compact()

// Now test encrypted mode.
fs.Delete()

prf := func(context []byte) ([]byte, error) {
h := hmac.New(sha256.New, []byte("dlc22"))
if _, err := h.Write(context); err != nil {
return nil, err
}
return h.Sum(nil), nil
}

fs, err = newFileStoreWithCreated(FileStoreConfig{StoreDir: storeDir, BlockSize: 1024 * 1024}, cfg, time.Now(), prf)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

loadMsgs(1000)
checkState(1000, 1, 1000)

// Now delete a few messages.
deleteMsgs(1)
compact()

deleteMsgs(1000, 999, 998, 997)
compact()

eraseMsgs(500, 502, 504, 506, 508, 510)
compact()
}
6 changes: 6 additions & 0 deletions server/memstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,12 @@ func (ms *memStore) State() StreamState {
return state
}

func (ms *memStore) Utilization() (total, reported uint64, err error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
return ms.state.Bytes, ms.state.Bytes, nil
}

func memStoreMsgSize(subj string, hdr, msg []byte) uint64 {
return uint64(len(subj) + len(hdr) + len(msg) + 16) // 8*2 for seq + age
}
Expand Down
Loading