Skip to content

Commit

Permalink
Add StakeEntry, LockedStakeEntry, and ValidatorEntry. Update Dockerfi…
Browse files Browse the repository at this point in the history
…le for pos
  • Loading branch information
Lazy Nina authored and tholonious committed Feb 9, 2024
1 parent 3e16391 commit cf78aa0
Show file tree
Hide file tree
Showing 13 changed files with 960 additions and 0 deletions.
106 changes: 106 additions & 0 deletions entries/epoch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package entries

import (
"context"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/pkg/errors"
"github.com/uptrace/bun"
)

// TODO: when to use nullzero vs use_zero?
type EpochEntry struct {
EpochNumber uint64
InitialBlockHeight uint64
InitialView uint64
FinalBlockHeight uint64
CreatedAtBlockTimestampNanoSecs uint64

BadgerKey []byte `pg:",pk,use_zero"`
}

type PGEpochEntry struct {
bun.BaseModel `bun:"table:epoch_entry"`
EpochEntry
}

// TODO: Do I need this?
type PGEpochUtxoOps struct {
bun.BaseModel `bun:"table:epoch_entry_utxo_ops"`
EpochEntry
UtxoOperation
}

// Convert the EpochEntry DeSo encoder to the PGEpochEntry struct used by bun.
func EpochEntryEncoderToPGStruct(epochEntry *lib.EpochEntry, keyBytes []byte, params *lib.DeSoParams) EpochEntry {
return EpochEntry{
EpochNumber: epochEntry.EpochNumber,
InitialBlockHeight: epochEntry.InitialBlockHeight,
InitialView: epochEntry.InitialView,
FinalBlockHeight: epochEntry.FinalBlockHeight,
BadgerKey: keyBytes,
}
}

// EpochEntryBatchOperation is the entry point for processing a batch of Epoch entries.
// It determines the appropriate handler based on the operation type and executes it.
func EpochEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteEpochEntry(entries, db, operationType)
} else {
err = bulkInsertEpochEntry(entries, db, operationType, params)
}
if err != nil {
return errors.Wrapf(err, "entries.EpochEntryBatchOperation: Problem with operation type %v", operationType)
}
return nil
}

// bulkInsertEpochEntry inserts a batch of locked stake entries into the database.
func bulkInsertEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
pgEntrySlice := make([]*PGEpochEntry, len(uniqueEntries))

// Loop through the entries and convert them to PGEntry.
for ii, entry := range uniqueEntries {
pgEntrySlice[ii] = &PGEpochEntry{EpochEntry: EpochEntryEncoderToPGStruct(entry.Encoder.(*lib.EpochEntry), entry.KeyBytes, params)}
}

// Execute the insert query.
query := db.NewInsert().Model(&pgEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertEpochEntry: Error inserting entries")
}
return nil
}

// bulkDeleteEpochEntry deletes a batch of locked stake entries from the database.
func bulkDeleteEpochEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)

// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGEpochEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteEpochEntry: Error deleting entries")
}

return nil
}
117 changes: 117 additions & 0 deletions entries/locked_stake.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package entries

import (
"context"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"github.com/uptrace/bun/extra/bunbig"
)

// TODO: when to use nullzero vs use_zero?
type LockedStakeEntry struct {
StakerPKID string `bun:",nullzero"`
ValidatorPKID string `bun:",nullzero"`
LockedAmountNanos *bunbig.Int `pg:",use_zero"`
LockedAtEpochNumber uint64

ExtraData map[string]string `bun:"type:jsonb"`
BadgerKey []byte `pg:",pk,use_zero"`
}

type PGLockedStakeEntry struct {
bun.BaseModel `bun:"table:locked_stake_entry"`
LockedStakeEntry
}

// TODO: Do I need this?
type PGLockedStakeEntryUtxoOps struct {
bun.BaseModel `bun:"table:locked_stake_entry_utxo_ops"`
LockedStakeEntry
UtxoOperation
}

// Convert the LockedStakeEntry DeSo encoder to the PGLockedStakeEntry struct used by bun.
func LockedStakeEncoderToPGStruct(lockedStakeEntry *lib.LockedStakeEntry, keyBytes []byte, params *lib.DeSoParams) LockedStakeEntry {
pgLockedStakeEntry := LockedStakeEntry{
ExtraData: consumer.ExtraDataBytesToString(lockedStakeEntry.ExtraData),
BadgerKey: keyBytes,
}

if lockedStakeEntry.StakerPKID != nil {
pgLockedStakeEntry.StakerPKID = consumer.PublicKeyBytesToBase58Check((*lockedStakeEntry.StakerPKID)[:], params)
}

if lockedStakeEntry.ValidatorPKID != nil {
pgLockedStakeEntry.ValidatorPKID = consumer.PublicKeyBytesToBase58Check((*lockedStakeEntry.ValidatorPKID)[:], params)
}

pgLockedStakeEntry.LockedAtEpochNumber = lockedStakeEntry.LockedAtEpochNumber
pgLockedStakeEntry.LockedAmountNanos = bunbig.FromMathBig(lockedStakeEntry.LockedAmountNanos.ToBig())

return pgLockedStakeEntry
}

// LockedStakeBatchOperation is the entry point for processing a batch of LockedStake entries.
// It determines the appropriate handler based on the operation type and executes it.
func LockedStakeBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteLockedStakeEntry(entries, db, operationType)
} else {
err = bulkInsertLockedStakeEntry(entries, db, operationType, params)
}
if err != nil {
return errors.Wrapf(err, "entries.LockedStakeBatchOperation: Problem with operation type %v", operationType)
}
return nil
}

