Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Fix connectivity events for UX (#75)
Browse files Browse the repository at this point in the history
* fix connectivity events

* fix level
  • Loading branch information
aarshkshah1992 authored Sep 8, 2022
1 parent 94dd61e commit 3fd5fe8
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 21 deletions.
21 changes: 20 additions & 1 deletion cmd/saturn-l2/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,8 @@ type config struct {
}

func main() {
logging.SetAllLoggers(logging.LevelInfo)
logging.SetLogLevel("dagstore", "ERROR")
// build app context
cleanup := func() {

Expand All @@ -152,7 +154,6 @@ func main() {
cleanup()
}()

logging.SetAllLoggers(logging.LevelInfo)
ctx, cancel := context.WithCancel(context.Background())
cleanup = updateCleanup(cleanup, func() {
log.Info("shutting down all threads")
Expand Down Expand Up @@ -198,6 +199,7 @@ func main() {
}
}
log.Infow("discovered L1s", "l1 IP Addrs", strings.Join(l1IPAddrs, ", "))
fmt.Println("INFO: Saturn Node was able to connect to the Orchestrator and will now start connecting to the Saturn network...")

// build the saturn logger
logger := logs.NewSaturnLogger()
Expand Down Expand Up @@ -328,6 +330,20 @@ func logL1Connectivity(ctx context.Context, nConnectedL1s *atomic.Uint64) {

lastNConnected := uint64(0)

// get to the first connectivity event as fast as possible
go func() {
for {
if ctx.Err() != nil {
return
}
nConnected := nConnectedL1s.Load()
if nConnected != 0 {
fmt.Printf("INFO: Saturn Node is online and connected to %d peers\n", nConnected)
return
}
}
}()

for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -472,6 +488,8 @@ func getNearestL1sWithRetry(ctx context.Context, cfg config, maxL1DiscoveryAttem
Jitter: true,
}

fmt.Println("INFO: Saturn Node will try to connect to the Saturn Orchestrator...")

for {
l1Addrs, err := getNearestL1s(ctx, cfg)
if err == nil {
Expand All @@ -485,6 +503,7 @@ func getNearestL1sWithRetry(ctx context.Context, cfg config, maxL1DiscoveryAttem
}

log.Errorw("failed to get L1s from orchestrator; will retry", "err", err)
fmt.Println("INFO: Saturn Node is unable to connect to the Orchestrator, retrying....")

// backoff and wait before making a new request to the orchestrator.
duration := backoff.Duration()
Expand Down
39 changes: 19 additions & 20 deletions l1interop/l1sseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,43 +182,42 @@ func (l *l1SseClient) Start(nConnectedl1s *atomic.Uint64) error {
continue
}

log.Debugw("will try to acquire semaphore for l1 request", "l1", l.l1Addr)
select {
case l.semaphore <- struct{}{}:
log.Debugw("successfully acquired semaphore for l1 request", "l1", l.l1Addr)
case <-l.ctx.Done():
return l.ctx.Err()
}

releaseSem := func() {
select {
case <-l.semaphore:
log.Debugw("successfully released semaphore for l1 request", "l1", l.l1Addr)
case <-l.ctx.Done():
return
}
}

log.Infow("received request from L1", "l1", l.l1Addr, "json", reqJSON)

var carReq types.CARTransferRequest
if err := json.Unmarshal([]byte(reqJSON), &carReq); err != nil {
releaseSem()
nConnectedl1s.Dec()
return fmt.Errorf("could not unmarshal l1 request: req=%s, err=%w", reqJSON, err)
}

dr, err := carReq.ToDAGRequest()
if err != nil {
releaseSem()
nConnectedl1s.Dec()
return fmt.Errorf("could not parse car transfer request,err=%w", err)
}

l.logger.Infow(dr.RequestId, "parsed CAR transfer request received from L1", "l1", l.l1Addr, "req", dr)

log.Debugw("will try to acquire semaphore for l1 request", "l1", l.l1Addr)
select {
case l.semaphore <- struct{}{}:
log.Debugw("successfully acquired semaphore for l1 request", "l1", l.l1Addr)
case <-l.ctx.Done():
nConnectedl1s.Dec()
return l.ctx.Err()
}

l.wg.Add(1)
go func() {
defer l.wg.Done()
defer releaseSem()
defer func() {
select {
case <-l.semaphore:
log.Debugw("successfully released semaphore for l1 request", "l1", l.l1Addr)
case <-l.ctx.Done():
return
}
}()

if err := l.sendCarResponse(l.ctx, l.l1Addr, dr); err != nil {
if !errors.Is(err, carstore.ErrNotFound) {
Expand Down

0 comments on commit 3fd5fe8

Please sign in to comment.