Skip to content

Commit

Permalink
Merge branch 'rollup-contracts-port-tests-cont' of https://github.com…
Browse files Browse the repository at this point in the history
…/OffchainLabs/nitro into rollup-contracts-port-tests-cont
  • Loading branch information
yahgwai committed Feb 28, 2022
2 parents 395c367 + 7749099 commit 979fc2c
Show file tree
Hide file tree
Showing 42 changed files with 796 additions and 446 deletions.
1 change: 1 addition & 0 deletions arbnode/arb_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type TransactionPublisher interface {
PublishTransaction(ctx context.Context, tx *types.Transaction) error
Initialize(context.Context) error
Start(context.Context) error
StopAndWait()
}

type ArbInterface struct {
Expand Down
128 changes: 57 additions & 71 deletions arbnode/batch_poster.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,40 @@ import (
"github.com/offchainlabs/arbstate/arbstate"
"github.com/offchainlabs/arbstate/arbutil"
"github.com/offchainlabs/arbstate/solgen/go/bridgegen"
"github.com/offchainlabs/arbstate/util"
)

type BatchPoster struct {
client arbutil.L1Interface
inbox *InboxTracker
streamer *TransactionStreamer
config *BatchPosterConfig
inboxContract *bridgegen.SequencerInbox
sequencesPosted uint64
gasRefunder common.Address
transactOpts *bind.TransactOpts
util.StopWaiter

client arbutil.L1Interface
inbox *InboxTracker
streamer *TransactionStreamer
config *BatchPosterConfig
inboxContract *bridgegen.SequencerInbox
gasRefunder common.Address
transactOpts *bind.TransactOpts
}

type BatchPosterConfig struct {
MaxBatchSize int
BatchPollDelay time.Duration
SubmissionSyncDelay time.Duration
CompressionLevel int
MaxBatchSize int
BatchPollDelay time.Duration
PostingErrorDelay time.Duration
CompressionLevel int
}

var DefaultBatchPosterConfig = BatchPosterConfig{
MaxBatchSize: 500,
BatchPollDelay: time.Second / 10,
SubmissionSyncDelay: time.Second,
CompressionLevel: brotli.DefaultCompression,
MaxBatchSize: 500,
BatchPollDelay: time.Second / 10,
PostingErrorDelay: time.Second * 5,
CompressionLevel: brotli.DefaultCompression,
}

var TestBatchPosterConfig = BatchPosterConfig{
MaxBatchSize: 10000,
BatchPollDelay: time.Millisecond * 10,
SubmissionSyncDelay: time.Millisecond * 10,
CompressionLevel: 2,
MaxBatchSize: 10000,
BatchPollDelay: time.Millisecond * 10,
PostingErrorDelay: time.Millisecond * 10,
CompressionLevel: 2,
}

func NewBatchPoster(client arbutil.L1Interface, inbox *InboxTracker, streamer *TransactionStreamer, config *BatchPosterConfig, contractAddress common.Address, refunder common.Address, transactOpts *bind.TransactOpts) (*BatchPoster, error) {
Expand All @@ -58,14 +60,13 @@ func NewBatchPoster(client arbutil.L1Interface, inbox *InboxTracker, streamer *T
return nil, err
}
return &BatchPoster{
client: client,
inbox: inbox,
streamer: streamer,
config: config,
inboxContract: inboxContract,
sequencesPosted: 1,
transactOpts: transactOpts,
gasRefunder: refunder,
client: client,
inbox: inbox,
streamer: streamer,
config: config,
inboxContract: inboxContract,
transactOpts: transactOpts,
gasRefunder: refunder,
}, nil
}

Expand Down Expand Up @@ -269,34 +270,22 @@ func (s *batchSegments) CloseAndGetBytes() ([]byte, error) {
return fullMsg, nil
}

func (b *BatchPoster) lastSubmissionIsSynced() bool {
batchcount, err := b.inbox.GetBatchCount()
func (b *BatchPoster) postSequencerBatch(ctx context.Context) (*types.Transaction, error) {
batchSeqNum, err := b.inbox.GetBatchCount()
if err != nil {
log.Warn("BatchPoster: batchcount failed", "err", err)
return false
return nil, err
}
if batchcount < b.sequencesPosted {
b.sequencesPosted = batchcount
return false
inboxContractCount, err := b.inboxContract.BatchCount(&bind.CallOpts{Context: ctx, Pending: true})
if err != nil {
return nil, err
}
if batchcount > b.sequencesPosted {
log.Warn("detected unexpected sequences posted", "actual", batchcount, "expected", b.sequencesPosted)
b.sequencesPosted = batchcount
return true
}
return true
}

// TODO make sure we detect end of block!
func (b *BatchPoster) postSequencerBatch() (*types.Transaction, error) {
for !b.lastSubmissionIsSynced() {
log.Warn("BatchPoster: not in sync", "sequencedPosted", b.sequencesPosted)
<-time.After(b.config.SubmissionSyncDelay)
if inboxContractCount.Cmp(new(big.Int).SetUint64(batchSeqNum)) != 0 {
return nil, fmt.Errorf("inbox tracker not synced: contract has %v batches but inbox tracker has %v", inboxContractCount, batchSeqNum)
}
var prevBatchMeta BatchMetadata
if b.sequencesPosted > 0 {
if batchSeqNum > 0 {
var err error
prevBatchMeta, err = b.inbox.GetBatchMetadata(b.sequencesPosted - 1)
prevBatchMeta, err = b.inbox.GetBatchMetadata(batchSeqNum - 1)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -328,35 +317,32 @@ func (b *BatchPoster) postSequencerBatch() (*types.Transaction, error) {
return nil, err
}
if sequencerMsg == nil {
log.Debug("BatchPoster: batch nil", "sequence nr.", b.sequencesPosted, "from", prevBatchMeta.MessageCount, "prev delayed", prevBatchMeta.DelayedMessageCount)
log.Debug("BatchPoster: batch nil", "sequence nr.", batchSeqNum, "from", prevBatchMeta.MessageCount, "prev delayed", prevBatchMeta.DelayedMessageCount)
return nil, nil
}
tx, err := b.inboxContract.AddSequencerL2BatchFromOrigin(b.transactOpts, new(big.Int).SetUint64(b.sequencesPosted), sequencerMsg, new(big.Int).SetUint64(segments.delayedMsg), b.gasRefunder)
txOpts := *b.transactOpts
txOpts.Context = ctx
tx, err := b.inboxContract.AddSequencerL2BatchFromOrigin(&txOpts, new(big.Int).SetUint64(batchSeqNum), sequencerMsg, new(big.Int).SetUint64(segments.delayedMsg), b.gasRefunder)
if err == nil {
b.sequencesPosted++
log.Info("BatchPoster: batch sent", "sequence nr.", b.sequencesPosted, "from", prevBatchMeta.MessageCount, "to", msgToPost, "prev delayed", prevBatchMeta.DelayedMessageCount, "current delayed", segments.delayedMsg, "total segments", len(segments.rawSegments))
log.Info("BatchPoster: batch sent", "sequence nr.", batchSeqNum, "from", prevBatchMeta.MessageCount, "to", msgToPost, "prev delayed", prevBatchMeta.DelayedMessageCount, "current delayed", segments.delayedMsg, "total segments", len(segments.rawSegments))
}
return tx, err
}

func (b *BatchPoster) Start(ctx context.Context) {
go (func() {
for {
tx, err := b.postSequencerBatch()
func (b *BatchPoster) Start(ctxIn context.Context) {
b.StopWaiter.Start(ctxIn)
b.CallIteratively(func(ctx context.Context) time.Duration {
tx, err := b.postSequencerBatch(ctx)
if err != nil {
log.Error("error posting batch", "err", err)
return b.config.PostingErrorDelay
}
if tx != nil {
_, err = arbutil.EnsureTxSucceededWithTimeout(ctx, b.client, tx, time.Minute)
if err != nil {
log.Error("error posting batch", "err", err)
}
if tx != nil {
_, err = arbutil.EnsureTxSucceededWithTimeout(ctx, b.client, tx, time.Minute)
if err != nil {
log.Error("failed ensuring batch tx succeeded", "err", err)
}
}
select {
case <-ctx.Done():
return
case <-time.After(b.config.BatchPollDelay):
log.Error("failed ensuring batch tx succeeded", "err", err)
}
}
})()
return b.config.BatchPollDelay
})
}
23 changes: 10 additions & 13 deletions arbnode/delayed_sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (

"github.com/offchainlabs/arbstate/arbos"
"github.com/offchainlabs/arbstate/arbutil"
"github.com/offchainlabs/arbstate/util"
)

type DelayedSequencer struct {
util.StopWaiter
client arbutil.L1Interface
bridge *DelayedBridge
inbox *InboxTracker
Expand Down Expand Up @@ -168,18 +170,13 @@ func (d *DelayedSequencer) run(ctx context.Context) error {
}
}

func (d *DelayedSequencer) Start(ctx context.Context) {
go (func() {
for {
err := d.run(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
log.Error("error reading inbox", "err", err)
}
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
}
func (d *DelayedSequencer) Start(ctxIn context.Context) {
d.StopWaiter.Start(ctxIn)
d.CallIteratively(func(ctx context.Context) time.Duration {
err := d.run(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
log.Error("error reading inbox", "err", err)
}
})()
return time.Second
})
}
2 changes: 2 additions & 0 deletions arbnode/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,5 @@ func (f *TxForwarder) Initialize(ctx context.Context) error {
func (f *TxForwarder) Start(ctx context.Context) error {
return nil
}

func (f *TxForwarder) StopAndWait() {}
24 changes: 11 additions & 13 deletions arbnode/inbox_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/ethereum/go-ethereum/log"
"github.com/offchainlabs/arbstate/arbutil"
"github.com/offchainlabs/arbstate/util"
)

type InboxReaderConfig struct {
Expand All @@ -33,6 +34,8 @@ var TestInboxReaderConfig = InboxReaderConfig{
}

type InboxReader struct {
util.StopWaiter

// Only in run thread
caughtUp bool
firstMessageBlock *big.Int
Expand All @@ -58,20 +61,15 @@ func NewInboxReader(tracker *InboxTracker, client arbutil.L1Interface, firstMess
}, nil
}

func (r *InboxReader) Start(ctx context.Context) error {
go (func() {
for {
err := r.run(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
log.Error("error reading inbox", "err", err)
}
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
}
func (r *InboxReader) Start(ctxIn context.Context) error {
r.StopWaiter.Start(ctxIn)
r.CallIteratively(func(ctx context.Context) time.Duration {
err := r.run(ctx)
if err != nil && !errors.Is(err, context.Canceled) {
log.Error("error reading inbox", "err", err)
}
})()
return time.Second
})

// Ensure we read the init message before other things start up
for i := 0; ; i++ {
Expand Down
Loading

0 comments on commit 979fc2c

Please sign in to comment.