Skip to content

Commit

Permalink
wip in-memory journal.
Browse files Browse the repository at this point in the history
  • Loading branch information
raulk committed Jul 17, 2020
1 parent 6092854 commit a8fc4a6
Show file tree
Hide file tree
Showing 13 changed files with 643 additions and 168 deletions.
6 changes: 2 additions & 4 deletions chain/events/events_height.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ type heightEvents struct {
}

func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {

ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange")
defer span.End()
span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height())))
Expand Down Expand Up @@ -144,12 +143,11 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error {
}

// ChainAt invokes the specified `HeightHandler` when the chain reaches the
// specified height+confidence threshold. If the chain is rolled-back under the
// specified height, `RevertHandler` will be called.
// specified height+confidence threshold. If the chain is rolled-back under the
// specified height, `RevertHandler` will be called.
//
// ts passed to handlers is the tipset at the specified, or above, if lower tipsets were null
func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error {

e.lk.Lock() // Tricky locking, check your locks if you modify this function!

bestH := e.tsc.best().Height()
Expand Down
17 changes: 10 additions & 7 deletions chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,10 @@ type ChainStore struct {
tsCache *lru.ARCCache

vmcalls runtime.Syscalls
journal journal.Journal
}

func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Syscalls) *ChainStore {
func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Syscalls, journal journal.Journal) *ChainStore {
c, _ := lru.NewARC(2048)
tsc, _ := lru.NewARC(4096)
cs := &ChainStore{
Expand All @@ -98,6 +99,7 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Sys
mmCache: c,
tsCache: tsc,
vmcalls: vmcalls,
journal: journal,
}

ci := NewChainIndex(cs.LoadTipSet)
Expand Down Expand Up @@ -326,12 +328,13 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
continue
}

journal.Add("sync", map[string]interface{}{
"op": "headChange",
"from": r.old.Key(),
"to": r.new.Key(),
"rev": len(revert),
"apply": len(apply),
cs.journal.AddEntry("sync", "head_change", map[string]interface{}{
"from": r.old.Key(),
"from_height": r.old.Height(),
"to": r.new.Key(),
"to_height": r.new.Height(),
"rev_cnt": len(revert),
"apply_cnt": len(apply),
})

// reverse the apply array
Expand Down
1 change: 1 addition & 0 deletions cmd/lotus-seal-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
var log = logging.Logger("main")

const FlagWorkerRepo = "worker-repo"

// TODO remove after deprecation period
const FlagWorkerRepoDeprecation = "workerrepo"

Expand Down
1 change: 1 addition & 0 deletions cmd/lotus-storage-miner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
var log = logging.Logger("main")

const FlagMinerRepo = "miner-repo"

// TODO remove after deprecation period
const FlagMinerRepoDeprecation = "storagerepo"

Expand Down
16 changes: 16 additions & 0 deletions journal/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package journal

import (
"github.com/filecoin-project/specs-actors/actors/abi"

"github.com/filecoin-project/lotus/chain/types"
)

type HeadChangeEvt struct {
From types.TipSetKey `json:"from"`
FromHeight abi.ChainEpoch `json:"from_height"`
To types.TipSetKey `json:"to"`
ToHeight abi.ChainEpoch `json:"to_height"`
RevertCount int `json:"rev_cnt"`
ApplyCount int `json:"apply_cnt"`
}
131 changes: 131 additions & 0 deletions journal/filesystem.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package journal

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"

logging "github.com/ipfs/go-log"
"go.uber.org/fx"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/node/repo"
)

var log = logging.Logger("journal")

// fsJournal is a basic journal backed by files on a filesystem.
type fsJournal struct {
disabledTracker

dir string
sizeLimit int64

lk sync.Mutex
fi *os.File
fSize int64

incoming chan *Entry

closing chan struct{}
}

// OpenFSJournal constructs a rolling filesystem journal, with a default
// per-file size limit of 1GiB.
func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled []EventType) (Journal, error) {
dir := filepath.Join(lr.Path(), "journal")
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err)
}

f := &fsJournal{
disabledTracker: newDisabledTracker(disabled),
dir: dir,
sizeLimit: 1 << 30,
incoming: make(chan *Entry, 32),
closing: make(chan struct{}),
}

if err := f.rollJournalFile(); err != nil {
return nil, err
}

lc.Append(fx.Hook{
OnStop: func(_ context.Context) error { return f.Close() },
})

go f.runLoop()

return f, nil
}

func (f *fsJournal) AddEntry(evtType EventType, obj interface{}) {
je := &Entry{
EventType: evtType,
Timestamp: build.Clock.Now(),
Data: obj,
}
select {
case f.incoming <- je:
case <-f.closing:
log.Warnw("journal closed but tried to log event", "entry", je)
}
}

func (f *fsJournal) Close() error {
close(f.closing)
return nil
}

func (f *fsJournal) putEntry(je *Entry) error {
b, err := json.Marshal(je)
if err != nil {
return err
}
n, err := f.fi.Write(append(b, '\n'))
if err != nil {
return err
}

f.fSize += int64(n)

if f.fSize >= f.sizeLimit {
_ = f.rollJournalFile()
}

return nil
}

func (f *fsJournal) rollJournalFile() error {
if f.fi != nil {
_ = f.fi.Close()
}

nfi, err := os.Create(filepath.Join(f.dir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(time.RFC3339))))
if err != nil {
return xerrors.Errorf("failed to open journal file: %w", err)
}

f.fi = nfi
f.fSize = 0
return nil
}

func (f *fsJournal) runLoop() {
for {
select {
case je := <-f.incoming:
if err := f.putEntry(je); err != nil {
log.Errorw("failed to write out journal entry", "entry", je, "err", err)
}
case <-f.closing:
_ = f.fi.Close()
return
}
}
}
148 changes: 0 additions & 148 deletions journal/journal.go

This file was deleted.

Loading

0 comments on commit a8fc4a6

Please sign in to comment.