Skip to content

Commit

Permalink
moved logic from main folder to dedicated ones
Browse files Browse the repository at this point in the history
  • Loading branch information
0xluk committed Mar 4, 2024
1 parent 467c207 commit 43019ad
Show file tree
Hide file tree
Showing 4 changed files with 237 additions and 307 deletions.
77 changes: 77 additions & 0 deletions factory/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package factory

import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
qubic "github.com/qubic/go-node-connector"
"io"
"math/rand"
"net/http"
"time"
)

type QubicConnectionFactory struct {
nodeFetcherTimeout time.Duration
nodeFetcherUrl string
}

func NewQubicConnection(nodeFetcherTimeout time.Duration, nodeFetcherUrl string) *QubicConnectionFactory {
return &QubicConnectionFactory{nodeFetcherTimeout: nodeFetcherTimeout, nodeFetcherUrl: nodeFetcherUrl}
}

func (cf *QubicConnectionFactory) Connect() (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), cf.nodeFetcherTimeout)
defer cancel()

peer, err := cf.getNewRandomPeer(ctx)
if err != nil {
return nil, errors.Wrap(err, "getting new random peer")
}

client, err := qubic.NewClient(ctx, peer, "21841")
if err != nil {
return nil, errors.Wrap(err, "creating qubic client")
}

fmt.Printf("connected to: %s\n", peer)
return client, nil
}

func (cf *QubicConnectionFactory) Close(v interface{}) error { return v.(*qubic.Client).Close() }

type response struct {
Peers []string `json:"peers"`
Length int `json:"length"`
LastUpdated int64 `json:"last_updated"`
}

func (cf *QubicConnectionFactory) getNewRandomPeer(ctx context.Context) (string, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, cf.nodeFetcherUrl, nil)
if err != nil {
return "", errors.Wrap(err, "creating new request")
}

res, err := http.DefaultClient.Do(req)
if err != nil {
return "", errors.Wrap(err, "getting peers from node fetcher")
}

var resp response
body, err := io.ReadAll(res.Body)
if err != nil {
return "", errors.Wrap(err, "reading response body")
}

err = json.Unmarshal(body, &resp)
if err != nil {
return "", errors.Wrap(err, "unmarshalling response")
}

peer := resp.Peers[rand.Intn(len(resp.Peers))]

fmt.Printf("Got %d new peers. Selected random %s\n", len(resp.Peers), peer)

return peer, nil
}
211 changes: 35 additions & 176 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
package main

import (
"context"
"encoding/json"
"fmt"
"github.com/ardanlabs/conf"
"github.com/cockroachdb/pebble"
"github.com/pkg/errors"
"github.com/qubic/go-archiver/factory"
"github.com/qubic/go-archiver/processor"
"github.com/qubic/go-archiver/rpc"
"github.com/qubic/go-archiver/store"
"github.com/qubic/go-archiver/validator"
"github.com/silenceper/pool"
"io"
"log"
"math/rand"
"net/http"
"os"
"os/signal"
"syscall"
"time"

qubic "github.com/qubic/go-node-connector"
)

const prefix = "QUBIC_ARCHIVER"
Expand All @@ -37,16 +31,22 @@ func run() error {
ReadTimeout time.Duration `conf:"default:5s"`
WriteTimeout time.Duration `conf:"default:5s"`
ShutdownTimeout time.Duration `conf:"default:5s"`
HttpHost string `conf:"0.0.0.0:8000"`
GrpcHost string `conf:"0.0.0.0:8001"`
HttpHost string `conf:"default:0.0.0.0:8000"`
GrpcHost string `conf:"default:0.0.0.0:8001"`
}
Pool struct {
NodeFetcherUrl string `conf:"default:http://127.0.0.1:8080/peers"`
NodeFetcherTimeout time.Duration `conf:"default:2s"`
InitialCap int `conf:"default:5"`
MaxIdle int `conf:"default:20"`
MaxCap int `conf:"default:30"`
IdleTimeout time.Duration `conf:"default:15s"`
}
Qubic struct {
NodeIp string `conf:"default:212.51.150.253"`
NodePort string `conf:"default:21841"`
FallbackTick uint64 `conf:"default:12710000"`
BatchSize uint64 `conf:"default:500"`
NodeFetcherEndpoint string `conf:"default:http://127.0.0.1:8080/peers"`
StorageFolder string `conf:"default:store"`
NodePort string `conf:"default:21841"`
FallbackTick uint64 `conf:"default:12769858"`
StorageFolder string `conf:"default:store"`
ProcessTickTimeout time.Duration `conf:"default:5s"`
}
}

