Skip to content

Commit

Permalink
Merge branch 'master' into update-audit-reports-url
Browse files Browse the repository at this point in the history
  • Loading branch information
tsahee authored Aug 17, 2024
2 parents c978660 + 0639404 commit 8dcf687
Show file tree
Hide file tree
Showing 23 changed files with 1,308 additions and 106 deletions.
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,8 @@ COPY --from=node-builder /workspace/target/bin/relay /usr/local/bin/
COPY --from=node-builder /workspace/target/bin/nitro-val /usr/local/bin/
COPY --from=node-builder /workspace/target/bin/seq-coordinator-manager /usr/local/bin/
COPY --from=node-builder /workspace/target/bin/prover /usr/local/bin/
COPY --from=node-builder /workspace/target/bin/dbconv /usr/local/bin/
COPY ./scripts/convert-databases.bash /usr/local/bin/
COPY --from=machine-versions /workspace/machines /home/user/target/machines
COPY ./scripts/validate-wasm-module-root.sh .
RUN ./validate-wasm-module-root.sh /home/user/target/machines /usr/local/bin/prover
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ all: build build-replay-env test-gen-proofs
@touch .make/all

.PHONY: build
build: $(patsubst %,$(output_root)/bin/%, nitro deploy relay daserver datool seq-coordinator-invalidate nitro-val seq-coordinator-manager)
build: $(patsubst %,$(output_root)/bin/%, nitro deploy relay daserver datool seq-coordinator-invalidate nitro-val seq-coordinator-manager dbconv)
@printf $(done)

.PHONY: build-node-deps
Expand Down Expand Up @@ -310,6 +310,9 @@ $(output_root)/bin/nitro-val: $(DEP_PREDICATE) build-node-deps
$(output_root)/bin/seq-coordinator-manager: $(DEP_PREDICATE) build-node-deps
go build $(GOLANG_PARAMS) -o $@ "$(CURDIR)/cmd/seq-coordinator-manager"

$(output_root)/bin/dbconv: $(DEP_PREDICATE) build-node-deps
go build $(GOLANG_PARAMS) -o $@ "$(CURDIR)/cmd/dbconv"

