Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Better logging and error handling (#70)
Browse files Browse the repository at this point in the history
* better logging and error handling

* improve logging and timeouts
  • Loading branch information
aarshkshah1992 authored Sep 5, 2022
1 parent ad316c0 commit 3677412
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 26 deletions.
11 changes: 8 additions & 3 deletions carserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package carserver

import (
"context"
"errors"
"fmt"
"io"

Expand Down Expand Up @@ -59,10 +60,14 @@ func (l *CarServer) ServeCARFile(ctx context.Context, dr *types.DagTraversalRequ
}
return nil
}); err != nil {
if err := l.spai.RecordRetrievalServed(ctx, sw.n, 1); err != nil {
l.logger.LogError(dr.RequestId, "failed to record retrieval failure", err)
if errors.Is(err, carstore.ErrNotFound) {
l.logger.Infow(dr.RequestId, "not serving CAR as CAR not found", "err", err)
} else {
if err := l.spai.RecordRetrievalServed(ctx, sw.n, 1); err != nil {
l.logger.LogError(dr.RequestId, "failed to record retrieval failure", err)
}
l.logger.LogError(dr.RequestId, "failed to traverse and serve car", err)
}
l.logger.LogError(dr.RequestId, "failed to traverse and serve car", err)

return err
}
Expand Down
5 changes: 2 additions & 3 deletions carstore/carstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ var (
maxConcurrentReadyFetches = 3
secondMissDuration = 24 * time.Hour
maxRecoverAttempts = uint64(1)
defaultDownloadTimeout = 30 * time.Minute
defaultDownloadTimeout = 20 * time.Minute
)

var (
Expand Down Expand Up @@ -279,7 +279,6 @@ func (cs *CarStore) FetchAndWriteCAR(reqID uuid.UUID, root cid.Cid, writer func(
// we don't have the requested CAR -> apply "cache on second miss" rule
cs.executeCacheMiss(reqID, root)

cs.logger.Infow(reqID, "returning not found for requested root")
return ErrNotFound
}

Expand Down Expand Up @@ -318,7 +317,7 @@ func (cs *CarStore) executeCacheMiss(reqID uuid.UUID, root cid.Cid) {
cs.logger.Infow(reqID, "successfully downloaded and cached given root")
sa.Close()
} else {
cs.logger.LogError(reqID, "failed to register/acquire shard", err)
cs.logger.LogError(reqID, "download failed as failed to register/acquire shard", err)
}

cs.mu.Lock()
Expand Down
2 changes: 1 addition & 1 deletion carstore/carstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestDownloadTimeout(t *testing.T) {
// second try -> not found
csh.fetchAndAssertNotFound(reqID, rootcid)

time.Sleep(1 * time.Second)
time.Sleep(3 * time.Second)

// still errors out
csh.fetchAndAssertNotFound(reqID, rootcid)
Expand Down
2 changes: 1 addition & 1 deletion carstore/gateway_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (g *gatewayAPI) Fetch(ctx context.Context, rootCID cid.Cid) (mount.Reader,

resp, err := g.client.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to execute http request")
return nil, fmt.Errorf("failed to execute http request: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("http req failed: code: %d, status: '%s'", resp.StatusCode, resp.Status)
Expand Down
45 changes: 27 additions & 18 deletions l1interop/l1sseclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (
"sync"
"time"

"github.com/filecoin-project/saturn-l2/carstore"

"go.uber.org/atomic"

"github.com/filecoin-project/saturn-l2/logs"
Expand Down Expand Up @@ -180,42 +182,50 @@ func (l *l1SseClient) Start(nConnectedl1s *atomic.Uint64) error {
continue
}

log.Infow("will try to acquire semaphore for l1 request", "l1", l.l1Addr)
log.Debugw("will try to acquire semaphore for l1 request", "l1", l.l1Addr)
select {
case l.semaphore <- struct{}{}:
log.Infow("successfully acquired semaphore for l1 request", "l1", l.l1Addr)
log.Debugw("successfully acquired semaphore for l1 request", "l1", l.l1Addr)
case <-l.ctx.Done():
return l.ctx.Err()
}

log.Infow("recieved request from L1", "l1", l.l1Addr, "json", reqJSON)
releaseSem := func() {
select {
case <-l.semaphore:
log.Debugw("successfully released semaphore for l1 request", "l1", l.l1Addr)
case <-l.ctx.Done():
return
}
}

log.Infow("received request from L1", "l1", l.l1Addr, "json", reqJSON)

var carReq types.CARTransferRequest
if err := json.Unmarshal([]byte(reqJSON), &carReq); err != nil {
releaseSem()
return fmt.Errorf("could not unmarshal l1 request: req=%s, err=%w", reqJSON, err)
}

dr, err := carReq.ToDAGRequest()
if err != nil {
releaseSem()
return fmt.Errorf("could not parse car transfer request,err=%w", err)
}

l.logger.Infow(dr.RequestId, "received CAR transfer request from L1", "l1", l.l1Addr, "req", dr)
l.logger.Infow(dr.RequestId, "parsed CAR transfer request received from L1", "l1", l.l1Addr, "req", dr)

l.wg.Add(1)
go func() {
defer l.wg.Done()
defer func() {
select {
case <-l.semaphore:
log.Infow("successfully relased semaphore for l1 request", "l1", l.l1Addr)
case <-l.ctx.Done():
return
}
}()
defer releaseSem()

if err := l.sendCarResponse(l.ctx, l.l1Addr, dr); err != nil {
l.logger.Errorw(dr.RequestId, "failed to send CAR file to L1 using Post", "err", err, "L1", l.l1Addr)
if !errors.Is(err, carstore.ErrNotFound) {
l.logger.Errorw(dr.RequestId, "failed to send CAR file to L1 using Post", "err", err, "l1", l.l1Addr)
} else {
l.logger.Infow(dr.RequestId, "not sending CAR over POST", "err", err, "l1", l.l1Addr)
}
}
}()
}
Expand All @@ -236,7 +246,6 @@ func (l *l1SseClient) Stop() {

func (l *l1SseClient) sendCarResponse(ctx context.Context, l1Addr string, dr *types.DagTraversalRequest) error {
respUrl := fmt.Sprintf(l1PostURL, l1Addr, dr.Root.String(), dr.RequestId.String())
log.Infow("will send back CAR file using HTTP POST to L1", "l1", l1Addr, "url", respUrl)

prd, pw := io.Pipe()
defer prd.Close()
Expand All @@ -252,14 +261,14 @@ func (l *l1SseClient) sendCarResponse(ctx context.Context, l1Addr string, dr *ty
req, err := http.NewRequest(http.MethodPost, respUrl, prd)
if err != nil {
_ = prd.CloseWithError(err)
return fmt.Errorf("failed to create http post request to send back car to L1: %w", err)
return fmt.Errorf("failed to create http post request to send back car to L1; url=%s; err=%w", respUrl, err)
}
req = req.WithContext(ctx)

resp, err := l.client.Do(req)
if err != nil {
_ = prd.CloseWithError(err)
return fmt.Errorf("failed to send http post request with CAR to L1: %w", err)
return fmt.Errorf("failed to send http post request with CAR to L1;url=%s, err=%w", respUrl, err)
}
defer resp.Body.Close()
_ = prd.Close()
Expand All @@ -268,9 +277,9 @@ func (l *l1SseClient) sendCarResponse(ctx context.Context, l1Addr string, dr *ty
_, _ = io.Copy(ioutil.Discard, lr)

if resp.StatusCode != http.StatusOK {
return fmt.Errorf("got status code %d from L1 for POST, expected %d", resp.StatusCode, http.StatusOK)
return fmt.Errorf("got status code %d from L1 for POST url %s , expected %d", resp.StatusCode, respUrl, http.StatusOK)
}

l.logger.Infow(dr.RequestId, "successfully sent CAR file to L1", "l1", l1Addr)
l.logger.Infow(dr.RequestId, "successfully sent CAR file to L1", "l1", l1Addr, "url", respUrl)
return nil
}

0 comments on commit 3677412

Please sign in to comment.