Skip to content

Commit

Permalink
adding tool to fix missing timestamp in blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
Eduard-Voiculescu committed Aug 12, 2024
1 parent 5a618ba commit df0214c
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 0 deletions.
1 change: 1 addition & 0 deletions cmd/firevara/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func init() {
rootCmd.AddCommand(NewFetchCmd(logger, tracer))
rootCmd.AddCommand(NewDecoderCmd(logger, tracer))
rootCmd.AddCommand(NewToolsFetchMetadataCmd(logger, tracer))
rootCmd.AddCommand(NewToolsFixMissingTimestampBlocks(logger, tracer))
}

func main() {
Expand Down
149 changes: 149 additions & 0 deletions cmd/firevara/tool_fix_missing_timestamp_blocks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package main

import (
"errors"
"fmt"
"io"
"os"
"strconv"

// gogoproto doesn't behave as expected, should we go with google.golang.org/protobuf/proto instead?
// "github.com/cosmos/gogoproto/proto"
"github.com/spf13/cobra"
"github.com/streamingfast/bstream"
"github.com/streamingfast/cli"
"github.com/streamingfast/dstore"
firecore "github.com/streamingfast/firehose-core"
pbgear "github.com/streamingfast/firehose-gear/pb/sf/gear/type/v1"
"github.com/streamingfast/logging"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)

func NewToolsFixMissingTimestampBlocks(logger *zap.Logger, tracer logging.Tracer) *cobra.Command {
cmd := &cobra.Command{
Use: "tools-fix-missing-timestamp-blocks <first-streamable-block> <stop-block> <src-blocks-store> <dst-blocks-store>",
Short: "Vara tool to fix missing timestamp in blocks",
Long: cli.Dedent(`
Firevara tool to fix missing timestamp blocks. This tool will read blocks from a source
and write the fixed block to a destination.
`),
Args: cobra.ExactArgs(4),
RunE: fixMissingTimestampRunE(logger, tracer),
}

return cmd
}

func fixMissingTimestampRunE(zlog *zap.Logger, tracer logging.Tracer) firecore.CommandExecutor {
return func(cmd *cobra.Command, args []string) (err error) {
ctx := cmd.Context()

firstStreamableBlock, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return fmt.Errorf("converting first streamable block to uint64: %w", err)
}

stopBlock, err := strconv.ParseUint(args[1], 10, 64)
if err != nil {
return fmt.Errorf("converting stop block to uint64: %w", err)
}

srcStore, err := dstore.NewDBinStore(args[2])
if err != nil {
return fmt.Errorf("unable to create source store: %w", err)
}

destStore, err := dstore.NewDBinStore(args[3])
if err != nil {
return fmt.Errorf("unable to create destination store: %w", err)
}

zlog.Debug(
"starting fix unknown type blocks",
zap.String("source_blocks_store", srcStore.BaseURL().String()),
zap.String("destination_blocks_store", destStore.BaseURL().String()),
zap.Uint64("first_streamable_block", firstStreamableBlock),
zap.Uint64("stop_block", stopBlock),
)

mergeWriter := &firecore.MergedBlocksWriter{
Store: destStore,
LowBlockNum: firstStreamableBlock,
StopBlockNum: stopBlock,
Logger: zlog,
Cmd: cmd,
}

var lastFilename string
var blockCount = 0
err = srcStore.WalkFrom(ctx, "", fmt.Sprintf("%010d", firstStreamableBlock), func(filename string) error {
var fileReader io.Reader
fileReader, err = srcStore.OpenObject(ctx, filename)
if err != nil {
return fmt.Errorf("creating reader: %w", err)
}

var blockReader *bstream.DBinBlockReader
blockReader, err = bstream.NewDBinBlockReader(fileReader)
if err != nil {
return fmt.Errorf("creating block reader: %w", err)
}

// the source store is a merged file store
for {
currentBlock, err := blockReader.Read()
if err != nil {
if err == io.EOF {
fmt.Fprintf(os.Stderr, "Total blocks: %d\n", blockCount)
break
}
return fmt.Errorf("error receiving blocks: %w", err)
}

blockCount++

gearBlock := &pbgear.Block{}
err = proto.Unmarshal(currentBlock.Payload.Value, gearBlock)
if err != nil {
return fmt.Errorf("unmarshaling block: %w", err)
}

gearBlock.Timestamp = currentBlock.Timestamp

payload, err := anypb.New(gearBlock)
if err != nil {
return fmt.Errorf("creating payload: %w", err)
}

currentBlock.Payload = payload
if err = mergeWriter.ProcessBlock(currentBlock, nil); err != nil {
return fmt.Errorf("processing block: %w", err)
}
}

lastFilename = filename
return nil
})

mergeWriter.Logger = mergeWriter.Logger.With(zap.String("last_filename", lastFilename), zap.Int("block_count", blockCount))
if err != nil {
if errors.Is(err, dstore.StopIteration) {
err = mergeWriter.WriteBundle()
if err != nil {
return fmt.Errorf("writing bundle: %w", err)
}
fmt.Println("done")
return nil
}
if errors.Is(err, io.EOF) {
fmt.Println("done")
return nil
}
return fmt.Errorf("walking source store: %w", err)
}

return nil
}
}

0 comments on commit df0214c

Please sign in to comment.