Expand Down Expand Up @@ -90,176 +90,35 @@ func run() error {
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

ticker := time.NewTicker(5 * time.Second)

cf := &connectionFactory{nodeFetcherHost: cfg.Qubic.NodeFetcherEndpoint}
//p, err := connpool.NewChannelPool(5, 30, cf.connect)
//if err != nil {
// return errors.Wrap(err, "creating new connection pool")
//}

fact := factory.NewQubicConnection(cfg.Pool.NodeFetcherTimeout, cfg.Pool.NodeFetcherUrl)
poolConfig := pool.Config{
InitialCap: 5,
MaxIdle: 20,
MaxCap: 30,
Factory: cf.connect,
Close: cf.close,
//Ping: ping,
InitialCap: cfg.Pool.InitialCap,
MaxIdle: cfg.Pool.MaxIdle,
MaxCap: cfg.Pool.MaxCap,
Factory: fact.Connect,
Close: fact.Close,
//The maximum idle time of the connection, the connection exceeding this time will be closed, which can avoid the problem of automatic failure when connecting to EOF when idle
IdleTimeout: 15 * time.Second,
IdleTimeout: cfg.Pool.IdleTimeout,
}
p, err := pool.NewChannelPool(&poolConfig)
chPool, err := pool.NewChannelPool(&poolConfig)
if err != nil {
return errors.Wrap(err, "creating new connection pool")
}

p := processor.NewProcessor(chPool, ps, cfg.Qubic.FallbackTick, cfg.Qubic.ProcessTickTimeout)
archiveErrors := make(chan error, 1)

// Start the service listening for requests.
go func() {
archiveErrors <- p.Start()
}()

for {
select {
case <-shutdown:
log.Fatalf("Shutting down")
case <-ticker.C:
err := do(p, cfg.Qubic.FallbackTick, cfg.Qubic.BatchSize, ps)
if err != nil {
log.Printf("do err: %s. pool len: %d\n", err.Error(), p.Len())
}
}
}
}

func do(pool pool.Pool, fallbackTick, batchSize uint64, ps *store.PebbleStore) error {
//ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
//defer cancel()

v, err := pool.Get()
if err != nil {
return errors.Wrap(err, "getting initial connection")
}

client := v.(*qubic.Client)
err = validateMultiple(client, fallbackTick, batchSize, ps)
if err != nil {
cErr := pool.Close(v)
if cErr != nil {
log.Printf("Closing conn failed: %s", cErr.Error())
}
return errors.Wrap(err, "validating multiple")
} else {
err = pool.Put(client)
if err != nil {
log.Printf("Putting conn back to pool failed: %s", err.Error())
}
}

log.Println("Batch completed")

return nil
}

func validateMultiple(client *qubic.Client, fallbackStartTick uint64, batchSize uint64, ps *store.PebbleStore) error {
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()

val := validator.NewValidator(client, ps)
tickInfo, err := client.GetTickInfo(ctx)
if err != nil {
return errors.Wrap(err, "getting tick info")
}
targetTick := uint64(tickInfo.Tick)
startTick, err := getNextProcessingTick(ctx, fallbackStartTick, ps)
if err != nil {
return errors.Wrap(err, "getting next processing tick")
}

if targetTick-startTick > batchSize {
targetTick = startTick + batchSize
}

if targetTick <= startTick {
return errors.Errorf("Target processing tick %d is not greater than last processing tick %d", targetTick, startTick)
}

log.Printf("Current batch starting from tick: %d until tick: %d", startTick, targetTick)
start := time.Now().Unix()
for t := startTick; t <= targetTick; t++ {
stepIn := time.Now().Unix()
if err := val.ValidateTick(context.Background(), t); err != nil {
return errors.Wrapf(err, "validating tick %d", t)
}
err := ps.SetLastProcessedTick(ctx, t)
if err != nil {
return errors.Wrapf(err, "setting last processed tick %d", t)
return errors.New("shutting down")
case err := <-archiveErrors:
return errors.Wrap(err, "archiver error")
}
stepOut := time.Now().Unix()
log.Printf("Tick %d validated in %d seconds", t, stepOut-stepIn)
}
end := time.Now().Unix()
log.Printf("%d ticks validated in %d seconds", targetTick-startTick, end-start)
return nil
}

func getNextProcessingTick(ctx context.Context, fallBackTick uint64, ps *store.PebbleStore) (uint64, error) {
lastTick, err := ps.GetLastProcessedTick(ctx)
if err != nil {
if errors.Cause(err) == store.ErrNotFound {
return fallBackTick, nil
}

return 0, errors.Wrap(err, "getting last processed tick")
}

return lastTick + 1, nil
}

type connectionFactory struct {
nodeFetcherHost string
}

func (cf *connectionFactory) connect() (interface{}, error) {
peer, err := getNewRandomPeer(cf.nodeFetcherHost)
if err != nil {
return nil, errors.Wrap(err, "getting new random peer")
}

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

client, err := qubic.NewClient(ctx, peer, "21841")
if err != nil {
return nil, errors.Wrap(err, "creating qubic client")
}

fmt.Printf("connected to: %s\n", peer)
return client, nil
}

func (cf *connectionFactory) close(v interface{}) error { return v.(*qubic.Client).Close() }

type response struct {
Peers []string `json:"peers"`
Length int `json:"length"`
LastUpdated int64 `json:"last_updated"`
}

func getNewRandomPeer(host string) (string, error) {
res, err := http.Get(host)
if err != nil {
return "", errors.Wrap(err, "getting peers from node fetcher")
}

var resp response
body, err := io.ReadAll(res.Body)
if err != nil {
return "", errors.Wrap(err, "reading response body")
}

err = json.Unmarshal(body, &resp)
if err != nil {
return "", errors.Wrap(err, "unmarshalling response")
}

peer := resp.Peers[rand.Intn(len(resp.Peers))]

fmt.Printf("Got %d new peers. Selected random %s\n", len(resp.Peers), peer)

return peer, nil
}
Loading

0 comments on commit 43019ad

Please sign in to comment.