Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

multi: integrate hash data type. #293

Merged
merged 7 commits into from
Dec 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dcrpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func newPool(db pool.Database, cfg *config) (*miningPool, error) {
FetchLastPaymentInfo: p.hub.FetchLastPaymentInfo,
FetchMinedWork: p.hub.FetchMinedWork,
FetchWorkQuotas: p.hub.FetchWorkQuotas,
FetchClients: p.hub.FetchClients,
FetchHashData: p.hub.FetchHashData,
AccountExists: p.hub.AccountExists,
FetchArchivedPayments: p.hub.FetchArchivedPayments,
FetchPendingPayments: p.hub.FetchPendingPayments,
Expand Down
29 changes: 15 additions & 14 deletions gui/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ type Cache struct {

// InitCache initialises and returns a cache for use in the GUI.
func InitCache(work []*pool.AcceptedWork, quotas []*pool.Quota,
clients []*pool.Client, pendingPmts []*pool.Payment,
hashData map[string][]*pool.HashData, pendingPmts []*pool.Payment,
archivedPmts []*pool.Payment, blockExplorerURL string,
lastPmtHeight uint32, lastPmtPaidOn, lastPmtCreatedOn int64) *Cache {

cache := Cache{blockExplorerURL: blockExplorerURL}
cache.updateMinedWork(work)
cache.updateRewardQuotas(quotas)
cache.updateClients(clients)
cache.updateHashData(hashData)
cache.updatePayments(pendingPmts, archivedPmts)
cache.updateLastPaymentInfo(lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn)
return &cache
Expand Down Expand Up @@ -229,20 +229,21 @@ func (c *Cache) getPoolHash() string {
return c.poolHash
}

// updateClients will refresh the cached list of connected clients, as well as
// recalculating the total hashrate for all connected clients.
func (c *Cache) updateClients(clients []*pool.Client) {
// updateHashData refreshes the cached list of hash data from connected
// clients, as well as recalculating the total hashrate.
func (c *Cache) updateHashData(hashData map[string][]*pool.HashData) {
clientInfo := make(map[string][]*client)
poolHashRate := new(big.Rat).SetInt64(0)
for _, c := range clients {
clientHashRate := c.FetchHashRate()
accountID := c.FetchAccountID()
clientInfo[accountID] = append(clientInfo[accountID], &client{
Miner: c.FetchMinerType(),
IP: c.FetchIPAddr(),
HashRate: hashString(clientHashRate),
})
poolHashRate = poolHashRate.Add(poolHashRate, clientHashRate)
for _, data := range hashData {
for _, entry := range data {
poolHashRate = poolHashRate.Add(poolHashRate, entry.HashRate)
clientInfo[entry.AccountID] = append(clientInfo[entry.AccountID],
&client{
Miner: entry.Miner,
IP: entry.IP,
HashRate: hashString(entry.HashRate),
})
}
}

c.poolHashMtx.Lock()
Expand Down
27 changes: 15 additions & 12 deletions gui/gui.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ type Config struct {
FetchWorkQuotas func() ([]*pool.Quota, error)
// HTTPBackupDB streams a backup of the database over an http response.
HTTPBackupDB func(w http.ResponseWriter) error
// FetchClients returns all connected pool clients.
FetchClients func() []*pool.Client
// FetchHashData returns all hash data from connected pool clients.
FetchHashData func() (map[string][]*pool.HashData, error)
// AccountExists checks if the provided account id references a pool account.
AccountExists func(accountID string) bool
// FetchArchivedPayments fetches all paid payments.
Expand Down Expand Up @@ -333,7 +333,11 @@ func (ui *GUI) Run(ctx context.Context) {
return
}

clients := ui.cfg.FetchClients()
hashData, err := ui.cfg.FetchHashData()
if err != nil {
log.Error(err)
return
}

pendingPayments, err := ui.cfg.FetchPendingPayments()
if err != nil {
Expand All @@ -353,7 +357,7 @@ func (ui *GUI) Run(ctx context.Context) {
return
}

ui.cache = InitCache(work, quotas, clients, pendingPayments, archivedPayments,
ui.cache = InitCache(work, quotas, hashData, pendingPayments, archivedPayments,
ui.cfg.BlockExplorerURL, lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn)

// Use a ticker to periodically update cached data and push updates through
Expand All @@ -366,8 +370,13 @@ func (ui *GUI) Run(ctx context.Context) {
for {
select {
case <-ticker.C:
clients := ui.cfg.FetchClients()
ui.cache.updateClients(clients)
hashData, err := ui.cfg.FetchHashData()
if err != nil {
log.Error(err)
continue
}

ui.cache.updateHashData(hashData)
ui.websocketServer.send(payload{
PoolHashRate: ui.cache.getPoolHash(),
})
Expand All @@ -386,12 +395,6 @@ func (ui *GUI) Run(ctx context.Context) {
LastWorkHeight: ui.cfg.FetchLastWorkHeight(),
})

case pool.ConnectedClient, pool.DisconnectedClient:
// Opting to keep connection updates pushed by the ticker
// to avoid pushing too much too frequently.
clients := ui.cfg.FetchClients()
ui.cache.updateClients(clients)

case pool.ClaimedShare:
quotas, err := ui.cfg.FetchWorkQuotas()
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions pool/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@ import (
"github.com/decred/dcrd/crypto/blake256"
)

var (
// defaultAccountID is the default account id value.
// It is defived from a a zero hash.
defaultAccountID = AccountID(zeroHash.String())
jholdstock marked this conversation as resolved.
Show resolved Hide resolved
)

// Account represents a mining pool account.
type Account struct {
UUID string `json:"uuid"`
Expand Down
176 changes: 175 additions & 1 deletion pool/boltdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ var (
// Confirmed processed payments are sourced from the payment bucket and
// archived.
paymentArchiveBkt = []byte("paymentarchivebkt")
// hashDataBkt stores client identification and hashrate information.
hashDataBkt = []byte("hashdatabkt")
// versionK is the key of the current version of the database.
versionK = []byte("version")
// lastPaymentCreatedOn is the key of the last time a payment was
Expand Down Expand Up @@ -130,7 +132,11 @@ func createBuckets(db *BoltDB) error {
if err != nil {
return err
}
return createNestedBucket(pbkt, paymentArchiveBkt)
err = createNestedBucket(pbkt, paymentArchiveBkt)
if err != nil {
return err
}
return createNestedBucket(pbkt, hashDataBkt)
})
return err
}
Expand Down Expand Up @@ -1334,3 +1340,171 @@ func (db *BoltDB) pruneShares(minNano int64) error {
return nil
})
}

// persistHashData saves the provided hash data to the database.
func (db *BoltDB) persistHashData(hashData *HashData) error {
const funcName = "persistHashData"
return db.DB.Update(func(tx *bolt.Tx) error {
bkt, err := fetchBucket(tx, hashDataBkt)
if err != nil {
return err
}

// Do not persist already existing hash rate.
if bkt.Get([]byte(hashData.UUID)) != nil {
desc := fmt.Sprintf("%s: hash data %s already exists", funcName,
hashData.UUID)
return errs.DBError(errs.ValueFound, desc)
}

hBytes, err := json.Marshal(hashData)
if err != nil {
desc := fmt.Sprintf("%s: unable to marshal hash data bytes: %v",
funcName, err)
return errs.DBError(errs.Parse, desc)
}
err = bkt.Put([]byte(hashData.UUID), hBytes)
if err != nil {
desc := fmt.Sprintf("%s: unable to persist hash data entry: %v",
funcName, err)
return errs.DBError(errs.PersistEntry, desc)
}
return nil
})
}

// updateHashData persists the updated hash data to the database.
func (db *BoltDB) updateHashData(hashData *HashData) error {
const funcName = "updateHashData"
return db.DB.Update(func(tx *bolt.Tx) error {
bkt, err := fetchBucket(tx, hashDataBkt)
if err != nil {
return err
}

// Assert the hash data provided exists before updating.
id := []byte(hashData.UUID)
v := bkt.Get(id)
if v == nil {
desc := fmt.Sprintf("%s: hash data %s not found",
funcName, hashData.UUID)
return errs.DBError(errs.ValueNotFound, desc)
}
hBytes, err := json.Marshal(hashData)
if err != nil {
desc := fmt.Sprintf("%s: unable to marshal hash data bytes: %v",
funcName, err)
return errs.DBError(errs.PersistEntry, desc)
}
err = bkt.Put(id, hBytes)
if err != nil {
desc := fmt.Sprintf("%s: unable to persist hash data: %v",
funcName, err)
return errs.DBError(errs.PersistEntry, desc)
}
return nil
})
}

// fetchHashData fetches the hash data associated with the provided id.
func (db *BoltDB) fetchHashData(id string) (*HashData, error) {
const funcName = "fetchHashData"
var data HashData

err := db.DB.View(func(tx *bolt.Tx) error {
bkt, err := fetchBucket(tx, hashDataBkt)
if err != nil {
return err
}

v := bkt.Get([]byte(id))
if v == nil {
desc := fmt.Sprintf("%s: no hash data found for id %s",
funcName, id)
return errs.DBError(errs.ValueNotFound, desc)
}
err = json.Unmarshal(v, &data)
if err != nil {
desc := fmt.Sprintf("%s: unable to unmarshal hash data: %v",
funcName, err)
return errs.DBError(errs.Parse, desc)
}
return nil
})
if err != nil {
return nil, err
}
return &data, err
}

// listHashData fetches all hash data updated after the provided minimum time.
func (db *BoltDB) listHashData(minNano int64) (map[string][]*HashData, error) {
const funcName = "listHashData"
data := make(map[string][]*HashData)

err := db.DB.View(func(tx *bolt.Tx) error {
bkt, err := fetchBucket(tx, hashDataBkt)
if err != nil {
return err
}

cursor := bkt.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
var hashData HashData
err = json.Unmarshal(v, &hashData)
if err != nil {
desc := fmt.Sprintf("%s: unable to unmarshal hash data: %v",
funcName, err)
return errs.DBError(errs.Parse, desc)
}

// Only select hash data updated after the provided minimum time.
if hashData.UpdatedOn > minNano {
data[hashData.AccountID] =
append(data[hashData.AccountID], &hashData)
}
}
return nil
})
if err != nil {
return nil, err
}
return data, err
}

// pruneHashData prunes all hash data that have not been updated since
// the provided minimum time.
func (db *BoltDB) pruneHashData(minNano int64) error {
funcName := "pruneHashData"

return db.DB.Update(func(tx *bolt.Tx) error {
bkt, err := fetchBucket(tx, hashDataBkt)
if err != nil {
return err
}
toDelete := [][]byte{}
cursor := bkt.Cursor()
for k, v := cursor.First(); k != nil; k, v = cursor.Next() {
var hashData HashData
err = json.Unmarshal(v, &hashData)
if err != nil {
desc := fmt.Sprintf("%s: unable to unmarshal hash data: %v",
funcName, err)
return errs.DBError(errs.Parse, desc)
}

// Prune hash data that have not been updated since the
// provided minimum time.
if minNano > hashData.UpdatedOn {
toDelete = append(toDelete, k)
}
}
for _, entry := range toDelete {
err := bkt.Delete(entry)
if err != nil {
return err
}
}
return nil
})
}
Loading