Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

古い署名を削除 & Streamで同期しない #32

Merged
merged 42 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
8315d93
introduce formatting of proto
tak1827 Jun 25, 2024
fce1f2c
submitter: clean sig
tak1827 Jun 25, 2024
2f382f6
p2p: no open stream when received published sig
tak1827 Jun 25, 2024
8f074ee
during startup, no start submitter, just start when new verse discroverd
tak1827 Jun 25, 2024
a42cd9a
verse: get nextIndex with confirmation
tak1827 Jun 25, 2024
a597114
verifier: clearn up old sigs
tak1827 Jun 26, 2024
af58440
update config
tak1827 Jun 26, 2024
c6e1c5b
fix config test error
tak1827 Jun 26, 2024
9e434ac
correctly handle http close error on msvr and psvr
tak1827 Jul 2, 2024
8c7f4b8
verse discovery: immediate discorver at first tick, improve loging, v…
tak1827 Jul 2, 2024
c2a7984
verse discovery: add inital endpoint reachable check
tak1827 Jul 2, 2024
6e61aed
fix disk optimiser ticker mis config
tak1827 Jul 2, 2024
123fe1b
verse discovery: synchronously try the first discovery
tak1827 Jul 2, 2024
2b84d3d
logging verifer/submitter added verse
tak1827 Jul 2, 2024
ad8dc8f
handle already verified index
tak1827 Jul 2, 2024
8afb839
fix config test by optinally validating application level validation
tak1827 Jul 2, 2024
45aadec
log when already index is verified
tak1827 Jul 3, 2024
11b3733
replace ioutil to io
tak1827 Jul 3, 2024
993a389
disable block / event collector
tak1827 Jul 5, 2024
0cb3f17
verifier: directly feth events, no from database as the collecters ar…
tak1827 Jul 5, 2024
d19bc8d
p2p: stop latest sig publish loop
tak1827 Jul 5, 2024
9898da3
submitter: include chainid to log
tak1827 Jul 5, 2024
cc991f6
verifier: fix publishing issue, missing contract, p2p node is the cor…
tak1827 Jul 6, 2024
a02d22f
update start to follow up verifier submitter update
tak1827 Jul 6, 2024
1d54141
accept published old sigs to receive
tak1827 Jul 6, 2024
c20a882
submitter: improve log
tak1827 Jul 6, 2024
2f18ebd
config: add start block offset
tak1827 Jul 7, 2024
a42693a
verifier: fix timeout error at the first log fetch
tak1827 Jul 7, 2024
a6aacde
verifier: filter log by contract address
tak1827 Jul 7, 2024
134ac1f
verifier: fix missing by contract address filtering when fetch log
tak1827 Jul 7, 2024
496d6e2
Merge branch 'feat/reduce-goroutine' into feat/no-stream-and-clean-sig
tak1827 Jul 16, 2024
a6054fe
increment version
tak1827 Jul 16, 2024
6221369
verifier: log improve
tak1827 Jul 16, 2024
f6592c7
apply feedback fro ironbeer
tak1827 Jul 16, 2024
d3d234f
respond feedback from ironbeer #32
tak1827 Jul 17, 2024
82478a7
delete unused code and mark deplicated
tak1827 Jul 17, 2024
2d634de
node: delete publishLoop and meterLoop
tak1827 Jul 17, 2024
993f216
fix feadback from ironbeer #32
tak1827 Jul 17, 2024
e3b28b6
verifier: order by batch_index when publish unverifierd logs
tak1827 Jul 17, 2024
be044cc
improve log
tak1827 Jul 18, 2024
1401447
verifier: fix skip iter block
tak1827 Jul 18, 2024
1a6e7dc
verifier: retry failed log 1h later
tak1827 Jul 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
build:
go build -o bin/oasvlfy .

.PHONY: proto
proto:
protoc --go_out=./proto/p2p/v1 \
./proto/p2p/v1/*.proto

fmt:
go fmt ./...

fmtproto:
clang-format -i ./proto/p2p/**/*.proto

test:
go test -v ./...
10 changes: 5 additions & 5 deletions cmd/config_loader.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package cmd

