From a8fc4a66e7718add60d18fee285098cdcc0c15f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 17 Jul 2020 14:22:37 +0100 Subject: [PATCH] wip in-memory journal. --- chain/events/events_height.go | 6 +- chain/store/store.go | 17 ++- cmd/lotus-seal-worker/main.go | 1 + cmd/lotus-storage-miner/main.go | 1 + journal/events.go | 16 +++ journal/filesystem.go | 131 +++++++++++++++++ journal/journal.go | 148 -------------------- journal/memory.go | 239 ++++++++++++++++++++++++++++++++ journal/memory_test.go | 178 ++++++++++++++++++++++++ journal/types.go | 60 ++++++++ node/builder.go | 6 +- node/modules/core.go | 6 - storage/wdpost_run.go | 2 +- 13 files changed, 643 insertions(+), 168 deletions(-) create mode 100644 journal/events.go create mode 100644 journal/filesystem.go delete mode 100644 journal/journal.go create mode 100644 journal/memory.go create mode 100644 journal/memory_test.go create mode 100644 journal/types.go diff --git a/chain/events/events_height.go b/chain/events/events_height.go index 24d758a3156..b419f06d279 100644 --- a/chain/events/events_height.go +++ b/chain/events/events_height.go @@ -26,7 +26,6 @@ type heightEvents struct { } func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { - ctx, span := trace.StartSpan(e.ctx, "events.HeightHeadChange") defer span.End() span.AddAttributes(trace.Int64Attribute("endHeight", int64(app[0].Height()))) @@ -144,12 +143,11 @@ func (e *heightEvents) headChangeAt(rev, app []*types.TipSet) error { } // ChainAt invokes the specified `HeightHandler` when the chain reaches the -// specified height+confidence threshold. If the chain is rolled-back under the -// specified height, `RevertHandler` will be called. +// specified height+confidence threshold. If the chain is rolled-back under the +// specified height, `RevertHandler` will be called. // // ts passed to handlers is the tipset at the specified, or above, if lower tipsets were null func (e *heightEvents) ChainAt(hnd HeightHandler, rev RevertHandler, confidence int, h abi.ChainEpoch) error { - e.lk.Lock() // Tricky locking, check your locks if you modify this function! bestH := e.tsc.best().Height() diff --git a/chain/store/store.go b/chain/store/store.go index fa31f6b081e..43e1b295c85 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -85,9 +85,10 @@ type ChainStore struct { tsCache *lru.ARCCache vmcalls runtime.Syscalls + journal journal.Journal } -func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Syscalls) *ChainStore { +func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Syscalls, journal journal.Journal) *ChainStore { c, _ := lru.NewARC(2048) tsc, _ := lru.NewARC(4096) cs := &ChainStore{ @@ -98,6 +99,7 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls runtime.Sys mmCache: c, tsCache: tsc, vmcalls: vmcalls, + journal: journal, } ci := NewChainIndex(cs.LoadTipSet) @@ -326,12 +328,13 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo continue } - journal.Add("sync", map[string]interface{}{ - "op": "headChange", - "from": r.old.Key(), - "to": r.new.Key(), - "rev": len(revert), - "apply": len(apply), + cs.journal.AddEntry("sync", "head_change", map[string]interface{}{ + "from": r.old.Key(), + "from_height": r.old.Height(), + "to": r.new.Key(), + "to_height": r.new.Height(), + "rev_cnt": len(revert), + "apply_cnt": len(apply), }) // reverse the apply array diff --git a/cmd/lotus-seal-worker/main.go b/cmd/lotus-seal-worker/main.go index bf0b420f0c7..805777e7a45 100644 --- a/cmd/lotus-seal-worker/main.go +++ b/cmd/lotus-seal-worker/main.go @@ -37,6 +37,7 @@ import ( var log = logging.Logger("main") const FlagWorkerRepo = "worker-repo" + // TODO remove after deprecation period const FlagWorkerRepoDeprecation = "workerrepo" diff --git a/cmd/lotus-storage-miner/main.go b/cmd/lotus-storage-miner/main.go index 9576aae0b4f..5fec6ad225c 100644 --- a/cmd/lotus-storage-miner/main.go +++ b/cmd/lotus-storage-miner/main.go @@ -22,6 +22,7 @@ import ( var log = logging.Logger("main") const FlagMinerRepo = "miner-repo" + // TODO remove after deprecation period const FlagMinerRepoDeprecation = "storagerepo" diff --git a/journal/events.go b/journal/events.go new file mode 100644 index 00000000000..b6b5a15d403 --- /dev/null +++ b/journal/events.go @@ -0,0 +1,16 @@ +package journal + +import ( + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/lotus/chain/types" +) + +type HeadChangeEvt struct { + From types.TipSetKey `json:"from"` + FromHeight abi.ChainEpoch `json:"from_height"` + To types.TipSetKey `json:"to"` + ToHeight abi.ChainEpoch `json:"to_height"` + RevertCount int `json:"rev_cnt"` + ApplyCount int `json:"apply_cnt"` +} diff --git a/journal/filesystem.go b/journal/filesystem.go new file mode 100644 index 00000000000..d49f77ff97a --- /dev/null +++ b/journal/filesystem.go @@ -0,0 +1,131 @@ +package journal + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" + + logging "github.com/ipfs/go-log" + "go.uber.org/fx" + "golang.org/x/xerrors" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/node/repo" +) + +var log = logging.Logger("journal") + +// fsJournal is a basic journal backed by files on a filesystem. +type fsJournal struct { + disabledTracker + + dir string + sizeLimit int64 + + lk sync.Mutex + fi *os.File + fSize int64 + + incoming chan *Entry + + closing chan struct{} +} + +// OpenFSJournal constructs a rolling filesystem journal, with a default +// per-file size limit of 1GiB. +func OpenFSJournal(lr repo.LockedRepo, lc fx.Lifecycle, disabled []EventType) (Journal, error) { + dir := filepath.Join(lr.Path(), "journal") + if err := os.MkdirAll(dir, 0755); err != nil { + return nil, fmt.Errorf("failed to mk directory %s for file journal: %w", dir, err) + } + + f := &fsJournal{ + disabledTracker: newDisabledTracker(disabled), + dir: dir, + sizeLimit: 1 << 30, + incoming: make(chan *Entry, 32), + closing: make(chan struct{}), + } + + if err := f.rollJournalFile(); err != nil { + return nil, err + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { return f.Close() }, + }) + + go f.runLoop() + + return f, nil +} + +func (f *fsJournal) AddEntry(evtType EventType, obj interface{}) { + je := &Entry{ + EventType: evtType, + Timestamp: build.Clock.Now(), + Data: obj, + } + select { + case f.incoming <- je: + case <-f.closing: + log.Warnw("journal closed but tried to log event", "entry", je) + } +} + +func (f *fsJournal) Close() error { + close(f.closing) + return nil +} + +func (f *fsJournal) putEntry(je *Entry) error { + b, err := json.Marshal(je) + if err != nil { + return err + } + n, err := f.fi.Write(append(b, '\n')) + if err != nil { + return err + } + + f.fSize += int64(n) + + if f.fSize >= f.sizeLimit { + _ = f.rollJournalFile() + } + + return nil +} + +func (f *fsJournal) rollJournalFile() error { + if f.fi != nil { + _ = f.fi.Close() + } + + nfi, err := os.Create(filepath.Join(f.dir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(time.RFC3339)))) + if err != nil { + return xerrors.Errorf("failed to open journal file: %w", err) + } + + f.fi = nfi + f.fSize = 0 + return nil +} + +func (f *fsJournal) runLoop() { + for { + select { + case je := <-f.incoming: + if err := f.putEntry(je); err != nil { + log.Errorw("failed to write out journal entry", "entry", je, "err", err) + } + case <-f.closing: + _ = f.fi.Close() + return + } + } +} diff --git a/journal/journal.go b/journal/journal.go deleted file mode 100644 index 8d509d51ccd..00000000000 --- a/journal/journal.go +++ /dev/null @@ -1,148 +0,0 @@ -package journal - -import ( - "encoding/json" - "fmt" - "os" - "path/filepath" - "sync" - "time" - - logging "github.com/ipfs/go-log" - "golang.org/x/xerrors" - - "github.com/filecoin-project/lotus/build" -) - -func InitializeSystemJournal(dir string) error { - if err := os.MkdirAll(dir, 0755); err != nil { - return err - } - j, err := OpenFSJournal(dir) - if err != nil { - return err - } - currentJournal = j - return nil -} - -func Add(sys string, val interface{}) { - if currentJournal == nil { - log.Warn("no journal configured") - return - } - currentJournal.AddEntry(sys, val) -} - -var log = logging.Logger("journal") - -var currentJournal Journal - -type Journal interface { - AddEntry(system string, obj interface{}) - Close() error -} - -// fsJournal is a basic journal backed by files on a filesystem -type fsJournal struct { - fi *os.File - fSize int64 - - lk sync.Mutex - - journalDir string - - incoming chan *JournalEntry - journalSizeLimit int64 - - closing chan struct{} -} - -func OpenFSJournal(dir string) (*fsJournal, error) { - fsj := &fsJournal{ - journalDir: dir, - incoming: make(chan *JournalEntry, 32), - journalSizeLimit: 1 << 30, - closing: make(chan struct{}), - } - - if err := fsj.rollJournalFile(); err != nil { - return nil, err - } - - go fsj.runLoop() - - return fsj, nil -} - -type JournalEntry struct { - System string - Timestamp time.Time - Val interface{} -} - -func (fsj *fsJournal) putEntry(je *JournalEntry) error { - b, err := json.Marshal(je) - if err != nil { - return err - } - n, err := fsj.fi.Write(append(b, '\n')) - if err != nil { - return err - } - - fsj.fSize += int64(n) - - if fsj.fSize >= fsj.journalSizeLimit { - fsj.rollJournalFile() - } - - return nil -} - -func (fsj *fsJournal) rollJournalFile() error { - if fsj.fi != nil { - fsj.fi.Close() - } - - nfi, err := os.Create(filepath.Join(fsj.journalDir, fmt.Sprintf("lotus-journal-%s.ndjson", build.Clock.Now().Format(time.RFC3339)))) - if err != nil { - return xerrors.Errorf("failed to open journal file: %w", err) - } - - fsj.fi = nfi - fsj.fSize = 0 - return nil -} - -func (fsj *fsJournal) runLoop() { - for { - select { - case je := <-fsj.incoming: - if err := fsj.putEntry(je); err != nil { - log.Errorw("failed to write out journal entry", "entry", je, "err", err) - } - case <-fsj.closing: - fsj.fi.Close() - return - } - } -} - -func (fsj *fsJournal) AddEntry(system string, obj interface{}) { - je := &JournalEntry{ - System: system, - Timestamp: build.Clock.Now(), - Val: obj, - } - select { - case fsj.incoming <- je: - case <-fsj.closing: - log.Warnw("journal closed but tried to log event", "entry", je) - } -} - -func (fsj *fsJournal) Close() error { - close(fsj.closing) - return nil -} diff --git a/journal/memory.go b/journal/memory.go new file mode 100644 index 00000000000..48372263553 --- /dev/null +++ b/journal/memory.go @@ -0,0 +1,239 @@ +package journal + +import ( + "context" + "sync/atomic" + + "go.uber.org/fx" + + "github.com/filecoin-project/lotus/build" +) + +// Control messages. +type ( + clearCtrl struct{} + addObserverCtrl struct { + observer *observer + replay bool + } + rmObserverCtrl *observer + getEntriesCtrl chan []*Entry +) + +type MemJournal struct { + disabledTracker + + entries []*Entry + index map[string]map[string][]*Entry + observers []observer + + incomingCh chan *Entry + controlCh chan interface{} + + state int32 // guarded by atomic; 0=closed, 1=running. + closed chan struct{} +} + +var _ Journal = (*MemJournal)(nil) + +type observer struct { + accept map[EventType]struct{} + ch chan *Entry +} + +func (o *observer) dispatch(entry *Entry) { + if o.accept == nil { + o.ch <- entry + } + if _, ok := o.accept[entry.EventType]; ok { + o.ch <- entry + } +} + +func NewMemoryJournal(lc fx.Lifecycle, disabled []EventType) *MemJournal { + m := &MemJournal{ + disabledTracker: newDisabledTracker(disabled), + + index: make(map[string]map[string][]*Entry, 16), + observers: make([]observer, 0, 16), + incomingCh: make(chan *Entry, 256), + controlCh: make(chan interface{}, 16), + state: 1, + closed: make(chan struct{}), + } + + lc.Append(fx.Hook{ + OnStop: func(_ context.Context) error { return m.Close() }, + }) + + go m.process() + + return m +} + +func (m *MemJournal) AddEntry(evtType EventType, obj interface{}) { + entry := &Entry{ + EventType: evtType, + Timestamp: build.Clock.Now(), + Data: obj, + } + + select { + case m.incomingCh <- entry: + case <-m.closed: + } +} + +func (m *MemJournal) Close() error { + if !atomic.CompareAndSwapInt32(&m.state, 1, 0) { + // already closed. + return nil + } + close(m.closed) + return nil +} + +func (m *MemJournal) Clear() { + select { + case m.controlCh <- clearCtrl{}: + case <-m.closed: + } +} + +// Observe starts observing events that are recorded in the MemJournal, and +// returns a channel where new events will be sent. When replay is true, all +// entries that have been recorded prior to the observer being registered will +// be replayed. To restrict the event types this observer will sent, use the +// include argument. If no include set is passed, the observer will receive all +// events types. +func (m *MemJournal) Observe(ctx context.Context, replay bool, include ...EventType) <-chan *Entry { + var acc map[EventType]struct{} + if include != nil { + acc = make(map[EventType]struct{}, 16) + for _, et := range include { + acc[et] = struct{}{} + } + } + + ch := make(chan *Entry, 256) + o := &observer{ + accept: acc, + ch: ch, + } + + // watch the context, and fire the "remove observer" control message upon + // cancellation. + go func() { + <-ctx.Done() + select { + case m.controlCh <- rmObserverCtrl(o): + case <-m.closed: + } + }() + + select { + case m.controlCh <- addObserverCtrl{o, replay}: + case <-m.closed: + // we are already stopped. + close(ch) + } + + return ch +} + +// Entries gets a snapshot of stored entries. +func (m *MemJournal) Entries() []*Entry { + ch := make(chan []*Entry) + m.controlCh <- getEntriesCtrl(ch) + return <-ch +} + +func (m *MemJournal) process() { + processCtrlMsg := func(message interface{}) { + switch msg := message.(type) { + case addObserverCtrl: + // adding an observer. + m.observers = append(m.observers, *msg.observer) + + if msg.replay { + // replay all existing entries. + for _, e := range m.entries { + msg.observer.dispatch(e) + } + } + case rmObserverCtrl: + // removing an observer; find the observer, close its channel. + // then discard it from our list by replacing it with the last + // observer and reslicing. + for i, o := range m.observers { + if o.ch == msg.ch { + close(o.ch) + m.observers[i] = m.observers[len(m.observers)-1] + m.observers = m.observers[:len(m.observers)-1] + } + } + case clearCtrl: + m.entries = m.entries[0:0] + // carry over system and event names; there are unlikely to change; + // just reslice the entry slices, so we are not thrashing memory. + for _, events := range m.index { + for ev := range events { + events[ev] = events[ev][0:0] + } + } + case getEntriesCtrl: + cpy := make([]*Entry, len(m.entries)) + copy(cpy, m.entries) + msg <- cpy + close(msg) + } + } + + processClose := func() { + m.entries = nil + m.index = make(map[string]map[string][]*Entry, 16) + for _, o := range m.observers { + close(o.ch) + } + m.observers = nil + } + + for { + // Drain all control messages first! + select { + case msg := <-m.controlCh: + processCtrlMsg(msg) + continue + case <-m.closed: + processClose() + return + default: + } + + // Now consume and pipe messages. + select { + case entry := <-m.incomingCh: + m.entries = append(m.entries, entry) + events := m.index[entry.System] + if events == nil { + events = make(map[string][]*Entry, 16) + m.index[entry.System] = events + } + + entries := events[entry.Event] + events[entry.Event] = append(entries, entry) + + for _, o := range m.observers { + o.dispatch(entry) + } + + case msg := <-m.controlCh: + processCtrlMsg(msg) + continue + + case <-m.closed: + processClose() + return + } + } +} diff --git a/journal/memory_test.go b/journal/memory_test.go new file mode 100644 index 00000000000..10b3a90cbe6 --- /dev/null +++ b/journal/memory_test.go @@ -0,0 +1,178 @@ +package journal + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/types" + + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/raulk/clock" + "github.com/stretchr/testify/require" + "go.uber.org/fx/fxtest" +) + +func TestMemJournal_AddEntry(t *testing.T) { + lc := fxtest.NewLifecycle(t) + defer lc.RequireStop() + + clk := clock.NewMock() + build.Clock = clk + + journal := NewMemoryJournal(lc, nil) + addEntries(journal, 100) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) + + entries := journal.Entries() + cnt := make(map[string]int, 10) + for i, e := range entries { + require.EqualValues(t, "spaceship", e.System) + require.Equal(t, HeadChangeEvt{ + From: types.TipSetKey{}, + FromHeight: abi.ChainEpoch(i), + To: types.TipSetKey{}, + ToHeight: abi.ChainEpoch(i), + RevertCount: i, + ApplyCount: i, + }, e.Data) + require.Equal(t, build.Clock.Now(), e.Timestamp) + cnt[e.Event]++ + } + + // we received 10 entries of each event type. + for _, c := range cnt { + require.Equal(t, 10, c) + } +} + +func TestMemJournal_Close(t *testing.T) { + lc := fxtest.NewLifecycle(t) + defer lc.RequireStop() + + journal := NewMemoryJournal(lc, nil) + addEntries(journal, 100) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) + + o1 := journal.Observe(context.TODO(), false) + o2 := journal.Observe(context.TODO(), false) + o3 := journal.Observe(context.TODO(), false) + + time.Sleep(500 * time.Millisecond) + + // Close the journal. + require.NoError(t, journal.Close()) + + time.Sleep(500 * time.Millisecond) + +NextChannel: + for _, ch := range []<-chan *Entry{o1, o2, o3} { + for { + select { + case _, more := <-ch: + if more { + // keep consuming + } else { + continue NextChannel + } + default: + t.Fatal("nothing more to consume, and channel is not closed") + } + } + } +} + +func TestMemJournal_Clear(t *testing.T) { + lc := fxtest.NewLifecycle(t) + defer lc.RequireStop() + + journal := NewMemoryJournal(lc, nil) + addEntries(journal, 100) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) + + journal.Clear() + require.Empty(t, journal.Entries()) + require.Empty(t, journal.Entries()) + require.Empty(t, journal.Entries()) +} + +func TestMemJournal_Observe(t *testing.T) { + lc := fxtest.NewLifecycle(t) + defer lc.RequireStop() + + journal := NewMemoryJournal(lc, nil) + addEntries(journal, 100) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) + + o1 := journal.Observe(context.TODO(), false, EventType{"spaceship", "wheezing-1"}) + o2 := journal.Observe(context.TODO(), true, EventType{"spaceship", "wheezing-1"}, EventType{"spaceship", "wheezing-2"}) + o3 := journal.Observe(context.TODO(), true) + + time.Sleep(1 * time.Second) + + require.Len(t, o1, 0) // no replay + require.Len(t, o2, 20) // replay with include set + require.Len(t, o3, 100) // replay with no include set (all entries) + + // add another 100 entries and assert what the observers have seen. + addEntries(journal, 100) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 200 }, 1*time.Second, 100*time.Millisecond) + + // note: we're able to queue items because the observer channel buffer size is 256. + require.Len(t, o1, 10) // should have 0 old entries + 10 new entries + require.Len(t, o2, 40) // should have 20 old entries + 20 new entries + require.Len(t, o3, 200) // should have 100 old entries + 100 new entries +} + +func TestMemJournal_ObserverCancellation(t *testing.T) { + lc := fxtest.NewLifecycle(t) + defer lc.RequireStop() + + journal := NewMemoryJournal(lc, nil) + + ctx, cancel := context.WithCancel(context.TODO()) + o1 := journal.Observe(ctx, false) + o2 := journal.Observe(context.TODO(), false) + addEntries(journal, 100) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 100 }, 1*time.Second, 100*time.Millisecond) + + // all observers have received the 100 entries. + require.Len(t, o1, 100) + require.Len(t, o2, 100) + + // cancel o1's context. + cancel() + time.Sleep(500 * time.Millisecond) + + // add 50 new entries + addEntries(journal, 50) + + require.Eventually(t, func() bool { return len(journal.Entries()) == 150 }, 1*time.Second, 100*time.Millisecond) + + require.Len(t, o1, 100) // has not moved. + require.Len(t, o2, 150) // should have 100 old entries + 50 new entries + +} + +func addEntries(journal *MemJournal, count int) { + for i := 0; i < count; i++ { + eventIdx := i % 10 + journal.AddEntry(EventType{"spaceship", fmt.Sprintf("wheezing-%d", eventIdx)}, HeadChangeEvt{ + From: types.TipSetKey{}, + FromHeight: abi.ChainEpoch(i), + To: types.TipSetKey{}, + ToHeight: abi.ChainEpoch(i), + RevertCount: i, + ApplyCount: i, + }) + } +} diff --git a/journal/types.go b/journal/types.go new file mode 100644 index 00000000000..d8c897cfada --- /dev/null +++ b/journal/types.go @@ -0,0 +1,60 @@ +package journal + +import "time" + +// EventType represents the signature of an event. +type EventType struct { + System string + Event string +} + +// Journal represents an audit trail of system actions. +// +// Every entry is tagged with a timestamp, a system name, and an event name. +// The supplied data can be any type, as long as it is JSON serializable, +// including structs, map[string]interface{}, or primitive types. +// +// For cleanliness and type safety, we recommend to use typed events. See the +// *Evt struct types in this package for more info. +type Journal interface { + // IsEnabled allows components to check if a given event type is enabled. + // All event types are enabled by default, and specific event types can only + // be disabled at construction type. Components are advised to check if the + // journal event types they record are enabled as soon as possible, and hold + // on to the answer all throughout the lifetime of the process. + IsEnabled(evtType EventType) bool + + // AddEntry adds an entry to this journal. See godocs on the Journal type + // for more info. + AddEntry(evtType EventType, data interface{}) + + // Close closes this journal for further writing. + Close() error +} + +// Entry represents a journal entry. +// +// See godocs on Journal for more information. +type Entry struct { + EventType + + Timestamp time.Time + Data interface{} +} + +// disabledTracker is an embeddable mixin that takes care of tracking disabled +// event types. +type disabledTracker map[EventType]struct{} + +func newDisabledTracker(disabled []EventType) disabledTracker { + dis := make(map[EventType]struct{}, len(disabled)) + for _, et := range disabled { + dis[et] = struct{}{} + } + return dis +} + +func (d disabledTracker) IsEnabled(evtType EventType) bool { + _, ok := d[evtType] + return !ok +} diff --git a/node/builder.go b/node/builder.go index 780497dba7c..90b150a8a47 100644 --- a/node/builder.go +++ b/node/builder.go @@ -41,6 +41,7 @@ import ( "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/vm" "github.com/filecoin-project/lotus/chain/wallet" + "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/peermgr" _ "github.com/filecoin-project/lotus/lib/sigs/bls" _ "github.com/filecoin-project/lotus/lib/sigs/secp" @@ -119,7 +120,7 @@ const ( ExtractApiKey HeadMetricsKey RunPeerTaggerKey - JournalKey + InitJournalKey SetApiEndpointKey @@ -151,8 +152,9 @@ func defaults() []Option { Override(new(record.Validator), modules.RecordValidator), Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(false)), Override(new(dtypes.ShutdownChan), make(chan struct{})), - Override(JournalKey, modules.SetupJournal), + Override(new(journal.Journal), journal.OpenFSJournal), + Override(InitJournalKey, func(j *journal.Journal) { /* forces the creation of the journal at startup */ }), // Filecoin modules } diff --git a/node/modules/core.go b/node/modules/core.go index 84179bd637c..ca9872d90cb 100644 --- a/node/modules/core.go +++ b/node/modules/core.go @@ -6,7 +6,6 @@ import ( "errors" "io" "io/ioutil" - "path/filepath" "github.com/gbrlsnchs/jwt/v3" logging "github.com/ipfs/go-log/v2" @@ -19,7 +18,6 @@ import ( "github.com/filecoin-project/lotus/api/apistruct" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/addrutil" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/repo" @@ -95,7 +93,3 @@ func BuiltinBootstrap() (dtypes.BootstrapPeers, error) { func DrandBootstrap() (dtypes.DrandBootstrap, error) { return build.DrandBootstrap() } - -func SetupJournal(lr repo.LockedRepo) error { - return journal.InitializeSystemJournal(filepath.Join(lr.Path(), "journal")) -} diff --git a/storage/wdpost_run.go b/storage/wdpost_run.go index 8e404610c16..66bb0d53e95 100644 --- a/storage/wdpost_run.go +++ b/storage/wdpost_run.go @@ -256,7 +256,7 @@ func (s *WindowPoStScheduler) checkNextFaults(ctx context.Context, deadline uint Method: builtin.MethodsMiner.DeclareFaults, Params: enc, Value: types.NewInt(0), // TODO: Is there a fee? - GasLimit: 100_000_000, // i dont know help + GasLimit: 100_000_000, // i dont know help GasPrice: types.NewInt(2), }