Skip to content

Commit

Permalink
pkarr record validation (#24) (#115)
Browse files Browse the repository at this point in the history
* reduce the many PkarrRecord structs to one central struct, with validation and conversions to/from other types

Fixes #24
  • Loading branch information
finn-block authored Feb 7, 2024
1 parent 5899ac4 commit 86820d3
Show file tree
Hide file tree
Showing 12 changed files with 196 additions and 224 deletions.
98 changes: 98 additions & 0 deletions impl/pkg/pkarr/record.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package pkarr

import (
"encoding/base64"
"errors"
"fmt"

"github.com/TBD54566975/ssi-sdk/util"
"github.com/anacrolix/dht/v2/bep44"
"github.com/anacrolix/torrent/bencode"
)

type Record struct {
Value []byte `json:"v" validate:"required"`
Key [32]byte `json:"k" validate:"required"`
Signature [64]byte `json:"sig" validate:"required"`
SequenceNumber int64 `json:"seq" validate:"required"`
}

type Response struct {
V []byte `validate:"required"`
Seq int64 `validate:"required"`
Sig [64]byte `validate:"required"`
}

func NewRecord(k []byte, v []byte, sig []byte, seq int64) (*Record, error) {
record := Record{SequenceNumber: seq}

if len(k) != 32 {
return nil, errors.New("incorrect key length for pkarr record")
}
record.Key = [32]byte(k)

if len(v) > 1000 {
return nil, errors.New("pkarr record value too long")
}
record.Value = v

if len(sig) != 64 {
return nil, errors.New("incorrect sig length for pkarr record")
}
record.Signature = [64]byte(sig)

if err := record.IsValid(); err != nil {
return nil, err
}

return &record, nil
}

// IsValid returns an error if the request is invalid; also validates the signature
func (r Record) IsValid() error {
if err := util.IsValidStruct(r); err != nil {
return err
}

// validate the signature
bv, err := bencode.Marshal(r.Value)
if err != nil {
return fmt.Errorf("error bencoding pkarr record: %v", err)
}

if !bep44.Verify(r.Key[:], nil, r.SequenceNumber, bv, r.Signature[:]) {
return errors.New("signature is invalid")
}
return nil
}

func (r Record) Response() Response {
return Response{
V: r.Value,
Seq: r.SequenceNumber,
Sig: r.Signature,
}
}

func (r Record) BEP44() bep44.Put {
return bep44.Put{
V: r.Value,
K: &r.Key,
Sig: r.Signature,
Seq: r.SequenceNumber,
}
}

func (r Record) String() string {
e := base64.RawURLEncoding
return fmt.Sprintf("pkarr.Record{K=%s V=%s Sig=%s Seq=%d}", e.EncodeToString(r.Key[:]), e.EncodeToString(r.Value), e.EncodeToString(r.Signature[:]), r.SequenceNumber)
}

func RecordFromBEP44(putMsg *bep44.Put) Record {
return Record{
Key: *putMsg.K,
Value: putMsg.V.([]byte),
Signature: putMsg.Sig,
SequenceNumber: putMsg.Seq,
}
}
21 changes: 10 additions & 11 deletions impl/pkg/server/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/gin-gonic/gin"

"github.com/TBD54566975/did-dht-method/internal/util"
"github.com/TBD54566975/did-dht-method/pkg/pkarr"
"github.com/TBD54566975/did-dht-method/pkg/service"
)

Expand Down Expand Up @@ -57,7 +58,7 @@ func (r *PkarrRouter) GetRecord(c *gin.Context) {
var seqBuf [8]byte
binary.BigEndian.PutUint64(seqBuf[:], uint64(resp.Seq))
// sig:seq:v
res := append(resp.Sig[:], append(seqBuf[:], resp.V...)...)
res := append(resp.Sig[:], append(seqBuf[:], resp.V[:]...)...)
RespondBytes(c, res, http.StatusOK)
}