# recompile wasm, but don't change timestamp unless files differ
$(replay_wasm): $(DEP_PREDICATE) $(go_source) .make/solgen
mkdir -p `dirname $(replay_wasm)`
Expand Down
130 changes: 108 additions & 22 deletions arbnode/seq_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type SeqCoordinator struct {

redisutil.RedisCoordinator

sync *SyncMonitor
streamer *TransactionStreamer
sequencer execution.ExecutionSequencer
delayedSequencer *DelayedSequencer
Expand Down Expand Up @@ -69,9 +70,10 @@ type SeqCoordinatorConfig struct {
SafeShutdownDelay time.Duration `koanf:"safe-shutdown-delay"`
ReleaseRetries int `koanf:"release-retries"`
// Max message per poll.
MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"`
MyUrl string `koanf:"my-url"`
Signer signature.SignVerifyConfig `koanf:"signer"`
MsgPerPoll arbutil.MessageIndex `koanf:"msg-per-poll"`
MyUrl string `koanf:"my-url"`
DeleteFinalizedMsgs bool `koanf:"delete-finalized-msgs"`
Signer signature.SignVerifyConfig `koanf:"signer"`
}

func (c *SeqCoordinatorConfig) Url() string {
Expand All @@ -95,6 +97,7 @@ func SeqCoordinatorConfigAddOptions(prefix string, f *flag.FlagSet) {
f.Int(prefix+".release-retries", DefaultSeqCoordinatorConfig.ReleaseRetries, "the number of times to retry releasing the wants lockout and chosen one status on shutdown")
f.Uint64(prefix+".msg-per-poll", uint64(DefaultSeqCoordinatorConfig.MsgPerPoll), "will only be marked as wanting the lockout if not too far behind")
f.String(prefix+".my-url", DefaultSeqCoordinatorConfig.MyUrl, "url for this sequencer if it is the chosen")
f.Bool(prefix+".delete-finalized-msgs", DefaultSeqCoordinatorConfig.DeleteFinalizedMsgs, "enable deleting of finalized messages from redis")
signature.SignVerifyConfigAddOptions(prefix+".signer", f)
}

Expand All @@ -104,31 +107,33 @@ var DefaultSeqCoordinatorConfig = SeqCoordinatorConfig{
RedisUrl: "",
LockoutDuration: time.Minute,
LockoutSpare: 30 * time.Second,
SeqNumDuration: 24 * time.Hour,
SeqNumDuration: 10 * 24 * time.Hour,
UpdateInterval: 250 * time.Millisecond,
HandoffTimeout: 30 * time.Second,
SafeShutdownDelay: 5 * time.Second,
ReleaseRetries: 4,
RetryInterval: 50 * time.Millisecond,
MsgPerPoll: 2000,
MyUrl: redisutil.INVALID_URL,
DeleteFinalizedMsgs: true,
Signer: signature.DefaultSignVerifyConfig,
}

var TestSeqCoordinatorConfig = SeqCoordinatorConfig{
Enable: false,
RedisUrl: "",
LockoutDuration: time.Second * 2,
LockoutSpare: time.Millisecond * 10,
SeqNumDuration: time.Minute * 10,
UpdateInterval: time.Millisecond * 10,
HandoffTimeout: time.Millisecond * 200,
SafeShutdownDelay: time.Millisecond * 100,
ReleaseRetries: 4,
RetryInterval: time.Millisecond * 3,
MsgPerPoll: 20,
MyUrl: redisutil.INVALID_URL,
Signer: signature.DefaultSignVerifyConfig,
Enable: false,
RedisUrl: "",
LockoutDuration: time.Second * 2,
LockoutSpare: time.Millisecond * 10,
SeqNumDuration: time.Minute * 10,
UpdateInterval: time.Millisecond * 10,
HandoffTimeout: time.Millisecond * 200,
SafeShutdownDelay: time.Millisecond * 100,
ReleaseRetries: 4,
RetryInterval: time.Millisecond * 3,
MsgPerPoll: 20,
MyUrl: redisutil.INVALID_URL,
DeleteFinalizedMsgs: true,
Signer: signature.DefaultSignVerifyConfig,
}

func NewSeqCoordinator(
Expand All @@ -149,6 +154,7 @@ func NewSeqCoordinator(
}
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
sync: sync,
streamer: streamer,
sequencer: sequencer,
config: config,
Expand Down Expand Up @@ -338,6 +344,14 @@ func (c *SeqCoordinator) acquireLockoutAndWriteMessage(ctx context.Context, msgC
return nil
}

func (c *SeqCoordinator) getRemoteFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
resStr, err := c.Client.Get(ctx, redisutil.FINALIZED_MSG_COUNT_KEY).Result()
if err != nil {
return 0, err
}
return c.signedBytesToMsgCount(ctx, []byte(resStr))
}

func (c *SeqCoordinator) getRemoteMsgCountImpl(ctx context.Context, r redis.Cmdable) (arbutil.MessageIndex, error) {
resStr, err := r.Get(ctx, redisutil.MSG_COUNT_KEY).Result()
if errors.Is(err, redis.Nil) {
Expand Down Expand Up @@ -473,6 +487,17 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}
// Was, and still is, the active sequencer
if c.config.DeleteFinalizedMsgs {
// Before proceeding, first try deleting finalized messages from redis and setting the finalizedMsgCount key
finalized, err := c.sync.GetFinalizedMsgCount(ctx)
if err != nil {
log.Warn("Error getting finalizedMessageCount from syncMonitor: %w", err)
} else if finalized == 0 {
log.Warn("SyncMonitor returned zero finalizedMessageCount")
} else if err := c.deleteFinalizedMsgsFromRedis(ctx, finalized); err != nil {
log.Warn("Coordinator failed to delete finalized messages from redis", "err", err)
}
}
// We leave a margin of error of either a five times the update interval or a fifth of the lockout duration, whichever is greater.
marginOfError := arbmath.MaxInt(c.config.LockoutDuration/5, c.config.UpdateInterval*5)
if time.Now().Add(marginOfError).Before(atomicTimeRead(&c.lockoutUntil)) {
Expand All @@ -492,6 +517,62 @@ func (c *SeqCoordinator) updateWithLockout(ctx context.Context, nextChosen strin
return c.noRedisError()
}

func (c *SeqCoordinator) deleteFinalizedMsgsFromRedis(ctx context.Context, finalized arbutil.MessageIndex) error {
deleteMsgsAndUpdateFinalizedMsgCount := func(keys []string) error {
if len(keys) > 0 {
// To support cases during init we delete keys from reverse (i.e lowest seq num first), so that even if deletion fails in one of the iterations
// next time deleteFinalizedMsgsFromRedis is called we dont miss undeleted messages, as exists is checked from higher seqnum to lower.
// In non-init cases it doesn't matter how we delete as we always try to delete from prevFinalized to finalized
batchDeleteCount := 1000
for i := len(keys); i > 0; i -= batchDeleteCount {
if err := c.Client.Del(ctx, keys[max(0, i-batchDeleteCount):i]...).Err(); err != nil {
return fmt.Errorf("error deleting finalized messages and their signatures from redis: %w", err)
}
}
}
finalizedBytes, err := c.msgCountToSignedBytes(finalized)
if err != nil {
return err
}
if err = c.Client.Set(ctx, redisutil.FINALIZED_MSG_COUNT_KEY, finalizedBytes, c.config.SeqNumDuration).Err(); err != nil {
return fmt.Errorf("couldn't set %s key to current finalizedMsgCount in redis: %w", redisutil.FINALIZED_MSG_COUNT_KEY, err)
}
return nil
}
prevFinalized, err := c.getRemoteFinalizedMsgCount(ctx)
if errors.Is(err, redis.Nil) {
var keys []string
for msg := finalized - 1; msg > 0; msg-- {
exists, err := c.Client.Exists(ctx, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg)).Result()
if err != nil {
// If there is an error deleting finalized messages during init, we retry later either from this sequencer or from another
return err
}
if exists == 0 {
break
}
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
log.Info("Initializing finalizedMsgCount and deleting finalized messages from redis", "finalizedMsgCount", finalized)
return deleteMsgsAndUpdateFinalizedMsgCount(keys)
} else if err != nil {
return fmt.Errorf("error getting finalizedMsgCount value from redis: %w", err)
}
remoteMsgCount, err := c.getRemoteMsgCountImpl(ctx, c.Client)
if err != nil {
return fmt.Errorf("cannot get remote message count: %w", err)
}
msgToDelete := min(finalized, remoteMsgCount)
if prevFinalized < msgToDelete {
var keys []string
for msg := prevFinalized; msg < msgToDelete; msg++ {
keys = append(keys, redisutil.MessageKeyFor(msg), redisutil.MessageSigKeyFor(msg))
}
return deleteMsgsAndUpdateFinalizedMsgCount(keys)
}
return nil
}

func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
chosenSeq, err := c.RecommendSequencerWantingLockout(ctx)
if err != nil {
Expand Down Expand Up @@ -522,19 +603,24 @@ func (c *SeqCoordinator) update(ctx context.Context) time.Duration {
log.Error("cannot read message count", "err", err)
return c.config.UpdateInterval
}
remoteFinalizedMsgCount, err := c.getRemoteFinalizedMsgCount(ctx)
if err != nil {
loglevel := log.Error
if errors.Is(err, redis.Nil) {
loglevel = log.Debug
}
loglevel("Cannot get remote finalized message count, might encounter failed to read message warnings later", "err", err)
}
remoteMsgCount, err := c.GetRemoteMsgCount()
if err != nil {
log.Warn("cannot get remote message count", "err", err)
return c.retryAfterRedisError()
}
readUntil := remoteMsgCount
if readUntil > localMsgCount+c.config.MsgPerPoll {
readUntil = localMsgCount + c.config.MsgPerPoll
}
readUntil := min(localMsgCount+c.config.MsgPerPoll, remoteMsgCount)
var messages []arbostypes.MessageWithMetadata
msgToRead := localMsgCount
var msgReadErr error
for msgToRead < readUntil {
for msgToRead < readUntil && localMsgCount >= remoteFinalizedMsgCount {
var resString string
resString, msgReadErr = c.Client.Get(ctx, redisutil.MessageKeyFor(msgToRead)).Result()
if msgReadErr != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,94 @@ func TestRedisSeqCoordinatorAtomic(t *testing.T) {
}

}

func TestSeqCoordinatorDeletesFinalizedMessages(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

coordConfig := TestSeqCoordinatorConfig
coordConfig.LockoutDuration = time.Millisecond * 100
coordConfig.LockoutSpare = time.Millisecond * 10
coordConfig.Signer.ECDSA.AcceptSequencer = false
coordConfig.Signer.SymmetricFallback = true
coordConfig.Signer.SymmetricSign = true
coordConfig.Signer.Symmetric.Dangerous.DisableSignatureVerification = true
coordConfig.Signer.Symmetric.SigningKey = ""

nullSigner, err := signature.NewSignVerify(&coordConfig.Signer, nil, nil)
Require(t, err)

redisUrl := redisutil.CreateTestRedis(ctx, t)
coordConfig.RedisUrl = redisUrl

config := coordConfig
config.MyUrl = "test"
redisCoordinator, err := redisutil.NewRedisCoordinator(config.RedisUrl)
Require(t, err)
coordinator := &SeqCoordinator{
RedisCoordinator: *redisCoordinator,
config: config,
signer: nullSigner,
}

// Add messages to redis
var keys []string
msgBytes, err := coordinator.msgCountToSignedBytes(0)
Require(t, err)
for i := arbutil.MessageIndex(1); i <= 10; i++ {
err = coordinator.Client.Set(ctx, redisutil.MessageKeyFor(i), msgBytes, time.Hour).Err()
Require(t, err)
err = coordinator.Client.Set(ctx, redisutil.MessageSigKeyFor(i), msgBytes, time.Hour).Err()
Require(t, err)
keys = append(keys, redisutil.MessageKeyFor(i), redisutil.MessageSigKeyFor(i))
}
// Set msgCount key
msgCountBytes, err := coordinator.msgCountToSignedBytes(11)
Require(t, err)
err = coordinator.Client.Set(ctx, redisutil.MSG_COUNT_KEY, msgCountBytes, time.Hour).Err()
Require(t, err)
exists, err := coordinator.Client.Exists(ctx, keys...).Result()
Require(t, err)
if exists != 20 {
t.Fatal("couldn't find all messages and signatures in redis")
}

// Set finalizedMsgCount and delete finalized messages
err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 5)
Require(t, err)

// Check if messages and signatures were deleted successfully
exists, err = coordinator.Client.Exists(ctx, keys[:8]...).Result()
Require(t, err)
if exists != 0 {
t.Fatal("finalized messages and signatures in range 1 to 4 were not deleted")
}

// Check if finalizedMsgCount was set to correct value
finalized, err := coordinator.getRemoteFinalizedMsgCount(ctx)
Require(t, err)
if finalized != 5 {
t.Fatalf("incorrect finalizedMsgCount, want: 5, have: %d", finalized)
}

// Try deleting finalized messages when theres already a finalizedMsgCount
err = coordinator.deleteFinalizedMsgsFromRedis(ctx, 7)
Require(t, err)
exists, err = coordinator.Client.Exists(ctx, keys[8:12]...).Result()
Require(t, err)
if exists != 0 {
t.Fatal("finalized messages and signatures in range 5 to 6 were not deleted")
}
finalized, err = coordinator.getRemoteFinalizedMsgCount(ctx)
Require(t, err)
if finalized != 7 {
t.Fatalf("incorrect finalizedMsgCount, want: 7, have: %d", finalized)
}

// Check that non-finalized messages are still available in redis
exists, err = coordinator.Client.Exists(ctx, keys[12:]...).Result()
Require(t, err)
if exists != 8 {
t.Fatal("non-finalized messages and signatures in range 7 to 10 are not fully available")
}
}
7 changes: 7 additions & 0 deletions arbnode/sync_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,13 @@ func (s *SyncMonitor) SyncTargetMessageCount() arbutil.MessageIndex {
return s.syncTarget
}

func (s *SyncMonitor) GetFinalizedMsgCount(ctx context.Context) (arbutil.MessageIndex, error) {
if s.inboxReader != nil && s.inboxReader.l1Reader != nil {
return s.inboxReader.GetFinalizedMsgCount(ctx)
}
return 0, nil
}

func (s *SyncMonitor) maxMessageCount() (arbutil.MessageIndex, error) {
msgCount, err := s.txStreamer.GetMessageCount()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/chaininfo/arbitrum_chain_info.json
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
"EnableArbOS": true,
"AllowDebugPrecompiles": true,
"DataAvailabilityCommittee": false,
"InitialArbOSVersion": 11,
"InitialArbOSVersion": 31,
"InitialChainOwner": "0x0000000000000000000000000000000000000000",
"GenesisBlockNum": 0
}
Expand Down Expand Up @@ -196,7 +196,7 @@
"EnableArbOS": true,
"AllowDebugPrecompiles": true,
"DataAvailabilityCommittee": true,
"InitialArbOSVersion": 11,
"InitialArbOSVersion": 31,
"InitialChainOwner": "0x0000000000000000000000000000000000000000",
"GenesisBlockNum": 0
}
Expand Down
Loading

0 comments on commit 8dcf687

Please sign in to comment.