// bulkInsertLockedStakeEntry inserts a batch of locked stake entries into the database.
func bulkInsertLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
pgEntrySlice := make([]*PGLockedStakeEntry, len(uniqueEntries))

// Loop through the entries and convert them to PGEntry.
for ii, entry := range uniqueEntries {
pgEntrySlice[ii] = &PGLockedStakeEntry{LockedStakeEntry: LockedStakeEncoderToPGStruct(entry.Encoder.(*lib.LockedStakeEntry), entry.KeyBytes, params)}
}

// Execute the insert query.
query := db.NewInsert().Model(&pgEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertLockedStakeEntry: Error inserting entries")
}
return nil
}

// bulkDeleteLockedStakeEntry deletes a batch of locked stake entries from the database.
func bulkDeleteLockedStakeEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)

// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGLockedStakeEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteLockedStakeEntry: Error deleting entries")
}

return nil
}
117 changes: 117 additions & 0 deletions entries/lockup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package entries

import (
"context"
"github.com/deso-protocol/core/lib"
"github.com/deso-protocol/state-consumer/consumer"
"github.com/pkg/errors"
"github.com/uptrace/bun"
"github.com/uptrace/bun/extra/bunbig"
)

// TODO: when to use nullzero vs use_zero?
type LockedBalanceEntry struct {
HODLerPKID string `bun:",nullzero"`
ProfilePKID string `bun:",nullzero"`
UnlockTimestampNanoSecs int64
VestingEndTimestampNanoSecs int64
BalanceBaseUnits *bunbig.Int `pg:",use_zero"`

BadgerKey []byte `pg:",pk,use_zero"`
}

type PGLockedBalanceEntry struct {
bun.BaseModel `bun:"table:locked_balance_entry"`
LockedBalanceEntry
}

// TODO: Do I need this?
type PGLockedBalanceEntryUtxoOps struct {
bun.BaseModel `bun:"table:locked_balance_entry_utxo_ops"`
LockedBalanceEntry
UtxoOperation
}

// Convert the LockedBalanceEntry DeSo encoder to the PGLockedBalnceEntry struct used by bun.
func LockedBalanceEntryEncoderToPGStruct(lockedBalanceEntry *lib.LockedBalanceEntry, keyBytes []byte, params *lib.DeSoParams) LockedBalanceEntry {
pgLockedBalanceEntry := LockedBalanceEntry{
BadgerKey: keyBytes,
}

if lockedBalanceEntry.HODLerPKID != nil {
pgLockedBalanceEntry.HODLerPKID = consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.HODLerPKID)[:], params)
}

if lockedBalanceEntry.ProfilePKID != nil {
pgLockedBalanceEntry.ProfilePKID = consumer.PublicKeyBytesToBase58Check((*lockedBalanceEntry.ProfilePKID)[:], params)
}

pgLockedBalanceEntry.UnlockTimestampNanoSecs = lockedBalanceEntry.UnlockTimestampNanoSecs
pgLockedBalanceEntry.VestingEndTimestampNanoSecs = lockedBalanceEntry.VestingEndTimestampNanoSecs
pgLockedBalanceEntry.BalanceBaseUnits = bunbig.FromMathBig(lockedBalanceEntry.BalanceBaseUnits.ToBig())

return pgLockedBalanceEntry
}

// LockedBalanceEntryBatchOperation is the entry point for processing a batch of LockedBalance entries.
// It determines the appropriate handler based on the operation type and executes it.
func LockedBalanceEntryBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
var err error
if operationType == lib.DbOperationTypeDelete {
err = bulkDeleteLockedBalanceEntry(entries, db, operationType)
} else {
err = bulkInsertLockedBalanceEntry(entries, db, operationType, params)
}
if err != nil {
return errors.Wrapf(err, "entries.LockedBalanceEntryBatchOperation: Problem with operation type %v", operationType)
}
return nil
}

// bulkInsertLockedBalanceEntry inserts a batch of locked stake entries into the database.
func bulkInsertLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
pgEntrySlice := make([]*PGLockedBalanceEntry, len(uniqueEntries))

// Loop through the entries and convert them to PGEntry.
for ii, entry := range uniqueEntries {
pgEntrySlice[ii] = &PGLockedBalanceEntry{LockedBalanceEntry: LockedBalanceEntryEncoderToPGStruct(entry.Encoder.(*lib.LockedBalanceEntry), entry.KeyBytes, params)}
}

// Execute the insert query.
query := db.NewInsert().Model(&pgEntrySlice)

if operationType == lib.DbOperationTypeUpsert {
query = query.On("CONFLICT (badger_key) DO UPDATE")
}

if _, err := query.Returning("").Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkInsertLockedBalanceEntry: Error inserting entries")
}
return nil
}

// bulkDeleteLockedBalanceEntry deletes a batch of locked stake entries from the database.
func bulkDeleteLockedBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

// Transform the entries into a list of keys to delete.
keysToDelete := consumer.KeysToDelete(uniqueEntries)

// Execute the delete query.
if _, err := db.NewDelete().
Model(&PGLockedBalanceEntry{}).
Where("badger_key IN (?)", bun.In(keysToDelete)).
Returning("").
Exec(context.Background()); err != nil {
return errors.Wrapf(err, "entries.bulkDeleteLockedBalanceEntry: Error deleting entries")
}

return nil
}
Loading

0 comments on commit cf78aa0

Please sign in to comment.