import (
"io/ioutil"
"os"
"strings"

"github.com/oasysgames/oasys-optimism-verifier/config"
Expand Down Expand Up @@ -139,20 +139,20 @@ func mustNewConfigLoader(cmd *cobra.Command) *configLoader {
return opts
}

func (opts *configLoader) load() (*config.Config, error) {
func (opts *configLoader) load(enableStrictValidation bool) (*config.Config, error) {
// load config from the file
if !opts.fromCli {
path, err := opts.cmd.Flags().GetString(fileConfigFlag)
if err != nil {
return nil, err
}

input, err := ioutil.ReadFile(path)
input, err := os.ReadFile(path)
if err != nil {
return nil, err
}

conf, err := config.NewConfig(input)
conf, err := config.NewConfig(input, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func (opts *configLoader) load() (*config.Config, error) {
}
}

if err := config.Validate(opts.cfg); err != nil {
if err := config.Validate(opts.cfg, enableStrictValidation); err != nil {
return nil, err
}

Expand Down
15 changes: 9 additions & 6 deletions cmd/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ func (s *ConfigLoaderTestSuite) TestLoadConfigFromYAML() {
address: '0xD244F03CA3e99C6093f6cBEFBD2f4508244C59D4'
password: %s
plain: '0xebf3a7f5f805e02c0bbbd599acd5c881f40db22caa95127d4bf48e2dde5fd7bb'

hub_layer:
chain_id: 1
rpc: https://rpc.hub.example.com/

verse_layer:
discovery:
endpoint: https://discovery.example.com/
Expand All @@ -71,7 +71,7 @@ func (s *ConfigLoaderTestSuite) TestLoadConfigFromYAML() {
l1_contracts:
StateCommitmentChain: '0x01E901F3c65fA7CBd4505F5eF3A88e4ce432e4B5'
L2OutputOracle: '0x2489317FA6e003550111D5D196302Ba0879354e2'

p2p:
listens:
- listen0
Expand All @@ -92,7 +92,7 @@ func (s *ConfigLoaderTestSuite) TestLoadConfigFromYAML() {
verifier:
enable: true
wallet: verifier

submitter:
enable: true
confirmations: 10
Expand Down Expand Up @@ -123,7 +123,7 @@ func (s *ConfigLoaderTestSuite) TestLoadConfigFromYAML() {
})
rootCmd.Execute()

got, _ := globalConfigLoader.load()
got, _ := globalConfigLoader.load(false)
s.Equal(want, got)
}

Expand Down Expand Up @@ -227,7 +227,8 @@ func (s *ConfigLoaderTestSuite) executeWithCliArgs(appendArgs []string) *config.
}, appendArgs...))
cmd.Execute()

conf, _ := opts.load()
conf, err := opts.load(false)
s.Require().NoError(err)
return conf
}

Expand Down Expand Up @@ -336,6 +337,8 @@ func (s *ConfigLoaderTestSuite) configWithMinCliArgs() *config.Config {
StateCollectLimit: defaults["verifier.state_collect_limit"].(int),
StateCollectTimeout: defaults["verifier.state_collect_timeout"].(time.Duration),
OptimizeInterval: defaults["verifier.db_optimize_interval"].(time.Duration),
Confirmations: defaults["verifier.confirmations"].(int),
StartBlockOffset: defaults["verifier.start_block_offset"].(uint64),
},
Submitter: config.Submitter{
Enable: false,
Expand Down
2 changes: 1 addition & 1 deletion cmd/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ var pingCmd = &cobra.Command{
Short: "Send ping via P2P to specified peer",
Long: "Send ping via P2P to specified peer",
Run: func(cmd *cobra.Command, args []string) {
conf, err := globalConfigLoader.load()
conf, err := globalConfigLoader.load(true)
if err != nil {
util.Exit(1, "Failed to load configuration: %s\n", err)
}
Expand Down
75 changes: 52 additions & 23 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,9 @@ func runStartCmd(cmd *cobra.Command, args []string) {
s.smcache.RefreshLoop(ctx, time.Hour)
}()

// start workers
s.startCollector(ctx)
s.startVerifier(ctx)
s.startSubmitter(ctx)
// s.startCollector(ctx) -> Necessary event logs are directly fetched from the chain during verification
// s.startVerifier(ctx)-> Verificatin starts whenever the verse is discovered
// s.startSubmitter(ctx) -> Submission starts whenever the verse is discovered
s.startVerseDiscovery(ctx)
s.startBeacon(ctx)
log.Info("All workers started")
Expand Down Expand Up @@ -173,7 +172,7 @@ func mustNewServer(ctx context.Context) *server {
signers: map[string]ethutil.Signer{},
}

if s.conf, err = globalConfigLoader.load(); err != nil {
if s.conf, err = globalConfigLoader.load(true); err != nil {
log.Crit("Failed to load configuration", "err", err)
}

Expand Down Expand Up @@ -219,7 +218,10 @@ func (s *server) mustStartMetrics(ctx context.Context) {
go func() {
// NOTE: Don't add wait group, as no need to guarantee the completion
if err := metrics.ListenAndServe(ctx, s.msvr); err != nil {
log.Crit("Failed to start metrics server", "err", err)
// `ErrServerClosed` is thrown when `Shutdown` is intentionally called
if !errors.Is(err, http.ErrServerClosed) {
log.Crit("Failed to start metrics server", "err", err)
}
}
log.Info("Metrics server have exited listening", "addr", s.conf.Metrics.Listen)
}()
Expand All @@ -236,7 +238,10 @@ func (s *server) mustStartPprof(ctx context.Context) {
go func() {
// NOTE: Don't add wait group, as no need to guarantee the completion
if err := ps.ListenAndServe(ctx, s.psvr); err != nil {
log.Crit("Failed to start pprof server", "err", err)
// `ErrServerClosed` is thrown when `Shutdown` is intentionally called
if !errors.Is(err, http.ErrServerClosed) {
log.Crit("Failed to start pprof server", "err", err)
}
}
log.Info("pprof server have exited listening", "addr", s.conf.Debug.Pprof.Listen)
}()
Expand Down Expand Up @@ -297,8 +302,8 @@ func (s *server) mustStartP2P(ctx context.Context, ipc *ipc.IPCServer) {
go func() {
defer s.wg.Done()

s.p2p.Start(ctx)
log.Info("P2P node has stopped, decrement wait group")
enableSubscriber := s.conf.Submitter.Enable
s.p2p.Start(ctx, enableSubscriber)
}()
}

Expand Down Expand Up @@ -358,7 +363,7 @@ func (s *server) mustSetupVerifier() {
}

l1Signer := ethutil.NewSignableClient(new(big.Int).SetUint64(s.conf.HubLayer.ChainID), s.hub, signer)
s.verifier = verifier.NewVerifier(&s.conf.Verifier, s.db, l1Signer)
s.verifier = verifier.NewVerifier(&s.conf.Verifier, s.db, s.p2p, l1Signer)
}

func (s *server) startVerifier(ctx context.Context) {
Expand Down Expand Up @@ -444,42 +449,56 @@ func (s *server) startSubmitter(ctx context.Context) {
}

func (s *server) startVerseDiscovery(ctx context.Context) {
if len(s.conf.VerseLayer.Directs) != 0 {
// read verses from the configuration
s.verseDiscoveryHandler(ctx, s.conf.VerseLayer.Directs)
}

if s.conf.VerseLayer.Discovery.Endpoint == "" {
// read verses from the configuration only, if the discovery endpoint is not set
s.verseDiscoveryHandler(s.conf.VerseLayer.Directs)
// Disable dinamically discovered verses, if the endpoint is not set
return
}

// dinamically discovered verses
disc := config.NewVerseDiscovery(
disc, err := config.NewVerseDiscovery(
ctx,
http.DefaultClient,
s.conf.VerseLayer.Discovery.Endpoint,
s.conf.VerseLayer.Discovery.RefreshInterval,
)
if err != nil {
log.Crit("Failed to construct verse discovery", "err", err)
}

// Subscribed verses to verifier and submitter
discSub := disc.Subscribe(ctx)

// synchronously try the first discovery
if err := disc.Work(ctx); err != nil {
tak1827 marked this conversation as resolved.
Show resolved Hide resolved
// exit if the first discovery faild, because the following discovery highly likely fail
log.Crit("Failed to work verse discovery", "err", err)
}

s.wg.Add(1)
go func() {
defer func() {
defer s.wg.Done()
discSub.Cancel()
log.Info("Verse discovery has stopped, decrement wait group")
}()

discTick := time.NewTicker(s.conf.VerseLayer.Discovery.RefreshInterval)
defer discTick.Stop()

// Subscribed verses to verifier and submitter
sub := disc.Subscribe(ctx)
defer sub.Cancel()

log.Info("Verse discovery started", "endpoint", s.conf.VerseLayer.Discovery.Endpoint, "interval", s.conf.VerseLayer.Discovery.RefreshInterval)

for {
select {
case <-ctx.Done():
log.Info("Verse discovery stopped")
return
case verses := <-sub.Next():
s.verseDiscoveryHandler(verses)
case verses := <-discSub.Next():
s.verseDiscoveryHandler(ctx, verses)
case <-discTick.C:
if err := disc.Work(ctx); err != nil {
log.Error("Failed to work verse discovery", "err", err)
Expand All @@ -489,7 +508,7 @@ func (s *server) startVerseDiscovery(ctx context.Context) {
}()
}

func (s *server) verseDiscoveryHandler(discovers []*config.Verse) {
func (s *server) verseDiscoveryHandler(ctx context.Context, discovers []*config.Verse) {
if s.verifier == nil && s.submitter == nil {
log.Warn("Both Verifier and Submitter are disabled")
return
Expand All @@ -509,7 +528,10 @@ func (s *server) verseDiscoveryHandler(discovers []*config.Verse) {
verse verse.Verse
verify common.Address
}
var verses []*verse_
var (
verses []*verse_
verseChainIDs []uint64
)
for _, cfg := range discovers {
for name, addr := range cfg.L1Contracts {
if factory, ok := verseFactories[name]; ok {
Expand All @@ -518,18 +540,23 @@ func (s *server) verseDiscoveryHandler(discovers []*config.Verse) {
verse: factory(s.db, s.hub, common.HexToAddress(addr)),
verify: verifyContracts[name],
})
verseChainIDs = append(verseChainIDs, cfg.ChainID)
}
}
}

log.Info("Discovered verses", "count", len(verses), "chain-ids", verseChainIDs)

for _, x := range verses {
// add verse to Verifier
if s.verifier != nil && !s.verifier.HasTask(x.verse.RollupContract(), x.cfg.RPC) {
l2Client, err := ethutil.NewClient(x.cfg.RPC)
if err != nil {
log.Error("Failed to construct verse-layer client", "err", err)
} else {
s.verifier.AddTask(x.verse.WithVerifiable(l2Client))
log.Info("Add verse to Verifier", "chain-id", x.cfg.ChainID, "contract", x.verse.RollupContract())
// s.verifier.AddTask(x.verse.WithVerifiable(l2Client))
s.verifier.AddVerse(ctx, x.verse.WithVerifiable(l2Client), x.cfg.ChainID)
tak1827 marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -546,8 +573,10 @@ func (s *server) verseDiscoveryHandler(discovers []*config.Verse) {
continue
}

log.Info("Add verse to Submitter", "chain-id", x.cfg.ChainID, "contract", x.verse.RollupContract())
l1Signer := ethutil.NewSignableClient(new(big.Int).SetUint64(s.conf.HubLayer.ChainID), s.hub, signer)
s.submitter.AddTask(x.verse.WithTransactable(l1Signer, x.verify))
// s.submitter.AddTask(x.verse.WithTransactable(l1Signer, x.verify))
s.submitter.AddVerse(ctx, x.verse.WithTransactable(l1Signer, x.verify), x.cfg.ChainID)
tak1827 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var statusCmd = &cobra.Command{
Short: "Show status",
Long: "Show status",
Run: func(cmd *cobra.Command, args []string) {
conf, err := globalConfigLoader.load()
conf, err := globalConfigLoader.load(true)
if err != nil {
util.Exit(1, "Failed to load configuration: %s\n", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/unlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func init() {
}

func runUnlockCmd(cmd *cobra.Command, args []string) {
conf, err := globalConfigLoader.load()
conf, err := globalConfigLoader.load(true)
if err != nil {
util.Exit(1, "Failed to load configuration: %s\n", err)
}
Expand Down
2 changes: 1 addition & 1 deletion collector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (w *EventCollector) Work(ctx context.Context) {

// collect event logs from hub-layer
start, end := blocks[0], blocks[len(blocks)-1]
logs, err := w.hub.FilterLogs(ctx, verse.NewEventLogFilter(start.Number, end.Number))
logs, err := w.hub.FilterLogs(ctx, verse.NewEventLogFilter(start.Number, end.Number, nil))
if err != nil {
w.log.Error("Failed to fetch event logs from hub-layer",
"start", start, "end", end, "err", err)
Expand Down
Loading
Loading