diff --git a/entries/epoch.go b/entries/epoch.go new file mode 100644 index 0000000..f7faae3 --- /dev/null +++ b/entries/epoch.go @@ -0,0 +1,106 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" +) + +// TODO: when to use nullzero vs use_zero? +type EpochEntry struct { + EpochNumber uint64 + InitialBlockHeight uint64 + InitialView uint64 + FinalBlockHeight uint64 + CreatedAtBlockTimestampNanoSecs uint64 + + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGEpochEntry struct { + bun.BaseModel `bun:"table:epoch_entry"` + EpochEntry +} + +// TODO: Do I need this? +type PGEpochUtxoOps struct { + bun.BaseModel `bun:"table:epoch_entry_utxo_ops"` + EpochEntry + UtxoOperation +} + +// Convert the EpochEntry DeSo encoder to the PGEpochEntry struct used by bun. +func EpochEntryEncoderToPGStruct(epochEntry *lib.EpochEntry, keyBytes []byte, params *lib.DeSoParams) EpochEntry { + return EpochEntry{ + EpochNumber: epochEntry.EpochNumber, + InitialBlockHeight: epochEntry.InitialBlockHeight, + InitialView: epochEntry.InitialView, + FinalBlockHeight: epochEntry.FinalBlockHeight, + BadgerKey: keyBytes, + } +} + +// EpochEntryBatchOperation is the entry point for processing a batch of Epoch entries. +// It determines the appropriate handler based on the operation type and executes it. +func EpochEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteEpochEntry(entries, db, operationType) + } else { + err = bulkInsertEpochEntry(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.EpochEntryBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertEpochEntry inserts a batch of locked stake entries into the database. +func bulkInsertEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGEpochEntry, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGEpochEntry{EpochEntry: EpochEntryEncoderToPGStruct(entry.Encoder.(*lib.EpochEntry), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertEpochEntry: Error inserting entries") + } + return nil +} + +// bulkDeleteEpochEntry deletes a batch of locked stake entries from the database. +func bulkDeleteEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGEpochEntry{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteEpochEntry: Error deleting entries") + } + + return nil +} diff --git a/entries/locked_stake.go b/entries/locked_stake.go new file mode 100644 index 0000000..d8adf88 --- /dev/null +++ b/entries/locked_stake.go @@ -0,0 +1,117 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" + "github.com/uptrace/bun/extra/bunbig" +) + +// TODO: when to use nullzero vs use_zero? +type LockedStakeEntry struct { + StakerPKID string `bun:",nullzero"` + ValidatorPKID string `bun:",nullzero"` + LockedAmountNanos *bunbig.Int `pg:",use_zero"` + LockedAtEpochNumber uint64 + + ExtraData map[string]string `bun:"type:jsonb"` + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGLockedStakeEntry struct { + bun.BaseModel `bun:"table:locked_stake_entry"` + LockedStakeEntry +} + +// TODO: Do I need this? +type PGLockedStakeEntryUtxoOps struct { + bun.BaseModel `bun:"table:locked_stake_entry_utxo_ops"` + LockedStakeEntry + UtxoOperation +} + +// Convert the LockedStakeEntry DeSo encoder to the PGLockedStakeEntry struct used by bun. +func LockedStakeEncoderToPGStruct(lockedStakeEntry *lib.LockedStakeEntry, keyBytes []byte, params *lib.DeSoParams) LockedStakeEntry { + pgLockedStakeEntry := LockedStakeEntry{ + ExtraData: consumer.ExtraDataBytesToString(lockedStakeEntry.ExtraData), + BadgerKey: keyBytes, + } + + if lockedStakeEntry.StakerPKID != nil { + pgLockedStakeEntry.StakerPKID = consumer.PublicKeyBytesToBase58Check((*lockedStakeEntry.StakerPKID)[:], params) + } + + if lockedStakeEntry.ValidatorPKID != nil { + pgLockedStakeEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*lockedStakeEntry.ValidatorPKID)[:], params) + } + + pgLockedStakeEntry.LockedAtEpochNumber = lockedStakeEntry.LockedAtEpochNumber + pgLockedStakeEntry.LockedAmountNanos = bunbig.FromMathBig(lockedStakeEntry.LockedAmountNanos.ToBig()) + + return pgLockedStakeEntry +} + +// LockedStakeBatchOperation is the entry point for processing a batch of LockedStake entries. +// It determines the appropriate handler based on the operation type and executes it. +func LockedStakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteLockedStakeEntry(entries, db, operationType) + } else { + err = bulkInsertLockedStakeEntry(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.LockedStakeBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertLockedStakeEntry inserts a batch of locked stake entries into the database. +func bulkInsertLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGLockedStakeEntry, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGLockedStakeEntry{LockedStakeEntry: LockedStakeEncoderToPGStruct(entry.Encoder.(*lib.LockedStakeEntry), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertLockedStakeEntry: Error inserting entries") + } + return nil +} + +// bulkDeleteLockedStakeEntry deletes a batch of locked stake entries from the database. +func bulkDeleteLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGLockedStakeEntry{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteLockedStakeEntry: Error deleting entries") + } + + return nil +} diff --git a/entries/lockup.go b/entries/lockup.go new file mode 100644 index 0000000..1531f70 --- /dev/null +++ b/entries/lockup.go @@ -0,0 +1,117 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" + "github.com/uptrace/bun/extra/bunbig" +) + +// TODO: when to use nullzero vs use_zero? +type LockedBalanceEntry struct { + HODLerPKID string `bun:",nullzero"` + ProfilePKID string `bun:",nullzero"` + UnlockTimestampNanoSecs int64 + VestingEndTimestampNanoSecs int64 + BalanceBaseUnits *bunbig.Int `pg:",use_zero"` + + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGLockedBalanceEntry struct { + bun.BaseModel `bun:"table:locked_balance_entry"` + LockedBalanceEntry +} + +// TODO: Do I need this? +type PGLockedBalanceEntryUtxoOps struct { + bun.BaseModel `bun:"table:locked_balance_entry_utxo_ops"` + LockedBalanceEntry + UtxoOperation +} + +// Convert the LockedBalanceEntry DeSo encoder to the PGLockedBalnceEntry struct used by bun. +func LockedBalanceEntryEncoderToPGStruct(lockedBalanceEntry *lib.LockedBalanceEntry, keyBytes []byte, params *lib.DeSoParams) LockedBalanceEntry { + pgLockedBalanceEntry := LockedBalanceEntry{ + BadgerKey: keyBytes, + } + + if lockedBalanceEntry.HODLerPKID != nil { + pgLockedBalanceEntry.HODLerPKID = consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.HODLerPKID)[:], params) + } + + if lockedBalanceEntry.ProfilePKID != nil { + pgLockedBalanceEntry.ProfilePKID = consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.ProfilePKID)[:], params) + } + + pgLockedBalanceEntry.UnlockTimestampNanoSecs = lockedBalanceEntry.UnlockTimestampNanoSecs + pgLockedBalanceEntry.VestingEndTimestampNanoSecs = lockedBalanceEntry.VestingEndTimestampNanoSecs + pgLockedBalanceEntry.BalanceBaseUnits = bunbig.FromMathBig(lockedBalanceEntry.BalanceBaseUnits.ToBig()) + + return pgLockedBalanceEntry +} + +// LockedBalanceEntryBatchOperation is the entry point for processing a batch of LockedBalance entries. +// It determines the appropriate handler based on the operation type and executes it. +func LockedBalanceEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteLockedBalanceEntry(entries, db, operationType) + } else { + err = bulkInsertLockedBalanceEntry(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.LockedBalanceEntryBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertLockedBalanceEntry inserts a batch of locked stake entries into the database. +func bulkInsertLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGLockedBalanceEntry, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGLockedBalanceEntry{LockedBalanceEntry: LockedBalanceEntryEncoderToPGStruct(entry.Encoder.(*lib.LockedBalanceEntry), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertLockedBalanceEntry: Error inserting entries") + } + return nil +} + +// bulkDeleteLockedBalanceEntry deletes a batch of locked stake entries from the database. +func bulkDeleteLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGLockedBalanceEntry{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteLockedBalanceEntry: Error deleting entries") + } + + return nil +} diff --git a/entries/stake.go b/entries/stake.go new file mode 100644 index 0000000..2c47fed --- /dev/null +++ b/entries/stake.go @@ -0,0 +1,117 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" + "github.com/uptrace/bun/extra/bunbig" +) + +// TODO: when to use nullzero vs use_zero? +type StakeEntry struct { + StakerPKID string `bun:",nullzero"` + ValidatorPKID string `bun:",nullzero"` + RewardMethod lib.StakingRewardMethod // TODO: we probably want this to be human readable? + StakeAmountNanos *bunbig.Int `pg:",use_zero"` + + ExtraData map[string]string `bun:"type:jsonb"` + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGStakeEntry struct { + bun.BaseModel `bun:"table:stake_entry"` + StakeEntry +} + +// TODO: Do I need this? +type PGStakeEntryUtxoOps struct { + bun.BaseModel `bun:"table:stake_entry_utxo_ops"` + StakeEntry + UtxoOperation +} + +// Convert the StakeEntry DeSo encoder to the PGStakeEntry struct used by bun. +func StakeEncoderToPGStruct(stakeEntry *lib.StakeEntry, keyBytes []byte, params *lib.DeSoParams) StakeEntry { + pgStakeEntry := StakeEntry{ + ExtraData: consumer.ExtraDataBytesToString(stakeEntry.ExtraData), + BadgerKey: keyBytes, + } + + if stakeEntry.StakerPKID != nil { + pgStakeEntry.StakerPKID = consumer.PublicKeyBytesToBase58Check((*stakeEntry.StakerPKID)[:], params) + } + + if stakeEntry.ValidatorPKID != nil { + pgStakeEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*stakeEntry.ValidatorPKID)[:], params) + } + + pgStakeEntry.RewardMethod = stakeEntry.RewardMethod + pgStakeEntry.StakeAmountNanos = bunbig.FromMathBig(stakeEntry.StakeAmountNanos.ToBig()) + + return pgStakeEntry +} + +// StakeBatchOperation is the entry point for processing a batch of Stake entries. +// It determines the appropriate handler based on the operation type and executes it. +func StakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteStakeEntry(entries, db, operationType) + } else { + err = bulkInsertStakeEntry(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.StakeBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertStakeEntry inserts a batch of stake entries into the database. +func bulkInsertStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGStakeEntry, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGStakeEntry{StakeEntry: StakeEncoderToPGStruct(entry.Encoder.(*lib.StakeEntry), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertStakeEntry: Error inserting entries") + } + return nil +} + +// bulkDeleteStakeEntry deletes a batch of stake entries from the database. +func bulkDeleteStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGStakeEntry{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteStakeEntry: Error deleting entries") + } + + return nil +} diff --git a/entries/validator.go b/entries/validator.go new file mode 100644 index 0000000..30e598e --- /dev/null +++ b/entries/validator.go @@ -0,0 +1,138 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" + "github.com/uptrace/bun/extra/bunbig" +) + +// TODO: when to use nullzero vs use_zero? +type ValidatorEntry struct { + ValidatorPKID string `bun:",nullzero"` + Domains []string `bun:",nullzero"` + DisableDelegatedStake bool + DelegatedStakeCommissionBasisPoints uint64 + VotingPublicKey string `bun:",nullzero"` + VotingAuthorization string `bun:",nullzero"` + // Use bunbig.Int to store the balance as a numeric in the pg database. + TotalStakeAmountNanos *bunbig.Int `pg:",use_zero"` + LastActiveAtEpochNumber uint64 + JailedAtEpochNumber uint64 + + ExtraData map[string]string `bun:"type:jsonb"` + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGValidatorEntry struct { + bun.BaseModel `bun:"table:validator_entry"` + ValidatorEntry +} + +// TODO: Do I need this? +type PGValidatorEntryUtxoOps struct { + bun.BaseModel `bun:"table:validator_entry_utxo_ops"` + ValidatorEntry + UtxoOperation +} + +// Convert the ValidatorEntry DeSo encoder to the PGValidatorEntry struct used by bun. +func ValidatorEncoderToPGStruct(validatorEntry *lib.ValidatorEntry, keyBytes []byte, params *lib.DeSoParams) ValidatorEntry { + pgValidatorEntry := ValidatorEntry{ + ExtraData: consumer.ExtraDataBytesToString(validatorEntry.ExtraData), + BadgerKey: keyBytes, + } + + if validatorEntry.ValidatorPKID != nil { + pgValidatorEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*validatorEntry.ValidatorPKID)[:], params) + } + + if validatorEntry.Domains != nil { + pgValidatorEntry.Domains = make([]string, len(validatorEntry.Domains)) + for ii, domain := range validatorEntry.Domains { + pgValidatorEntry.Domains[ii] = string(domain) + } + } + + pgValidatorEntry.DisableDelegatedStake = validatorEntry.DisableDelegatedStake + pgValidatorEntry.DelegatedStakeCommissionBasisPoints = validatorEntry.DelegatedStakeCommissionBasisPoints + + if validatorEntry.VotingPublicKey != nil { + pgValidatorEntry.VotingPublicKey = validatorEntry.VotingPublicKey.ToString() + } + + if validatorEntry.VotingAuthorization != nil { + pgValidatorEntry.VotingAuthorization = validatorEntry.VotingAuthorization.ToString() + } + + pgValidatorEntry.TotalStakeAmountNanos = bunbig.FromMathBig(validatorEntry.TotalStakeAmountNanos.ToBig()) + pgValidatorEntry.LastActiveAtEpochNumber = validatorEntry.LastActiveAtEpochNumber + pgValidatorEntry.JailedAtEpochNumber = validatorEntry.JailedAtEpochNumber + + return pgValidatorEntry +} + +// ValidatorBatchOperation is the entry point for processing a batch of Validator entries. +// It determines the appropriate handler based on the operation type and executes it. +func ValidatorBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteValidatorEntry(entries, db, operationType) + } else { + err = bulkInsertValidatorEntry(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.ValidatorBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertValidatorEntry inserts a batch of validator entries into the database. +func bulkInsertValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGValidatorEntry, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGValidatorEntry{ValidatorEntry: ValidatorEncoderToPGStruct(entry.Encoder.(*lib.ValidatorEntry), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertValidatorEntry: Error inserting entries") + } + return nil +} + +// bulkDeleteValidatorEntry deletes a batch of validator entries from the database. +func bulkDeleteValidatorEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGValidatorEntry{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteValidatorEntry: Error deleting entries") + } + + return nil +} diff --git a/entries/yield_curve_point.go b/entries/yield_curve_point.go new file mode 100644 index 0000000..bd81314 --- /dev/null +++ b/entries/yield_curve_point.go @@ -0,0 +1,109 @@ +package entries + +import ( + "context" + "github.com/deso-protocol/core/lib" + "github.com/deso-protocol/state-consumer/consumer" + "github.com/pkg/errors" + "github.com/uptrace/bun" +) + +// TODO: when to use nullzero vs use_zero? +type LockupYieldCurvePoint struct { + ProfilePKID string `bun:",nullzero"` + LockupDurationNanoSecs int64 + LockupYieldAPYBasisPoints uint64 + + BadgerKey []byte `pg:",pk,use_zero"` +} + +type PGLockupYieldCurvePoint struct { + bun.BaseModel `bun:"table:locked_balance_entry"` + LockupYieldCurvePoint +} + +// TODO: Do I need this? +type PGLockupYieldCurvePointUtxoOps struct { + bun.BaseModel `bun:"table:locked_balance_entry_utxo_ops"` + LockupYieldCurvePoint + UtxoOperation +} + +// Convert the LockupYieldCurvePoint DeSo encoder to the PGLockedBalnceEntry struct used by bun. +func LockupYieldCurvePointEncoderToPGStruct(lockupYieldCurvePoint *lib.LockupYieldCurvePoint, keyBytes []byte, params *lib.DeSoParams) LockupYieldCurvePoint { + pgLockupYieldCurvePoint := LockupYieldCurvePoint{ + BadgerKey: keyBytes, + } + + if lockupYieldCurvePoint.ProfilePKID != nil { + pgLockupYieldCurvePoint.ProfilePKID = consumer.PublicKeyBytesToBase58Check((*lockupYieldCurvePoint.ProfilePKID)[:], params) + } + + pgLockupYieldCurvePoint.LockupDurationNanoSecs = lockupYieldCurvePoint.LockupDurationNanoSecs + pgLockupYieldCurvePoint.LockupYieldAPYBasisPoints = lockupYieldCurvePoint.LockupYieldAPYBasisPoints + + return pgLockupYieldCurvePoint +} + +// LockupYieldCurvePointBatchOperation is the entry point for processing a batch of LockedBalance entries. +// It determines the appropriate handler based on the operation type and executes it. +func LockupYieldCurvePointBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error { + // We check before we call this function that there is at least one operation type. + // We also ensure before this that all entries have the same operation type. + operationType := entries[0].OperationType + var err error + if operationType == lib.DbOperationTypeDelete { + err = bulkDeleteLockupYieldCurvePoint(entries, db, operationType) + } else { + err = bulkInsertLockupYieldCurvePoint(entries, db, operationType, params) + } + if err != nil { + return errors.Wrapf(err, "entries.LockupYieldCurvePointBatchOperation: Problem with operation type %v", operationType) + } + return nil +} + +// bulkInsertLockupYieldCurvePoint inserts a batch of locked stake entries into the database. +func bulkInsertLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + // Create a new array to hold the bun struct. + pgEntrySlice := make([]*PGLockupYieldCurvePoint, len(uniqueEntries)) + + // Loop through the entries and convert them to PGEntry. + for ii, entry := range uniqueEntries { + pgEntrySlice[ii] = &PGLockupYieldCurvePoint{LockupYieldCurvePoint: LockupYieldCurvePointEncoderToPGStruct(entry.Encoder.(*lib.LockupYieldCurvePoint), entry.KeyBytes, params)} + } + + // Execute the insert query. + query := db.NewInsert().Model(&pgEntrySlice) + + if operationType == lib.DbOperationTypeUpsert { + query = query.On("CONFLICT (badger_key) DO UPDATE") + } + + if _, err := query.Returning("").Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkInsertLockupYieldCurvePoint: Error inserting entries") + } + return nil +} + +// bulkDeleteLockupYieldCurvePoint deletes a batch of locked stake entries from the database. +func bulkDeleteLockupYieldCurvePoint(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error { + // Track the unique entries we've inserted so we don't insert the same entry twice. + uniqueEntries := consumer.UniqueEntries(entries) + + // Transform the entries into a list of keys to delete. + keysToDelete := consumer.KeysToDelete(uniqueEntries) + + // Execute the delete query. + if _, err := db.NewDelete(). + Model(&PGLockupYieldCurvePoint{}). + Where("badger_key IN (?)", bun.In(keysToDelete)). + Returning(""). + Exec(context.Background()); err != nil { + return errors.Wrapf(err, "entries.bulkDeleteLockupYieldCurvePoint: Error deleting entries") + } + + return nil +} diff --git a/handler/data_handler.go b/handler/data_handler.go index b630db5..2e7c2b5 100644 --- a/handler/data_handler.go +++ b/handler/data_handler.go @@ -74,6 +74,18 @@ func (postgresDataHandler *PostgresDataHandler) HandleEntryBatch(batchedEntries err = entries.BlockBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) case lib.EncoderTypeTxn: err = entries.TransactionBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeStakeEntry: + err = entries.StakeBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeValidatorEntry: + err = entries.ValidatorBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeLockedStakeEntry: + err = entries.LockedStakeBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeLockedBalanceEntry: + err = entries.LockedBalanceEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeLockupYieldCurvePoint: + err = entries.LockupYieldCurvePointBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) + case lib.EncoderTypeEpochEntry: + err = entries.EpochEntryBatchOperation(batchedEntries, postgresDataHandler.DB, postgresDataHandler.Params) } if err != nil { diff --git a/migrations/initial_migrations/20231213000000_create_locked_stake_entry_table.go b/migrations/initial_migrations/20231213000000_create_locked_stake_entry_table.go new file mode 100644 index 0000000..4d13124 --- /dev/null +++ b/migrations/initial_migrations/20231213000000_create_locked_stake_entry_table.go @@ -0,0 +1,41 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createLockedStakeEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + staker_pkid VARCHAR NOT NULL, + validator_pkid VARCHAR NOT NULL, + locked_amount_nanos NUMERIC(78, 0) NOT NULL, + locked_at_epoch_number BIGINT NOT NULL, + + extra_data JSONB, + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_validator_pkid_idx ON {tableName} (validator_pkid); + CREATE INDEX {tableName}_staker_pkid_idx ON {tableName} (staker_pkid); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createLockedStakeEntryTable(db, "locked_stake_entry") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS locked_stake_entry; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/initial_migrations/20231213000000_create_stake_entry_table.go b/migrations/initial_migrations/20231213000000_create_stake_entry_table.go new file mode 100644 index 0000000..ae37317 --- /dev/null +++ b/migrations/initial_migrations/20231213000000_create_stake_entry_table.go @@ -0,0 +1,41 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createStakeEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + staker_pkid VARCHAR NOT NULL, + validator_pkid VARCHAR NOT NULL, + reward_method SMALLINT NOT NULL, + stake_amount_nanos NUMERIC(78, 0) NOT NULL, + + extra_data JSONB, + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_validator_pkid_idx ON {tableName} (validator_pkid); + CREATE INDEX {tableName}_staker_pkid_idx ON {tableName} (staker_pkid); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createStakeEntryTable(db, "stake_entry") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS stake_entry; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/initial_migrations/20231213000000_create_validator_entry_table.go b/migrations/initial_migrations/20231213000000_create_validator_entry_table.go new file mode 100644 index 0000000..2f34b57 --- /dev/null +++ b/migrations/initial_migrations/20231213000000_create_validator_entry_table.go @@ -0,0 +1,44 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createValidatorEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + validator_pkid VARCHAR NOT NULL, + domains VARCHAR[], + disable_delegated_stake BOOLEAN, + delegated_stake_commission_basis_points BIGINT, + voting_public_key VARCHAR, + voting_authorization VARCHAR, + total_stake_amount_nanos NUMERIC(78, 0) NOT NULL, + last_active_at_epoch_number BIGINT, + jailed_at_epoch_number BIGINT, + extra_data JSONB, + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_validator_pkid_idx ON {tableName} (validator_pkid); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createValidatorEntryTable(db, "validator_entry") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS validator_entry; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/initial_migrations/20240129000000_create_epoch_entry_table.go b/migrations/initial_migrations/20240129000000_create_epoch_entry_table.go new file mode 100644 index 0000000..1dd2ed7 --- /dev/null +++ b/migrations/initial_migrations/20240129000000_create_epoch_entry_table.go @@ -0,0 +1,39 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +// TODO: indexes +func createEpochEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + epoch_number BIGINT NOT NULL, + initial_block_height BIGINT NOT NULL, + initial_view BIGINT NOT NULL, + final_block_height BIGINT NOT NULL, + + badger_key BYTEA PRIMARY KEY + ); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createEpochEntryTable(db, "epoch") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS epoch; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/initial_migrations/20240129000001_create_locked_balance_entry_table.go b/migrations/initial_migrations/20240129000001_create_locked_balance_entry_table.go new file mode 100644 index 0000000..678f156 --- /dev/null +++ b/migrations/initial_migrations/20240129000001_create_locked_balance_entry_table.go @@ -0,0 +1,41 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createLockedBalanceEntryTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + hodler_pkid VARCHAR NOT NULL, + profile_pkid VARCHAR NOT NULL, + unlock_timestamp_nano_secs BIGINT NOT NULL, + vesting_end_timestamp_nano_secs BIGINT NOT NULL, + balance_base_units NUMERIC(78, 0) NOT NULL, + + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_hodler_pkid_idx ON {tableName} (hodler_pkid); + CREATE INDEX {tableName}_profile_pkid_idx ON {tableName} (profile_pkid); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createLockedBalanceEntryTable(db, "locked_balance_entry") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS locked_balance_entry; + `) + if err != nil { + return err + } + return nil + }) +} diff --git a/migrations/initial_migrations/20240129000002_create_lockup_yield_curve_point_table.go b/migrations/initial_migrations/20240129000002_create_lockup_yield_curve_point_table.go new file mode 100644 index 0000000..d957f6a --- /dev/null +++ b/migrations/initial_migrations/20240129000002_create_lockup_yield_curve_point_table.go @@ -0,0 +1,38 @@ +package initial_migrations + +import ( + "context" + "strings" + + "github.com/uptrace/bun" +) + +// TODO: Not nullable fields +func createYieldCurvePointTable(db *bun.DB, tableName string) error { + _, err := db.Exec(strings.Replace(` + CREATE TABLE {tableName} ( + profile_pkid VARCHAR NOT NULL, + lockup_duration_nano_secs BIGINT NOT NULL, + lockup_yield_api_basis_points BIGINT NOT NULL, + + badger_key BYTEA PRIMARY KEY + ); + CREATE INDEX {tableName}_profile_pkid_idx ON {tableName} (profile_pkid); + `, "{tableName}", tableName, -1)) + // TODO: What other fields do we need indexed? + return err +} + +func init() { + Migrations.MustRegister(func(ctx context.Context, db *bun.DB) error { + return createYieldCurvePointTable(db, "yield_curve_point") + }, func(ctx context.Context, db *bun.DB) error { + _, err := db.Exec(` + DROP TABLE IF EXISTS yield_curve_point; + `) + if err != nil { + return err + } + return nil + }) +}