Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Jul 18, 2022
1 parent bb6c08d commit dc0bcb4
Show file tree
Hide file tree
Showing 12 changed files with 474 additions and 37 deletions.
32 changes: 29 additions & 3 deletions carserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"net"
"net/http"

"github.com/filecoin-project/saturn-l2/station"

"github.com/filecoin-project/saturn-l2/logs"

"github.com/filecoin-project/saturn-l2/carstore"
Expand Down Expand Up @@ -50,13 +53,16 @@ type Libp2pHttpCARServer struct {

cs *carstore.CarStore
logger *logs.SaturnLogger

spai station.StationAPI
}

func New(h host.Host, cs *carstore.CarStore, logger *logs.SaturnLogger) *Libp2pHttpCARServer {
func New(h host.Host, cs *carstore.CarStore, logger *logs.SaturnLogger, sapi station.StationAPI) *Libp2pHttpCARServer {
return &Libp2pHttpCARServer{
h: h,
cs: cs,
logger: logger,
spai: sapi,
}
}

Expand Down Expand Up @@ -129,16 +135,25 @@ func (l *Libp2pHttpCARServer) serveCARFile(w http.ResponseWriter, r *http.Reques
// we have parsed the request successfully -> start logging and serving it
l.logger.Infow(dr.reqId, "got car transfer request")

sw := &statWriter{w: w}

if err := l.cs.FetchAndWriteCAR(dr.reqId, dr.root, func(ro bstore.Blockstore) error {
ls := cidlink.DefaultLinkSystem()
bsa := bsadapter.Adapter{Wrapped: ro}
ls.SetReadStorage(&bsa)

_, err = car.TraverseV1(l.ctx, &ls, dr.root, dr.selector, w, car.WithSkipOffset(dr.skip))
_, err = car.TraverseV1(l.ctx, &ls, dr.root, dr.selector, sw, car.WithSkipOffset(dr.skip))
if err != nil {
if err := l.spai.RecordRetrievalServed(l.ctx, sw.n, 1); err != nil {
l.logger.LogError(dr.reqId, "failed to record retrieval failure", err)
}

l.logger.LogError(dr.reqId, "car transfer failed", err)
return fmt.Errorf("car traversal failed: %w", err)
}
if err := l.spai.RecordRetrievalServed(l.ctx, sw.n, 0); err != nil {
l.logger.LogError(dr.reqId, "failed to record successful retrieval", err)
}
return nil
}); err != nil {
if errors.Is(err, carstore.ErrNotFound) {
Expand All @@ -152,5 +167,16 @@ func (l *Libp2pHttpCARServer) serveCARFile(w http.ResponseWriter, r *http.Reques
}

l.logger.Infow(dr.reqId, "car transfer successful")
// TODO record sent bytes and talk to log injestor
// TODO: Talk to Log injestor here
}

type statWriter struct {
w io.Writer
n uint64
}

func (sw *statWriter) Write(p []byte) (n int, err error) {
n, err = sw.w.Write(p)
sw.n += uint64(n)
return
}
34 changes: 32 additions & 2 deletions carserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import (
"testing"
"time"

datastore "github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"

"github.com/filecoin-project/saturn-l2/logs"

cid "github.com/ipfs/go-cid"
Expand Down Expand Up @@ -41,23 +44,35 @@ func TestSimpleTransfer(t *testing.T) {
temp := t.TempDir()
ctx := context.Background()

mds := dss.MutexWrap(datastore.NewMapDatastore())
sapi := NewStationAPIImpl(mds, nil)

// create the getway api with a test http server
root, contents, svc := testutils.GetTestServerFor(t, path)
defer svc.Close()
gwAPI := carstore.NewGatewayAPI(svc.URL)
gwAPI := carstore.NewGatewayAPI(svc.URL, sapi)
lg := logs.NewSaturnLogger()
cfg := carstore.Config{}
cs, err := carstore.New(temp, gwAPI, cfg, lg)
require.NoError(t, err)
sapi.SetStorageStatsFetcher(cs)
require.NoError(t, cs.Start(ctx))

// create a mock libp2p network, two peers and a connection between them
p1, p2 := buildPeers(t, ctx)

// create and start the car server
carserver := New(p2, cs, lg)
carserver := New(p2, cs, lg, sapi)
require.NoError(t, carserver.Start(ctx))

as, err := sapi.AllStats(ctx)
require.NoError(t, err)
require.EqualValues(t, 0, as.Upload)
require.EqualValues(t, 0, as.ContentRequests)
require.EqualValues(t, 0, as.ContentReqErrors)
require.EqualValues(t, 0, as.Download)
require.EqualValues(t, 0, as.StorageStats.Bytes)

// send the request
client := libp2pHTTPClient(p1)
reqBz := mkRequest(t, root, 0)
Expand All @@ -83,6 +98,13 @@ func TestSimpleTransfer(t *testing.T) {
bz := readHTTPResponse(t, resp)
// ensure contents match
require.EqualValues(t, contents, bz)
as, err = sapi.AllStats(ctx)
require.NoError(t, err)
require.EqualValues(t, len(contents), as.Upload)
require.EqualValues(t, 1, as.ContentRequests)
require.EqualValues(t, 0, as.ContentReqErrors)
require.EqualValues(t, len(contents), as.Download)
require.EqualValues(t, len(contents), as.StorageStats.Bytes)

// send request with the skip param
reqBz = mkRequest(t, root, 101)
Expand All @@ -91,6 +113,14 @@ func TestSimpleTransfer(t *testing.T) {

bz = readHTTPResponse(t, resp)
require.EqualValues(t, contents[101:], bz)

as, err = sapi.AllStats(ctx)
require.NoError(t, err)
require.EqualValues(t, len(contents)+len(contents)-101, as.Upload)
require.EqualValues(t, 2, as.ContentRequests)
require.EqualValues(t, 0, as.ContentReqErrors)
require.EqualValues(t, len(contents), as.Download)
require.EqualValues(t, len(contents), as.StorageStats.Bytes)
}

// TODO -> Test Parallel Transfers
Expand Down
135 changes: 135 additions & 0 deletions carserver/station_api_impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package carserver

import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/ipfs/go-datastore/namespace"

datastore "github.com/ipfs/go-datastore"

"github.com/filecoin-project/saturn-l2/station"
)

var Version = "v0.0.0"
var contentReqKey = datastore.NewKey("/content-req")
var storeNameSpace = "station"

type StationAPIImpl struct {
ss station.StorageStatsFetcher

mu sync.RWMutex
ds datastore.Batching
}

func NewStationAPIImpl(ds datastore.Batching, ss station.StorageStatsFetcher) *StationAPIImpl {
nds := namespace.Wrap(ds, datastore.NewKey(storeNameSpace))
return &StationAPIImpl{
ss: ss,
ds: nds,
}
}

func (s *StationAPIImpl) SetStorageStatsFetcher(ss station.StorageStatsFetcher) {
s.ss = ss
}

func (s *StationAPIImpl) RecordRetrievalServed(ctx context.Context, bytesServed, nErrors uint64) error {
s.mu.Lock()
defer s.mu.Unlock()

return s.createOrUpdateReqStatsUnlocked(ctx, func(r *station.ReqStats) {
r.Upload = bytesServed
r.ContentRequests = 1
r.ContentReqErrors = nErrors
}, func(r *station.ReqStats) {
r.Upload += bytesServed
r.ContentRequests += 1
r.ContentReqErrors += nErrors
})
}

func (s *StationAPIImpl) RecordDataDownloaded(ctx context.Context, bytesDownloaded uint64) error {
s.mu.Lock()
defer s.mu.Unlock()

return s.createOrUpdateReqStatsUnlocked(ctx, func(r *station.ReqStats) {
r.Download = bytesDownloaded
}, func(r *station.ReqStats) {
r.Download += bytesDownloaded
})
}

func (s *StationAPIImpl) createOrUpdateReqStatsUnlocked(ctx context.Context, createFn func(s *station.ReqStats),
updateFn func(s *station.ReqStats)) error {

bz, err := s.ds.Get(ctx, contentReqKey)
if err != nil && err != datastore.ErrNotFound {
return fmt.Errorf("failed to get retrieval stats from datastore: %w", err)
}
if err == datastore.ErrNotFound {
stats := station.ReqStats{}
createFn(&stats)
bz, err := json.Marshal(stats)
if err != nil {
return fmt.Errorf("failed to marshal retrieval stats to json: %w", err)
}

return s.ds.Put(ctx, contentReqKey, bz)
}
var stats station.ReqStats
if err := json.Unmarshal(bz, &stats); err != nil {
return fmt.Errorf("failed to unmarshal existing retrieval stats: %w", err)
}

updateFn(&stats)

bz, err = json.Marshal(stats)
if err != nil {
return fmt.Errorf("failed to marshal retrieval stats to json: %w", err)
}
return s.ds.Put(ctx, contentReqKey, bz)
}

func (s *StationAPIImpl) AllStats(ctx context.Context) (station.StationStats, error) {
s.mu.RLock()
defer s.mu.RUnlock()

// storage stats
storage, err := s.ss.Stat()
if err != nil {
return station.StationStats{}, fmt.Errorf("failed to fetch storage stats: %w", err)
}

// info
info := station.RPInfo{
Version: Version,
}

// fetch retrieval stats
bz, err := s.ds.Get(ctx, contentReqKey)
if err != nil && err != datastore.ErrNotFound {
return station.StationStats{}, fmt.Errorf("failed to fetch retrieval stats: %w", err)
}
if err == datastore.ErrNotFound {
return station.StationStats{
RPInfo: info,
StorageStats: storage,
}, nil
}

var rs station.ReqStats
if err := json.Unmarshal(bz, &rs); err != nil {
return station.StationStats{}, fmt.Errorf("failed to unmarshal retrieval stats from json: %w", err)
}

return station.StationStats{
RPInfo: info,
StorageStats: storage,
ReqStats: rs,
}, nil
}

var _ station.StationAPI = &StationAPIImpl{}
85 changes: 85 additions & 0 deletions carserver/station_api_impl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package carserver

import (
"context"
"testing"

"github.com/filecoin-project/saturn-l2/station"
datastore "github.com/ipfs/go-datastore"
dss "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/require"
)

func TestStationAPIImpl(t *testing.T) {
ctx := context.Background()
ds := dss.MutexWrap(datastore.NewMapDatastore())
sapi := NewStationAPIImpl(ds, &mockStorageStatsFetcher{
out: 790,
})

as, err := sapi.AllStats(ctx)
require.NoError(t, err)
require.Equal(t, station.StationStats{RPInfo: station.RPInfo{Version: Version},
StorageStats: station.StorageStats{
Bytes: 790,
}}, as)

require.NoError(t, sapi.RecordDataDownloaded(ctx, 100))
as, err = sapi.AllStats(ctx)
require.NoError(t, err)
require.Equal(t, station.StationStats{RPInfo: station.RPInfo{Version: Version},
StorageStats: station.StorageStats{
Bytes: 790,
},
ReqStats: station.ReqStats{
Download: 100,
}}, as)

require.NoError(t, sapi.RecordDataDownloaded(ctx, 200))
as, err = sapi.AllStats(ctx)
require.NoError(t, err)
require.Equal(t, station.StationStats{RPInfo: station.RPInfo{Version: Version},
StorageStats: station.StorageStats{
Bytes: 790,
},
ReqStats: station.ReqStats{
Download: 300,
}}, as)

require.NoError(t, sapi.RecordRetrievalServed(ctx, 100, 0))
as, err = sapi.AllStats(ctx)
require.NoError(t, err)
require.Equal(t, station.StationStats{RPInfo: station.RPInfo{Version: Version},
StorageStats: station.StorageStats{
Bytes: 790,
},
ReqStats: station.ReqStats{
Upload: 100,
ContentRequests: 1,
Download: 300,
}}, as)

require.NoError(t, sapi.RecordRetrievalServed(ctx, 500, 2))
as, err = sapi.AllStats(ctx)
require.NoError(t, err)
require.Equal(t, station.StationStats{RPInfo: station.RPInfo{Version: Version},
StorageStats: station.StorageStats{
Bytes: 790,
},
ReqStats: station.ReqStats{
Upload: 600,
ContentRequests: 2,
ContentReqErrors: 2,
Download: 300,
}}, as)
}

type mockStorageStatsFetcher struct {
out uint64
}

func (ms *mockStorageStatsFetcher) Stat() (station.StorageStats, error) {
return station.StorageStats{
Bytes: ms.out,
}, nil
}
Loading

0 comments on commit dc0bcb4

Please sign in to comment.