Skip to content

Commit

Permalink
Implement most of the synchronization feature.
Browse files Browse the repository at this point in the history
  • Loading branch information
LINCKODE committed Nov 14, 2024
1 parent f8cbd5f commit 81f8530
Show file tree
Hide file tree
Showing 14 changed files with 2,032 additions and 1,208 deletions.
28 changes: 25 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,22 @@ func run() error {
}
Qubic struct {
NodePort string `conf:"default:21841"`
StorageFolder string `conf:"default:store"`
StorageFolder string `conf:"default:storage"`
ProcessTickTimeout time.Duration `conf:"default:5s"`
}
Store struct {
ResetEmptyTickKeys bool `conf:"default:false"`
}
Sync struct {
Enable bool `conf:"default:false"`
Source string `conf:"default:localhost:8001"`
ResponseTimeout time.Duration `conf:"default:5s"`
}
Bootstrap struct {
Enable bool `conf:"default:true"`
MaxRequestedItems int `conf:"default:100"`
BatchSize int `conf:"default:10"`
}
}

if err := conf.Parse(os.Args[1:], prefix, &cfg); err != nil {
Expand Down Expand Up @@ -129,7 +139,13 @@ func run() error {
return errors.Wrap(err, "creating qubic pool")
}

rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, cfg.Server.NodeSyncThreshold, cfg.Server.ChainTickFetchUrl, ps, p)
bootstrapConfiguration := rpc.BootstrapConfiguration{
Enable: cfg.Bootstrap.Enable,
MaximumRequestedItems: cfg.Bootstrap.MaxRequestedItems,
BatchSize: cfg.Bootstrap.BatchSize,
}

