Skip to content
This repository has been archived by the owner on Aug 8, 2024. It is now read-only.

Commit

Permalink
Merge pull request #91 from JackalLabs/small-refactor
Browse files Browse the repository at this point in the history
Small refactor
  • Loading branch information
TheMarstonConnell authored Jun 26, 2023
2 parents 53dae82 + fc83070 commit 041b1c7
Show file tree
Hide file tree
Showing 10 changed files with 516 additions and 124 deletions.
105 changes: 58 additions & 47 deletions jprov/queue/queue.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package queue

import (
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -35,6 +36,58 @@ func (q *UploadQueue) Append(upload *types.Upload) {
q.Queue = append(q.Queue, upload)
}

// Return a list of messages of the upload queue up to maxMessageSize in FCFS order.
// Returns nil in these conditions:
// 1. maxMessageSize is too small
// 2. the UploadQueue is locked
// 3. the upload queue is empty
func (q *UploadQueue) PrepareMessage(maxMessageSize int) (messages []cosmosTypes.Msg) {
if maxMessageSize < 1 || !q.Locked || len(q.Queue) == 0 {
return nil
}

var netMsgSize int
for _, q := range q.Queue {
msgSize := len(q.Message.String())

if netMsgSize+msgSize > maxMessageSize {
break
} else {
netMsgSize += msgSize
messages = append(messages, q.Message)
}
}

return
}

// Update the upload queue with the parameter fields of count
func (q *UploadQueue) UpdateQueue(count int, err error, res *cosmosTypes.TxResponse) {
if !q.Locked || len(q.Queue) == 0 || len(q.Queue) < count {
return
}

for i := 0; i < count; i++ {
q := q.Queue[i]

if err != nil {
q.Err = err
} else {
if res != nil {
if res.Code != 0 {
q.Err = errors.New(res.RawLog)
} else {
q.Response = res
}
}
}

if q.Callback != nil {
q.Callback.Done()
}
}
}

func (q *UploadQueue) listenOnce(cmd *cobra.Command, providerName string) {
if q.Locked {
return
Expand All @@ -46,9 +99,7 @@ func (q *UploadQueue) listenOnce(cmd *cobra.Command, providerName string) {

ctx := utils.GetServerContextFromCmd(cmd)

l := len(q.Queue)

if l == 0 {
if len(q.Queue) == 0 {
return
}

Expand All @@ -57,56 +108,16 @@ func (q *UploadQueue) listenOnce(cmd *cobra.Command, providerName string) {
ctx.Logger.Error(err.Error())
}

var totalSizeOfMsgs int
msgs := make([]cosmosTypes.Msg, 0)
uploads := make([]*types.Upload, 0)

for i := 0; i < l; i++ { // loop through entire queue

upload := q.Queue[i]

uploadSize := len(upload.Message.String())

// if the size of the upload would put us past our cap, we cut off the queue and send only what fits
if totalSizeOfMsgs+uploadSize > maxSize {
msgs = msgs[:len(msgs)-1]
uploads = uploads[:len(uploads)-1]
l = i

break
} else {
uploads = append(uploads, upload)
msgs = append(msgs, upload.Message)
totalSizeOfMsgs += len(upload.Message.String())
}

}
msgs := q.PrepareMessage(maxSize)

clientCtx := client.GetClientContextFromCmd(cmd)
ctx.Logger.Debug(fmt.Sprintf("total no. of msgs in proof transaction is: %d", len(msgs)))

res, err := utils.SendTx(clientCtx, cmd.Flags(), fmt.Sprintf("Storage Provided by %s", providerName), msgs...)
for _, v := range uploads {
if v == nil {
continue
}
if err != nil {
v.Err = err
} else {
if res != nil {
if res.Code != 0 {
v.Err = fmt.Errorf(res.RawLog)
} else {
v.Response = res
}
}
}
if v.Callback != nil {
v.Callback.Done()
}
}

q.Queue = q.Queue[l:] // pop every upload that fit off the queue
q.UpdateQueue(len(msgs), err, res)

q.Queue = q.Queue[len(msgs):] // pop every upload that fit off the queue
}

func (q *UploadQueue) StartListener(cmd *cobra.Command, providerName string) {
Expand Down
61 changes: 61 additions & 0 deletions jprov/queue/queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ func setupQueue(t *testing.T) (queue.UploadQueue, *require.Assertions) {
return q, require
}

func setupUpload(count int) (upload []*types.Upload) {
for i := 0; i < count; i++ {
msg := storagetypes.NewMsgInitProvider(
"test-address",
"localhost:3333",
"1000",
"test-key",
)
upload = append(upload, &types.Upload{Message: msg})
}

return
}

func TestAppend(t *testing.T) {
q, require := setupQueue(t)

Expand Down Expand Up @@ -56,3 +70,50 @@ func TestAppend(t *testing.T) {

require.Equal(stringQueue, string(data))
}

func TestPrepareMessage(t *testing.T) {
cases := map[string]struct {
uq queue.UploadQueue
maxMsgSize int
resultSize int
}{
"empty_queue": {
uq: queue.UploadQueue{
Locked: true,
},
maxMsgSize: 10,
resultSize: 0,
},
"queue_exceed_max": {
uq: queue.UploadQueue{
Locked: true,
Queue: setupUpload(10),
},
maxMsgSize: 1,
resultSize: 0,
},
"queue_msg_length": {
uq: queue.UploadQueue{
Locked: true,
Queue: setupUpload(1),
},
maxMsgSize: 500,
resultSize: len(setupUpload(1)[0].Message.String()),
},
}

for name, c := range cases {
t.Run(name, func(t *testing.T) {
msgs := c.uq.PrepareMessage(c.maxMsgSize)
var msgSize int
for _, m := range msgs {
msgSize += len(m.String())
}

if c.resultSize != msgSize {
t.Log("Expected size: ", c.resultSize, " Result size: ", msgSize)
t.Fail()
}
})
}
}
73 changes: 44 additions & 29 deletions jprov/server/attestation.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
Expand All @@ -18,6 +19,40 @@ import (
"github.com/spf13/cobra"
)

func verifyAttest(deal storageTypes.ActiveDeals, attest types.AttestRequest) (verified bool, err error) {
merkle := deal.Merkle
block := deal.Blocktoprove
blockNum, err := strconv.ParseInt(block, 10, 64)
if err != nil {
return false, err
}

verified = storageKeeper.VerifyDeal(merkle, attest.HashList, blockNum, attest.Item)

return
}

func addMsgAttest(address string, cid string, q *queue.UploadQueue) (upload types.Upload, err error) {
msg := storageTypes.NewMsgAttest(address, cid)

if err := msg.ValidateBasic(); err != nil {
return upload, err
}

var wg sync.WaitGroup
wg.Add(1)

upload = types.Upload{
Message: msg,
Err: nil,
Callback: &wg,
Response: nil,
}

q.Append(&upload)
return
}

func attest(w *http.ResponseWriter, r *http.Request, cmd *cobra.Command, q *queue.UploadQueue) {
clientCtx, qerr := client.GetClientTxContext(cmd)
if qerr != nil {
Expand Down Expand Up @@ -45,22 +80,16 @@ func attest(w *http.ResponseWriter, r *http.Request, cmd *cobra.Command, q *queu
deal, err := queryClient.ActiveDeals(context.Background(), dealReq)
if err != nil {
http.Error(*w, err.Error(), http.StatusBadRequest)
return
}

merkle := deal.ActiveDeals.Merkle
block := deal.ActiveDeals.Blocktoprove
blockNum, err := strconv.ParseInt(block, 10, 64)
verified, err := verifyAttest(deal.ActiveDeals, attest)
if err != nil {
http.Error(*w, err.Error(), http.StatusBadRequest)
return
}

verified := storageKeeper.VerifyDeal(merkle, attest.HashList, blockNum, attest.Item)

if !verified {
http.Error(*w, err.Error(), http.StatusBadRequest)
return
http.Error(*w, errors.New("failed to verify attest").Error(), http.StatusBadRequest)
}

address, err := crypto.GetAddress(clientCtx)
Expand All @@ -69,35 +98,21 @@ func attest(w *http.ResponseWriter, r *http.Request, cmd *cobra.Command, q *queu
return
}

msg := storageTypes.NewMsgAttest( // create new attest
address,
attest.Cid,
)
if err := msg.ValidateBasic(); err != nil {
upload, err := addMsgAttest(address, attest.Cid, q)
if err != nil {
http.Error(*w, err.Error(), http.StatusBadRequest)
return
}

var wg sync.WaitGroup
wg.Add(1)

u := types.Upload{
Message: msg,
Err: nil,
Callback: &wg,
Response: nil,
}

q.Append(&u)
wg.Wait()
upload.Callback.Wait()

if u.Err != nil {
http.Error(*w, u.Err.Error(), http.StatusBadRequest)
if upload.Err != nil {
http.Error(*w, upload.Err.Error(), http.StatusBadRequest)
return
}

if u.Response.Code != 0 {
http.Error(*w, fmt.Errorf(u.Response.RawLog).Error(), http.StatusBadRequest)
if upload.Response.Code != 0 {
http.Error(*w, fmt.Errorf(upload.Response.RawLog).Error(), http.StatusBadRequest)
return
}

Expand Down
Loading

0 comments on commit 041b1c7

Please sign in to comment.