diff --git a/politeiawww/addresswatcher.go b/politeiawww/cmsaddresswatcher.go similarity index 72% rename from politeiawww/addresswatcher.go rename to politeiawww/cmsaddresswatcher.go index 7e73e9888..f43ba895b 100644 --- a/politeiawww/addresswatcher.go +++ b/politeiawww/cmsaddresswatcher.go @@ -5,7 +5,6 @@ package main import ( - "context" "encoding/hex" "encoding/json" "fmt" @@ -14,8 +13,6 @@ import ( "github.com/decred/dcrd/dcrutil" pstypes "github.com/decred/dcrdata/pubsub/types/v2" - client "github.com/decred/dcrdata/pubsub/v2/psclient" - "github.com/decred/dcrdata/semver" pd "github.com/decred/politeia/politeiad/api/v1" "github.com/decred/politeia/politeiad/cache" cms "github.com/decred/politeia/politeiawww/api/cms/v1" @@ -24,85 +21,37 @@ import ( "github.com/decred/politeia/util" ) -const mainnetSubsidyAddr = "Dcur2mcGjmENx4DhNqDctW5wJCVyT3Qeqkx" +const ( + // mainnetSubsidyAddr is the mainnet address in which cms payments + // must come from in order to be considered a valid payment. + mainnetSubsidyAddr = "Dcur2mcGjmENx4DhNqDctW5wJCVyT3Qeqkx" +) -func (p *politeiawww) addWatchAddress(address string) error { - address = "address:" + address - if subd, _ := strInSlice(p.pubSubDcrdata.currentSubs, address); subd { - log.Infof("Already subscribed to %s.", address) - return nil - } - p.pubSubDcrdata.currentSubs = append(p.pubSubDcrdata.currentSubs, address) - _, err := p.pubSubDcrdata.client.Subscribe(address) +func (p *politeiawww) addWatchAddress(address string) { + err := p.wsDcrdata.subToAddr(address) if err != nil { - return fmt.Errorf("failed to subscribe: %v", err) + log.Errorf("addWatchAddress: subscribe '%v': %v", + address, err) + return } log.Infof("Subscribed to listen: %v", address) - return nil } -func (p *politeiawww) removeWatchAddress(address string) error { - address = "address:" + address - subd, i := strInSlice(p.pubSubDcrdata.currentSubs, address) - if !subd { - log.Infof("Not subscribed to %s.", address) - return nil - } - p.pubSubDcrdata.currentSubs = append(p.pubSubDcrdata.currentSubs[:i], - p.pubSubDcrdata.currentSubs[i+1:]...) - _, err := p.pubSubDcrdata.client.Unsubscribe(address) +func (p *politeiawww) removeWatchAddress(address string) { + err := p.wsDcrdata.unsubFromAddr(address) if err != nil { - return fmt.Errorf("failed to unsubscribe: %v", err) + log.Errorf("removeWatchAddress: unsubscribe '%v': %v", + address, err) + return } log.Infof("Unsubscribed: %v", address) - return nil } -func (p *politeiawww) addPing() error { - _, err := p.pubSubDcrdata.client.Subscribe("ping") - if err != nil { - return fmt.Errorf("failed to subscribe: %v", err) - } - log.Infof("Subscribed to ping") - return nil -} - -func (p *politeiawww) setupWatcher() error { - // Create the websocket connection. - wsURL, err := util.BlockExplorerURLForSubscriptions(activeNetParams.Params) - if err != nil { - return err - } - log.Infof("Connecting to ws at: %v", wsURL) - - opts := client.Opts{ - ReadTimeout: 3 * time.Second, - WriteTimeout: 3 * time.Second, - } - p.pubSubDcrdata.client, err = client.New(wsURL, context.Background(), &opts) - if err != nil { - log.Errorf("failed to connect to %s: %v", wsURL, err) - return err - } - serverVer, err := p.pubSubDcrdata.client.ServerVersion() - if err != nil { - log.Errorf("failed to get server version: %v", err) - return err - } - - clientSemVer := client.Version() - log.Infof("PubSub Server version: %s, Client version %v", serverVer, - clientSemVer) - serverSemVer := semver.NewSemver(serverVer.Major, serverVer.Minor, - serverVer.Patch) - if !semver.Compatible(clientSemVer, serverSemVer) { - return fmt.Errorf("pubsub server version is %v, but client is version %v", - serverSemVer, clientSemVer) - } - +func (p *politeiawww) setupCMSAddressWatcher() { + p.wsDcrdata.subToPing() go func() { for { - msg, ok := <-p.pubSubDcrdata.client.Receive() + msg, ok := <-p.wsDcrdata.client.Receive() if !ok { break } @@ -136,10 +85,9 @@ func (p *politeiawww) setupWatcher() error { } } }() - return nil } -func (p *politeiawww) restartAddressesWatching() error { +func (p *politeiawww) restartCMSAddressesWatching() error { approvedInvoices, err := p.cmsDB.InvoicesByStatus(int(cms.InvoiceStatusApproved)) if err != nil { return err @@ -182,15 +130,6 @@ func (p *politeiawww) restartAddressesWatching() error { return nil } -func strInSlice(sl []string, str string) (bool, int) { - for i, s := range sl { - if s == str { - return true, i - } - } - return false, -1 -} - // checkPayments checks to see if a given payment has been successfully paid. // It will return TRUE if paid, otherwise false. It utilizes the util // FetchTxsForAddressNotBefore which looks for transaction at a given address diff --git a/politeiawww/politeiawww.go b/politeiawww/politeiawww.go index a05a6b278..87d15f3a4 100644 --- a/politeiawww/politeiawww.go +++ b/politeiawww/politeiawww.go @@ -12,7 +12,6 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/decred/dcrd/chaincfg" - client "github.com/decred/dcrdata/pubsub/v2/psclient" "github.com/decred/politeia/politeiad/api/v1/mime" "github.com/decred/politeia/politeiad/cache" www "github.com/decred/politeia/politeiawww/api/www/v1" @@ -67,12 +66,6 @@ type wsContext struct { done chan struct{} // SHUT...DOWN...EVERYTHING... } -// wsDcrdata is the context for the dcrdata websocket connection. -type wsDcrdata struct { - client *client.Client - currentSubs []string -} - func (w *wsContext) String() string { u := w.uuid if u == "" { @@ -135,9 +128,9 @@ type politeiawww struct { cmsDB cmsdatabase.Database cron *cron.Cron - // pubSubDcrdata contains the client and list of current subscriptions to + // wsDcrdata contains the client and list of current subscriptions to // dcrdata's public subscription websocket - pubSubDcrdata *wsDcrdata + wsDcrdata *wsDcrdata } // XXX rig this up diff --git a/politeiawww/wsdcrdata.go b/politeiawww/wsdcrdata.go new file mode 100644 index 000000000..d3957f2d8 --- /dev/null +++ b/politeiawww/wsdcrdata.go @@ -0,0 +1,140 @@ +package main + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + client "github.com/decred/dcrdata/pubsub/v2/psclient" + "github.com/decred/dcrdata/semver" + "github.com/decred/politeia/util" +) + +const ( + // addrSubPrefix must be prefixed to the address when subscribing + // to address events. + addrSubPrefix = "address:" +) + +var ( + // errDuplicateSub is emitted when attempting to subscribe to an + // event that has already been subscribed to. + errDuplicateSub = errors.New("duplicate subscription") + + // errSubNotFound is emitted when attempting to unsubscribe to an + // event that has not yet been subscribed to. + errSubNotFound = errors.New("subscription not found") +) + +// wsDcrdata is the context used for managing a dcrdata websocket connection. +type wsDcrdata struct { + sync.RWMutex + client *client.Client // dcrdata websocket client + subscriptions map[string]struct{} // Client subscriptions +} + +// addSub adds an event subscription to the subscriptions map. +func (w *wsDcrdata) addSub(event string) { + w.Lock() + defer w.Unlock() + + w.subscriptions[event] = struct{}{} +} + +// removeSub removes an event subscription from the subscriptions map. +func (w *wsDcrdata) removeSub(event string) { + w.Lock() + defer w.Unlock() + + delete(w.subscriptions, event) +} + +// isSubscribed returns whether the client is subscribed to the provided event. +func (w *wsDcrdata) isSubscribed(event string) bool { + w.RLock() + defer w.RUnlock() + + _, ok := w.subscriptions[event] + return ok +} + +// subToPing subscribes to the dcrdata ping event. +func (w *wsDcrdata) subToPing() error { + _, err := w.client.Subscribe("ping") + if err != nil { + return fmt.Errorf("failed to subscribe: %v", err) + } + log.Debugf("wsDcrdata subscribed to ping") + return nil +} + +// subToAddr subscribes to dcrdata events for the provided address. +func (w *wsDcrdata) subToAddr(address string) error { + event := addrSubPrefix + address + if w.isSubscribed(event) { + return errDuplicateSub + } + _, err := w.client.Subscribe(event) + if err != nil { + return fmt.Errorf("failed to subscribe: %v", err) + } + w.addSub(event) + log.Debugf("wsDcrdata subscribed to addr: %v", address) + return nil +} + +// unsubFromAddr unsubscribes from dcrdata events for the provided address. +func (w *wsDcrdata) unsubFromAddr(address string) error { + event := addrSubPrefix + address + if !w.isSubscribed(event) { + return errSubNotFound + } + _, err := w.client.Unsubscribe(event) + if err != nil { + return fmt.Errorf("failed to unsubscribe: %v", err) + } + w.removeSub(event) + log.Debugf("wsDcrdata unsubscribed from addr: %v", address) + return nil +} + +// newWSDcrdata return a new wsDcrdata context. +func newWSDcrdata() (*wsDcrdata, error) { + // Init websocket client + u, err := util.BlockExplorerURLForSubscriptions(activeNetParams.Params) + if err != nil { + return nil, err + } + opts := client.Opts{ + ReadTimeout: 3 * time.Second, + WriteTimeout: 3 * time.Second, + } + c, err := client.New(u, context.Background(), &opts) + if err != nil { + return nil, fmt.Errorf("failed to connect to %v: %v", u, err) + } + + log.Infof("Dcrdata websocket host: %v", u) + + // Check client and server compatibility + v, err := c.ServerVersion() + if err != nil { + return nil, fmt.Errorf("server version failed: %v", err) + } + serverSemVer := semver.NewSemver(v.Major, v.Minor, v.Patch) + clientSemVer := client.Version() + if !semver.Compatible(clientSemVer, serverSemVer) { + return nil, fmt.Errorf("version mismatch; client %v, server %v", + serverSemVer, clientSemVer) + } + + log.Infof("Dcrdata pubsub server version: %v, client version %v", + serverSemVer, clientSemVer) + + return &wsDcrdata{ + client: c, + subscriptions: make(map[string]struct{}), + }, nil +} diff --git a/politeiawww/www.go b/politeiawww/www.go index f24723d9b..7fccca3b7 100644 --- a/politeiawww/www.go +++ b/politeiawww/www.go @@ -537,20 +537,18 @@ func _main() error { p.cron = cron.New() p.checkInvoiceNotifications() - p.pubSubDcrdata = &wsDcrdata{} - - // XXX how many addresses should we plan on storing? - p.pubSubDcrdata.currentSubs = make([]string, 0, 1048) - - p.setupWatcher() - - err = p.restartAddressesWatching() + // Setup address watcher + ws, err := newWSDcrdata() + if err != nil { + return fmt.Errorf("new wsDcrdata: %v", err) + } + p.wsDcrdata = ws + p.setupCMSAddressWatcher() + err = p.restartCMSAddressesWatching() if err != nil { log.Errorf("error restarting address watcher %v", err) } - p.addPing() - default: return fmt.Errorf("unknown mode: %v", p.cfg.Mode) } @@ -639,8 +637,9 @@ done: // Close user db connection p.db.Close() - if p.pubSubDcrdata != nil && p.pubSubDcrdata.client != nil { - p.pubSubDcrdata.client.Stop() + // Shutdown all dcrdata websockets + if p.wsDcrdata != nil { + p.wsDcrdata.client.Stop() } return nil