rpcServer := rpc.NewServer(cfg.Server.GrpcHost, cfg.Server.HttpHost, cfg.Server.NodeSyncThreshold, cfg.Server.ChainTickFetchUrl, ps, p, bootstrapConfiguration)
err = rpcServer.Start()
if err != nil {
return errors.Wrap(err, "starting rpc server")
Expand All @@ -138,7 +154,13 @@ func run() error {
shutdown := make(chan os.Signal, 1)
signal.Notify(shutdown, os.Interrupt, syscall.SIGTERM)

proc := processor.NewProcessor(p, ps, cfg.Qubic.ProcessTickTimeout)
syncConfiguration := processor.SyncConfiguration{
Enable: cfg.Sync.Enable,
Source: cfg.Sync.Source,
ResponseTimeout: cfg.Sync.ResponseTimeout,
}

proc := processor.NewProcessor(p, ps, cfg.Qubic.ProcessTickTimeout, syncConfiguration)
procErrors := make(chan error, 1)

// Start the service listening for requests.
Expand Down
19 changes: 18 additions & 1 deletion processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,38 @@ func (e *TickInTheFutureError) Error() string {
return errors.Errorf("Requested tick %d is in the future. Latest tick is: %d", e.requestedTick, e.latestTick).Error()
}

type SyncConfiguration struct {
Enable bool
Source string
ResponseTimeout time.Duration
}

type Processor struct {
pool *qubic.Pool
ps *store.PebbleStore
processTickTimeout time.Duration
SyncConfiguration SyncConfiguration
}

func NewProcessor(p *qubic.Pool, ps *store.PebbleStore, processTickTimeout time.Duration) *Processor {
func NewProcessor(p *qubic.Pool, ps *store.PebbleStore, processTickTimeout time.Duration, syncConfiguration SyncConfiguration) *Processor {
return &Processor{
pool: p,
ps: ps,
processTickTimeout: processTickTimeout,
SyncConfiguration: syncConfiguration,
}
}

func (p *Processor) Start() error {

if p.SyncConfiguration.Enable {
syncProcessor := NewSyncProcessor(p.SyncConfiguration, p.ps, p.processTickTimeout)
err := syncProcessor.Start()
if err != nil {
return errors.Wrap(err, "performing synchronization")
}
}

for {
err := p.processOneByOne()
if err != nil {
Expand Down
242 changes: 242 additions & 0 deletions processor/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
package processor

import (
"context"
"fmt"
"github.com/pkg/errors"
"github.com/qubic/go-archiver/protobuff"
"github.com/qubic/go-archiver/store"
"github.com/qubic/go-archiver/sync"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"io"
"log"
"time"
)

type SyncProcessor struct {
syncConfiguration SyncConfiguration
syncServiceClient protobuff.SyncServiceClient
pebbleStore *store.PebbleStore
syncDelta SyncDelta
processTickTimeout time.Duration
maxObjectRequest uint32
}

func NewSyncProcessor(syncConfiguration SyncConfiguration, pebbleStore *store.PebbleStore, processTickTimeout time.Duration) *SyncProcessor {
return &SyncProcessor{
syncConfiguration: syncConfiguration,
pebbleStore: pebbleStore,
processTickTimeout: processTickTimeout,
}
}

func (sp *SyncProcessor) Start() error {

log.Printf("Connecting to bootstrap node %s...", sp.syncConfiguration.Source)

grpcConnection, err := grpc.NewClient(sp.syncConfiguration.Source, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return errors.Wrap(err, "creating grpc connection to bootstrap")
}
defer grpcConnection.Close()

syncServiceClient := protobuff.NewSyncServiceClient(grpcConnection)
sp.syncServiceClient = syncServiceClient

log.Println("Fetching bootstrap metadata...")
bootstrapMetadata, err := sp.getBootstrapMetadata()
if err != nil {
return err
}

sp.maxObjectRequest = uint32(bootstrapMetadata.MaxObjectRequest)

clientMetadata, err := sp.getClientMetadata()
if err != nil {
return errors.Wrap(err, "getting client metadata")
}

log.Println("Calculating synchronization delta...")
syncDelta, err := sp.calculateSyncDelta(bootstrapMetadata, clientMetadata)
if err != nil {
return errors.Wrap(err, "calculating sync delta")
}

if len(syncDelta) == 0 {
log.Println("Nothing to synchronize, resuming to processing network ticks.")
return nil
}

log.Println("Synchronizing missing epoch information...")
err = sp.syncEpochInfo(syncDelta)
if err != nil {
return errors.Wrap(err, "syncing epoch info")
}

sp.syncDelta = syncDelta

log.Println("Starting tick synchronization")
sp.sync()

return nil
}

func (sp *SyncProcessor) getBootstrapMetadata() (*protobuff.SyncMetadataResponse, error) {
ctx, cancel := context.WithTimeout(context.Background(), sp.syncConfiguration.ResponseTimeout)
defer cancel()

metadata, err := sp.syncServiceClient.SyncGetBootstrapMetadata(ctx, nil)
if err != nil {
return nil, errors.Wrap(err, "getting bootstrap metadata")
}

return metadata, nil
}

func (sp *SyncProcessor) getClientMetadata() (*protobuff.SyncMetadataResponse, error) {

processedTickIntervals, err := sp.pebbleStore.GetProcessedTickIntervals(nil)
if err != nil {
return nil, errors.Wrap(err, "getting processed tick intervals")
}

return &protobuff.SyncMetadataResponse{
ArchiverVersion: sync.ArchiverVersion,
ProcessedTickIntervals: processedTickIntervals,
}, nil
}

type EpochDelta struct {
Epoch uint32
ProcessedIntervals []*protobuff.ProcessedTickInterval
}

type SyncDelta []EpochDelta

func areIntervalsEqual(a, b []*protobuff.ProcessedTickInterval) bool {
if len(a) != len(b) {
return false
}

for index := 0; index < len(a); index++ {
if a[index] != b[index] {
return false
}
}
return true
}

func (sp *SyncProcessor) calculateSyncDelta(bootstrapMetadata, clientMetadata *protobuff.SyncMetadataResponse) (SyncDelta, error) {

if bootstrapMetadata.ArchiverVersion != clientMetadata.ArchiverVersion {
return nil, errors.New(fmt.Sprintf("client version (%s) does not match bootstrap version (%s)", clientMetadata.ArchiverVersion, bootstrapMetadata.ArchiverVersion))
}

bootstrapProcessedTicks := make(map[uint32][]*protobuff.ProcessedTickInterval)
clientProcessedTicks := make(map[uint32][]*protobuff.ProcessedTickInterval)

for _, epochIntervals := range bootstrapMetadata.ProcessedTickIntervals {
bootstrapProcessedTicks[epochIntervals.Epoch] = epochIntervals.Intervals
}

for _, epochIntervals := range clientMetadata.ProcessedTickIntervals {
clientProcessedTicks[epochIntervals.Epoch] = epochIntervals.Intervals
}

var syncDelta SyncDelta

for epoch, processedIntervals := range bootstrapProcessedTicks {

clientProcessedIntervals, exists := clientProcessedTicks[epoch]
if !exists || !areIntervalsEqual(processedIntervals, clientProcessedIntervals) {
epochDelta := EpochDelta{
Epoch: epoch,
ProcessedIntervals: processedIntervals,
}
syncDelta = append(syncDelta, epochDelta)
}
}

return syncDelta, nil
}

func (sp *SyncProcessor) storeEpochInfo(response *protobuff.SyncEpochInfoResponse) error {

for _, epoch := range response.Epochs {
err := sp.pebbleStore.SetComputors(context.Background(), epoch.ComputorList.Epoch, epoch.ComputorList)
if err != nil {
return errors.Wrapf(err, "storing computor list for epoch %d", epoch.ComputorList.Epoch)
}

err = sp.pebbleStore.SetLastTickQuorumDataPerEpoch(epoch.LastTickQuorumData, epoch.LastTickQuorumData.QuorumTickStructure.Epoch)
if err != nil {
return errors.Wrapf(err, "storing last tick quorum data for epoch %d", epoch.LastTickQuorumData.QuorumTickStructure.Epoch)
}
}

return nil
}

func (sp *SyncProcessor) syncEpochInfo(delta SyncDelta) error {

var epochs []uint32

for _, epochDelta := range delta {
epochs = append(epochs, epochDelta.Epoch)
}

ctx, cancel := context.WithTimeout(context.Background(), sp.syncConfiguration.ResponseTimeout)

defer cancel()

stream, err := sp.syncServiceClient.SyncGetEpochInformation(ctx, &protobuff.SyncEpochInfoRequest{Epochs: epochs})
if err != nil {
return errors.Wrap(err, "fetching epoch info")
}

for {
data, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return errors.Wrap(err, "reading stream")
}

err = sp.storeEpochInfo(data)
if err != nil {
return errors.Wrap(err, "storing epoch data")
}
}

return nil
}

func (sp *SyncProcessor) sync() error {
for _, epochDelta := range sp.syncDelta {

log.Printf("Synchronizing ticks for epoch %d...\n", epochDelta.Epoch)

for _, interval := range epochDelta.ProcessedIntervals {

for tickNumber := interval.InitialProcessedTick; tickNumber < interval.LastProcessedTick; tickNumber += sp.maxObjectRequest {

startTick := tickNumber
endTick := startTick + sp.maxObjectRequest

Check failure on line 226 in processor/sync.go

View workflow job for this annotation

GitHub Actions / test-nocache (1.22.x, ubuntu-latest)

endTick declared and not used

err := sp.processTick(tickNumber, tickNumber+20)

Check failure on line 228 in processor/sync.go

View workflow job for this annotation

GitHub Actions / test-nocache (1.22.x, ubuntu-latest)

sp.processTick undefined (type *SyncProcessor has no field or method processTick)
if err != nil {
return errors.Wrapf(err, "synchronizing tick %d", tickNumber)
}

}

}
}
return nil
}

func (sp *SyncProcessor) processTicks(startTick, endTick uint32) error {

}

Check failure on line 242 in processor/sync.go

View workflow job for this annotation

GitHub Actions / test-nocache (1.22.x, ubuntu-latest)

missing return
Loading

0 comments on commit 81f8530

Please sign in to comment.