Expand Down Expand Up @@ -104,18 +105,16 @@ func (r *PkarrRouter) PutRecord(c *gin.Context) {

// transform the request into a service request by extracting the fields
// according to https://github.com/Nuhvi/pkarr/blob/main/design/relays.md#put
vBytes := body[72:]
keyBytes := [32]byte(key[:])
bytes := body[:64]
sigBytes := [64]byte(bytes)
value := body[72:]
sig := body[:64]
seq := int64(binary.BigEndian.Uint64(body[64:72]))
request := service.PublishPkarrRequest{
V: vBytes,
K: keyBytes,
Sig: sigBytes,
Seq: seq,
request, err := pkarr.NewRecord(key, value, sig, seq)
if err != nil {
LoggingRespondErrWithMsg(c, err, "error parsing request", http.StatusBadRequest)
return
}
if err = r.service.PublishPkarr(c, *id, request); err != nil {

if err = r.service.PublishPkarr(c, *id, *request); err != nil {
LoggingRespondErrWithMsg(c, err, "failed to publish pkarr record", http.StatusInternalServerError)
return
}
Expand Down
6 changes: 3 additions & 3 deletions impl/pkg/server/server_pkarr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestPKARRRouter(t *testing.T) {
c := newRequestContextWithParams(w, req, map[string]string{IDParam: suffix})

pkarrRouter.PutRecord(c)
assert.True(t, is2xxResponse(w.Code))
assert.True(t, is2xxResponse(w.Code), "unexpected %s", w.Result().Status)
})

t.Run("test get record", func(t *testing.T) {
Expand All @@ -48,14 +48,14 @@ func TestPKARRRouter(t *testing.T) {
c := newRequestContextWithParams(w, req, map[string]string{IDParam: suffix})

pkarrRouter.PutRecord(c)
assert.True(t, is2xxResponse(w.Code))
assert.True(t, is2xxResponse(w.Code), "unexpected %s", w.Result().Status)

w = httptest.NewRecorder()
req = httptest.NewRequest(http.MethodGet, fmt.Sprintf("%s/%s", testServerURL, suffix), nil)
c = newRequestContextWithParams(w, req, map[string]string{IDParam: suffix})

pkarrRouter.GetRecord(c)
assert.True(t, is2xxResponse(w.Code))
assert.True(t, is2xxResponse(w.Code), "unexpected %s", w.Result().Status)

resp, err := io.ReadAll(w.Body)
assert.NoError(t, err)
Expand Down
110 changes: 16 additions & 94 deletions impl/pkg/service/pkarr.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,21 @@ package service

import (
"context"
"errors"
"time"

"github.com/goccy/go-json"

"github.com/TBD54566975/did-dht-method/internal/util"
ssiutil "github.com/TBD54566975/ssi-sdk/util"
"github.com/allegro/bigcache/v3"
"github.com/anacrolix/dht/v2/bep44"
"github.com/anacrolix/torrent/bencode"
"github.com/sirupsen/logrus"

"github.com/TBD54566975/did-dht-method/config"
dhtint "github.com/TBD54566975/did-dht-method/internal/dht"
"github.com/TBD54566975/did-dht-method/pkg/dht"
"github.com/TBD54566975/did-dht-method/pkg/pkarr"
"github.com/TBD54566975/did-dht-method/pkg/storage"
"github.com/TBD54566975/did-dht-method/pkg/storage/pkarr"
)

const recordSizeLimit = 1000
Expand Down Expand Up @@ -67,55 +65,18 @@ func NewPkarrService(cfg *config.Config, db storage.Storage) (*PkarrService, err
return &service, nil
}

// PublishPkarrRequest is the request to publish a Pkarr record
type PublishPkarrRequest struct {
V []byte `validate:"required"`
K [32]byte `validate:"required"`
Sig [64]byte `validate:"required"`
Seq int64 `validate:"required"`
}

// isValid returns an error if the request is invalid; also validates the signature
func (p PublishPkarrRequest) isValid() error {
if err := ssiutil.IsValidStruct(p); err != nil {
return err
}
// validate the signature
bv, err := bencode.Marshal(p.V)
if err != nil {
return err
}
if !bep44.Verify(p.K[:], nil, p.Seq, bv, p.Sig[:]) {
return errors.New("signature is invalid")
}
return nil
}

func (p PublishPkarrRequest) toRecord() pkarr.Record {
return pkarr.Record{
V: p.V,
K: p.K[:],
Sig: p.Sig[:],
Seq: p.Seq,
}
}

// PublishPkarr stores the record in the db, publishes the given Pkarr record to the DHT, and returns the z-base-32 encoded ID
func (s *PkarrService) PublishPkarr(ctx context.Context, id string, request PublishPkarrRequest) error {
if err := request.isValid(); err != nil {
func (s *PkarrService) PublishPkarr(ctx context.Context, id string, record pkarr.Record) error {
if err := record.IsValid(); err != nil {
return err
}

// write to db and cache
record := request.toRecord()
if err := s.db.WriteRecord(ctx, record); err != nil {
return err
}
recordBytes, err := json.Marshal(GetPkarrResponse{
V: request.V,
Seq: request.Seq,
Sig: request.Sig,
})

recordBytes, err := json.Marshal(record.Response())
if err != nil {
return err
}
Expand All @@ -127,12 +88,7 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, request Publ
// return here and put it in the DHT asynchronously
// TODO(gabe): consider a background process to monitor failures
go func() {
_, err := s.dht.Put(ctx, bep44.Put{
V: request.V,
K: &request.K,
Sig: request.Sig,
Seq: request.Seq,
})
_, err := s.dht.Put(ctx, record.BEP44())
if err != nil {
logrus.WithError(err).Error("error from dht.Put")
}
Expand All @@ -141,26 +97,11 @@ func (s *PkarrService) PublishPkarr(ctx context.Context, id string, request Publ
return nil
}

// GetPkarrResponse is the response to a get Pkarr request
type GetPkarrResponse struct {
V []byte `validate:"required"`
Seq int64 `validate:"required"`
Sig [64]byte `validate:"required"`
}

func fromPkarrRecord(record pkarr.Record) (*GetPkarrResponse, error) {
return &GetPkarrResponse{
V: record.V,
Seq: record.Seq,
Sig: [64]byte(record.Sig),
}, nil
}

// GetPkarr returns the full Pkarr record (including sig data) for the given z-base-32 encoded ID
func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*GetPkarrResponse, error) {
func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*pkarr.Response, error) {
// first do a cache lookup
if got, err := s.cache.Get(id); err == nil {
var resp GetPkarrResponse
var resp pkarr.Response
if err = json.Unmarshal(got, &resp); err != nil {
return nil, err
}
Expand All @@ -186,14 +127,12 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*GetPkarrRespon
}

logrus.WithField("record", id).Debug("resolved pkarr record from storage")
resp, err := fromPkarrRecord(*record)
if err == nil {
if err = s.addRecordToCache(id, *resp); err != nil {
logrus.WithError(err).WithField("record", id).Error("failed to set pkarr record in cache")
}
resp := record.Response()
if err = s.addRecordToCache(id, record.Response()); err != nil {
logrus.WithError(err).WithField("record", id).Error("failed to set pkarr record in cache")
}

return resp, err
return &resp, err
}

// prepare the record for return
Expand All @@ -205,7 +144,7 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*GetPkarrRespon
if err = bencode.Unmarshal(bBytes, &payload); err != nil {
return nil, err
}
resp := GetPkarrResponse{
resp := pkarr.Response{
V: []byte(payload),
Seq: got.Seq,
Sig: got.Sig,
Expand All @@ -219,7 +158,7 @@ func (s *PkarrService) GetPkarr(ctx context.Context, id string) (*GetPkarrRespon
return &resp, nil
}

func (s *PkarrService) addRecordToCache(id string, resp GetPkarrResponse) error {
func (s *PkarrService) addRecordToCache(id string, resp pkarr.Response) error {
recordBytes, err := json.Marshal(resp)
if err != nil {
return err
Expand Down Expand Up @@ -252,14 +191,7 @@ func (s *PkarrService) republish() {
logrus.WithField("record_count", len(allRecords)).Info("Republishing record")

for _, record := range allRecords {
put, err := recordToBEP44Put(record)
if err != nil {
logrus.WithError(err).Error("failed to convert record to bep44 put")
errCnt++
continue
}

if _, err = s.dht.Put(context.Background(), *put); err != nil {
if _, err = s.dht.Put(context.Background(), record.BEP44()); err != nil {
logrus.WithError(err).Error("failed to republish record")
errCnt++
continue
Expand All @@ -272,15 +204,5 @@ func (s *PkarrService) republish() {
break
}
}

logrus.WithField("success", successCnt).WithField("errors", errCnt).Info("Republishing complete")
}

func recordToBEP44Put(record pkarr.Record) (*bep44.Put, error) {
return &bep44.Put{
V: record.V,
K: (*[32]byte)(record.K),
Sig: [64]byte(record.Sig),
Seq: record.Seq,
}, nil
logrus.Infof("Republishing complete. Successfully republished %d out of %d record(s)", len(allRecords)-errCnt, len(allRecords))
}
Loading

0 comments on commit 86820d3

Please sign in to comment.