Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

www: Update wsDcrdata context #933

Merged
merged 2 commits into from
Jun 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 19 additions & 80 deletions politeiawww/addresswatcher.go → politeiawww/cmsaddresswatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package main

import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
Expand All @@ -14,8 +13,6 @@ import (

"github.com/decred/dcrd/dcrutil"
pstypes "github.com/decred/dcrdata/pubsub/types/v2"
client "github.com/decred/dcrdata/pubsub/v2/psclient"
"github.com/decred/dcrdata/semver"
pd "github.com/decred/politeia/politeiad/api/v1"
"github.com/decred/politeia/politeiad/cache"
cms "github.com/decred/politeia/politeiawww/api/cms/v1"
Expand All @@ -24,85 +21,37 @@ import (
"github.com/decred/politeia/util"
)

const mainnetSubsidyAddr = "Dcur2mcGjmENx4DhNqDctW5wJCVyT3Qeqkx"
const (
// mainnetSubsidyAddr is the mainnet address in which cms payments
// must come from in order to be considered a valid payment.
mainnetSubsidyAddr = "Dcur2mcGjmENx4DhNqDctW5wJCVyT3Qeqkx"
)

func (p *politeiawww) addWatchAddress(address string) error {
address = "address:" + address
if subd, _ := strInSlice(p.pubSubDcrdata.currentSubs, address); subd {
log.Infof("Already subscribed to %s.", address)
return nil
}
p.pubSubDcrdata.currentSubs = append(p.pubSubDcrdata.currentSubs, address)
_, err := p.pubSubDcrdata.client.Subscribe(address)
func (p *politeiawww) addWatchAddress(address string) {
err := p.wsDcrdata.subToAddr(address)
if err != nil {
return fmt.Errorf("failed to subscribe: %v", err)
log.Errorf("addWatchAddress: subscribe '%v': %v",
address, err)
return
}
log.Infof("Subscribed to listen: %v", address)
return nil
}

func (p *politeiawww) removeWatchAddress(address string) error {
address = "address:" + address
subd, i := strInSlice(p.pubSubDcrdata.currentSubs, address)
if !subd {
log.Infof("Not subscribed to %s.", address)
return nil
}
p.pubSubDcrdata.currentSubs = append(p.pubSubDcrdata.currentSubs[:i],
p.pubSubDcrdata.currentSubs[i+1:]...)
_, err := p.pubSubDcrdata.client.Unsubscribe(address)
func (p *politeiawww) removeWatchAddress(address string) {
err := p.wsDcrdata.unsubFromAddr(address)
if err != nil {
return fmt.Errorf("failed to unsubscribe: %v", err)
log.Errorf("removeWatchAddress: unsubscribe '%v': %v",
address, err)
return
}
log.Infof("Unsubscribed: %v", address)
return nil
}

func (p *politeiawww) addPing() error {
_, err := p.pubSubDcrdata.client.Subscribe("ping")
if err != nil {
return fmt.Errorf("failed to subscribe: %v", err)
}
log.Infof("Subscribed to ping")
return nil
}

func (p *politeiawww) setupWatcher() error {
// Create the websocket connection.
wsURL, err := util.BlockExplorerURLForSubscriptions(activeNetParams.Params)
if err != nil {
return err
}
log.Infof("Connecting to ws at: %v", wsURL)

opts := client.Opts{
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
}
p.pubSubDcrdata.client, err = client.New(wsURL, context.Background(), &opts)
if err != nil {
log.Errorf("failed to connect to %s: %v", wsURL, err)
return err
}
serverVer, err := p.pubSubDcrdata.client.ServerVersion()
if err != nil {
log.Errorf("failed to get server version: %v", err)
return err
}

clientSemVer := client.Version()
log.Infof("PubSub Server version: %s, Client version %v", serverVer,
clientSemVer)
serverSemVer := semver.NewSemver(serverVer.Major, serverVer.Minor,
serverVer.Patch)
if !semver.Compatible(clientSemVer, serverSemVer) {
return fmt.Errorf("pubsub server version is %v, but client is version %v",
serverSemVer, clientSemVer)
}

func (p *politeiawww) setupCMSAddressWatcher() {
p.wsDcrdata.subToPing()
go func() {
for {
msg, ok := <-p.pubSubDcrdata.client.Receive()
msg, ok := <-p.wsDcrdata.client.Receive()
if !ok {
break
}
Expand Down Expand Up @@ -136,10 +85,9 @@ func (p *politeiawww) setupWatcher() error {
}
}
}()
return nil
}

func (p *politeiawww) restartAddressesWatching() error {
func (p *politeiawww) restartCMSAddressesWatching() error {
approvedInvoices, err := p.cmsDB.InvoicesByStatus(int(cms.InvoiceStatusApproved))
if err != nil {
return err
Expand Down Expand Up @@ -182,15 +130,6 @@ func (p *politeiawww) restartAddressesWatching() error {
return nil
}

func strInSlice(sl []string, str string) (bool, int) {
for i, s := range sl {
if s == str {
return true, i
}
}
return false, -1
}

// checkPayments checks to see if a given payment has been successfully paid.
// It will return TRUE if paid, otherwise false. It utilizes the util
// FetchTxsForAddressNotBefore which looks for transaction at a given address
Expand Down
11 changes: 2 additions & 9 deletions politeiawww/politeiawww.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (

"github.com/davecgh/go-spew/spew"
"github.com/decred/dcrd/chaincfg"
client "github.com/decred/dcrdata/pubsub/v2/psclient"
"github.com/decred/politeia/politeiad/api/v1/mime"
"github.com/decred/politeia/politeiad/cache"
www "github.com/decred/politeia/politeiawww/api/www/v1"
Expand Down Expand Up @@ -67,12 +66,6 @@ type wsContext struct {
done chan struct{} // SHUT...DOWN...EVERYTHING...
}

// wsDcrdata is the context for the dcrdata websocket connection.
type wsDcrdata struct {
client *client.Client
currentSubs []string
}

func (w *wsContext) String() string {
u := w.uuid
if u == "" {
Expand Down Expand Up @@ -135,9 +128,9 @@ type politeiawww struct {
cmsDB cmsdatabase.Database
cron *cron.Cron

// pubSubDcrdata contains the client and list of current subscriptions to
// wsDcrdata contains the client and list of current subscriptions to
// dcrdata's public subscription websocket
pubSubDcrdata *wsDcrdata
wsDcrdata *wsDcrdata
}

// XXX rig this up
Expand Down
140 changes: 140 additions & 0 deletions politeiawww/wsdcrdata.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package main

import (
"context"
"errors"
"fmt"
"sync"
"time"

client "github.com/decred/dcrdata/pubsub/v2/psclient"
"github.com/decred/dcrdata/semver"
"github.com/decred/politeia/util"
)

const (
// addrSubPrefix must be prefixed to the address when subscribing
// to address events.
addrSubPrefix = "address:"
)

var (
// errDuplicateSub is emitted when attempting to subscribe to an
// event that has already been subscribed to.
errDuplicateSub = errors.New("duplicate subscription")

// errSubNotFound is emitted when attempting to unsubscribe to an
// event that has not yet been subscribed to.
errSubNotFound = errors.New("subscription not found")
)

// wsDcrdata is the context used for managing a dcrdata websocket connection.
type wsDcrdata struct {
sync.RWMutex
client *client.Client // dcrdata websocket client
subscriptions map[string]struct{} // Client subscriptions
}

// addSub adds an event subscription to the subscriptions map.
func (w *wsDcrdata) addSub(event string) {
w.Lock()
defer w.Unlock()

w.subscriptions[event] = struct{}{}
}

// removeSub removes an event subscription from the subscriptions map.
func (w *wsDcrdata) removeSub(event string) {
w.Lock()
defer w.Unlock()

delete(w.subscriptions, event)
}

// isSubscribed returns whether the client is subscribed to the provided event.
func (w *wsDcrdata) isSubscribed(event string) bool {
w.RLock()
defer w.RUnlock()

_, ok := w.subscriptions[event]
return ok
}

// subToPing subscribes to the dcrdata ping event.
func (w *wsDcrdata) subToPing() error {
_, err := w.client.Subscribe("ping")
if err != nil {
return fmt.Errorf("failed to subscribe: %v", err)
}
log.Debugf("wsDcrdata subscribed to ping")
return nil
}

// subToAddr subscribes to dcrdata events for the provided address.
func (w *wsDcrdata) subToAddr(address string) error {
event := addrSubPrefix + address
if w.isSubscribed(event) {
return errDuplicateSub
}
_, err := w.client.Subscribe(event)
if err != nil {
return fmt.Errorf("failed to subscribe: %v", err)
}
w.addSub(event)
log.Debugf("wsDcrdata subscribed to addr: %v", address)
return nil
}

// unsubFromAddr unsubscribes from dcrdata events for the provided address.
func (w *wsDcrdata) unsubFromAddr(address string) error {
event := addrSubPrefix + address
if !w.isSubscribed(event) {
return errSubNotFound
}
_, err := w.client.Unsubscribe(event)
if err != nil {
return fmt.Errorf("failed to unsubscribe: %v", err)
}
w.removeSub(event)
log.Debugf("wsDcrdata unsubscribed from addr: %v", address)
return nil
}

// newWSDcrdata return a new wsDcrdata context.
func newWSDcrdata() (*wsDcrdata, error) {
// Init websocket client
u, err := util.BlockExplorerURLForSubscriptions(activeNetParams.Params)
if err != nil {
return nil, err
}
opts := client.Opts{
ReadTimeout: 3 * time.Second,
WriteTimeout: 3 * time.Second,
}
c, err := client.New(u, context.Background(), &opts)
if err != nil {
return nil, fmt.Errorf("failed to connect to %v: %v", u, err)
}

log.Infof("Dcrdata websocket host: %v", u)

// Check client and server compatibility
v, err := c.ServerVersion()
if err != nil {
return nil, fmt.Errorf("server version failed: %v", err)
}
serverSemVer := semver.NewSemver(v.Major, v.Minor, v.Patch)
clientSemVer := client.Version()
if !semver.Compatible(clientSemVer, serverSemVer) {
return nil, fmt.Errorf("version mismatch; client %v, server %v",
serverSemVer, clientSemVer)
}

log.Infof("Dcrdata pubsub server version: %v, client version %v",
serverSemVer, clientSemVer)

return &wsDcrdata{
client: c,
subscriptions: make(map[string]struct{}),
}, nil
}
23 changes: 11 additions & 12 deletions politeiawww/www.go
Original file line number Diff line number Diff line change
Expand Up @@ -537,20 +537,18 @@ func _main() error {
p.cron = cron.New()
p.checkInvoiceNotifications()

p.pubSubDcrdata = &wsDcrdata{}

// XXX how many addresses should we plan on storing?
p.pubSubDcrdata.currentSubs = make([]string, 0, 1048)

p.setupWatcher()

err = p.restartAddressesWatching()
// Setup address watcher
ws, err := newWSDcrdata()
if err != nil {
return fmt.Errorf("new wsDcrdata: %v", err)
}
p.wsDcrdata = ws
p.setupCMSAddressWatcher()
err = p.restartCMSAddressesWatching()
if err != nil {
log.Errorf("error restarting address watcher %v", err)
}

p.addPing()

default:
return fmt.Errorf("unknown mode: %v", p.cfg.Mode)
}
Expand Down Expand Up @@ -639,8 +637,9 @@ done:
// Close user db connection
p.db.Close()

if p.pubSubDcrdata != nil && p.pubSubDcrdata.client != nil {
p.pubSubDcrdata.client.Stop()
// Shutdown all dcrdata websockets
if p.wsDcrdata != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we not need to verify that p.wsDcrdata.client is not nil here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No because wsDcrdata is initialized using the function newWSDcrdata, which ensures that the client is setup correctly. If it's being initialized without using newWSDcrdata then its being initialized incorrectly.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool thanks for clarifying. Looks good.

p.wsDcrdata.client.Stop()
}

return nil
Expand Down