From 43019ad70fa94ee6c493755e4e9752b7da085b60 Mon Sep 17 00:00:00 2001 From: 0xluk Date: Mon, 4 Mar 2024 16:26:59 +0200 Subject: [PATCH] moved logic from main folder to dedicated ones --- factory/factory.go | 77 +++++++++++++++ main.go | 211 +++++++---------------------------------- processor/processor.go | 124 ++++++++++++++++++++++++ validator/validator.go | 132 +------------------------- 4 files changed, 237 insertions(+), 307 deletions(-) create mode 100644 factory/factory.go create mode 100644 processor/processor.go diff --git a/factory/factory.go b/factory/factory.go new file mode 100644 index 0000000..0a924c1 --- /dev/null +++ b/factory/factory.go @@ -0,0 +1,77 @@ +package factory + +import ( + "context" + "encoding/json" + "fmt" + "github.com/pkg/errors" + qubic "github.com/qubic/go-node-connector" + "io" + "math/rand" + "net/http" + "time" +) + +type QubicConnectionFactory struct { + nodeFetcherTimeout time.Duration + nodeFetcherUrl string +} + +func NewQubicConnection(nodeFetcherTimeout time.Duration, nodeFetcherUrl string) *QubicConnectionFactory { + return &QubicConnectionFactory{nodeFetcherTimeout: nodeFetcherTimeout, nodeFetcherUrl: nodeFetcherUrl} +} + +func (cf *QubicConnectionFactory) Connect() (interface{}, error) { + ctx, cancel := context.WithTimeout(context.Background(), cf.nodeFetcherTimeout) + defer cancel() + + peer, err := cf.getNewRandomPeer(ctx) + if err != nil { + return nil, errors.Wrap(err, "getting new random peer") + } + + client, err := qubic.NewClient(ctx, peer, "21841") + if err != nil { + return nil, errors.Wrap(err, "creating qubic client") + } + + fmt.Printf("connected to: %s\n", peer) + return client, nil +} + +func (cf *QubicConnectionFactory) Close(v interface{}) error { return v.(*qubic.Client).Close() } + +type response struct { + Peers []string `json:"peers"` + Length int `json:"length"` + LastUpdated int64 `json:"last_updated"` +} + +func (cf *QubicConnectionFactory) getNewRandomPeer(ctx context.Context) (string, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, cf.nodeFetcherUrl, nil) + if err != nil { + return "", errors.Wrap(err, "creating new request") + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return "", errors.Wrap(err, "getting peers from node fetcher") + } + + var resp response + body, err := io.ReadAll(res.Body) + if err != nil { + return "", errors.Wrap(err, "reading response body") + } + + err = json.Unmarshal(body, &resp) + if err != nil { + return "", errors.Wrap(err, "unmarshalling response") + } + + peer := resp.Peers[rand.Intn(len(resp.Peers))] + + fmt.Printf("Got %d new peers. Selected random %s\n", len(resp.Peers), peer) + + return peer, nil +} diff --git a/main.go b/main.go index 4375d4e..66f01b1 100644 --- a/main.go +++ b/main.go @@ -1,26 +1,20 @@ package main import ( - "context" - "encoding/json" "fmt" "github.com/ardanlabs/conf" "github.com/cockroachdb/pebble" "github.com/pkg/errors" + "github.com/qubic/go-archiver/factory" + "github.com/qubic/go-archiver/processor" "github.com/qubic/go-archiver/rpc" "github.com/qubic/go-archiver/store" - "github.com/qubic/go-archiver/validator" "github.com/silenceper/pool" - "io" "log" - "math/rand" - "net/http" "os" "os/signal" "syscall" "time" - - qubic "github.com/qubic/go-node-connector" ) const prefix = "QUBIC_ARCHIVER" @@ -37,16 +31,22 @@ func run() error { ReadTimeout time.Duration `conf:"default:5s"` WriteTimeout time.Duration `conf:"default:5s"` ShutdownTimeout time.Duration `conf:"default:5s"` - HttpHost string `conf:"0.0.0.0:8000"` - GrpcHost string `conf:"0.0.0.0:8001"` + HttpHost string `conf:"default:0.0.0.0:8000"` + GrpcHost string `conf:"default:0.0.0.0:8001"` + } + Pool struct { + NodeFetcherUrl string `conf:"default:http://127.0.0.1:8080/peers"` + NodeFetcherTimeout time.Duration `conf:"default:2s"` + InitialCap int `conf:"default:5"` + MaxIdle int `conf:"default:20"` + MaxCap int `conf:"default:30"` + IdleTimeout time.Duration `conf:"default:15s"` } Qubic struct { - NodeIp string `conf:"default:212.51.150.253"` - NodePort string `conf:"default:21841"` - FallbackTick uint64 `conf:"default:12710000"` - BatchSize uint64 `conf:"default:500"` - NodeFetcherEndpoint string `conf:"default:http://127.0.0.1:8080/peers"` - StorageFolder string `conf:"default:store"` + NodePort string `conf:"default:21841"` + FallbackTick uint64 `conf:"default:12769858"` + StorageFolder string `conf:"default:store"` + ProcessTickTimeout time.Duration `conf:"default:5s"` } } @@ -90,176 +90,35 @@ func run() error { shutdown := make(chan os.Signal, 1) signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM) - ticker := time.NewTicker(5 * time.Second) - - cf := &connectionFactory{nodeFetcherHost: cfg.Qubic.NodeFetcherEndpoint} - //p, err := connpool.NewChannelPool(5, 30, cf.connect) - //if err != nil { - // return errors.Wrap(err, "creating new connection pool") - //} - + fact := factory.NewQubicConnection(cfg.Pool.NodeFetcherTimeout, cfg.Pool.NodeFetcherUrl) poolConfig := pool.Config{ - InitialCap: 5, - MaxIdle: 20, - MaxCap: 30, - Factory: cf.connect, - Close: cf.close, - //Ping: ping, + InitialCap: cfg.Pool.InitialCap, + MaxIdle: cfg.Pool.MaxIdle, + MaxCap: cfg.Pool.MaxCap, + Factory: fact.Connect, + Close: fact.Close, //The maximum idle time of the connection, the connection exceeding this time will be closed, which can avoid the problem of automatic failure when connecting to EOF when idle - IdleTimeout: 15 * time.Second, + IdleTimeout: cfg.Pool.IdleTimeout, } - p, err := pool.NewChannelPool(&poolConfig) + chPool, err := pool.NewChannelPool(&poolConfig) if err != nil { return errors.Wrap(err, "creating new connection pool") } + p := processor.NewProcessor(chPool, ps, cfg.Qubic.FallbackTick, cfg.Qubic.ProcessTickTimeout) + archiveErrors := make(chan error, 1) + + // Start the service listening for requests. + go func() { + archiveErrors <- p.Start() + }() + for { select { case <-shutdown: - log.Fatalf("Shutting down") - case <-ticker.C: - err := do(p, cfg.Qubic.FallbackTick, cfg.Qubic.BatchSize, ps) - if err != nil { - log.Printf("do err: %s. pool len: %d\n", err.Error(), p.Len()) - } - } - } -} - -func do(pool pool.Pool, fallbackTick, batchSize uint64, ps *store.PebbleStore) error { - //ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) - //defer cancel() - - v, err := pool.Get() - if err != nil { - return errors.Wrap(err, "getting initial connection") - } - - client := v.(*qubic.Client) - err = validateMultiple(client, fallbackTick, batchSize, ps) - if err != nil { - cErr := pool.Close(v) - if cErr != nil { - log.Printf("Closing conn failed: %s", cErr.Error()) - } - return errors.Wrap(err, "validating multiple") - } else { - err = pool.Put(client) - if err != nil { - log.Printf("Putting conn back to pool failed: %s", err.Error()) - } - } - - log.Println("Batch completed") - - return nil -} - -func validateMultiple(client *qubic.Client, fallbackStartTick uint64, batchSize uint64, ps *store.PebbleStore) error { - ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) - defer cancel() - - val := validator.NewValidator(client, ps) - tickInfo, err := client.GetTickInfo(ctx) - if err != nil { - return errors.Wrap(err, "getting tick info") - } - targetTick := uint64(tickInfo.Tick) - startTick, err := getNextProcessingTick(ctx, fallbackStartTick, ps) - if err != nil { - return errors.Wrap(err, "getting next processing tick") - } - - if targetTick-startTick > batchSize { - targetTick = startTick + batchSize - } - - if targetTick <= startTick { - return errors.Errorf("Target processing tick %d is not greater than last processing tick %d", targetTick, startTick) - } - - log.Printf("Current batch starting from tick: %d until tick: %d", startTick, targetTick) - start := time.Now().Unix() - for t := startTick; t <= targetTick; t++ { - stepIn := time.Now().Unix() - if err := val.ValidateTick(context.Background(), t); err != nil { - return errors.Wrapf(err, "validating tick %d", t) - } - err := ps.SetLastProcessedTick(ctx, t) - if err != nil { - return errors.Wrapf(err, "setting last processed tick %d", t) + return errors.New("shutting down") + case err := <-archiveErrors: + return errors.Wrap(err, "archiver error") } - stepOut := time.Now().Unix() - log.Printf("Tick %d validated in %d seconds", t, stepOut-stepIn) } - end := time.Now().Unix() - log.Printf("%d ticks validated in %d seconds", targetTick-startTick, end-start) - return nil -} - -func getNextProcessingTick(ctx context.Context, fallBackTick uint64, ps *store.PebbleStore) (uint64, error) { - lastTick, err := ps.GetLastProcessedTick(ctx) - if err != nil { - if errors.Cause(err) == store.ErrNotFound { - return fallBackTick, nil - } - - return 0, errors.Wrap(err, "getting last processed tick") - } - - return lastTick + 1, nil -} - -type connectionFactory struct { - nodeFetcherHost string -} - -func (cf *connectionFactory) connect() (interface{}, error) { - peer, err := getNewRandomPeer(cf.nodeFetcherHost) - if err != nil { - return nil, errors.Wrap(err, "getting new random peer") - } - - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - client, err := qubic.NewClient(ctx, peer, "21841") - if err != nil { - return nil, errors.Wrap(err, "creating qubic client") - } - - fmt.Printf("connected to: %s\n", peer) - return client, nil -} - -func (cf *connectionFactory) close(v interface{}) error { return v.(*qubic.Client).Close() } - -type response struct { - Peers []string `json:"peers"` - Length int `json:"length"` - LastUpdated int64 `json:"last_updated"` -} - -func getNewRandomPeer(host string) (string, error) { - res, err := http.Get(host) - if err != nil { - return "", errors.Wrap(err, "getting peers from node fetcher") - } - - var resp response - body, err := io.ReadAll(res.Body) - if err != nil { - return "", errors.Wrap(err, "reading response body") - } - - err = json.Unmarshal(body, &resp) - if err != nil { - return "", errors.Wrap(err, "unmarshalling response") - } - - peer := resp.Peers[rand.Intn(len(resp.Peers))] - - fmt.Printf("Got %d new peers. Selected random %s\n", len(resp.Peers), peer) - - return peer, nil } diff --git a/processor/processor.go b/processor/processor.go new file mode 100644 index 0000000..57b44c3 --- /dev/null +++ b/processor/processor.go @@ -0,0 +1,124 @@ +package processor + +import ( + "context" + "fmt" + "github.com/pkg/errors" + "github.com/qubic/go-archiver/store" + "github.com/qubic/go-archiver/validator" + qubic "github.com/qubic/go-node-connector" + "github.com/silenceper/pool" + "log" + "time" +) + +func newTickInTheFutureError(requestedTick uint64, latestTick uint64) *TickInTheFutureError { + return &TickInTheFutureError{requestedTick: requestedTick, latestTick: latestTick} +} + +type TickInTheFutureError struct { + requestedTick uint64 + latestTick uint64 +} + +func (e *TickInTheFutureError) Error() string { + return errors.Errorf("Requested tick %d is in the future. Latest tick is: %d", e.requestedTick, e.latestTick).Error() +} + +type Processor struct { + pool pool.Pool + ps *store.PebbleStore + processTickTimeout time.Duration + fallbackNextProcessingTick uint64 +} + +func NewProcessor(pool pool.Pool, ps *store.PebbleStore, fallbackNextProcessingTick uint64, processTickTimeout time.Duration) *Processor { + return &Processor{ + pool: pool, + ps: ps, + fallbackNextProcessingTick: fallbackNextProcessingTick, + processTickTimeout: processTickTimeout, + } +} + +func (p *Processor) Start() error { + var titfError *TickInTheFutureError + for { + err := p.processOneByOne() + if err != nil { + if errors.As(err, &titfError) { + log.Printf("Processing failed: %s", err.Error()) + time.Sleep(1 * time.Second) + continue + } + log.Printf("Processing failed: %s", err.Error()) + } + } +} + +func (p *Processor) processOneByOne() error { + ctx, cancel := context.WithTimeout(context.Background(), p.processTickTimeout) + defer cancel() + + var err error + qcv, err := p.pool.Get() + if err != nil { + return errors.Wrap(err, "getting qubic pooled client connection") + } + client := qcv.(*qubic.Client) + defer func() { + if err == nil { + log.Printf("Putting conn back to pool") + pErr := p.pool.Put(client) + if pErr != nil { + log.Printf("Putting conn back to pool failed: %s", pErr.Error()) + } + } else { + log.Printf("Closing conn") + cErr := p.pool.Close(client) + if cErr != nil { + log.Printf("Closing conn failed: %s", cErr.Error()) + } + } + }() + + nextTick, err := p.getNextProcessingTick(ctx) + if err != nil { + return errors.Wrap(err, "getting next processing tick") + } + fmt.Printf("Next tick to process: %d\n", nextTick) + tickInfo, err := client.GetTickInfo(ctx) + if err != nil { + return errors.Wrap(err, "getting tick info") + } + if uint64(tickInfo.Tick) < nextTick { + err = newTickInTheFutureError(nextTick, uint64(tickInfo.Tick)) + return err + } + + val := validator.New(client, p.ps) + err = val.ValidateTick(ctx, nextTick) + if err != nil { + return errors.Wrapf(err, "validating tick %d", nextTick) + } + + err = p.ps.SetLastProcessedTick(ctx, nextTick) + if err != nil { + return errors.Wrapf(err, "setting last processed tick %d", nextTick) + } + + return nil +} + +func (p *Processor) getNextProcessingTick(ctx context.Context) (uint64, error) { + lastTick, err := p.ps.GetLastProcessedTick(ctx) + if err == nil { + return lastTick + 1, nil + } + + if errors.Cause(err) == store.ErrNotFound { + return p.fallbackNextProcessingTick, nil + } + + return 0, errors.Wrap(err, "getting last processed tick") +} diff --git a/validator/validator.go b/validator/validator.go index 6dddfca..e10531c 100644 --- a/validator/validator.go +++ b/validator/validator.go @@ -11,7 +11,6 @@ import ( qubic "github.com/qubic/go-node-connector" "github.com/qubic/go-node-connector/types" "log" - "time" ) type Validator struct { @@ -19,14 +18,11 @@ type Validator struct { store *store.PebbleStore } -func NewValidator(qu *qubic.Client, store *store.PebbleStore) *Validator { +func New(qu *qubic.Client, store *store.PebbleStore) *Validator { return &Validator{qu: qu, store: store} } func (v *Validator) ValidateTick(ctx context.Context, tickNumber uint64) error { - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - quorumVotes, err := v.qu.GetQuorumVotes(ctx, uint32(tickNumber)) if err != nil { return errors.Wrap(err, "getting quorum tick data") @@ -55,7 +51,6 @@ func (v *Validator) ValidateTick(ctx context.Context, tickNumber uint64) error { if err != nil { return errors.Wrap(err, "validating comps") } - //log.Println("Computors validated") err = computors.Store(ctx, v.store, comps) if err != nil { return errors.Wrap(err, "storing computors") @@ -125,128 +120,3 @@ func (v *Validator) ValidateTick(ctx context.Context, tickNumber uint64) error { return nil } - -func getComputorsAndValidate(ctx context.Context, qu *qubic.Client) (types.Computors, error) { - comps, err := qu.GetComputors(ctx) - if err != nil { - return types.Computors{}, errors.Wrap(err, "getting comps") - } - - err = computors.Validate(ctx, comps) - if err != nil { - return types.Computors{}, errors.Wrap(err, "validating comps") - } - log.Println("Computors validated") - - return comps, nil -} - -//func (v *Validator) ValidateTickParallel(ctx context.Context, nodeIP string, tickNumber uint64) error { -// comps, err := getComputorsAndValidate(ctx, v.qu) -// -// var quorumVotes []types.QuorumTickData -// var tickData types.TickData -// var transactions []types.Transaction -// var wg sync.WaitGroup -// var errChan = make(chan error, 3) -// -// wg.Add(3) -// -// go func() { -// defer wg.Done() -// client, err := qubic.NewClient(context.Background(), nodeIP, "21841") -// if err != nil { -// errChan <- errors.Wrap(err, "creating qubic client") -// return -// } -// defer client.Close() -// -// log.Println("Fetching Quorum votes") -// data, err := client.GetQuorumTickData(ctx, uint32(tickNumber)) -// if err != nil { -// log.Println("err quorum") -// errChan <- errors.Wrap(err, "getting quorum tick data") -// return -// } -// -// quorumVotes = data.QuorumData -// log.Println("Quorum Tick data fetched") -// }() -// -// go func() { -// defer wg.Done() -// client, err := qubic.NewClient(context.Background(), nodeIP, "21841") -// if err != nil { -// errChan <- errors.Wrap(err, "creating qubic client") -// return -// } -// defer client.Close() -// -// log.Println("Fetching tick data") -// data, err := client.GetTickData(ctx, uint32(tickNumber)) -// if err != nil { -// log.Println("err tick") -// errChan <- errors.Wrap(err, "getting tick data") -// return -// } -// -// tickData = data -// log.Println("Tick data fetched") -// }() -// -// go func() { -// defer wg.Done() -// client, err := qubic.NewClient(context.Background(), nodeIP, "21841") -// if err != nil { -// errChan <- errors.Wrap(err, "creating qubic client") -// return -// } -// defer client.Close() -// -// log.Println("Fetching tick transaction") -// data, err := client.GetTickTransactions(ctx, uint32(tickNumber)) -// if err != nil { -// log.Println("err tx") -// errChan <- errors.Wrap(err, "getting tick transactions") -// return -// } -// -// transactions = data -// log.Println("Tick transaction data fetched") -// }() -// -// go func() { -// wg.Wait() -// log.Println("Work done") -// close(errChan) // Close channel after all goroutines report back -// }() -// -// for err := range errChan { -// if err != nil { -// fmt.Println("Error received:", err) -// return err // Exit the loop on the first error -// } -// } -// -// err = quorum.Validate(ctx, quorumVotes, comps) -// if err != nil { -// return errors.Wrap(err, "validating quorum") -// } -// -// log.Println("Quorum validated") -// -// err = tick.Validate(ctx, tickData, quorumVotes[0], comps) -// if err != nil { -// return errors.Wrap(err, "validating tick data") -// } -// -// log.Println("Tick validated") -// -// -// err = tx.Validate(ctx, transactions, tickData) -// if err != nil { -// return errors.Wrap(err, "validating transactions") -// } -// -// return nil -//}