Skip to content

Commit

Permalink
feat: add logger, context; fix: download failure (#16)
Browse files Browse the repository at this point in the history
* feat: put logger in option

* feat: add context in async functions

* feat: add log option in batcher initializer

* fix: download

* feat: segment length check
  • Loading branch information
MiniFrenchBread authored Jul 5, 2024
1 parent 07307f1 commit 98b2184
Show file tree
Hide file tree
Showing 28 changed files with 362 additions and 278 deletions.
7 changes: 5 additions & 2 deletions cmd/download.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package cmd

import (
"context"

"github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/node"
"github.com/0glabs/0g-storage-client/transfer"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -37,12 +40,12 @@ func init() {
func download(*cobra.Command, []string) {
nodes := node.MustNewClients(downloadArgs.nodes)

downloader, err := transfer.NewDownloader(nodes)
downloader, err := transfer.NewDownloader(nodes, common.LogOption{Logger: logrus.StandardLogger()})
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize downloader")
}

if err := downloader.Download(downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
if err := downloader.Download(context.Background(), downloadArgs.root, downloadArgs.file, downloadArgs.proof); err != nil {
logrus.WithError(err).Fatal("Failed to download file")
}
}
7 changes: 5 additions & 2 deletions cmd/upload.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package cmd

import (
"context"

zg_common "github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/blockchain"
"github.com/0glabs/0g-storage-client/contract"
"github.com/0glabs/0g-storage-client/core"
Expand Down Expand Up @@ -68,7 +71,7 @@ func upload(*cobra.Command, []string) {
defer client.Close()
}

uploader, err := transfer.NewUploader(flow, clients)
uploader, err := transfer.NewUploader(flow, clients, zg_common.LogOption{Logger: logrus.StandardLogger()})
if err != nil {
logrus.WithError(err).Fatal("Failed to initialize uploader")
}
Expand All @@ -84,7 +87,7 @@ func upload(*cobra.Command, []string) {
}
defer file.Close()

if err := uploader.Upload(file, opt); err != nil {
if err := uploader.Upload(context.Background(), file, opt); err != nil {
logrus.WithError(err).Fatal("Failed to upload file")
}
}
3 changes: 0 additions & 3 deletions common/blockchain/contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/openweb3/web3go"
"github.com/openweb3/web3go/types"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)

var CustomGasPrice uint64
Expand Down Expand Up @@ -50,8 +49,6 @@ func Deploy(clientWithSigner *web3go.Client, dataOrFile string) (common.Address,
return common.Address{}, errors.WithMessage(err, "Failed to send transaction")
}

logrus.WithField("hash", txHash).Info("Transaction sent to blockchain")

receipt, err := WaitForReceipt(clientWithSigner, txHash, true)
if err != nil {
return common.Address{}, errors.WithMessage(err, "Failed to wait for receipt")
Expand Down
7 changes: 4 additions & 3 deletions common/blockchain/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ var Web3LogEnabled bool
type RetryOption struct {
Rounds uint
Interval time.Duration
logger *logrus.Logger
}

func MustNewWeb3(url, key string) *web3go.Client {
Expand Down Expand Up @@ -63,13 +64,13 @@ func WaitForReceipt(client *web3go.Client, txHash common.Hash, successRequired b
if len(opts) > 0 {
opt = opts[0]
} else {
// default infinite wait
opt.Rounds = 0
// default 10 rounds
opt.Rounds = 10
opt.Interval = time.Second * 3
}

var tries uint
reminder := util.NewReminder(logrus.TraceLevel, time.Minute)
reminder := util.NewReminder(opt.logger, time.Minute)
for receipt == nil {
if tries > opt.Rounds+1 && opt.Rounds != 0 {
return nil, errors.New("no receipt after max retries")
Expand Down
25 changes: 25 additions & 0 deletions common/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package common

import (
"io"

"github.com/sirupsen/logrus"
)

type LogOption struct {
LogLevel logrus.Level
Logger *logrus.Logger
}

func NewLogger(opt ...LogOption) *logrus.Logger {
logger := logrus.New()
if len(opt) == 0 {
logger.Out = io.Discard
return logger
}
if opt[0].Logger != nil {
return opt[0].Logger
}
logger.SetLevel(opt[0].LogLevel)
return logger
}
4 changes: 3 additions & 1 deletion common/parallel/interface.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package parallel

import "context"

type Result struct {
Routine int
Task int
Expand All @@ -8,6 +10,6 @@ type Result struct {
}

type Interface interface {
ParallelDo(routine, task int) (interface{}, error)
ParallelDo(ctx context.Context, routine, task int) (interface{}, error)
ParallelCollect(result *Result) error
}
6 changes: 3 additions & 3 deletions common/parallel/serial.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"sync"
)

func Serial(parallelizable Interface, tasks, routines, window int) error {
func Serial(ctx context.Context, parallelizable Interface, tasks, routines, window int) error {
if tasks == 0 {
return nil
}
Expand All @@ -29,7 +29,7 @@ func Serial(parallelizable Interface, tasks, routines, window int) error {
defer close(resultCh)

var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(ctx)

// start routines to do tasks
for i := 0; i < routines; i++ {
Expand All @@ -56,7 +56,7 @@ func work(ctx context.Context, routine int, parallelizable Interface, taskCh <-c
case <-ctx.Done():
return
case task := <-taskCh:
val, err := parallelizable.ParallelDo(routine, task)
val, err := parallelizable.ParallelDo(ctx, routine, task)
resultCh <- &Result{routine, task, val, err}
if err != nil {
return
Expand Down
5 changes: 3 additions & 2 deletions common/parallel/serial_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package parallel

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -11,7 +12,7 @@ type foo struct {
result []int
}

func (f *foo) ParallelDo(routine, task int) (interface{}, error) {
func (f *foo) ParallelDo(ctx context.Context, routine, task int) (interface{}, error) {
return task * task, nil
}

Expand All @@ -30,7 +31,7 @@ func TestSerial(t *testing.T) {

tasks := 100

err := Serial(&f, tasks, 4, 16)
err := Serial(context.Background(), &f, tasks, 4, 16)
assert.Nil(t, err)
assert.Equal(t, tasks, len(f.result))

Expand Down
24 changes: 13 additions & 11 deletions common/util/reminder.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
package util

import (
"io"
"time"

"github.com/sirupsen/logrus"
)

// Reminder is used for time consuming operations to remind user about progress.
type Reminder struct {
start time.Time // start time since last warn
interval time.Duration // interval to warn once
level logrus.Level // log level to remind in general
start time.Time // start time since last warn
interval time.Duration // interval to warn once
logger *logrus.Logger // log level to remind in general
}

// NewReminder returns a new Reminder instance.
//
// `level`: log level to remind in general.
//
// `interval`: interval to remind in warning level.
func NewReminder(level logrus.Level, interval time.Duration) *Reminder {
if level < logrus.InfoLevel {
panic("invalid log level to remind in general")
func NewReminder(logger *logrus.Logger, interval time.Duration) *Reminder {
if logger == nil {
logger = logrus.New()
logger.Out = io.Discard
}

return &Reminder{
start: time.Now(),
interval: interval,
level: level,
logger: logger,
}
}

Expand All @@ -40,15 +42,15 @@ func (reminder *Reminder) Remind(message string, fields ...logrus.Fields) {
if time.Since(reminder.start) > reminder.interval {
reminder.remind(logrus.WarnLevel, message, fields...)
reminder.start = time.Now()
} else if logrus.IsLevelEnabled(reminder.level) {
reminder.remind(reminder.level, message, fields...)
} else {
reminder.remind(reminder.logger.Level, message, fields...)
}
}

func (reminder *Reminder) remind(level logrus.Level, message string, fields ...logrus.Fields) {
if len(fields) > 0 {
logrus.WithFields(fields[0]).Log(level, message)
reminder.logger.WithFields(fields[0]).Log(level, message)
} else {
logrus.StandardLogger().Log(level, message)
reminder.logger.Log(level, message)
}
}
8 changes: 2 additions & 6 deletions core/dataflow.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package core

import (
"context"
"errors"
"runtime"
"time"

"github.com/0glabs/0g-storage-client/common/parallel"
"github.com/0glabs/0g-storage-client/core/merkle"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/sirupsen/logrus"
)

const (
Expand Down Expand Up @@ -38,7 +37,6 @@ type IterableData interface {
}

func MerkleTree(data IterableData) (*merkle.Tree, error) {
stageTimer := time.Now()
var builder merkle.TreeBuilder
initializer := &TreeBuilderInitializer{
data: data,
Expand All @@ -47,13 +45,11 @@ func MerkleTree(data IterableData) (*merkle.Tree, error) {
builder: &builder,
}

err := parallel.Serial(initializer, NumSegmentsPadded(data), runtime.GOMAXPROCS(0), 0)
err := parallel.Serial(context.Background(), initializer, NumSegmentsPadded(data), runtime.GOMAXPROCS(0), 0)
if err != nil {
return nil, err
}

logrus.WithField("duration", time.Since(stageTimer)).Info("create segment root took")

return builder.Build(), nil
}

Expand Down
15 changes: 8 additions & 7 deletions core/flow.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package core

import (
"context"
"math"
"math/big"
"runtime"
"time"

"github.com/0glabs/0g-storage-client/common"
"github.com/0glabs/0g-storage-client/common/parallel"
"github.com/0glabs/0g-storage-client/contract"
"github.com/0glabs/0g-storage-client/core/merkle"
Expand All @@ -15,10 +16,12 @@ import (
type Flow struct {
data IterableData
tags []byte

logger *logrus.Logger
}

func NewFlow(data IterableData, tags []byte) *Flow {
return &Flow{data, tags}
func NewFlow(data IterableData, tags []byte, opts ...common.LogOption) *Flow {
return &Flow{data: data, tags: tags, logger: common.NewLogger(opts...)}
}

func (flow *Flow) CreateSubmission() (*contract.Submission, error) {
Expand All @@ -28,7 +31,6 @@ func (flow *Flow) CreateSubmission() (*contract.Submission, error) {
Tags: flow.tags,
}

stageTimer := time.Now()
var offset int64
for _, chunks := range flow.splitNodes() {
node, err := flow.createNode(offset, chunks)
Expand All @@ -38,7 +40,6 @@ func (flow *Flow) CreateSubmission() (*contract.Submission, error) {
submission.Nodes = append(submission.Nodes, *node)
offset += chunks * DefaultChunkSize
}
logrus.WithField("duration", time.Since(stageTimer)).Info("create submission nodes took")

return &submission, nil
}
Expand Down Expand Up @@ -88,7 +89,7 @@ func (flow *Flow) splitNodes() []int64 {
}
nextChunkSize /= 2
}
logrus.WithFields(logrus.Fields{
flow.logger.WithFields(logrus.Fields{
"chunks": chunks,
"nodeSize": nodes,
}).Debug("SplitNodes")
Expand All @@ -114,7 +115,7 @@ func (flow *Flow) createSegmentNode(offset, batch, size int64) (*contract.Submis
builder: &builder,
}

err := parallel.Serial(initializer, int((size-1)/batch+1), runtime.GOMAXPROCS(0), 0)
err := parallel.Serial(context.Background(), initializer, int((size-1)/batch+1), runtime.GOMAXPROCS(0), 0)
if err != nil {
return nil, err
}
Expand Down
4 changes: 3 additions & 1 deletion core/tree_builder_initializer.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package core

import (
"context"

"github.com/0glabs/0g-storage-client/common/parallel"
"github.com/0glabs/0g-storage-client/core/merkle"
"github.com/ethereum/go-ethereum/common"
Expand All @@ -22,7 +24,7 @@ func (t *TreeBuilderInitializer) ParallelCollect(result *parallel.Result) error
}

// ParallelDo implements parallel.Interface.
func (t *TreeBuilderInitializer) ParallelDo(routine int, task int) (interface{}, error) {
func (t *TreeBuilderInitializer) ParallelDo(ctx context.Context, routine int, task int) (interface{}, error) {
offset := t.offset + int64(task)*t.batch
buf, err := ReadAt(t.data, int(t.batch), offset, t.data.PaddedSize())
if err != nil {
Expand Down
Loading

0 comments on commit 98b2184

Please sign in to comment.