diff --git a/Makefile b/Makefile index 8652e76..97e6401 100644 --- a/Makefile +++ b/Makefile @@ -6,6 +6,12 @@ export GO111MODULE = on all: lint test-unit +install: + @go install + +.PHONY: install + + ############################################################################### ### Tools & Dependencies ### ############################################################################### diff --git a/api/data.go b/api/data.go index 27d4db1..3975cb6 100644 --- a/api/data.go +++ b/api/data.go @@ -1,18 +1,16 @@ package api import ( - "encoding/json" "net/http" "github.com/JackalLabs/sequoia/api/types" "github.com/JackalLabs/sequoia/file_system" - "github.com/dgraph-io/badger/v4" "github.com/rs/zerolog/log" ) -func DumpDBHandler(db *badger.DB) func(http.ResponseWriter, *http.Request) { +func DumpDBHandler(f *file_system.FileSystem) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { - dump, err := file_system.Dump(db) + dump, err := f.Dump() if err != nil { v := types.ErrorResponse{ Error: err.Error(), diff --git a/api/file_handler.go b/api/file_handler.go index d32afcc..0cc180b 100644 --- a/api/file_handler.go +++ b/api/file_handler.go @@ -1,115 +1,145 @@ package api import ( - "encoding/json" + "context" + "encoding/hex" "fmt" "net/http" + "strconv" + "time" + + "github.com/JackalLabs/sequoia/proofs" + + "github.com/desmos-labs/cosmos-go-wallet/wallet" "github.com/JackalLabs/sequoia/api/types" "github.com/JackalLabs/sequoia/file_system" - "github.com/JackalLabs/sequoia/queue" - "github.com/dgraph-io/badger/v4" "github.com/gorilla/mux" storageTypes "github.com/jackalLabs/canine-chain/v3/x/storage/types" "github.com/rs/zerolog/log" ) -const MaxFileSize = 32 << 30 +// const MaxFileSize = 32 << 30 +const MaxFileSize = 0 + +func handleErr(err error, w http.ResponseWriter, code int) { + v := types.ErrorResponse{ + Error: err.Error(), + } + w.WriteHeader(code) + err = json.NewEncoder(w).Encode(v) + if err != nil { + log.Error().Err(err) + } +} -func PostFileHandler(db *badger.DB, q *queue.Queue, address string) func(http.ResponseWriter, *http.Request) { +func PostFileHandler(fio *file_system.FileSystem, prover *proofs.Prover, wl *wallet.Wallet, chunkSize int64) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { err := req.ParseMultipartForm(MaxFileSize) // MAX file size lives here if err != nil { - v := types.ErrorResponse{ - Error: err.Error(), - } - w.WriteHeader(http.StatusInternalServerError) - err = json.NewEncoder(w).Encode(v) - if err != nil { - log.Error().Err(err) - } + handleErr(fmt.Errorf("cannot parse form %w", err), w, http.StatusBadRequest) return } sender := req.Form.Get("sender") + merkleString := req.Form.Get("merkle") + merkle, err := hex.DecodeString(merkleString) + if err != nil { + handleErr(fmt.Errorf("cannot parse merkle: %w", err), w, http.StatusBadRequest) + return + } - file, _, err := req.FormFile("file") // Retrieve the file from form data + startBlockString := req.Form.Get("start") + startBlock, err := strconv.ParseInt(startBlockString, 10, 64) if err != nil { - v := types.ErrorResponse{ - Error: err.Error(), - } - w.WriteHeader(http.StatusInternalServerError) - err = json.NewEncoder(w).Encode(v) - if err != nil { - log.Error().Err(err) - } + handleErr(fmt.Errorf("cannot parse start block: %w", err), w, http.StatusBadRequest) return } - merkle, fid, cid, size, err := file_system.WriteFile(db, file, sender, address, "") + file, fh, err := req.FormFile("file") // Retrieve the file from form data if err != nil { - v := types.ErrorResponse{ - Error: err.Error(), - } - w.WriteHeader(http.StatusInternalServerError) - err = json.NewEncoder(w).Encode(v) - if err != nil { - log.Error().Err(err) - } + handleErr(fmt.Errorf("cannot get file from form: %w", err), w, http.StatusBadRequest) + return } - msg := storageTypes.NewMsgPostContract( - address, - sender, - fmt.Sprintf("%d", size), - fid, - merkle, - ) + readSize := fh.Size + if readSize == 0 { + handleErr(fmt.Errorf("file cannot be empty"), w, http.StatusBadRequest) + return + } - if err := msg.ValidateBasic(); err != nil { - v := types.ErrorResponse{ - Error: err.Error(), - } - w.WriteHeader(http.StatusInternalServerError) - err = json.NewEncoder(w).Encode(v) - if err != nil { - log.Error().Err(err) - } + cl := storageTypes.NewQueryClient(wl.Client.GRPCConn) + queryParams := storageTypes.QueryFile{ + Merkle: merkle, + Owner: sender, + Start: startBlock, + } + res, err := cl.File(context.Background(), &queryParams) + if err != nil { + handleErr(fmt.Errorf("failed to find file on chain: %w", err), w, http.StatusInternalServerError) + return } - m, wg := q.Add(msg) + f := res.File - wg.Wait() // wait for queue to process + if readSize != f.FileSize { + handleErr(fmt.Errorf("cannot accept form file that doesn't match the chain data %d != %d", readSize, f.FileSize), w, http.StatusInternalServerError) + return + } - if m.Error() != nil { - v := types.ErrorResponse{ - Error: m.Error().Error(), - } - w.WriteHeader(http.StatusInternalServerError) - err = json.NewEncoder(w).Encode(v) - if err != nil { - log.Error().Err(err) + if hex.EncodeToString(f.Merkle) != merkleString { + handleErr(fmt.Errorf("cannot accept file that doesn't match the chain data %x != %x", f.Merkle, merkle), w, http.StatusInternalServerError) + return + } + + if len(f.Proofs) == int(f.MaxProofs) { + if !f.ContainsProver(wl.AccAddress()) { + handleErr(fmt.Errorf("cannot accept file that I cannot claim"), w, http.StatusInternalServerError) + return } } + size, err := fio.WriteFile(file, merkle, sender, startBlock, wl.AccAddress(), chunkSize) + if err != nil { + handleErr(fmt.Errorf("failed to write file to disk: %w", err), w, http.StatusInternalServerError) + return + } + + if int64(size) != f.FileSize { + handleErr(fmt.Errorf("cannot accept file that doesn't match the chain data %d != %d", int64(size), f.FileSize), w, http.StatusInternalServerError) + return + } + resp := types.UploadResponse{ - CID: cid, - FID: fid, + Merkle: merkle, + Owner: sender, + Start: startBlock, } err = json.NewEncoder(w).Encode(resp) if err != nil { - log.Error().Err(err) + log.Error().Err(fmt.Errorf("can't encode json : %w", err)) } + + _ = prover.PostProof(merkle, sender, startBlock, startBlock, time.Now()) } } -func DownloadFileHandler(db *badger.DB) func(http.ResponseWriter, *http.Request) { +func DownloadFileHandler(f *file_system.FileSystem) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) - fid := vars["fid"] + merkleString := vars["merkle"] + merkle, err := hex.DecodeString(merkleString) + if err != nil { + v := types.ErrorResponse{ + Error: err.Error(), + } + w.WriteHeader(http.StatusInternalServerError) + _ = json.NewEncoder(w).Encode(v) + + } - file, err := file_system.GetFileDataByFID(db, fid) + file, err := f.GetFileDataByMerkle(merkle) if err != nil { v := types.ErrorResponse{ Error: err.Error(), diff --git a/api/files.go b/api/files.go index a05cc95..5eb6c50 100644 --- a/api/files.go +++ b/api/files.go @@ -1,18 +1,17 @@ package api import ( - "encoding/json" + "fmt" "net/http" "github.com/JackalLabs/sequoia/api/types" "github.com/JackalLabs/sequoia/file_system" - "github.com/dgraph-io/badger/v4" "github.com/rs/zerolog/log" ) -func ListFilesHandler(db *badger.DB) func(http.ResponseWriter, *http.Request) { +func ListFilesHandler(f *file_system.FileSystem) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { - files, err := file_system.ListFiles(db) + merkles, _, _, err := f.ListFiles() if err != nil { v := types.ErrorResponse{ Error: err.Error(), @@ -25,9 +24,14 @@ func ListFilesHandler(db *badger.DB) func(http.ResponseWriter, *http.Request) { return } + mm := make([]string, len(merkles)) + for i, merkle := range merkles { + mm[i] = fmt.Sprintf("%x", merkle) + } + f := types.ListResponse{ - Files: files, - Count: len(files), + Files: mm, + Count: len(mm), } err = json.NewEncoder(w).Encode(f) @@ -37,9 +41,9 @@ func ListFilesHandler(db *badger.DB) func(http.ResponseWriter, *http.Request) { } } -func LegacyListFilesHandler(db *badger.DB) func(http.ResponseWriter, *http.Request) { +func LegacyListFilesHandler(f *file_system.FileSystem) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { - files, err := file_system.ListFiles(db) + merkles, owners, _, err := f.ListFiles() if err != nil { v := types.ErrorResponse{ Error: err.Error(), @@ -52,12 +56,12 @@ func LegacyListFilesHandler(db *badger.DB) func(http.ResponseWriter, *http.Reque return } - cids := make([]types.LegacyAPIListValue, len(files)) + cids := make([]types.LegacyAPIListValue, len(merkles)) - for i, file := range files { + for i, m := range merkles { cids[i] = types.LegacyAPIListValue{ - CID: file, - FID: "", + CID: fmt.Sprintf("%x", m), + FID: owners[i], } } diff --git a/api/index.go b/api/index.go index e6ed0e8..d952aba 100644 --- a/api/index.go +++ b/api/index.go @@ -1,7 +1,6 @@ package api import ( - "encoding/json" "net/http" "github.com/JackalLabs/sequoia/api/types" diff --git a/api/server.go b/api/server.go index 67b943c..234c5e6 100644 --- a/api/server.go +++ b/api/server.go @@ -1,18 +1,28 @@ package api import ( + "errors" "fmt" "net/http" "time" - "github.com/JackalLabs/sequoia/queue" + "github.com/JackalLabs/sequoia/file_system" + + "github.com/prometheus/client_golang/prometheus/promhttp" + + "github.com/JackalLabs/sequoia/proofs" + "github.com/rs/zerolog/log" + "github.com/desmos-labs/cosmos-go-wallet/wallet" - "github.com/dgraph-io/badger/v4" "github.com/gorilla/mux" ) +import jsoniter "github.com/json-iterator/go" + +var json = jsoniter.ConfigCompatibleWithStandardLibrary type API struct { port int64 + srv *http.Server } func NewAPI(port int64) *API { @@ -21,20 +31,26 @@ func NewAPI(port int64) *API { } } -func (a *API) Serve(db *badger.DB, q *queue.Queue, wallet *wallet.Wallet) { +func (a *API) Close() error { + return a.srv.Close() +} + +func (a *API) Serve(f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64) { r := mux.NewRouter() r.HandleFunc("/", IndexHandler(wallet.AccAddress())) - r.HandleFunc("/upload", PostFileHandler(db, q, wallet.AccAddress())) - r.HandleFunc("/download/{fid}", DownloadFileHandler(db)) + r.HandleFunc("/upload", PostFileHandler(f, p, wallet, chunkSize)) + r.HandleFunc("/download/{fid}", DownloadFileHandler(f)) - r.HandleFunc("/list", ListFilesHandler(db)) - r.HandleFunc("/api/data/fids", LegacyListFilesHandler(db)) + r.HandleFunc("/list", ListFilesHandler(f)) + r.HandleFunc("/api/data/fids", LegacyListFilesHandler(f)) - r.HandleFunc("/dump", DumpDBHandler(db)) + r.HandleFunc("/dump", DumpDBHandler(f)) r.HandleFunc("/version", VersionHandler(wallet)) - srv := &http.Server{ + r.Handle("/metrics", promhttp.Handler()) + + a.srv = &http.Server{ Handler: r, Addr: fmt.Sprintf("0.0.0.0:%d", a.port), // Good practice: enforce timeouts for servers you create! @@ -42,8 +58,11 @@ func (a *API) Serve(db *badger.DB, q *queue.Queue, wallet *wallet.Wallet) { ReadTimeout: 15 * time.Second, } - err := srv.ListenAndServe() + log.Logger.Info().Msg(fmt.Sprintf("Sequoia API now listening on %s", a.srv.Addr)) + err := a.srv.ListenAndServe() if err != nil { - panic(err) + if !errors.Is(err, http.ErrServerClosed) { + panic(err) + } } } diff --git a/api/types/responses.go b/api/types/responses.go index 7765f33..99c4ff2 100644 --- a/api/types/responses.go +++ b/api/types/responses.go @@ -1,8 +1,9 @@ package types type UploadResponse struct { - CID string `json:"cid"` - FID string `json:"fid"` + Merkle []byte `json:"merkle"` + Owner string `json:"owner"` + Start int64 `json:"start"` } type ErrorResponse struct { diff --git a/cmd/root.go b/cmd/root.go index 072f71a..6a31c5f 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,6 +1,7 @@ package cmd import ( + "fmt" "os" "github.com/JackalLabs/sequoia/cmd/types" @@ -13,6 +14,8 @@ import ( func init() { log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) + log.Logger = log.With().Caller().Logger() + log.Logger = log.Level(zerolog.InfoLevel) } func InitCmd() *cobra.Command { @@ -29,8 +32,12 @@ func InitCmd() *cobra.Command { if err != nil { return err } + _, err = config.InitWallet(home) + if err != nil { + return err + } - log.Logger.Info().Msg("done!") + fmt.Println("done!") return nil }, diff --git a/cmd/wallet/wallet.go b/cmd/wallet/wallet.go index c4e34e5..08b7586 100644 --- a/cmd/wallet/wallet.go +++ b/cmd/wallet/wallet.go @@ -5,7 +5,6 @@ import ( "github.com/JackalLabs/sequoia/cmd/types" "github.com/JackalLabs/sequoia/config" - "github.com/rs/zerolog/log" "github.com/spf13/cobra" ) @@ -40,7 +39,7 @@ func addressCmd() *cobra.Command { panic(err) } - log.Info().Msg(fmt.Sprintf("Provider Address: %s", wallet.AccAddress())) + fmt.Printf("Provider Address: %s\n", wallet.AccAddress()) }, } } diff --git a/config/types.go b/config/types.go index 697d27a..b1d3416 100644 --- a/config/types.go +++ b/config/types.go @@ -15,6 +15,7 @@ type Config struct { TotalSpace int64 `yaml:"total_bytes_offered"` DataDirectory string `yaml:"data_directory"` APICfg APIConfig `yaml:"api_config"` + ProofThreads int64 `yaml:"proof_threads"` } type StrayManagerConfig struct { @@ -49,5 +50,6 @@ func DefaultConfig() *Config { APICfg: APIConfig{ Port: 3333, }, + ProofThreads: 1000, } } diff --git a/config/wallet.go b/config/wallet.go index f8c8935..0f0701c 100644 --- a/config/wallet.go +++ b/config/wallet.go @@ -1,7 +1,6 @@ package config import ( - "encoding/json" "fmt" "os" "path" @@ -10,6 +9,9 @@ import ( "github.com/cosmos/go-bip39" "github.com/desmos-labs/cosmos-go-wallet/wallet" ) +import jsoniter "github.com/json-iterator/go" + +var json = jsoniter.ConfigCompatibleWithStandardLibrary const SeedFileName = "provider_wallet.json" @@ -66,7 +68,7 @@ func createWallet(directory string) error { filePath := path.Join(directory, SeedFileName) if newWallet { - fmt.Printf("A new wallet was just created with a random seed phrase, if you wish to use an existing seed phrase, edit %s", filePath) + fmt.Printf("A new wallet was just created with a random seed phrase, if you wish to use an existing seed phrase, edit %s\n", filePath) os.Exit(0) } diff --git a/core/app.go b/core/app.go index 952ff5d..9079948 100644 --- a/core/app.go +++ b/core/app.go @@ -9,6 +9,12 @@ import ( "syscall" "time" + "github.com/JackalLabs/sequoia/file_system" + + "github.com/JackalLabs/sequoia/monitoring" + + "github.com/cosmos/gogoproto/grpc" + "github.com/JackalLabs/sequoia/api" "github.com/JackalLabs/sequoia/config" "github.com/JackalLabs/sequoia/logger" @@ -23,12 +29,13 @@ import ( ) type App struct { - db *badger.DB api *api.API q *queue.Queue prover *proofs.Prover strayManager *strays.StrayManager home string + monitor *monitoring.Monitor + fileSystem *file_system.FileSystem } func NewApp(home string) *App { @@ -56,15 +63,17 @@ func NewApp(home string) *App { apiServer := api.NewAPI(cfg.APICfg.Port) + f := file_system.NewFileSystem(db) + return &App{ - db: db, - api: apiServer, - home: home, + fileSystem: f, + api: apiServer, + home: home, } } func initProviderOnChain(wallet *wallet.Wallet, ip string, totalSpace int64) error { - init := storageTypes.NewMsgInitProvider(wallet.AccAddress(), ip, fmt.Sprintf("%d", totalSpace), "") + init := storageTypes.NewMsgInitProvider(wallet.AccAddress(), ip, totalSpace, "") data := walletTypes.NewTransactionData( init, @@ -86,7 +95,7 @@ func initProviderOnChain(wallet *wallet.Wallet, ip string, totalSpace int64) err } func updateSpace(wallet *wallet.Wallet, totalSpace int64) error { - init := storageTypes.NewMsgSetProviderTotalspace(wallet.AccAddress(), fmt.Sprintf("%d", totalSpace)) + init := storageTypes.NewMsgSetProviderTotalSpace(wallet.AccAddress(), totalSpace) data := walletTypes.NewTransactionData( init, @@ -129,6 +138,19 @@ func updateIp(wallet *wallet.Wallet, ip string) error { return nil } +func (a *App) GetStorageParams(client grpc.ClientConn) (storageTypes.Params, error) { + queryParams := &storageTypes.QueryParams{} + + cl := storageTypes.NewQueryClient(client) + + res, err := cl.Params(context.Background(), queryParams) + if err != nil { + return storageTypes.Params{}, err + } + + return res.Params, nil +} + func (a *App) Start() { cfg, err := config.Init(a.home) if err != nil { @@ -142,64 +164,78 @@ func (a *App) Start() { myAddress := w.AccAddress() - queryParams := &storageTypes.QueryProviderRequest{ + queryParams := &storageTypes.QueryProvider{ Address: myAddress, } cl := storageTypes.NewQueryClient(w.Client.GRPCConn) - res, err := cl.Providers(context.Background(), queryParams) + claimers := make([]string, 0) + + res, err := cl.Provider(context.Background(), queryParams) if err != nil { log.Info().Msg("Provider does not exist on network or is not connected...") err := initProviderOnChain(w, cfg.Ip, cfg.TotalSpace) if err != nil { panic(err) } - } - totalSpace, err := strconv.ParseInt(res.Providers.Totalspace, 10, 64) - if err != nil { + } else { + claimers = res.Provider.AuthClaimers + + totalSpace, err := strconv.ParseInt(res.Provider.Totalspace, 10, 64) if err != nil { - panic(err) + if err != nil { + panic(err) + } } - } - if totalSpace != cfg.TotalSpace { - err := updateSpace(w, cfg.TotalSpace) - if err != nil { - panic(err) + if totalSpace != cfg.TotalSpace { + err := updateSpace(w, cfg.TotalSpace) + if err != nil { + panic(err) + } } - } - if res.Providers.Ip != cfg.Ip { - err := updateIp(w, cfg.Ip) - if err != nil { - panic(err) + if res.Provider.Ip != cfg.Ip { + err := updateIp(w, cfg.Ip) + if err != nil { + panic(err) + } } } - myUrl := res.Providers.Ip + params, err := a.GetStorageParams(w.Client.GRPCConn) + if err != nil { + panic(err) + } + + myUrl := cfg.Ip log.Info().Msg(fmt.Sprintf("Provider started as: %s", myAddress)) a.q = queue.NewQueue(w, cfg.QueueInterval) - a.prover = proofs.NewProver(w, a.db, a.q, cfg.ProofInterval) - - go a.api.Serve(a.db, a.q, w) - go a.prover.Start() go a.q.Listen() - a.strayManager = strays.NewStrayManager(w, a.q, cfg.StrayManagerCfg.CheckInterval, cfg.StrayManagerCfg.RefreshInterval, cfg.StrayManagerCfg.HandCount, res.Providers.AuthClaimers) + a.prover = proofs.NewProver(w, a.q, a.fileSystem, cfg.ProofInterval, cfg.ProofThreads) + a.strayManager = strays.NewStrayManager(w, a.q, cfg.StrayManagerCfg.CheckInterval, cfg.StrayManagerCfg.RefreshInterval, cfg.StrayManagerCfg.HandCount, claimers) + a.monitor = monitoring.NewMonitor(w) - go a.strayManager.Start(a.db, myUrl) + // Starting the 4 concurrent services + go a.api.Serve(a.fileSystem, a.prover, w, params.ChunkSize) + go a.prover.Start() + go a.strayManager.Start(a.fileSystem, myUrl, params.ChunkSize) + go a.monitor.Start() done := make(chan os.Signal, 1) signal.Notify(done, syscall.SIGINT, syscall.SIGTERM) <-done // Will block here until user hits ctrl+c - fmt.Println("Shutting down safely...") + fmt.Println("Shutting Sequoia down safely...") + _ = a.api.Close() a.q.Stop() a.prover.Stop() a.strayManager.Stop() + a.monitor.Stop() time.Sleep(time.Second * 30) // give the program some time to shut down - a.db.Close() + a.fileSystem.Close() } diff --git a/file_system/file_system.go b/file_system/file_system.go index 9460bda..fa02a07 100644 --- a/file_system/file_system.go +++ b/file_system/file_system.go @@ -1,20 +1,22 @@ package file_system import ( - "bytes" "crypto/sha256" "encoding/hex" + "errors" "fmt" "io" - "github.com/JackalLabs/sequoia/utils" "github.com/dgraph-io/badger/v4" "github.com/rs/zerolog/log" - "github.com/wealdtech/go-merkletree" - "github.com/wealdtech/go-merkletree/sha3" + "github.com/wealdtech/go-merkletree/v2" + "github.com/wealdtech/go-merkletree/v2/sha3" ) +import jsoniter "github.com/json-iterator/go" -func buildTree(buf io.Reader, chunkSize int64) ([]byte, []byte, [][]byte, int, error) { +var json = jsoniter.ConfigCompatibleWithStandardLibrary + +func BuildTree(buf io.Reader, chunkSize int64) ([]byte, []byte, [][]byte, int, error) { size := 0 data := make([][]byte, 0) @@ -36,10 +38,8 @@ func buildTree(buf io.Reader, chunkSize int64) ([]byte, []byte, [][]byte, int, e chunks = append(chunks, b) - hexedData := hex.EncodeToString(b) - hash := sha256.New() - hash.Write([]byte(fmt.Sprintf("%d%s", index, hexedData))) // appending the index and the data + hash.Write([]byte(fmt.Sprintf("%d%x", index, b))) // appending the index and the data hashName := hash.Sum(nil) data = append(data, hashName) @@ -47,88 +47,89 @@ func buildTree(buf io.Reader, chunkSize int64) ([]byte, []byte, [][]byte, int, e index++ } - tree, err := merkletree.NewUsing(data, sha3.New512(), false) + tree, err := merkletree.NewTree( + merkletree.WithData(data), + merkletree.WithHashType(sha3.New512()), + merkletree.WithSalt(false), + ) if err != nil { return nil, nil, nil, 0, err } r := tree.Root() - exportedTree, err := tree.Export() + exportedTree, err := json.Marshal(tree) if err != nil { return nil, nil, nil, 0, err } - tree = nil // for GC - return r, exportedTree, chunks, size, nil } -func WriteFile(db *badger.DB, reader io.Reader, signee string, address string, cidOverride string) (merkle string, fid string, cid string, size int, err error) { - var buf bytes.Buffer - tee := io.TeeReader(reader, &buf) - fid, err = utils.MakeFid(tee) +func (f *FileSystem) WriteFile(reader io.Reader, merkle []byte, owner string, start int64, address string, chunkSize int64) (size int, err error) { + log.Info().Msg(fmt.Sprintf("Writing %x to disk", merkle)) + root, exportedTree, chunks, s, err := BuildTree(reader, chunkSize) if err != nil { - return + log.Error().Err(fmt.Errorf("cannot build tree | %w", err)) + return 0, err + } + size = s + if hex.EncodeToString(merkle) != hex.EncodeToString(root) { + return 0, fmt.Errorf("merkle does not match %x != %x", merkle, root) } - cid, err = utils.MakeCid(signee, address, fid) + err = f.db.Update(func(txn *badger.Txn) error { + err = txn.Set(treeKey(merkle, owner, start), exportedTree) + if err != nil { + e := fmt.Errorf("cannot set tree %x | %w", merkle, err) + log.Error().Err(e) + return e + } + + return nil + }) if err != nil { - return + return 0, err } - if cidOverride != "" { - cid = cidOverride - } + k := len(chunks) + for len(chunks) > 0 { - err = db.Update(func(txn *badger.Txn) error { - var chunkSize int64 = 1024 + i := k - len(chunks) - root, exportedTree, chunks, s, err := buildTree(&buf, chunkSize) - if err != nil { - log.Info().Msg(fmt.Sprintf("Cannot build tree | %e", err)) - return err - } - size = s - merkle = hex.EncodeToString(root) + chunk := chunks[0] + chunks = chunks[1:] - err = txn.Set(treeKey(cid), exportedTree) - if err != nil { - log.Info().Msg(fmt.Sprintf("Cannot set tree %s | %e", cid, err)) - } - - for i, chunk := range chunks { - err := txn.Set(chunkKey(cid, i), chunk) + err = f.db.Update(func(txn *badger.Txn) error { + err := txn.Set(chunkKey(merkle, owner, start, i), chunk) if err != nil { - log.Info().Msg(fmt.Sprintf("Cannot set chunk %d | %e", i, err)) + e := fmt.Errorf("cannot set chunk %d | %w", i, err) + log.Error().Err(e) + return e } - } - - err = txn.Set(fileKey(cid), []byte(fid)) + return nil + }) if err != nil { - log.Info().Msg(fmt.Sprintf("Cannot set cid %s | %e", cid, err)) + return 0, err } - return nil - }) + } - return + fileCount.Inc() + return size, nil } -func DeleteFile(db *badger.DB, cid string) error { - log.Info().Msg(fmt.Sprintf("Removing %s from disk...", cid)) - return db.Update(func(txn *badger.Txn) error { - err := txn.Delete(treeKey(cid)) - if err != nil { - return err - } - err = txn.Delete(fileKey(cid)) +func (f *FileSystem) DeleteFile(merkle []byte, owner string, start int64) error { + log.Info().Msg(fmt.Sprintf("Removing %x from disk...", merkle)) + fileCount.Dec() + return f.db.Update(func(txn *badger.Txn) error { + err := txn.Delete(treeKey(merkle, owner, start)) if err != nil { return err } it := txn.NewIterator(badger.DefaultIteratorOptions) defer it.Close() - prefix := majorChunkKey(cid) + prefix := majorChunkKey(merkle, owner, start) for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { item := it.Item() k := item.Key() @@ -142,18 +143,29 @@ func DeleteFile(db *badger.DB, cid string) error { }) } -func ListFiles(db *badger.DB) ([]string, error) { - files := make([]string, 0) +func (f *FileSystem) ListFiles() ([][]byte, []string, []int64, error) { + merkles := make([][]byte, 0) + owners := make([]string, 0) + starts := make([]int64, 0) - err := db.View(func(txn *badger.Txn) error { + err := f.db.View(func(txn *badger.Txn) error { it := txn.NewIterator(badger.DefaultIteratorOptions) defer it.Close() - prefix := []byte("files/") + prefix := []byte("tree/") for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { item := it.Item() k := item.Key() err := item.Value(func(v []byte) error { - files = append(files, string(k[len(prefix):])) + newValue := k[len(prefix):] + merkle, owner, start, err := SplitMerkle(newValue) + if err != nil { + return err + } + + merkles = append(merkles, merkle) + owners = append(owners, owner) + starts = append(starts, start) + return nil }) if err != nil { @@ -163,13 +175,42 @@ func ListFiles(db *badger.DB) ([]string, error) { return nil }) - return files, err + return merkles, owners, starts, err +} + +func (f *FileSystem) ProcessFiles(fn func(merkle []byte, owner string, start int64)) error { + err := f.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + prefix := []byte("tree/") + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + k := item.Key() + err := item.Value(func(v []byte) error { + newValue := k[len(prefix):] + merkle, owner, start, err := SplitMerkle(newValue) + if err != nil { + return err + } + + fn(merkle, owner, start) + + return nil + }) + if err != nil { + return err + } + } + return nil + }) + + return err } -func Dump(db *badger.DB) (map[string]string, error) { +func (f *FileSystem) Dump() (map[string]string, error) { files := make(map[string]string) - err := db.View(func(txn *badger.Txn) error { + err := f.db.View(func(txn *badger.Txn) error { it := txn.NewIterator(badger.DefaultIteratorOptions) defer it.Close() for it.Rewind(); it.Valid(); it.Next() { @@ -196,98 +237,111 @@ func Dump(db *badger.DB) (map[string]string, error) { return files, err } -func GetFileChunk(db *badger.DB, cid string, chunkToLoad int) (newTree *merkletree.MerkleTree, chunkOut []byte, err error) { - tree := treeKey(cid) - chunk := chunkKey(cid, chunkToLoad) +func (f *FileSystem) GetFileTreeByChunk(merkle []byte, owner string, start int64, chunkToLoad int) (*merkletree.MerkleTree, []byte, error) { + tree := treeKey(merkle, owner, start) + chunk := chunkKey(merkle, owner, start, chunkToLoad) + + var chunkOut []byte + var newTree merkletree.MerkleTree - err = db.View(func(txn *badger.Txn) error { + err := f.db.View(func(txn *badger.Txn) error { t, err := txn.Get(tree) if err != nil { return err } err = t.Value(func(val []byte) error { - newTree, err = merkletree.ImportMerkleTree(val, sha3.New512()) + err := json.Unmarshal(val, &newTree) if err != nil { return err } return nil }) + if err != nil { + return err + } c, err := txn.Get(chunk) if err != nil { return err } - err = c.Value(func(val []byte) error { + _ = c.Value(func(val []byte) error { // doesn't need checked chunkOut = val return nil }) return nil }) - - return -} - -func GetCIDFromFID(txn *badger.Txn, fid string) (cid string, err error) { - found := false - - it := txn.NewIterator(badger.DefaultIteratorOptions) - defer it.Close() - for it.Rewind(); it.Valid(); it.Next() { - if found { - break - } - - item := it.Item() - - _ = item.Value(func(val []byte) error { - if string(val) == fid { - cid = string(item.Key()[len("files/"):]) - - found = true - } - - return nil - }) - + if err != nil { + return nil, nil, err } - if !found { - err = fmt.Errorf("no fid found") + if chunkOut == nil { + return nil, nil, errors.New("chunk is nil, something is wrong") } - return + return &newTree, chunkOut, nil } -func GetFileDataByFID(db *badger.DB, fid string) (file []byte, err error) { - err = db.View(func(txn *badger.Txn) error { - cid, err := GetCIDFromFID(txn, fid) - if err != nil { - return err - } +func (f *FileSystem) GetFileData(merkle []byte, owner string, start int64) ([]byte, error) { + fileData := make([]byte, 0) - file, err = GetFileData(db, cid) - if err != nil { - return err + err := f.db.View(func(txn *badger.Txn) error { + it := txn.NewIterator(badger.DefaultIteratorOptions) + defer it.Close() + prefix := majorChunkKey(merkle, owner, start) + for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { + item := it.Item() + + err := item.Value(func(val []byte) error { + fileData = append(fileData, val...) + return nil + }) + if err != nil { + return err + } } + return nil }) - return + return fileData, err } -func GetFileData(db *badger.DB, cid string) ([]byte, error) { +func (f *FileSystem) GetFileDataByMerkle(merkle []byte) ([]byte, error) { fileData := make([]byte, 0) - - err := db.View(func(txn *badger.Txn) error { + o := "" + var s int64 + err := f.db.View(func(txn *badger.Txn) error { it := txn.NewIterator(badger.DefaultIteratorOptions) defer it.Close() - prefix := majorChunkKey(cid) + prefix := majorChunkMerkleKey(merkle) for it.Seek(prefix); it.ValidForPrefix(prefix); it.Next() { item := it.Item() - err := item.Value(func(val []byte) error { + k := item.Key()[len("chunks/"):] + + _, owner, start, err := SplitMerkle(k) + if err != nil { + return err + } + + if len(o) == 0 { + o = owner + } else { + if owner != o { + return nil + } + } + if s == 0 { + s = start + } else { + if s != start { + return nil + } + } + + err = item.Value(func(val []byte) error { fileData = append(fileData, val...) return nil }) diff --git a/file_system/file_system_test.go b/file_system/file_system_test.go index caae771..1def316 100644 --- a/file_system/file_system_test.go +++ b/file_system/file_system_test.go @@ -1,25 +1,90 @@ package file_system +//nolint:all import ( "bytes" - "crypto/rand" + "crypto/sha256" "encoding/hex" "fmt" - "strings" + "io" + "io/ioutil" + "math/rand" + "net/http" "testing" + "github.com/JackalLabs/sequoia/proofs" + "github.com/wealdtech/go-merkletree/v2" + "github.com/wealdtech/go-merkletree/v2/sha3" + + "github.com/JackalLabs/sequoia/logger" + "github.com/rs/zerolog" + "github.com/dgraph-io/badger/v4" "github.com/rs/zerolog/log" "github.com/stretchr/testify/require" ) +import _ "net/http/pprof" + +var table = []struct { + input int +}{ + {input: 1024}, // 1kib + {input: 1024 * 10}, // 10kib + {input: 1024 * 1024}, // 1mib + {input: 1024 * 1024 * 1024}, // 1gib + {input: 1024 * 1024 * 1024 * 10}, // 10gib +} + +func BenchmarkFileWrites(b *testing.B) { + log.Logger = log.Output(zerolog.ConsoleWriter{Out: ioutil.Discard}) + log.Logger = log.With().Caller().Logger() + + options := badger.DefaultOptions("/tmp/badger") + options.Logger = &logger.SequoiaLogger{} + + db, err := badger.Open(options) + require.NoError(b, err) + + err = db.DropAll() + require.NoError(b, err) + + f := NewFileSystem(db) + + defer db.Close() + + go func() { + _ = http.ListenAndServe("localhost:6060", nil) + }() + + for _, v := range table { + b.Run(fmt.Sprintf("input_size_%d", v.input), func(b *testing.B) { + for i := 0; i < 1; i++ { + token := make([]byte, v.input) // 1 kb size + //nolint:all + rand.Read(token) + + buf := bytes.NewBuffer(token) + buf2 := bytes.NewBuffer(token) + + root, _, _, _, err := BuildTree(buf2, 10240) + require.NoError(b, err) + + _, err = f.WriteFile(buf, root, "file_owner", 0, "myself", 10240) + require.NoError(b, err) + + } + }) + } +} + func TestWriteFile(t *testing.T) { db, err := badger.Open(badger.DefaultOptions("/tmp/badger")) require.NoError(t, err) err = db.DropAll() require.NoError(t, err) - + f := NewFileSystem(db) defer db.Close() data, err := hex.DecodeString("303030303030383597631df147918b77139b132d44798cef96879280a4b1e1309f699875c6bf57798d17bbbbe75273ba4343da20d25bbca6729ccf9b1456d0b25a08f9616a7bf414de0e15ed29f0a74378789bc7510a7d1f76348aadd93030303030383032976304f845b5c40413ec580e446491ee9bd7c780e4f2e52cb774995dcd9f10278d5ea5c5b00c2eac37039b7a844fa4a82780d9a4061a99dd1d06e130696afd07dd0e59ec275af66319a71dd53dd89f3bd6381aef3262b1bab5f8115522dbbe67411c87e827fd93d220c9d5bc60f0d55ba12df0ee3ff46ee63ecb1edf540c2aedf9c3fcf42c0310e5f7a5e69df89a0e7961e371c9f1499ccc520e283513b1e5eace184dde615078996ea67d0566b102b6f72baa9c9c76a4cc920d667f82cb55aab33c593538d636a8f1c59aa609f50eb6c20bb52c5885a7cb15cb8a3ada30a53f45ba2a3ad5c321114ffdcb8974eca8f56af3d70956af556165659b9427e078015a4fc55d6ed50a00b3aba89cd00dfdd360b5a82f631eab1be3b7c1d7eceb312733c4b21baa6640e8e5ef683a569625d8f6815858bd24a5e39f2c716862ad3cb77503e131d015f5cb615deb1974b787f85f78e85e14c92b7c8ee217a1cc997ebbb0ed3690d57a01a796692d32bb2c3c6f80af3fb104b1b506e52f94826ed6faed82df260710bb9971d1368724a7fa48c394be60d7435080dc76981c789e458a42dce0f6fe29f4e956768e0eddfff6f512a1a2e64689f82132094249df464c5286014b1835ace7b83dddea38e65e55f818ebc53d929ed38fc0997afb145c036bb1fdc7f1a2813840c69ddc1dc284d18e25b3c9b22619f0a97bcf1f36864ff0ed551e7a7249001b1f909a45b132e6de3585537240dd25941de1e4b66065626f0a2297b5c4328e6b672004e4f16aa4d742bb5b7548c4cc6756d7f2bc0de8df4fe1a21921233dd76785eb319db7bc567f2dbce5be42fdbe853edbdcf36dfbc0996874e096ea4954e4b5afb9751b0bf055778863231b4eb7a0f0839190e26db5cdd2c10f5841edc4cc85b6edf328909886d18b75e4e06210e1020fbb73b51bafdef5cd9a1bd70f52388b00a2bb555bc5e6a06bc88eeb35094a2851f3460305a83b893be857a5452b0728dae28dcd09e8e25a714cf014b557107e911fa16fa1dc6c36e4b1399cd96eca0685dc3746fa19ede15f9c0a14c5b00500b95fba05b8cb29d9c5ee6d2e164ac430e9fe56e59e10681a6f2a647c7ddf0f30ae1308035282c615c8368e") @@ -28,16 +93,14 @@ func TestWriteFile(t *testing.T) { buf := bytes.NewBuffer(data) merkle := "1688dc719d1a41ff567fd54e66953f5c518044f6fed6ce814ba777b7dead4ab7d1c193448dc1c04eac05e6708dfd7a8999e9afdf6ba5c525ab7fb9c7f1e2bd4c" - - fileMerkle, fid, _, size, err := WriteFile(db, buf, "jkl123", "myself", "") + m, err := hex.DecodeString(merkle) require.NoError(t, err) - require.Equal(t, len(data), size) + size, err := f.WriteFile(buf, m, "file_owner", 0, "myself", 1024) - realFid := "jklf1p5cm3z47rrcyaskqge3yc33xm7hdq7lken99ahluvuz67ugctleqmwv43a" - require.Equal(t, realFid, fid) + require.NoError(t, err) - require.Equal(t, merkle, fileMerkle) + require.Equal(t, len(data), size) err = db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions @@ -53,78 +116,71 @@ func TestWriteFile(t *testing.T) { }) require.NoError(t, err) - s, err := ListFiles(db) + ms, _, _, err := f.ListFiles() require.NoError(t, err) - log.Info().Msg(strings.Join(s, ",")) + require.Equal(t, 1, len(ms)) } -func TestFIDCID(t *testing.T) { +func TestWriteAndProveFiles(t *testing.T) { db, err := badger.Open(badger.DefaultOptions("/tmp/badger")) require.NoError(t, err) - - err = db.DropAll() - require.NoError(t, err) - defer db.Close() - data, err := hex.DecodeString("303030303030383597631df147918b77139b132d44798cef96879280a4b1e1309f699875c6bf57798d17bbbbe75273ba4343da20d25bbca6729ccf9b1456d0b25a08f9616a7bf414de0e15ed29f0a74378789bc7510a7d1f76348aadd93030303030383032976304f845b5c40413ec580e446491ee9bd7c780e4f2e52cb774995dcd9f10278d5ea5c5b00c2eac37039b7a844fa4a82780d9a4061a99dd1d06e130696afd07dd0e59ec275af66319a71dd53dd89f3bd6381aef3262b1bab5f8115522dbbe67411c87e827fd93d220c9d5bc60f0d55ba12df0ee3ff46ee63ecb1edf540c2aedf9c3fcf42c0310e5f7a5e69df89a0e7961e371c9f1499ccc520e283513b1e5eace184dde615078996ea67d0566b102b6f72baa9c9c76a4cc920d667f82cb55aab33c593538d636a8f1c59aa609f50eb6c20bb52c5885a7cb15cb8a3ada30a53f45ba2a3ad5c321114ffdcb8974eca8f56af3d70956af556165659b9427e078015a4fc55d6ed50a00b3aba89cd00dfdd360b5a82f631eab1be3b7c1d7eceb312733c4b21baa6640e8e5ef683a569625d8f6815858bd24a5e39f2c716862ad3cb77503e131d015f5cb615deb1974b787f85f78e85e14c92b7c8ee217a1cc997ebbb0ed3690d57a01a796692d32bb2c3c6f80af3fb104b1b506e52f94826ed6faed82df260710bb9971d1368724a7fa48c394be60d7435080dc76981c789e458a42dce0f6fe29f4e956768e0eddfff6f512a1a2e64689f82132094249df464c5286014b1835ace7b83dddea38e65e55f818ebc53d929ed38fc0997afb145c036bb1fdc7f1a2813840c69ddc1dc284d18e25b3c9b22619f0a97bcf1f36864ff0ed551e7a7249001b1f909a45b132e6de3585537240dd25941de1e4b66065626f0a2297b5c4328e6b672004e4f16aa4d742bb5b7548c4cc6756d7f2bc0de8df4fe1a21921233dd76785eb319db7bc567f2dbce5be42fdbe853edbdcf36dfbc0996874e096ea4954e4b5afb9751b0bf055778863231b4eb7a0f0839190e26db5cdd2c10f5841edc4cc85b6edf328909886d18b75e4e06210e1020fbb73b51bafdef5cd9a1bd70f52388b00a2bb555bc5e6a06bc88eeb35094a2851f3460305a83b893be857a5452b0728dae28dcd09e8e25a714cf014b557107e911fa16fa1dc6c36e4b1399cd96eca0685dc3746fa19ede15f9c0a14c5b00500b95fba05b8cb29d9c5ee6d2e164ac430e9fe56e59e10681a6f2a647c7ddf0f30ae1308035282c615c8368e") + err = db.DropAll() require.NoError(t, err) + f := NewFileSystem(db) - buf := bytes.NewBuffer(data) - - merkle := "1688dc719d1a41ff567fd54e66953f5c518044f6fed6ce814ba777b7dead4ab7d1c193448dc1c04eac05e6708dfd7a8999e9afdf6ba5c525ab7fb9c7f1e2bd4c" + size := 1024 * 1024 * 10 + var chunkSize int64 = 1024 - fileMerkle, fid, _, size, err := WriteFile(db, buf, "jkl123", "myself", "") - require.NoError(t, err) + token := make([]byte, size) // 1 kb size + newToken := make([]byte, size) // 1 kb size - require.Equal(t, len(data), size) + //nolint:all + rand.Read(token) + copy(newToken, token) - realFid := "jklf1p5cm3z47rrcyaskqge3yc33xm7hdq7lken99ahluvuz67ugctleqmwv43a" - require.Equal(t, realFid, fid) + require.Equal(t, token, newToken) - require.Equal(t, merkle, fileMerkle) + b := bytes.NewBuffer(token) + b2 := bytes.NewBuffer(newToken) - s, err := ListFiles(db) + root, _, _, _, err := BuildTree(b, chunkSize) require.NoError(t, err) - fileData, err := GetFileData(db, "jklc16mz45jjsem93ycv9g0nug82rag2a3ydtpy7zj8eh9wdfgr9dh0cszn6ggx") - require.NoError(t, err) + owner := "file_owner" + var start int64 = 0 - fileDataFromFid, err := GetFileDataByFID(db, "jklf1p5cm3z47rrcyaskqge3yc33xm7hdq7lken99ahluvuz67ugctleqmwv43a") + _, err = f.WriteFile(b2, root, owner, start, "myself", chunkSize) require.NoError(t, err) - require.Equal(t, fileData, fileDataFromFid) - - log.Info().Msg(strings.Join(s, ",")) -} - -func TestLargeFile(t *testing.T) { - db, err := badger.Open(badger.DefaultOptions("/tmp/badger")) + ms, _, _, err := f.ListFiles() require.NoError(t, err) - err = db.DropAll() - require.NoError(t, err) + require.Equal(t, 1, len(ms)) - defer db.Close() + totalBlocks := size / int(chunkSize) + for i := 0; i < totalBlocks; i++ { - for i := 1; i < 1024*20; i++ { - bi := make([]byte, i) - // then we can call rand.Read. - _, err = rand.Read(bi) + p, c, err := proofs.GenProof(f, root, owner, start, i) require.NoError(t, err) - buf := bytes.NewBuffer(bi) - - _, _, cid, size, err := WriteFile(db, buf, "jkl123", "myself", "") + h := sha256.New() + _, err = io.WriteString(h, fmt.Sprintf("%d%x", i, c)) require.NoError(t, err) - require.Equal(t, len(bi), size) + hashName := h.Sum(nil) - fileData, err := GetFileData(db, cid) + var proof merkletree.Proof // unmarshal proof + err = json.Unmarshal(p, &proof) require.NoError(t, err) - require.Equal(t, bi, fileData) + verified, err := merkletree.VerifyProofUsing(hashName, false, &proof, [][]byte{root}, sha3.New512()) + require.NoError(t, err) + + require.Equal(t, true, verified) } + } diff --git a/file_system/keys.go b/file_system/keys.go index cbe8622..3b1103d 100644 --- a/file_system/keys.go +++ b/file_system/keys.go @@ -1,19 +1,40 @@ package file_system -import "fmt" +import ( + "encoding/hex" + "fmt" + "strconv" + "strings" +) -func chunkKey(cid string, index int) []byte { - return []byte(fmt.Sprintf("chunks/%s/%010d", cid, index)) +func chunkKey(merkle []byte, owner string, start int64, index int) []byte { + return []byte(fmt.Sprintf("chunks/%x/%s/%d/%010d", merkle, owner, start, index)) } -func majorChunkKey(cid string) []byte { - return []byte(fmt.Sprintf("chunks/%s/", cid)) +func majorChunkKey(merkle []byte, owner string, start int64) []byte { + return []byte(fmt.Sprintf("chunks/%x/%s/%d/", merkle, owner, start)) } -func treeKey(cid string) []byte { - return []byte(fmt.Sprintf("tree/%s", cid)) +func majorChunkMerkleKey(merkle []byte) []byte { + return []byte(fmt.Sprintf("chunks/%x", merkle)) } -func fileKey(cid string) []byte { - return []byte(fmt.Sprintf("files/%s", cid)) +func SplitMerkle(key []byte) (merkle []byte, owner string, start int64, err error) { + its := strings.Split(string(key), "/") + merkle, err = hex.DecodeString(its[0]) + if err != nil { + return + } + + start, err = strconv.ParseInt(its[2], 10, 64) + if err != nil { + return + } + + owner = its[1] + return +} + +func treeKey(merkle []byte, owner string, start int64) []byte { + return []byte(fmt.Sprintf("tree/%x/%s/%d", merkle, owner, start)) } diff --git a/file_system/monitoring.go b/file_system/monitoring.go new file mode 100644 index 0000000..efbd59f --- /dev/null +++ b/file_system/monitoring.go @@ -0,0 +1,11 @@ +package file_system + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var fileCount = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "sequoia_file_count", + Help: "The number of files on disk", +}) diff --git a/file_system/types.go b/file_system/types.go new file mode 100644 index 0000000..8ce633e --- /dev/null +++ b/file_system/types.go @@ -0,0 +1,15 @@ +package file_system + +import "github.com/dgraph-io/badger/v4" + +type FileSystem struct { + db *badger.DB +} + +func NewFileSystem(db *badger.DB) *FileSystem { + return &FileSystem{db: db} +} + +func (f *FileSystem) Close() { + f.db.Close() +} diff --git a/go.mod b/go.mod index a5ec974..bf4b77b 100644 --- a/go.mod +++ b/go.mod @@ -5,20 +5,22 @@ go 1.21 require ( github.com/cosmos/cosmos-sdk v0.45.17 github.com/cosmos/go-bip39 v1.0.0 + github.com/cosmos/gogoproto v1.4.10 github.com/desmos-labs/cosmos-go-wallet v0.7.2 github.com/dgraph-io/badger/v4 v4.2.0 github.com/gorilla/mux v1.8.1 - github.com/jackalLabs/canine-chain/v3 v3.0.1 + github.com/jackalLabs/canine-chain/v3 v3.0.2 + github.com/json-iterator/go v1.1.12 + github.com/prometheus/client_golang v1.16.0 github.com/rs/zerolog v1.31.0 github.com/spf13/cobra v1.8.0 github.com/stretchr/testify v1.8.4 - github.com/wealdtech/go-merkletree v1.0.0 + github.com/wealdtech/go-merkletree/v2 v2.5.1-0.20231106114422-6769f4468d71 gopkg.in/yaml.v3 v3.0.1 - ) require ( - cosmossdk.io/api v0.3.1 // indirect + cosmossdk.io/api v0.4.1 // indirect cosmossdk.io/core v0.5.1 // indirect cosmossdk.io/depinject v1.0.0-alpha.3 // indirect filippo.io/edwards25519 v1.0.0 // indirect @@ -46,7 +48,6 @@ require ( github.com/cosmos/btcutil v1.0.5 // indirect github.com/cosmos/cosmos-db v0.0.0-20221226095112-f3c38ecb5e32 // indirect github.com/cosmos/cosmos-proto v1.0.0-beta.3 // indirect - github.com/cosmos/gogoproto v1.4.10 // indirect github.com/cosmos/gorocksdb v1.2.0 // indirect github.com/cosmos/iavl v0.19.5 // indirect github.com/cosmos/ibc-go/v4 v4.4.2 // indirect @@ -98,7 +99,6 @@ require ( github.com/improbable-eng/grpc-web v0.15.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmhodges/levigo v1.0.0 // indirect - github.com/josharian/intern v1.0.0 // indirect github.com/klauspost/compress v1.16.3 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect @@ -106,13 +106,14 @@ require ( github.com/libp2p/go-buffer-pool v0.1.0 // indirect github.com/linxGnu/grocksdb v1.7.10 // indirect github.com/magiconair/properties v1.8.6 // indirect - github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mimoo/StrobeGo v0.0.0-20210601165009-122bf33a46e0 // indirect github.com/minio/highwayhash v1.0.2 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.2 // indirect github.com/mtibben/percent v0.2.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/pelletier/go-toml v1.9.5 // indirect @@ -120,7 +121,6 @@ require ( github.com/petermattis/goid v0.0.0-20230317030725-371a4b8eda08 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_golang v1.16.0 // indirect github.com/prometheus/client_model v0.3.0 // indirect github.com/prometheus/common v0.42.0 // indirect github.com/prometheus/procfs v0.10.1 // indirect @@ -187,8 +187,9 @@ replace ( // use cosmos-flavored protobufs github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 - github.com/tendermint/tendermint => github.com/cometbft/cometbft v0.34.27 + //github.com/jackalLabs/canine-chain/v3 => ../canine-chain + github.com/jackalLabs/canine-chain/v3 => github.com/jackalLabs/canine-chain/v3 v3.0.2-0.20231120052238-6b1aa68fa906 // using the master branch for now before v4 releases - github.com/wealdtech/go-merkletree => github.com/TheMarstonConnell/go-merkletree v0.0.0-20230328183338-b5d590ab1aaf + github.com/tendermint/tendermint => github.com/cometbft/cometbft v0.34.27 ) diff --git a/go.sum b/go.sum index dd4b30c..18fc1db 100644 --- a/go.sum +++ b/go.sum @@ -44,8 +44,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= -cosmossdk.io/api v0.3.1 h1:NNiOclKRR0AOlO4KIqeaG6PS6kswOMhHD0ir0SscNXE= -cosmossdk.io/api v0.3.1/go.mod h1:DfHfMkiNA2Uhy8fj0JJlOCYOBp4eWUUJ1te5zBGNyIw= +cosmossdk.io/api v0.4.1 h1:0ikaYM6GyxTYYcfBiyR8YnLCfhNnhKpEFnaSepCTmqg= +cosmossdk.io/api v0.4.1/go.mod h1:jR7k5ok90LxW2lFUXvd8Vpo/dr4PpiyVegxdm7b1ZdE= cosmossdk.io/core v0.5.1 h1:vQVtFrIYOQJDV3f7rw4pjjVqc1id4+mE0L9hHP66pyI= cosmossdk.io/core v0.5.1/go.mod h1:KZtwHCLjcFuo0nmDc24Xy6CRNEL9Vl/MeimQ2aC7NLE= cosmossdk.io/depinject v1.0.0-alpha.3 h1:6evFIgj//Y3w09bqOUOzEpFj5tsxBqdc5CfkO7z+zfw= @@ -104,8 +104,6 @@ github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMx github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/TheMarstonConnell/cosmos-go-wallet v0.7.3-0.20231017193534-7de44d189e2a h1:7TBxGLnjrwT7JD1mLGPhkES/Dm2GJN0/wB6w1rR6we8= github.com/TheMarstonConnell/cosmos-go-wallet v0.7.3-0.20231017193534-7de44d189e2a/go.mod h1:1Qv2CAWgjKx/h/yA3xP/3losGFRxxKbxFDjGIqwhvvc= -github.com/TheMarstonConnell/go-merkletree v0.0.0-20230328183338-b5d590ab1aaf h1:K8hLTtjPxDCIjb/YNncNWHTWDpubfPFbXWGE8vAQC4g= -github.com/TheMarstonConnell/go-merkletree v0.0.0-20230328183338-b5d590ab1aaf/go.mod h1:Dpt5BLOsKmHAA6gAs3lyo9EKaLq2glBsEoHj3RqfN+Q= github.com/VictoriaMetrics/fastcache v1.6.0/go.mod h1:0qHz5QP0GMX4pfmMA/zt5RgfNuXJrTP0zS7DqpHGGTw= github.com/VividCortex/gohistogram v1.0.0 h1:6+hBz+qvs0JOrrNhhmR7lFxo5sINxBCGXrdtl/UvroE= github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/4+TcAqDqk/vUH7g= @@ -718,8 +716,8 @@ github.com/iris-contrib/jade v1.1.4/go.mod h1:EDqR+ur9piDl6DUgs6qRrlfzmlx/D5Uybo github.com/iris-contrib/pongo2 v0.0.1/go.mod h1:Ssh+00+3GAZqSQb30AvBRNxBx7rf0GqwkjqxNd0u65g= github.com/iris-contrib/schema v0.0.1/go.mod h1:urYA3uvUNG1TIIjOSCzHr9/LmbQo8LrOcOqfqxa4hXw= github.com/iris-contrib/schema v0.0.6/go.mod h1:iYszG0IOsuIsfzjymw1kMzTL8YQcCWlm65f3wX8J5iA= -github.com/jackalLabs/canine-chain/v3 v3.0.1 h1:l6ZD5qSaB8RUuSo08kD1yt1vLxx+k4Mc80RozAPmws0= -github.com/jackalLabs/canine-chain/v3 v3.0.1/go.mod h1:9q17bBVcNA69GxPPUePCnZDg0RZecjJ7ef9vZGq7ogU= +github.com/jackalLabs/canine-chain/v3 v3.0.2-0.20231120052238-6b1aa68fa906 h1:f2WRWAYyx7zs02Crbb4rzBFfVtNZhkzIEwh2YIxouNE= +github.com/jackalLabs/canine-chain/v3 v3.0.2-0.20231120052238-6b1aa68fa906/go.mod h1:x8EHziV+N+pKqYztDFpmh57uO0TpPhF0AF13tZ735MM= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jedisct1/go-minisign v0.0.0-20190909160543-45766022959e/go.mod h1:G1CVv03EnqU1wYL2dFwXxW2An0az9JTl/ZsqXQeBlkU= github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= @@ -732,7 +730,6 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC github.com/jmhodges/levigo v1.0.0 h1:q5EC36kV79HWeTBWsod3mG11EgStG3qArTKcvlksN1U= github.com/jmhodges/levigo v1.0.0/go.mod h1:Q6Qx+uH3RAqyK4rFQroq9RL7mdkABMcfhEI+nNuzMJQ= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= -github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8HmY= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= @@ -840,7 +837,6 @@ github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPK github.com/mailgun/raymond/v2 v2.0.46/go.mod h1:lsgvL50kgt1ylcFJYZiULi5fjPBkkhNfj4KA0W54Z18= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= -github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0= github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc= github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ= github.com/matryer/try v0.0.0-20161228173917-9ac251b645a2/go.mod h1:0KeJpeMD6o+O4hW7qJOT7vyQPKrWmj26uf5wMc/IiIs= @@ -1254,6 +1250,8 @@ github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/vmihailenco/msgpack/v5 v5.3.5/go.mod h1:7xyJ9e+0+9SaZT0Wt1RGleJXzli6Q/V5KbhBonMG9jc= github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds= +github.com/wealdtech/go-merkletree/v2 v2.5.1-0.20231106114422-6769f4468d71 h1:6CdfLVLgeh6MjVbn3dyNFUU3QJXhVBriv9QTg7Hy7vk= +github.com/wealdtech/go-merkletree/v2 v2.5.1-0.20231106114422-6769f4468d71/go.mod h1:NL3IVC+cwXMegkCY4SyPPMee3sYIBM+msyFOEhQS+qM= github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU= github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ= diff --git a/monitoring/monitor.go b/monitoring/monitor.go new file mode 100644 index 0000000..bef723b --- /dev/null +++ b/monitoring/monitor.go @@ -0,0 +1,63 @@ +package monitoring + +import ( + "context" + "strconv" + "time" + + bankTypes "github.com/cosmos/cosmos-sdk/x/bank/types" + "github.com/jackalLabs/canine-chain/v3/x/storage/types" +) + +func (m *Monitor) updateBurns() { + cl := types.NewQueryClient(m.wallet.Client.GRPCConn) + provRes, err := cl.Provider(context.Background(), &types.QueryProvider{Address: m.wallet.AccAddress()}) + if err != nil { + return + } + + bString := provRes.Provider.BurnedContracts + burns, err := strconv.ParseInt(bString, 10, 64) + if err != nil { + return + } + + fileBurnCount.Set(float64(burns)) +} + +func (m *Monitor) updateHeight() { + abciInfo, err := m.wallet.Client.RPCClient.ABCIInfo(context.Background()) + if err != nil { + return + } + height := abciInfo.Response.LastBlockHeight + + blockHeight.Set(float64(height)) +} + +func (m *Monitor) updateBalance() { + cl := bankTypes.NewQueryClient(m.wallet.Client.GRPCConn) + provRes, err := cl.Balance(context.Background(), &bankTypes.QueryBalanceRequest{Address: m.wallet.AccAddress(), Denom: "ujkl"}) + if err != nil { + return + } + + amt := provRes.Balance.Amount + + tokenBalance.Set(float64(amt.QuoRaw(1_000_000).Int64())) +} + +func (m *Monitor) Start() { + m.running = true + + for m.running { + time.Sleep(time.Second * 5) + m.updateBurns() + m.updateHeight() + m.updateBalance() + } +} + +func (m *Monitor) Stop() { + m.running = false +} diff --git a/monitoring/types.go b/monitoring/types.go new file mode 100644 index 0000000..ad68f9a --- /dev/null +++ b/monitoring/types.go @@ -0,0 +1,41 @@ +package monitoring + +import ( + "github.com/desmos-labs/cosmos-go-wallet/wallet" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var fileBurnCount = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "sequoia_burn_count", + Help: "The number of files burned by provider", +}) + +var blockHeight = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "sequoia_block_height", + Help: "The height of the chain at a given time", +}) + +var catchingUp = promauto.NewSummary(prometheus.SummaryOpts{ + Name: "sequoia_catching_up", + Help: "If the node is catching up", +}) + +var tokenBalance = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "sequoia_balance", + Help: "Provider token balance", +}) + +var _ = catchingUp + +type Monitor struct { + running bool + wallet *wallet.Wallet +} + +func NewMonitor(wallet *wallet.Wallet) *Monitor { + return &Monitor{ + running: false, + wallet: wallet, + } +} diff --git a/network/downloads.go b/network/downloads.go index e3f7aff..a53123d 100644 --- a/network/downloads.go +++ b/network/downloads.go @@ -3,21 +3,22 @@ package network import ( "bytes" "context" - "encoding/json" "fmt" "io" "net/http" "github.com/JackalLabs/sequoia/file_system" "github.com/desmos-labs/cosmos-go-wallet/wallet" - "github.com/dgraph-io/badger/v4" "github.com/jackalLabs/canine-chain/v3/x/storage/types" "github.com/rs/zerolog/log" ) +import jsoniter "github.com/json-iterator/go" -func DownloadFile(db *badger.DB, cid string, fid string, wallet *wallet.Wallet, signee string, fileSize int64, myUrl string) error { - queryParams := &types.QueryFindFileRequest{ - Fid: fid, +var json = jsoniter.ConfigCompatibleWithStandardLibrary + +func DownloadFile(f *file_system.FileSystem, merkle []byte, owner string, start int64, wallet *wallet.Wallet, fileSize int64, myUrl string, chunkSize int64) error { + queryParams := &types.QueryFindFile{ + Merkle: merkle, } cl := types.NewQueryClient(wallet.Client.GRPCConn) @@ -34,7 +35,7 @@ func DownloadFile(db *badger.DB, cid string, fid string, wallet *wallet.Wallet, } if len(arr) == 0 { - return fmt.Errorf("%s not found on provider network", fid) + return fmt.Errorf("%x not found on provider network", merkle) } foundFile := false @@ -43,9 +44,9 @@ func DownloadFile(db *badger.DB, cid string, fid string, wallet *wallet.Wallet, continue } - size, err := DownloadFileFromURL(db, url, cid, fid, signee, wallet.AccAddress()) + size, err := DownloadFileFromURL(f, url, merkle, owner, start, wallet.AccAddress(), chunkSize) if err != nil { - log.Info().Msg(fmt.Sprintf("Couldn't get %s from %s, trying again...", fid, url)) + log.Info().Msg(fmt.Sprintf("Couldn't get %x from %s, trying again...", merkle, url)) continue } if fileSize != int64(size) { @@ -59,15 +60,15 @@ func DownloadFile(db *badger.DB, cid string, fid string, wallet *wallet.Wallet, return fmt.Errorf("failed to find file on network") } - log.Debug().Msg(fmt.Sprintf("Done downloading %s", fid)) + log.Debug().Msg(fmt.Sprintf("Done downloading %x", merkle)) return nil } -func DownloadFileFromURL(db *badger.DB, url string, cid string, fid string, signee string, address string) (int, error) { - log.Info().Msg(fmt.Sprintf("Downloading %s from %s...", fid, url)) +func DownloadFileFromURL(f *file_system.FileSystem, url string, merkle []byte, owner string, start int64, address string, chunkSize int64) (int, error) { + log.Info().Msg(fmt.Sprintf("Downloading %x from %s...", merkle, url)) cli := http.Client{} - req, err := http.NewRequest("GET", fmt.Sprintf("%s/download/%s", url, fid), nil) + req, err := http.NewRequest("GET", fmt.Sprintf("%s/download/%x", url, merkle), nil) if err != nil { return 0, err } @@ -99,7 +100,7 @@ func DownloadFileFromURL(db *badger.DB, url string, cid string, fid string, sign reader := bytes.NewReader(buff.Bytes()) - _, _, _, size, err := file_system.WriteFile(db, reader, signee, address, cid) + size, err := f.WriteFile(reader, merkle, owner, start, address, chunkSize) if err != nil { return 0, err } diff --git a/proofs/monitoring.go b/proofs/monitoring.go new file mode 100644 index 0000000..3636269 --- /dev/null +++ b/proofs/monitoring.go @@ -0,0 +1,11 @@ +package proofs + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var filesProving = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "sequoia_current_proofs_processing", + Help: "The number of files currently being proven", +}) diff --git a/proofs/proofs.go b/proofs/proofs.go index a0581a9..53e6c0c 100644 --- a/proofs/proofs.go +++ b/proofs/proofs.go @@ -3,120 +3,203 @@ package proofs import ( "context" "crypto/sha256" - "encoding/json" + "encoding/hex" + "errors" "fmt" - "io" - "strconv" "time" - "github.com/JackalLabs/sequoia/file_system" + sdk "github.com/cosmos/cosmos-sdk/types" + canine "github.com/jackalLabs/canine-chain/v3/app" + "github.com/JackalLabs/sequoia/queue" "github.com/desmos-labs/cosmos-go-wallet/wallet" - "github.com/dgraph-io/badger/v4" "github.com/jackalLabs/canine-chain/v3/x/storage/types" "github.com/rs/zerolog/log" - "github.com/wealdtech/go-merkletree" - "github.com/wealdtech/go-merkletree/sha3" + "github.com/wealdtech/go-merkletree/v2" + "github.com/wealdtech/go-merkletree/v2/sha3" ) +import jsoniter "github.com/json-iterator/go" + +var json = jsoniter.ConfigCompatibleWithStandardLibrary const ( ErrNotOurs = "not our deal" ErrNotReady = "not ready yet" ) -func GenerateMerkleProof(tree *merkletree.MerkleTree, index int, item []byte) (valid bool, proof *merkletree.Proof, err error) { +func GenerateMerkleProof(tree *merkletree.MerkleTree, index int, item []byte) (bool, *merkletree.Proof, error) { + log.Debug().Msg(fmt.Sprintf("Generating Merkle proof for %d", index)) + h := sha256.New() - _, err = io.WriteString(h, fmt.Sprintf("%d%x", index, item)) + _, err := h.Write([]byte(fmt.Sprintf("%d%x", index, item))) + if err != nil { + return false, nil, err + } + + proof, err := tree.GenerateProof(h.Sum(nil), 0) if err != nil { - return + return false, nil, err + } + + valid, err := merkletree.VerifyProofUsing(h.Sum(nil), false, proof, [][]byte{tree.Root()}, sha3.New512()) + if err != nil { + return false, nil, err + } + return valid, proof, nil +} + +// GenProof generates a proof from an arbitrary file on the file system +// +// returns proof, item and error +func GenProof(io FileSystem, merkle []byte, owner string, start int64, block int) ([]byte, []byte, error) { + tree, chunk, err := io.GetFileTreeByChunk(merkle, owner, start, block) + if err != nil { + log.Error().Err(fmt.Errorf("failed to get filetree by chunk for %x %w", merkle, err)) + return nil, nil, err + } + + log.Debug().Msg(fmt.Sprintf("About to generate merkle proof for %x", merkle)) + + valid, proof, err := GenerateMerkleProof(tree, block, chunk) + if err != nil { + return nil, nil, err + } + if !valid { + e := errors.New("tree not valid") + log.Error().Err(fmt.Errorf("tree not valid for %x %w", merkle, e)) + return nil, nil, e } - proof, err = tree.GenerateProof(h.Sum(nil), 0) + jproof, err := json.Marshal(*proof) if err != nil { - return + return nil, nil, err } - valid, err = merkletree.VerifyProofUsing(h.Sum(nil), false, proof, [][]byte{tree.Root()}, sha3.New512()) - return + log.Debug().Msg(fmt.Sprintf("Done making proof for %x", merkle)) + + return jproof, chunk, nil } -func (p *Prover) GenerateProof(cid string) ([]byte, []byte, error) { - queryParams := &types.QueryActiveDealRequest{ - Cid: cid, +func (p *Prover) GenerateProof(merkle []byte, owner string, start int64, blockHeight int64, startedAt time.Time) ([]byte, []byte, int64, error) { + log.Debug().Msg(fmt.Sprintf("Generating proof for %x", merkle)) + queryParams := &types.QueryFile{ + Merkle: merkle, + Owner: owner, + Start: start, } cl := types.NewQueryClient(p.wallet.Client.GRPCConn) - res, err := cl.ActiveDeals(context.Background(), queryParams) - if err != nil { // if the deal doesn't exist we check strays & contracts, then remove it - contractParams := &types.QueryContractRequest{ - Cid: cid, - } - _, e := cl.Contracts(context.Background(), contractParams) - if e != nil { // if we can't find the contract, check strays, then remove it - strayParams := &types.QueryStrayRequest{ - Cid: cid, - } - _, e := cl.Strays(context.Background(), strayParams) - if e != nil { // if we can't find the stray, remove it - return nil, nil, err - } + res, err := cl.File(context.Background(), queryParams) + if err != nil { + return nil, nil, 0, err + } - return nil, nil, fmt.Errorf(ErrNotReady) - } + file := res.File - return nil, nil, fmt.Errorf(ErrNotReady) + proofQuery := &types.QueryProof{ + ProviderAddress: p.wallet.AccAddress(), + Merkle: file.Merkle, + Owner: file.Owner, + Start: file.Start, } - if res.ActiveDeals.Provider != p.wallet.AccAddress() { - return nil, nil, fmt.Errorf(ErrNotOurs) + newProof := types.FileProof{ // defining a new proof model + Prover: p.wallet.AccAddress(), + Merkle: file.Merkle, + Owner: file.Owner, + Start: file.Start, + LastProven: 0, + ChunkToProve: 0, } - if res.ActiveDeals.Proofverified == "true" { - return nil, nil, nil + if file.ContainsProver(p.wallet.AccAddress()) { + // file is ours + proofRes, err := cl.Proof(context.Background(), proofQuery) + if err == nil { + newProof = proofRes.Proof // found the proof, we're good to go + } + } else { + // file is not ours, we need to figure out what to do with it + if len(file.Proofs) == int(file.MaxProofs) { + return nil, nil, 0, errors.New(ErrNotOurs) // there is no more room on this file anyway, ignore it + } } - blockToProveString := res.ActiveDeals.Blocktoprove - blockToProve, err := strconv.ParseInt(blockToProveString, 10, 64) - if err != nil { - return nil, nil, err + log.Debug().Msg(fmt.Sprintf("Querying proof of %x", merkle)) + + t := time.Since(startedAt) + + proven := file.ProvenThisBlock(blockHeight+int64(t.Seconds()/5.0), newProof.LastProven) + if proven { + log.Info().Msg(fmt.Sprintf("%x was proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight)) + log.Debug().Msg("File was already proven") + return nil, nil, 0, nil } - block := int(blockToProve) + log.Info().Msg(fmt.Sprintf("%x was not yet proven at %d, height is now %d", file.Merkle, newProof.LastProven, blockHeight)) + + block := int(newProof.ChunkToProve) + + log.Debug().Msg(fmt.Sprintf("Getting file tree by chunk for %x", merkle)) + + proof, item, err := GenProof(p.io, merkle, owner, start, block) + + return proof, item, newProof.ChunkToProve, err +} - tree, chunk, err := file_system.GetFileChunk(p.db, cid, block) +func (p *Prover) PostProof(merkle []byte, owner string, start int64, blockHeight int64, startedAt time.Time) error { + proof, item, index, err := p.GenerateProof(merkle, owner, start, blockHeight, startedAt) if err != nil { - return nil, nil, err + return err } - valid, proof, err := GenerateMerkleProof(tree, block, chunk) - if err != nil { - return nil, nil, err + if proof == nil || item == nil { + log.Debug().Msg("generated proof was nil but no error was thrown") + return nil } - if !valid { - return nil, nil, fmt.Errorf("tree not valid") + + log.Debug().Msg("Successfully generated proof") + + msg := types.NewMsgPostProof(p.wallet.AccAddress(), merkle, owner, start, item, proof, index) + + m, wg := p.q.Add(msg) + + wg.Wait() + + if m.Error() != nil { + log.Error().Err(m.Error()) + return m.Error() } - jproof, err := json.Marshal(*proof) + var postRes types.MsgPostProofResponse + data, err := hex.DecodeString(m.Res().Data) if err != nil { - return nil, nil, err + return nil } - return jproof, chunk, nil -} - -func (p *Prover) PostProof(cid string) error { - proof, item, err := p.GenerateProof(cid) + encodingCfg := canine.MakeEncodingConfig() + var txMsgData sdk.TxMsgData + err = encodingCfg.Marshaler.Unmarshal(data, &txMsgData) if err != nil { - return err + return nil } - if proof == nil { + if len(txMsgData.Data) == 0 { return nil } - msg := types.NewMsgPostproof(p.wallet.AccAddress(), fmt.Sprintf("%x", item), string(proof), cid) + err = postRes.Unmarshal(txMsgData.Data[m.Index()].Data) + if err != nil { + return nil + } + + if !postRes.Success { + log.Error().Msg(postRes.ErrorMessage) + } + + log.Info().Msg(fmt.Sprintf("%x was successfully proven", merkle)) - _, _ = p.q.Add(msg) + log.Debug().Msg(fmt.Sprintf("TX Hash: %s", m.Hash())) return nil } @@ -128,43 +211,57 @@ func (p *Prover) Start() { return } - time.Sleep(time.Millisecond * 333) // pauses for one third of a second - if !p.processed.Add(time.Second * time.Duration(p.interval)).Before(time.Now()) { // check every 2 minutes + time.Sleep(time.Millisecond * 1000) // pauses for one third of a second + if !p.processed.Add(time.Second * time.Duration(p.interval)).Before(time.Now()) { continue } - if p.locked { + log.Debug().Msg("Starting proof cycle...") + + abciInfo, err := p.wallet.Client.RPCClient.ABCIInfo(context.Background()) + if err != nil { + log.Error().Err(err) continue } + height := abciInfo.Response.LastBlockHeight + 10 - p.locked = true + t := time.Now() - files, err := file_system.ListFiles(p.db) + err = p.io.ProcessFiles(func(merkle []byte, owner string, start int64) { + for p.Full() { + time.Sleep(time.Second) + } + log.Debug().Msg(fmt.Sprintf("proving: %x", merkle)) + filesProving.Inc() + p.Inc() + go p.wrapPostProof(merkle, owner, start, height, t) + }) if err != nil { log.Error().Err(err) } - for _, cid := range files { - err := p.PostProof(cid) + p.processed = time.Now() + } +} + +func (p *Prover) wrapPostProof(merkle []byte, owner string, start int64, height int64, startedAt time.Time) { + defer filesProving.Dec() + defer p.Dec() + err := p.PostProof(merkle, owner, start, height, startedAt) + if err != nil { + log.Warn().Err(err) + if err.Error() == "rpc error: code = NotFound desc = not found" { // if the file is not found on the network, delete it + err := p.io.DeleteFile(merkle, owner, start) if err != nil { - if err.Error() == "rpc error: code = NotFound desc = not found" { // if the file is not found on the network, delete it - err := file_system.DeleteFile(p.db, cid) - if err != nil { - log.Error().Err(err) - } - } - if err.Error() == ErrNotOurs { // if the file is not ours, delete it - err := file_system.DeleteFile(p.db, cid) - if err != nil { - log.Error().Err(err) - } - } + log.Error().Err(err) + } + } + if err.Error() == ErrNotOurs { // if the file is not ours, delete it + err := p.io.DeleteFile(merkle, owner, start) + if err != nil { + log.Error().Err(err) } } - - p.locked = false - - p.processed = time.Now() } } @@ -172,14 +269,15 @@ func (p *Prover) Stop() { p.running = false } -func NewProver(wallet *wallet.Wallet, db *badger.DB, q *queue.Queue, interval int64) *Prover { +func NewProver(wallet *wallet.Wallet, q *queue.Queue, io FileSystem, interval int64, threads int64) *Prover { p := Prover{ running: false, wallet: wallet, - db: db, q: q, processed: time.Time{}, interval: interval, + io: io, + threads: threads, } return &p diff --git a/proofs/types.go b/proofs/types.go index 6d3f09a..c6634dc 100644 --- a/proofs/types.go +++ b/proofs/types.go @@ -3,17 +3,37 @@ package proofs import ( "time" + "github.com/wealdtech/go-merkletree/v2" + "github.com/JackalLabs/sequoia/queue" "github.com/desmos-labs/cosmos-go-wallet/wallet" - "github.com/dgraph-io/badger/v4" ) type Prover struct { - running bool - wallet *wallet.Wallet - db *badger.DB - q *queue.Queue - processed time.Time - locked bool - interval int64 + running bool + wallet *wallet.Wallet + q *queue.Queue + processed time.Time + interval int64 + io FileSystem + threads int64 + currentThreads int64 +} + +type FileSystem interface { + DeleteFile([]byte, string, int64) error + ProcessFiles(func([]byte, string, int64)) error + GetFileTreeByChunk([]byte, string, int64, int) (*merkletree.MerkleTree, []byte, error) +} + +func (p *Prover) Inc() { + p.currentThreads++ +} + +func (p *Prover) Dec() { + p.currentThreads-- +} + +func (p *Prover) Full() bool { + return p.threads <= p.currentThreads } diff --git a/queue/monitoring.go b/queue/monitoring.go new file mode 100644 index 0000000..be5cd54 --- /dev/null +++ b/queue/monitoring.go @@ -0,0 +1,11 @@ +package queue + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var queueSize = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "sequoia_queue_size", + Help: "The number of messages currently in the queue", +}) diff --git a/queue/queue.go b/queue/queue.go index fc10df2..561ed55 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -8,7 +8,6 @@ import ( "github.com/cosmos/cosmos-sdk/types" walletTypes "github.com/desmos-labs/cosmos-go-wallet/types" "github.com/desmos-labs/cosmos-go-wallet/wallet" - storageTypes "github.com/jackalLabs/canine-chain/v3/x/storage/types" "github.com/rs/zerolog/log" ) @@ -63,26 +62,14 @@ func (q *Queue) Listen() { log.Info().Msg(fmt.Sprintf("Queue: %d messages waiting to be put on-chain...", lmsg)) - maxSize := 1024 * 1024 // 1mb + // maxSize := 1024 * 1024 // 1mb + maxSize := 45 total := len(q.messages) + queueSize.Set(float64(total)) - var size int - for s, message := range q.messages { - - var k interface{} = message - - // nolint:all - switch k.(type) { - case storageTypes.MsgPostproof: - mpp := k.(storageTypes.MsgPostproof) - size += mpp.Size() - } - - if size > maxSize { - total = s - } - + if total > maxSize { + total = maxSize } log.Info().Msg(fmt.Sprintf("Queue: Posting %d messages to chain...", total)) @@ -104,9 +91,11 @@ func (q *Queue) Listen() { if err != nil { log.Info().Msg(fmt.Sprintf("Failed to post from queue: %s", err.Error())) } - for _, process := range toProcess { + + for i, process := range toProcess { process.err = err process.res = res + process.msgIndex = i process.Done() } diff --git a/queue/types.go b/queue/types.go index 5ba153d..0534857 100644 --- a/queue/types.go +++ b/queue/types.go @@ -17,10 +17,11 @@ type Queue struct { } type Message struct { - msg types.Msg - wg *sync.WaitGroup - err error - res *types.TxResponse + msg types.Msg + wg *sync.WaitGroup + err error + res *types.TxResponse + msgIndex int } func (m *Message) Error() error { @@ -30,3 +31,15 @@ func (m *Message) Error() error { func (m *Message) Log() string { return m.res.RawLog } + +func (m *Message) Res() *types.TxResponse { + return m.res +} + +func (m *Message) Hash() string { + return m.res.TxHash +} + +func (m *Message) Index() int { + return m.msgIndex +} diff --git a/strays/hands.go b/strays/hands.go index dcdc0cb..a7dbc9f 100644 --- a/strays/hands.go +++ b/strays/hands.go @@ -1,23 +1,27 @@ package strays import ( - "strconv" "time" + "github.com/JackalLabs/sequoia/file_system" + "github.com/JackalLabs/sequoia/proofs" + "github.com/JackalLabs/sequoia/network" "github.com/JackalLabs/sequoia/queue" walletTypes "github.com/desmos-labs/cosmos-go-wallet/types" "github.com/desmos-labs/cosmos-go-wallet/wallet" - "github.com/dgraph-io/badger/v4" "github.com/jackalLabs/canine-chain/v3/x/storage/types" "github.com/rs/zerolog/log" ) +import jsoniter "github.com/json-iterator/go" + +var json = jsoniter.ConfigCompatibleWithStandardLibrary func (h *Hand) Stop() { h.running = false } -func (h *Hand) Start(db *badger.DB, wallet *wallet.Wallet, myUrl string) { +func (h *Hand) Start(f *file_system.FileSystem, wallet *wallet.Wallet, myUrl string, chunkSize int64) { h.running = true for h.running { @@ -26,27 +30,45 @@ func (h *Hand) Start(db *badger.DB, wallet *wallet.Wallet, myUrl string) { continue } - signee := h.stray.Signee - fid := h.stray.Fid - cid := h.stray.Cid - size, err := strconv.ParseInt(h.stray.Filesize, 10, 64) + signee := h.stray.Owner + merkle := h.stray.Merkle + start := h.stray.Start + + err := network.DownloadFile(f, merkle, signee, start, wallet, h.stray.FileSize, myUrl, chunkSize) + if err != nil { + log.Error().Err(err) + h.stray = nil + continue + } + + tree, chunk, err := f.GetFileTreeByChunk(merkle, signee, start, 0) + if err != nil { + log.Error().Err(err) + h.stray = nil + continue + } + + _, proof, err := proofs.GenerateMerkleProof(tree, 0, chunk) if err != nil { log.Error().Err(err) h.stray = nil continue } - err = network.DownloadFile(db, cid, fid, wallet, signee, size, myUrl) + jproof, err := json.Marshal(*proof) if err != nil { log.Error().Err(err) h.stray = nil continue } - msg := &types.MsgClaimStray{ - Creator: h.Address(), - Cid: cid, - ForAddress: wallet.AccAddress(), + msg := &types.MsgPostProof{ + Creator: h.Address(), + Item: chunk, + HashList: jproof, + Merkle: merkle, + Owner: signee, + Start: start, } data := walletTypes.NewTransactionData( @@ -79,7 +101,7 @@ func (h *Hand) Busy() bool { return h.stray != nil } -func (h *Hand) Take(stray *types.Strays) { +func (h *Hand) Take(stray *types.UnifiedFile) { h.stray = stray } diff --git a/strays/manager.go b/strays/manager.go index ce7d062..965f0c9 100644 --- a/strays/manager.go +++ b/strays/manager.go @@ -6,12 +6,13 @@ import ( "math/rand" "time" + "github.com/JackalLabs/sequoia/file_system" + "github.com/JackalLabs/sequoia/queue" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/query" "github.com/cosmos/cosmos-sdk/x/feegrant" "github.com/desmos-labs/cosmos-go-wallet/wallet" - "github.com/dgraph-io/badger/v4" "github.com/jackalLabs/canine-chain/v3/x/storage/types" "github.com/rs/zerolog/log" ) @@ -87,11 +88,11 @@ func NewStrayManager(w *wallet.Wallet, q *queue.Queue, interval int64, refreshIn return s } -func (s *StrayManager) Start(db *badger.DB, myUrl string) { +func (s *StrayManager) Start(f *file_system.FileSystem, myUrl string, chunkSize int64) { s.running = true for _, hand := range s.hands { - go hand.Start(db, s.wallet, myUrl) + go hand.Start(f, s.wallet, myUrl, chunkSize) } for s.running { @@ -99,6 +100,8 @@ func (s *StrayManager) Start(db *badger.DB, myUrl string) { if s.refreshed.Add(time.Second * s.refreshInterval).Before(time.Now()) { err := s.RefreshList() if err != nil { + log.Error().Err(err) + log.Info().Msg("failed refresh") } s.refreshed = time.Now() @@ -121,7 +124,7 @@ func (s *StrayManager) Start(db *badger.DB, myUrl string) { } } -func (s *StrayManager) Pop() *types.Strays { +func (s *StrayManager) Pop() *types.UnifiedFile { if len(s.strays) == 0 { return nil } @@ -143,7 +146,7 @@ func (s *StrayManager) Stop() { func (s *StrayManager) RefreshList() error { log.Info().Msg("Refreshing stray list...") - s.strays = make([]*types.Strays, 0) + s.strays = make([]*types.UnifiedFile, 0) var val uint64 if s.lastSize > 300 { @@ -157,18 +160,19 @@ func (s *StrayManager) RefreshList() error { CountTotal: true, } - queryParams := &types.QueryAllStraysRequest{ - Pagination: page, + queryParams := &types.QueryOpenFiles{ + ProviderAddress: s.wallet.AccAddress(), + Pagination: page, } cl := types.NewQueryClient(s.wallet.Client.GRPCConn) - res, err := cl.StraysAll(context.Background(), queryParams) + res, err := cl.OpenFiles(context.Background(), queryParams) if err != nil { return err } - for _, stray := range res.Strays { + for _, stray := range res.Files { newStray := stray s.strays = append(s.strays, &newStray) } diff --git a/strays/types.go b/strays/types.go index c5cbc37..a97a398 100644 --- a/strays/types.go +++ b/strays/types.go @@ -11,12 +11,12 @@ import ( type Hand struct { wallet *wallet.Wallet offset byte - stray *types.Strays + stray *types.UnifiedFile running bool } type StrayManager struct { - strays []*types.Strays + strays []*types.UnifiedFile wallet *wallet.Wallet lastSize int rand *rand.Rand