diff --git a/dcrpool.go b/dcrpool.go index b1757165..c8fa9724 100644 --- a/dcrpool.go +++ b/dcrpool.go @@ -172,7 +172,7 @@ func newPool(db pool.Database, cfg *config) (*miningPool, error) { FetchLastPaymentInfo: p.hub.FetchLastPaymentInfo, FetchMinedWork: p.hub.FetchMinedWork, FetchWorkQuotas: p.hub.FetchWorkQuotas, - FetchClients: p.hub.FetchClients, + FetchHashData: p.hub.FetchHashData, AccountExists: p.hub.AccountExists, FetchArchivedPayments: p.hub.FetchArchivedPayments, FetchPendingPayments: p.hub.FetchPendingPayments, diff --git a/gui/cache.go b/gui/cache.go index aed9bce7..595f0af4 100644 --- a/gui/cache.go +++ b/gui/cache.go @@ -91,14 +91,14 @@ type Cache struct { // InitCache initialises and returns a cache for use in the GUI. func InitCache(work []*pool.AcceptedWork, quotas []*pool.Quota, - clients []*pool.Client, pendingPmts []*pool.Payment, + hashData map[string][]*pool.HashData, pendingPmts []*pool.Payment, archivedPmts []*pool.Payment, blockExplorerURL string, lastPmtHeight uint32, lastPmtPaidOn, lastPmtCreatedOn int64) *Cache { cache := Cache{blockExplorerURL: blockExplorerURL} cache.updateMinedWork(work) cache.updateRewardQuotas(quotas) - cache.updateClients(clients) + cache.updateHashData(hashData) cache.updatePayments(pendingPmts, archivedPmts) cache.updateLastPaymentInfo(lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn) return &cache @@ -229,20 +229,21 @@ func (c *Cache) getPoolHash() string { return c.poolHash } -// updateClients will refresh the cached list of connected clients, as well as -// recalculating the total hashrate for all connected clients. -func (c *Cache) updateClients(clients []*pool.Client) { +// updateHashData refreshes the cached list of hash data from connected +// clients, as well as recalculating the total hashrate. +func (c *Cache) updateHashData(hashData map[string][]*pool.HashData) { clientInfo := make(map[string][]*client) poolHashRate := new(big.Rat).SetInt64(0) - for _, c := range clients { - clientHashRate := c.FetchHashRate() - accountID := c.FetchAccountID() - clientInfo[accountID] = append(clientInfo[accountID], &client{ - Miner: c.FetchMinerType(), - IP: c.FetchIPAddr(), - HashRate: hashString(clientHashRate), - }) - poolHashRate = poolHashRate.Add(poolHashRate, clientHashRate) + for _, data := range hashData { + for _, entry := range data { + poolHashRate = poolHashRate.Add(poolHashRate, entry.HashRate) + clientInfo[entry.AccountID] = append(clientInfo[entry.AccountID], + &client{ + Miner: entry.Miner, + IP: entry.IP, + HashRate: hashString(entry.HashRate), + }) + } } c.poolHashMtx.Lock() diff --git a/gui/gui.go b/gui/gui.go index d784955d..9f05c4f3 100644 --- a/gui/gui.go +++ b/gui/gui.go @@ -75,8 +75,8 @@ type Config struct { FetchWorkQuotas func() ([]*pool.Quota, error) // HTTPBackupDB streams a backup of the database over an http response. HTTPBackupDB func(w http.ResponseWriter) error - // FetchClients returns all connected pool clients. - FetchClients func() []*pool.Client + // FetchHashData returns all hash data from connected pool clients. + FetchHashData func() (map[string][]*pool.HashData, error) // AccountExists checks if the provided account id references a pool account. AccountExists func(accountID string) bool // FetchArchivedPayments fetches all paid payments. @@ -333,7 +333,11 @@ func (ui *GUI) Run(ctx context.Context) { return } - clients := ui.cfg.FetchClients() + hashData, err := ui.cfg.FetchHashData() + if err != nil { + log.Error(err) + return + } pendingPayments, err := ui.cfg.FetchPendingPayments() if err != nil { @@ -353,7 +357,7 @@ func (ui *GUI) Run(ctx context.Context) { return } - ui.cache = InitCache(work, quotas, clients, pendingPayments, archivedPayments, + ui.cache = InitCache(work, quotas, hashData, pendingPayments, archivedPayments, ui.cfg.BlockExplorerURL, lastPmtHeight, lastPmtPaidOn, lastPmtCreatedOn) // Use a ticker to periodically update cached data and push updates through @@ -366,8 +370,13 @@ func (ui *GUI) Run(ctx context.Context) { for { select { case <-ticker.C: - clients := ui.cfg.FetchClients() - ui.cache.updateClients(clients) + hashData, err := ui.cfg.FetchHashData() + if err != nil { + log.Error(err) + continue + } + + ui.cache.updateHashData(hashData) ui.websocketServer.send(payload{ PoolHashRate: ui.cache.getPoolHash(), }) @@ -386,12 +395,6 @@ func (ui *GUI) Run(ctx context.Context) { LastWorkHeight: ui.cfg.FetchLastWorkHeight(), }) - case pool.ConnectedClient, pool.DisconnectedClient: - // Opting to keep connection updates pushed by the ticker - // to avoid pushing too much too frequently. - clients := ui.cfg.FetchClients() - ui.cache.updateClients(clients) - case pool.ClaimedShare: quotas, err := ui.cfg.FetchWorkQuotas() if err != nil { diff --git a/pool/account.go b/pool/account.go index ecb68fa6..00bde3ac 100644 --- a/pool/account.go +++ b/pool/account.go @@ -10,6 +10,12 @@ import ( "github.com/decred/dcrd/crypto/blake256" ) +var ( + // defaultAccountID is the default account id value. + // It is defived from a a zero hash. + defaultAccountID = AccountID(zeroHash.String()) +) + // Account represents a mining pool account. type Account struct { UUID string `json:"uuid"` diff --git a/pool/boltdb.go b/pool/boltdb.go index 37c04096..5f6e8074 100644 --- a/pool/boltdb.go +++ b/pool/boltdb.go @@ -42,6 +42,8 @@ var ( // Confirmed processed payments are sourced from the payment bucket and // archived. paymentArchiveBkt = []byte("paymentarchivebkt") + // hashDataBkt stores client identification and hashrate information. + hashDataBkt = []byte("hashdatabkt") // versionK is the key of the current version of the database. versionK = []byte("version") // lastPaymentCreatedOn is the key of the last time a payment was @@ -130,7 +132,11 @@ func createBuckets(db *BoltDB) error { if err != nil { return err } - return createNestedBucket(pbkt, paymentArchiveBkt) + err = createNestedBucket(pbkt, paymentArchiveBkt) + if err != nil { + return err + } + return createNestedBucket(pbkt, hashDataBkt) }) return err } @@ -1334,3 +1340,171 @@ func (db *BoltDB) pruneShares(minNano int64) error { return nil }) } + +// persistHashData saves the provided hash data to the database. +func (db *BoltDB) persistHashData(hashData *HashData) error { + const funcName = "persistHashData" + return db.DB.Update(func(tx *bolt.Tx) error { + bkt, err := fetchBucket(tx, hashDataBkt) + if err != nil { + return err + } + + // Do not persist already existing hash rate. + if bkt.Get([]byte(hashData.UUID)) != nil { + desc := fmt.Sprintf("%s: hash data %s already exists", funcName, + hashData.UUID) + return errs.DBError(errs.ValueFound, desc) + } + + hBytes, err := json.Marshal(hashData) + if err != nil { + desc := fmt.Sprintf("%s: unable to marshal hash data bytes: %v", + funcName, err) + return errs.DBError(errs.Parse, desc) + } + err = bkt.Put([]byte(hashData.UUID), hBytes) + if err != nil { + desc := fmt.Sprintf("%s: unable to persist hash data entry: %v", + funcName, err) + return errs.DBError(errs.PersistEntry, desc) + } + return nil + }) +} + +// updateHashData persists the updated hash data to the database. +func (db *BoltDB) updateHashData(hashData *HashData) error { + const funcName = "updateHashData" + return db.DB.Update(func(tx *bolt.Tx) error { + bkt, err := fetchBucket(tx, hashDataBkt) + if err != nil { + return err + } + + // Assert the hash data provided exists before updating. + id := []byte(hashData.UUID) + v := bkt.Get(id) + if v == nil { + desc := fmt.Sprintf("%s: hash data %s not found", + funcName, hashData.UUID) + return errs.DBError(errs.ValueNotFound, desc) + } + hBytes, err := json.Marshal(hashData) + if err != nil { + desc := fmt.Sprintf("%s: unable to marshal hash data bytes: %v", + funcName, err) + return errs.DBError(errs.PersistEntry, desc) + } + err = bkt.Put(id, hBytes) + if err != nil { + desc := fmt.Sprintf("%s: unable to persist hash data: %v", + funcName, err) + return errs.DBError(errs.PersistEntry, desc) + } + return nil + }) +} + +// fetchHashData fetches the hash data associated with the provided id. +func (db *BoltDB) fetchHashData(id string) (*HashData, error) { + const funcName = "fetchHashData" + var data HashData + + err := db.DB.View(func(tx *bolt.Tx) error { + bkt, err := fetchBucket(tx, hashDataBkt) + if err != nil { + return err + } + + v := bkt.Get([]byte(id)) + if v == nil { + desc := fmt.Sprintf("%s: no hash data found for id %s", + funcName, id) + return errs.DBError(errs.ValueNotFound, desc) + } + err = json.Unmarshal(v, &data) + if err != nil { + desc := fmt.Sprintf("%s: unable to unmarshal hash data: %v", + funcName, err) + return errs.DBError(errs.Parse, desc) + } + return nil + }) + if err != nil { + return nil, err + } + return &data, err +} + +// listHashData fetches all hash data updated after the provided minimum time. +func (db *BoltDB) listHashData(minNano int64) (map[string][]*HashData, error) { + const funcName = "listHashData" + data := make(map[string][]*HashData) + + err := db.DB.View(func(tx *bolt.Tx) error { + bkt, err := fetchBucket(tx, hashDataBkt) + if err != nil { + return err + } + + cursor := bkt.Cursor() + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + var hashData HashData + err = json.Unmarshal(v, &hashData) + if err != nil { + desc := fmt.Sprintf("%s: unable to unmarshal hash data: %v", + funcName, err) + return errs.DBError(errs.Parse, desc) + } + + // Only select hash data updated after the provided minimum time. + if hashData.UpdatedOn > minNano { + data[hashData.AccountID] = + append(data[hashData.AccountID], &hashData) + } + } + return nil + }) + if err != nil { + return nil, err + } + return data, err +} + +// pruneHashData prunes all hash data that have not been updated since +// the provided minimum time. +func (db *BoltDB) pruneHashData(minNano int64) error { + funcName := "pruneHashData" + + return db.DB.Update(func(tx *bolt.Tx) error { + bkt, err := fetchBucket(tx, hashDataBkt) + if err != nil { + return err + } + toDelete := [][]byte{} + cursor := bkt.Cursor() + for k, v := cursor.First(); k != nil; k, v = cursor.Next() { + var hashData HashData + err = json.Unmarshal(v, &hashData) + if err != nil { + desc := fmt.Sprintf("%s: unable to unmarshal hash data: %v", + funcName, err) + return errs.DBError(errs.Parse, desc) + } + + // Prune hash data that have not been updated since the + // provided minimum time. + if minNano > hashData.UpdatedOn { + toDelete = append(toDelete, k) + } + } + for _, entry := range toDelete { + err := bkt.Delete(entry) + if err != nil { + return err + } + } + return nil + }) +} diff --git a/pool/boltupgrades.go b/pool/boltupgrades.go index 9ff92d85..d56a1371 100644 --- a/pool/boltupgrades.go +++ b/pool/boltupgrades.go @@ -38,6 +38,10 @@ const ( // It adds the UUID field to payments. paymentUUIDVersion = 6 + // hashDataVersion is the seventh version of the database. + // It adds a hash data bucket to the database. + hashDataVersion = 7 + // BoltDBVersion is the latest version of the bolt database that is // understood by the program. Databases with recorded versions higher than // this will fail to open (meaning any upgrades prevent reverting to older @@ -54,6 +58,7 @@ var upgrades = [...]func(tx *bolt.Tx) error{ removeTxFeeReserveVersion - 1: removeTxFeeReserveUpgrade, shareCreatedOnVersion - 1: shareCreatedOnUpgrade, paymentUUIDVersion - 1: paymentUUIDUpgrade, + hashDataVersion - 1: hashDataUpgrade, } func fetchDBVersion(tx *bolt.Tx) (uint32, error) { @@ -399,7 +404,7 @@ func removeTxFeeReserveUpgrade(tx *bolt.Tx) error { } if dbVersion != oldVersion { - desc := fmt.Sprintf("%s: inappropriately called", err) + desc := fmt.Sprintf("%s: inappropriately called", funcName) return errs.DBError(errs.DBUpgrade, desc) } @@ -432,7 +437,7 @@ func shareCreatedOnUpgrade(tx *bolt.Tx) error { } if dbVersion != oldVersion { - desc := fmt.Sprintf("%s: inappropriately called", err) + desc := fmt.Sprintf("%s: inappropriately called", funcName) return errs.DBError(errs.DBUpgrade, desc) } @@ -501,7 +506,7 @@ func paymentUUIDUpgrade(tx *bolt.Tx) error { } if dbVersion != oldVersion { - desc := fmt.Sprintf("%s: inappropriately called", err) + desc := fmt.Sprintf("%s: inappropriately called", funcName) return errs.DBError(errs.DBUpgrade, desc) } @@ -584,6 +589,37 @@ func paymentUUIDUpgrade(tx *bolt.Tx) error { return setDBVersion(tx, newVersion) } +func hashDataUpgrade(tx *bolt.Tx) error { + const oldVersion = 6 + const newVersion = 7 + + const funcName = "hashDataUpgrade" + + dbVersion, err := fetchDBVersion(tx) + if err != nil { + return err + } + + if dbVersion != oldVersion { + desc := fmt.Sprintf("%s: inappropriately called", funcName) + return errs.DBError(errs.DBUpgrade, desc) + } + + pbkt := tx.Bucket(poolBkt) + if pbkt == nil { + desc := fmt.Sprintf("%s: bucket %s not found", funcName, + string(poolBkt)) + return errs.DBError(errs.StorageNotFound, desc) + } + + err = createNestedBucket(pbkt, hashDataBkt) + if err != nil { + return err + } + + return setDBVersion(tx, newVersion) +} + // upgradeDB checks whether any upgrades are necessary before the database is // ready for application usage. If any are, they are performed. func upgradeDB(db *BoltDB) error { diff --git a/pool/boltupgrades_test.go b/pool/boltupgrades_test.go index 0b879f48..a0e53cd6 100644 --- a/pool/boltupgrades_test.go +++ b/pool/boltupgrades_test.go @@ -26,6 +26,7 @@ var boltDBUpgradeTests = [...]struct { {verifyV4Upgrade, "v2.db.gz"}, {verifyV5Upgrade, "v4.db.gz"}, {verifyV6Upgrade, "v5.db.gz"}, + // No upgrade test for V6, it is a backwards-compatible upgrade } func TestBoltDBUpgrades(t *testing.T) { diff --git a/pool/chainstate.go b/pool/chainstate.go index 66e6e525..0fe76546 100644 --- a/pool/chainstate.go +++ b/pool/chainstate.go @@ -5,6 +5,7 @@ import ( "errors" "sync" "sync/atomic" + "time" "github.com/decred/dcrd/blockchain/standalone/v2" "github.com/decred/dcrd/chaincfg/chainhash" @@ -221,6 +222,23 @@ func (cs *ChainState) handleChainUpdates(ctx context.Context) { continue } + // Prune all hash data not updated in the past minute. + // A connected client should have updated multiple times + // in a minute. Only disconnected miners would not have + // updated within the timeframe. + nowNano := time.Now().Add(-time.Minute).UnixNano() + err = cs.cfg.db.pruneHashData(nowNano) + if err != nil { + // Errors generated pruning invalidated hash rate + // indicate an underlying issue accessing the + // database. The chainstate process will be + // terminated as a result. + log.Errorf("unable to prune hash rate: %v", err) + close(msg.Done) + cs.cfg.Cancel() + continue + } + err = cs.pruneAcceptedWork(ctx, pruneLimit) if err != nil { // Errors generated pruning invalidated accepted diff --git a/pool/client.go b/pool/client.go index 92f1d643..cb76909d 100644 --- a/pool/client.go +++ b/pool/client.go @@ -1,4 +1,4 @@ -// Copyright (c) 2019 The Decred developers +// Copyright (c) 2019-2020 The Decred developers // Use of this source code is governed by an ISC // license that can be found in the LICENSE file. @@ -23,6 +23,7 @@ import ( "time" "github.com/decred/dcrd/blockchain/standalone/v2" + "github.com/decred/dcrd/chaincfg/chainhash" "github.com/decred/dcrd/chaincfg/v3" "github.com/decred/dcrd/dcrutil/v3" "github.com/decred/dcrd/wire" @@ -53,6 +54,9 @@ var ( // ZeroRat is the default value for a big.Rat. ZeroRat = new(big.Rat).SetInt64(0) + + // zeroHash is the default value for a chainhash.Hash. + zeroHash = chainhash.Hash{0} ) // readPayload is a convenience type that wraps a message and its @@ -276,6 +280,9 @@ func (c *Client) handleAuthorizeRequest(req *Request, allowed bool) error { c.name = name case true: + // Set a default account id. + c.account = defaultAccountID + c.name = username } @@ -1103,6 +1110,41 @@ func (c *Client) hashMonitor() { hash := new(big.Rat).Quo(num, denom) c.setHashRate(hash) subs = submissions + + c.mtx.RLock() + miner := c.miner + c.mtx.RUnlock() + + hashID := hashDataID(c.account, c.extraNonce1) + hashData, err := c.cfg.db.fetchHashData(hashID) + if err != nil { + if errors.Is(err, errs.ValueNotFound) { + hashData = newHashData(miner, c.account, c.addr.String(), + c.extraNonce1, hash) + err = c.cfg.db.persistHashData(hashData) + if err != nil { + log.Errorf("unable to persist hash data with "+ + "id %s: %v", hashData.UUID, err) + } + + continue + } + + log.Errorf("unable to fetch hash data with id %s: %v", + hashID, err) + + c.cancel() + continue + } + + hashData.HashRate = hash + hashData.UpdatedOn = time.Now().UnixNano() + + err = c.cfg.db.updateHashData(hashData) + if err != nil { + log.Errorf("unable to update hash data with "+ + "id %s: %v", hashData.UUID, err) + } } } } diff --git a/pool/database.go b/pool/database.go index 684e1dfe..f7a84a19 100644 --- a/pool/database.go +++ b/pool/database.go @@ -63,6 +63,13 @@ type Database interface { persistJob(job *Job) error deleteJob(id string) error deleteJobsBeforeHeight(height uint32) error + + // Hash Data + persistHashData(hashData *HashData) error + updateHashData(hashData *HashData) error + fetchHashData(id string) (*HashData, error) + listHashData(minNano int64) (map[string][]*HashData, error) + pruneHashData(minNano int64) error } // BoltDB is a wrapper around bolt.DB which implements the Database interface. diff --git a/pool/db_test.go b/pool/db_test.go index 07a021ce..ac0b6b94 100644 --- a/pool/db_test.go +++ b/pool/db_test.go @@ -178,6 +178,10 @@ func Test_BoltDB_InitDB(t *testing.T) { if err == nil { return fmt.Errorf("expected paymentArchiveBkt to exist already") } + _, err = pbkt.CreateBucket(hashDataBkt) + if err == nil { + return fmt.Errorf("expected hashDataBkt to exist already") + } return nil }) if err != nil { diff --git a/pool/endpoint.go b/pool/endpoint.go index 3d638410..6afef546 100644 --- a/pool/endpoint.go +++ b/pool/endpoint.go @@ -192,9 +192,6 @@ func (e *Endpoint) connect(ctx context.Context) { e.wg.Add(1) go client.run() - // Signal the gui cache of the connected client. - e.cfg.SignalCache(ConnectedClient) - log.Debugf("Mining client connected. extranonce1=%s, addr=%s", client.extraNonce1, client.addr) diff --git a/pool/hashdata.go b/pool/hashdata.go new file mode 100644 index 00000000..565ffff8 --- /dev/null +++ b/pool/hashdata.go @@ -0,0 +1,43 @@ +// Copyright (c) 2020 The Decred developers +// Use of this source code is governed by an ISC +// license that can be found in the LICENSE file. + +package pool + +import ( + "bytes" + "math/big" + "time" +) + +// HashData represents client identification and hashrate information +// for a mining client. +type HashData struct { + UUID string `json:"uuid"` + AccountID string `json:"accountid"` + Miner string `json:"miner"` + IP string `json:"ip"` + HashRate *big.Rat `json:"hashrate"` + UpdatedOn int64 `json:"updatedon"` +} + +// hashDataID generates a unique hash data id. +func hashDataID(accountID string, extraNonce1 string) string { + var buf bytes.Buffer + _, _ = buf.WriteString(extraNonce1) + _, _ = buf.WriteString(accountID) + return buf.String() +} + +// newHashData creates a new hash data. +func newHashData(miner string, accountID string, ip string, extraNonce1 string, hashRate *big.Rat) *HashData { + nowNano := time.Now().UnixNano() + return &HashData{ + UUID: hashDataID(accountID, extraNonce1), + AccountID: accountID, + HashRate: hashRate, + IP: ip, + Miner: miner, + UpdatedOn: nowNano, + } +} diff --git a/pool/hashdata_test.go b/pool/hashdata_test.go new file mode 100644 index 00000000..c95b972e --- /dev/null +++ b/pool/hashdata_test.go @@ -0,0 +1,138 @@ +package pool + +import ( + "errors" + "math/big" + "testing" + "time" + + errs "github.com/decred/dcrpool/errors" +) + +func testHashData(t *testing.T) { + miner := ObeliskDCR1 + extraNonce1 := "ca750c60" + ip := "127.0.0.1:5550" + now := time.Now() + hashRate := new(big.Rat).SetInt64(100) + + hashData := newHashData(miner, xID, ip, extraNonce1, hashRate) + + // Ensure hash data can be persisted. + err := db.persistHashData(hashData) + if err != nil { + t.Fatal(err) + } + + // Ensure hash data can be fetched. + hashID := hashData.UUID + fetchedHashData, err := db.fetchHashData(hashID) + if err != nil { + t.Fatal(err) + } + + // Assert fetched values match expected values. + if fetchedHashData.UpdatedOn != hashData.UpdatedOn { + t.Fatalf("expected updated on value of %v, got %v", + hashData.UpdatedOn, fetchedHashData.UpdatedOn) + } + + if fetchedHashData.HashRate.RatString() != hashData.HashRate.RatString() { + t.Fatalf("expected hash rate value of %v, got %v", + hashData.HashRate, fetchedHashData.HashRate) + } + + if fetchedHashData.IP != hashData.IP { + t.Fatalf("expected ip value of %v, got %v", + hashData.IP, fetchedHashData.IP) + } + + if fetchedHashData.Miner != hashData.Miner { + t.Fatalf("expected miner value of %v, got %v", + hashData.UpdatedOn, fetchedHashData.UpdatedOn) + } + + if fetchedHashData.AccountID != hashData.AccountID { + t.Fatalf("expected account id value of %v, got %v", + hashData.AccountID, fetchedHashData.AccountID) + } + + // Ensure fetching a non-existent hash data returns an error. + invalidHashID := hashDataID(yID, extraNonce1) + _, err = db.fetchHashData(invalidHashID) + if !errors.Is(err, errs.ValueNotFound) { + t.Fatalf("expected a value not found error for "+ + "non-existent hash data, got %v", err) + } + + fiveMinutesAfter := now.Add(time.Minute * 5).UnixNano() + fiveMinutesBefore := now.Add(-time.Minute * 5).UnixNano() + + // Ensure listing account hash data adheres to the minimum update + // time constraint. + dataset, err := db.listHashData(fiveMinutesAfter) + if err != nil { + t.Fatal(err) + } + + if len(dataset) > 0 { + t.Fatalf("expected no hash data, got %d", len(dataset)) + } + + dataset, err = db.listHashData(fiveMinutesBefore) + if err != nil { + t.Fatal(err) + } + + if len(dataset) != 1 { + t.Fatalf("expected one hash data, got %d", len(dataset)) + } + + // Ensure the account id is a key of the hash data set returned. + _, ok := dataset[xID] + if !ok { + t.Fatalf("expected dataset to have %s as an account id key", xID) + } + + newUpdatedOn := hashData.UpdatedOn + 100 + hashData.UpdatedOn = newUpdatedOn + + // Ensure hash data can be updated. + err = db.updateHashData(hashData) + if err != nil { + t.Fatal(err) + } + + fetchedHashData, err = db.fetchHashData(hashID) + if err != nil { + t.Fatal(err) + } + + if fetchedHashData.UpdatedOn != hashData.UpdatedOn { + t.Fatalf("expected updated on time to be %d, got %d", + hashData.UpdatedOn, fetchedHashData.UpdatedOn) + } + + // Ensure pruning hash data adheres to the minimum update time constraint. + err = db.pruneHashData(fiveMinutesBefore) + if err != nil { + t.Fatalf("unexpected pruning error: %v", err) + } + + _, err = db.fetchHashData(hashID) + if err != nil { + t.Fatalf("expected a valid hash data returned, got %v", err) + } + + err = db.pruneHashData(fiveMinutesAfter) + if err != nil { + t.Fatalf("unexpected pruning error: %v", err) + } + + _, err = db.fetchHashData(hashID) + if !errors.Is(err, errs.ValueNotFound) { + t.Fatalf("expected a value not found error for "+ + "pruned hash data, got %v", err) + } + +} diff --git a/pool/hub.go b/pool/hub.go index 63c8b744..76765b8b 100644 --- a/pool/hub.go +++ b/pool/hub.go @@ -74,12 +74,6 @@ const ( // updated to unconfirmed due to a reorg. Unconfirmed - // ConnectedClient indicates a new client has connected to the pool. - ConnectedClient - - // DisconnectedClient indicates a client has disconnected from the pool. - DisconnectedClient - // ClaimedShare indicates work quotas for participating clients have // been updated. ClaimedShare @@ -577,16 +571,16 @@ func (h *Hub) Run(ctx context.Context) { h.shutdown() } -// FetchClients returns all connected pool clients. -func (h *Hub) FetchClients() []*Client { - clients := make([]*Client, 0) - h.endpoint.clientsMtx.Lock() - for _, c := range h.endpoint.clients { - clients = append(clients, c) +// FetchHashData returns all hash data from connected pool clients +// which have been updated in the last minute. +func (h *Hub) FetchHashData() (map[string][]*HashData, error) { + aMinuteAgo := time.Now().Add(-time.Minute).UnixNano() + hashData, err := h.cfg.DB.listHashData(aMinuteAgo) + if err != nil { + return nil, err } - h.endpoint.clientsMtx.Unlock() - return clients + return hashData, err } // FetchPendingPayments fetches all unpaid payments. diff --git a/pool/hub_test.go b/pool/hub_test.go index 003c2a97..0c55a8a2 100644 --- a/pool/hub_test.go +++ b/pool/hub_test.go @@ -384,10 +384,6 @@ func testHub(t *testing.T) { t.Fatal("expected hub to have connected clients") } - if len(hub.FetchClients()) != 1 { - t.Fatal("expected a connected cpu client") - } - // Force subscribe and authorize connected clients to allow // receiving work notifications. hub.endpoint.clientsMtx.Lock() diff --git a/pool/paymentmgr_test.go b/pool/paymentmgr_test.go index aa5d042c..7896b6a2 100644 --- a/pool/paymentmgr_test.go +++ b/pool/paymentmgr_test.go @@ -24,7 +24,6 @@ import ( ) var ( - zeroHash = chainhash.Hash{0} zeroSource = &PaymentSource{ BlockHash: zeroHash.String(), Coinbase: zeroHash.String(), diff --git a/pool/pool_test.go b/pool/pool_test.go index 9426f385..8b1cc1c4 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -92,6 +92,7 @@ func TestPool(t *testing.T) { "testClientRolledWork": testClientTimeRolledWork, "testClientMessageHandling": testClientMessageHandling, "testClientUpgrades": testClientUpgrades, + "testHashData": testHashData, "testPaymentMgrPPS": testPaymentMgrPPS, "testPaymentMgrPPLNS": testPaymentMgrPPLNS, "testPaymentMgrMaturity": testPaymentMgrMaturity, diff --git a/pool/postgres.go b/pool/postgres.go index 2cb5fb61..a9179352 100644 --- a/pool/postgres.go +++ b/pool/postgres.go @@ -94,6 +94,11 @@ func InitPostgresDB(host string, port uint32, user, pass, dbName string, purgeDB return nil, makeErr("accepted work", err) } + _, err = db.Exec(createTableHashData) + if err != nil { + return nil, makeErr("hashrate", err) + } + return &PostgresDB{db}, nil } @@ -222,6 +227,44 @@ func decodeShareRows(rows *sql.Rows) ([]*Share, error) { return toReturn, nil } +// decodeHashDataRows deserializes the provided SQL rows into a slice of +// HashData structs. +func decodeHashDataRows(rows *sql.Rows) (map[string][]*HashData, error) { + const funcName = "decodeHashDataRows" + + toReturn := make(map[string][]*HashData) + for rows.Next() { + var uuid, accountID, miner, ip, hashRate string + var updatedOn int64 + err := rows.Scan(&uuid, &accountID, &miner, &ip, + &hashRate, &updatedOn) + if err != nil { + desc := fmt.Sprintf("%s: unable to scan hash data entry: %v", + funcName, err) + return nil, errs.DBError(errs.Decode, desc) + } + + hashRat, ok := new(big.Rat).SetString(hashRate) + if !ok { + desc := fmt.Sprintf("%s: unable to decode big.Rat string: %v", + funcName, err) + return nil, errs.DBError(errs.Parse, desc) + } + + hashData := &HashData{uuid, accountID, miner, ip, hashRat, updatedOn} + toReturn[accountID] = append(toReturn[accountID], hashData) + } + + err := rows.Err() + if err != nil { + desc := fmt.Sprintf("%s: unable to decode hash data: %v", + funcName, err) + return nil, errs.DBError(errs.Decode, desc) + } + + return toReturn, nil +} + // httpBackup is not implemented for postgres database. func (db *PostgresDB) httpBackup(w http.ResponseWriter) error { return errors.New("httpBackup is not implemented for postgres database") @@ -951,3 +994,113 @@ func (db *PostgresDB) deleteJobsBeforeHeight(height uint32) error { } return err } + +// persistHashData saves the provided hash data to the database. +func (db *PostgresDB) persistHashData(hashData *HashData) error { + const funcName = "persistHashData" + + _, err := db.DB.Exec(insertHashData, hashData.UUID, hashData.AccountID, + hashData.Miner, hashData.IP, hashData.HashRate.RatString(), + hashData.UpdatedOn) + if err != nil { + + var pqError *pq.Error + if errors.As(err, &pqError) { + if pqError.Code.Name() == "unique_violation" { + desc := fmt.Sprintf("%s: hash rate %s already exists", funcName, + hashData.UUID) + return errs.DBError(errs.ValueFound, desc) + } + } + + desc := fmt.Sprintf("%s: unable to persist hash rate: %v", funcName, err) + return errs.DBError(errs.PersistEntry, desc) + } + return nil +} + +// updateHashData persists the updated hash data to the database. +func (db *PostgresDB) updateHashData(hashData *HashData) error { + const funcName = "updateHashData" + + result, err := db.DB.Exec(updateHashData, + hashData.UUID, hashData.AccountID, hashData.Miner, + hashData.IP, hashData.HashRate.RatString(), hashData.UpdatedOn) + if err != nil { + return err + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + desc := fmt.Sprintf("%s: unable to update accepted hash data with id "+ + "(%s): %v", funcName, hashData.UUID, err) + return errs.DBError(errs.PersistEntry, desc) + } + + if rowsAffected == 0 { + desc := fmt.Sprintf("%s: hash data %s not found", funcName, hashData.UUID) + return errs.DBError(errs.ValueNotFound, desc) + } + + return nil +} + +// fetchHashData fetches the hash data associated with the provided id. +func (db *PostgresDB) fetchHashData(id string) (*HashData, error) { + const funcName = "fetchHashData" + var uuid, accountID, miner, ip, hashRate string + var updatedOn int64 + err := db.DB.QueryRow(selectHashData, id).Scan(&uuid, &accountID, &miner, + &ip, &hashRate, &updatedOn) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + desc := fmt.Sprintf("%s: no hash data found for id %s", funcName, id) + return nil, errs.DBError(errs.ValueNotFound, desc) + } + + desc := fmt.Sprintf("%s: unable to fetch hash data with id (%s): %v", + funcName, id, err) + return nil, errs.DBError(errs.FetchEntry, desc) + } + + hashRat, ok := new(big.Rat).SetString(hashRate) + if !ok { + desc := fmt.Sprintf("%s: unable to decode big.Rat string: %v", + funcName, err) + return nil, errs.DBError(errs.Parse, desc) + } + + return &HashData{uuid, accountID, miner, ip, hashRat, updatedOn}, nil +} + +// listHashData fetches all hash data updated after the provided minimum time. +func (db *PostgresDB) listHashData(minNano int64) (map[string][]*HashData, error) { + const funcName = "listHashData" + rows, err := db.DB.Query(listHashData, minNano) + if err != nil { + desc := fmt.Sprintf("%s: unable to list hash data: %v", + funcName, err) + return nil, errs.DBError(errs.FetchEntry, desc) + } + + hashData, err := decodeHashDataRows(rows) + if err != nil { + return nil, err + } + + return hashData, nil +} + +// pruneHashData prunes all hash data that have not been updated since +// the provided minimum time. +func (db *PostgresDB) pruneHashData(minNano int64) error { + const funcName = "pruneHashData" + + _, err := db.DB.Exec(pruneHashData, minNano) + if err != nil { + desc := fmt.Sprintf("%s: unable to prune hash data: %v", funcName, err) + return errs.DBError(errs.DeleteEntry, desc) + } + + return nil +} diff --git a/pool/sql_queries.go b/pool/sql_queries.go index 37b98c61..2ef47eaa 100644 --- a/pool/sql_queries.go +++ b/pool/sql_queries.go @@ -73,6 +73,16 @@ const ( confirmed BOOLEAN NOT NULL );` + createTableHashData = ` + CREATE TABLE IF NOT EXISTS hashdata ( + uuid TEXT PRIMARY KEY, + accountid TEXT NOT NULL, + miner TEXT NOT NULL, + ip TEXT NOT NULL, + hashrate TEXT NOT NULL, + updatedon INT8 NOT NULL + );` + purgeDB = `DROP TABLE IF EXISTS acceptedwork, accounts, @@ -80,7 +90,8 @@ const ( jobs, metadata, payments, - shares;` + shares, + hashdata;` selectPoolMode = ` SELECT value @@ -378,4 +389,44 @@ const ( deleteJob = `DELETE FROM jobs WHERE uuid=$1;` deleteJobBeforeHeight = `DELETE FROM jobs WHERE height < $1;` + + selectHashData = `SELECT + uuid, + accountid, + miner, + ip, + hashrate, + updatedon + FROM hashdata + WHERE uuid=$1;` + + listHashData = `SELECT + uuid, + accountid, + miner, + ip, + hashrate, + updatedon + FROM hashdata + WHERE updatedon > $1;` + + pruneHashData = `DELETE FROM hashdata WHERE updatedon < $1;` + + insertHashData = `INSERT INTO hashdata( + uuid, + accountid, + miner, + ip, + hashrate, + updatedon) VALUES ($1,$2,$3, $4, $5, $6);` + + updateHashData = ` + UPDATE hashdata + SET + accountid=$2, + miner=$3, + ip=$4, + hashrate=$5, + updatedon=$6 + WHERE uuid=$1;` )