Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scoping out txn fetching #4

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions arbnode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ func DeployOnL1(ctx context.Context, parentChainReader *headerreader.HeaderReade

type Config struct {
Sequencer bool `koanf:"sequencer"`
EspressoSequencer bool `koanf:"espresso-sequencer"`
ParentChainReader headerreader.Config `koanf:"parent-chain-reader" reload:"hot"`
InboxReader InboxReaderConfig `koanf:"inbox-reader" reload:"hot"`
DelayedSequencer DelayedSequencerConfig `koanf:"delayed-sequencer" reload:"hot"`
Expand Down Expand Up @@ -439,6 +440,7 @@ type Node struct {
InboxReader *InboxReader
InboxTracker *InboxTracker
DelayedSequencer *DelayedSequencer
EspressoHandler *EspressoHandler
BatchPoster *BatchPoster
MessagePruner *MessagePruner
BlockValidator *staker.BlockValidator
Expand Down Expand Up @@ -853,6 +855,13 @@ func createNodeImpl(
return nil, err
}

var espresso *EspressoHandler
if config.EspressoSequencer {
// EspressoHandler acts like DelayedSequencer, read txs/messages from L1 and build a block.
// Maybe we can reuse or recreate some components here.
espresso, err = NewEspressoHandler(l1Reader, exec, config)
}

return &Node{
ArbDB: arbDb,
Stack: stack,
Expand All @@ -863,6 +872,7 @@ func createNodeImpl(
InboxReader: inboxReader,
InboxTracker: inboxTracker,
DelayedSequencer: delayedSequencer,
EspressoHandler: espresso,
BatchPoster: batchPoster,
MessagePruner: messagePruner,
BlockValidator: blockValidator,
Expand Down Expand Up @@ -989,6 +999,9 @@ func (n *Node) Start(ctx context.Context) error {
if n.DelayedSequencer != nil {
n.DelayedSequencer.Start(ctx)
}
if n.EspressoHandler != nil {
n.EspressoHandler.Start(ctx)
}
if n.BatchPoster != nil {
n.BatchPoster.Start(ctx)
}
Expand Down Expand Up @@ -1071,6 +1084,9 @@ func (n *Node) StopAndWait() {
if n.DelayedSequencer != nil && n.DelayedSequencer.Started() {
n.DelayedSequencer.StopAndWait()
}
if n.EspressoHandler != nil && n.EspressoHandler.Started() {
n.EspressoHandler.StopAndWait()
}
if n.BatchPoster != nil && n.BatchPoster.Started() {
n.BatchPoster.StopAndWait()
}
Expand Down
1 change: 1 addition & 0 deletions execution/gethexec/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func CreateExecutionNode(
} else if config.forwardingTarget == "" {
txPublisher = NewTxDropper()
} else {
// Make sure this Forwarder can publish the txs to Espresso Sequencer
txPublisher = NewForwarder(config.forwardingTarget, &config.Forwarder)
}
}
Expand Down
122 changes: 122 additions & 0 deletions execution/gethexec/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ type SequencerConfig struct {
MaxTxDataSize int `koanf:"max-tx-data-size" reload:"hot"`
NonceFailureCacheSize int `koanf:"nonce-failure-cache-size" reload:"hot"`
NonceFailureCacheExpiry time.Duration `koanf:"nonce-failure-cache-expiry" reload:"hot"`
Espresso bool
}

func (c *SequencerConfig) Validate() error {
Expand Down Expand Up @@ -268,6 +269,15 @@ func (c nonceFailureCache) Add(err NonceError, queueItem txQueueItem) {
}
}

type HotShotIndex struct {
blockIdx uint64
txnIdx uint64
}

type HotShotTxnFetcherInterface interface {
NextArbitrumTxn(index HotShotIndex)
}

type Sequencer struct {
stopwaiter.StopWaiter

Expand All @@ -291,6 +301,9 @@ type Sequencer struct {
activeMutex sync.Mutex
pauseChan chan struct{}
forwarder *TxForwarder

// Pointer to the last hotshot transaction sequenced, only used if we are operating in Espresso mode
hotShotIndex *HotShotIndex
}

func NewSequencer(execEngine *ExecutionEngine, l1Reader *headerreader.HeaderReader, configFetcher SequencerConfigFetcher) (*Sequencer, error) {
Expand Down Expand Up @@ -710,7 +723,116 @@ func (s *Sequencer) precheckNonces(queueItems []txQueueItem) []txQueueItem {
return outputQueueItems
}

func (s *Sequencer) createBlockEspresso(ctx context.Context) (returnValue bool) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move these code to the arbnode to let every validator forward txs and build block. arb sequencer is a centralized thing

Copy link
Member Author

@ImJeremyHe ImJeremyHe Oct 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And it is not easy to update arb sequencer to let every one run it because arbitrum now is not permissionless.

var txns []types.Transaction
var totalBatchSize int

config := s.config()

for {
var txn types.Transaction
txn, err = hotshot.NextArbitrumTransaction()
txBytes, err := txn.MarshalBinary()
if err != nil {
//queueItem.returnResult(err)
continue
}
if len(txBytes) > config.MaxTxDataSize {
// This tx is too large
// queueItem.returnResult(txpool.ErrOversizedData)
continue
}
if totalBatchSize+len(txBytes) > config.MaxTxDataSize {
// This tx would be too large to add to this batch
// End the batch here to put this tx in the next one, update last processed block and txn idx
break
}
totalBatchSize += len(txBytes)
queueItems = append(queueItems, txn)
}

txes := make([]*types.Transaction, len(queueItems))
hooks := s.makeSequencingHooks()
hooks.ConditionalOptionsForTx = make([]*arbitrum_types.ConditionalOptions, len(queueItems))
for i, queueItem := range queueItems {
txes[i] = tx
}

timestamp := time.Now().Unix()
s.L1BlockAndTimeMutex.Lock()
l1Block := s.l1BlockNumber
l1Timestamp := s.l1Timestamp
s.L1BlockAndTimeMutex.Unlock()

if s.l1Reader != nil && (l1Block == 0 || math.Abs(float64(l1Timestamp)-float64(timestamp)) > config.MaxAcceptableTimestampDelta.Seconds()) {
log.Error(
"cannot sequence: unknown L1 block or L1 timestamp too far from local clock time",
"l1Block", l1Block,
"l1Timestamp", time.Unix(int64(l1Timestamp), 0),
"localTimestamp", time.Unix(int64(timestamp), 0),
)
return false
}

header := &arbostypes.L1IncomingMessageHeader{
Kind: arbostypes.L1MessageType_L2Message,
Poster: l1pricing.BatchPosterAddress,
BlockNumber: l1Block,
Timestamp: uint64(timestamp),
RequestId: nil,
L1BaseFee: nil,
}

start := time.Now()
block, err := s.execEngine.SequenceTransactions(header, txes, hooks)
elapsed := time.Since(start)
blockCreationTimer.Update(elapsed)
if elapsed >= time.Second*5 {
var blockNum *big.Int
if block != nil {
blockNum = block.Number()
}
log.Warn("took over 5 seconds to sequence a block", "elapsed", elapsed, "numTxes", len(txes), "success", block != nil, "l2Block", blockNum)
}
if err == nil && len(hooks.TxErrors) != len(txes) {
err = fmt.Errorf("unexpected number of error results: %v vs number of txes %v", len(hooks.TxErrors), len(txes))
}
if err != nil {
if errors.Is(err, context.Canceled) {
// thread closed. We'll later try to forward these messages.
for _, item := range queueItems {
s.txRetryQueue.Push(item)
}
return true // don't return failure to avoid retrying immediately
}
log.Error("error sequencing transactions", "err", err)
for _, queueItem := range queueItems {
queueItem.returnResult(err)
}
return false
}

if block != nil {
successfulBlocksCounter.Inc(1)
s.nonceCache.Finalize(block)
}

return true
}

func (s *Sequencer) usingEspresso() bool {
return s.config().Espresso
}

func (s *Sequencer) createBlock(ctx context.Context) (returnValue bool) {
if s.usingEspresso() {
return s.createBlockEspresso(ctx)
} else {
return s.createBlockDefault(ctx)
}
}

func (s *Sequencer) createBlockDefault(ctx context.Context) (returnValue bool) {
var queueItems []txQueueItem
var totalBatchSize int

Expand Down
2 changes: 1 addition & 1 deletion go-ethereum
2 changes: 1 addition & 1 deletion nitro-testnode
28 changes: 28 additions & 0 deletions testnode-config/l2_chain_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
{
"chainId": 412346,
"homesteadBlock": 0,
"daoForkSupport": true,
"eip150Block": 0,
"eip150Hash": "0x0000000000000000000000000000000000000000000000000000000000000000",
"eip155Block": 0,
"eip158Block": 0,
"byzantiumBlock": 0,
"constantinopleBlock": 0,
"petersburgBlock": 0,
"istanbulBlock": 0,
"muirGlacierBlock": 0,
"berlinBlock": 0,
"londonBlock": 0,
"clique": {
"period": 0,
"epoch": 0
},
"arbitrum": {
"EnableArbOS": true,
"AllowDebugPrecompiles": true,
"DataAvailabilityCommittee": false,
"InitialArbOSVersion": 11,
"InitialChainOwner": "0x3f1Eae7D46d88F08fc2F8ed27FCb2AB183EB2d0E",
"GenesisBlockNum": 0
}
}
80 changes: 80 additions & 0 deletions testnode-config/l3node_config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
{
"parent-chain": {
"connection": {
"url": "ws://sequencer:8548"
},
"wallet": {
"account": "0x3E6134aAD4C4d422FF2A4391Dc315c4DDf98D1a5",
"password": "passphrase",
"pathname": "/home/user/l1keystore"
}
},
"chain": {
"id": 333333,
"info-files": [
"config/l3_chain_info.json"
]
},
"node": {
"archive": true,
"forwarding-target": "null",
"staker": {
"dangerous": {
"without-block-validator": false
},
"disable-challenge": false,
"enable": true,
"staker-interval": "10s",
"make-assertion-interval": "10s",
"strategy": "MakeNodes",
"use-smart-contract-wallet": true
},
"sequencer": {
"enable": true,
"dangerous": {
"no-coordinator": true
}
},
"delayed-sequencer": {
"enable": true
},
"seq-coordinator": {
"enable": false,
"redis-url": "redis://redis:6379",
"lockout-duration": "30s",
"lockout-spare": "1s",
"my-url": "",
"retry-interval": "0.5s",
"seq-num-duration": "24h0m0s",
"update-interval": "3s"
},
"batch-poster": {
"enable": true,
"redis-url": "",
"max-delay": "30s",
"data-poster": {
"redis-signer": {
"signing-key": "0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef"
},
"wait-for-l1-finality": false
}
},
"block-validator": {
"validation-server": {
"url": "ws://validation_node:8549",
"jwtsecret": "config/val_jwt.hex"
}
}
},
"persistent": {
"chain": "local"
},
"ws": {
"addr": "0.0.0.0"
},
"http": {
"addr": "0.0.0.0",
"vhosts": "*",
"corsdomain": "*"
}
}
Loading
Loading