Skip to content

Commit

Permalink
multi: add ValidateAddress to stakepoold RPC (#406)
Browse files Browse the repository at this point in the history
  • Loading branch information
jholdstock authored and dajohi committed Jun 11, 2019
1 parent 2fc2544 commit b25f430
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 174 deletions.
9 changes: 9 additions & 0 deletions backend/stakepoold/rpc/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ service StakepooldService {
rpc ImportScript (ImportScriptRequest) returns (ImportScriptResponse);
rpc StakePoolUserInfo (StakePoolUserInfoRequest) returns (StakePoolUserInfoResponse);
rpc WalletInfo (WalletInfoRequest) returns (WalletInfoResponse);
rpc ValidateAddress (ValidateAddressRequest) returns (ValidateAddressResponse);
}

service VersionService {
Expand Down Expand Up @@ -69,6 +70,14 @@ message WalletInfoResponse {
uint32 VoteVersion = 1;
}

message ValidateAddressRequest {
string Address = 1;
}
message ValidateAddressResponse {
bool IsMine = 1;
string PubKeyAddr = 2;
}

message StakePoolUserTicket {
string Status = 1;
string Ticket = 2;
Expand Down
16 changes: 16 additions & 0 deletions backend/stakepoold/rpc/rpcserver/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,22 @@ func (ctx *AppContext) WalletInfo() (*wallettypes.WalletInfoResult, error) {
return response, nil
}

func (ctx *AppContext) ValidateAddress(address string) (*wallettypes.ValidateAddressWalletResult, error) {
addr, err := dcrutil.DecodeAddress(address)
if err != nil {
log.Errorf("ValidateAddress: ValidateAddress rpc failed: %v", err)
return nil, err
}

response, err := ctx.WalletConnection.ValidateAddress(addr)
if err != nil {
log.Errorf("ValidateAddress: ValidateAddress rpc failed: %v", err)
return nil, err
}

return response, nil
}

func (ctx *AppContext) UpdateUserData(newUserVotingConfig map[string]userdata.UserVotingConfig) {
log.Debug("updateUserData ctx.Lock")
ctx.Lock()
Expand Down
12 changes: 12 additions & 0 deletions backend/stakepoold/rpc/rpcserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,3 +189,15 @@ func (s *stakepooldServer) WalletInfo(ctx context.Context, req *pb.WalletInfoReq
VoteVersion: response.VoteVersion,
}, nil
}

func (s *stakepooldServer) ValidateAddress(ctx context.Context, req *pb.ValidateAddressRequest) (*pb.ValidateAddressResponse, error) {
response, err := s.appContext.ValidateAddress(req.Address)
if err != nil {
return nil, err
}

return &pb.ValidateAddressResponse{
IsMine: response.IsMine,
PubKeyAddr: response.PubKeyAddr,
}, nil
}
285 changes: 204 additions & 81 deletions backend/stakepoold/rpc/stakepoolrpc/api.pb.go

Large diffs are not rendered by default.

92 changes: 1 addition & 91 deletions controllers/dcrclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ import (
type functionName int

const (
validateAddressFn functionName = iota
createMultisigFn
createMultisigFn functionName = iota
getStakeInfoFn
connectedFn
)
Expand All @@ -45,18 +44,6 @@ var (
defaultAccountName = "default"
)

// validateAddressResponse
type validateAddressResponse struct {
addrInfo *wallettypes.ValidateAddressWalletResult
err error
}

// validateAddressMsg
type validateAddressMsg struct {
address dcrutil.Address
reply chan validateAddressResponse
}

// createMultisigResponse
type createMultisigResponse struct {
multisigInfo *wallettypes.CreateMultiSigResult
Expand Down Expand Up @@ -104,10 +91,6 @@ out:
select {
case m := <-w.msgChan:
switch msg := m.(type) {
case validateAddressMsg:
resp := w.executeInSequence(validateAddressFn, msg)
respTyped := resp.(*validateAddressResponse)
msg.reply <- *respTyped
case createMultisigMsg:
resp := w.executeInSequence(createMultisigFn, msg)
respTyped := resp.(*createMultisigResponse)
Expand Down Expand Up @@ -137,59 +120,6 @@ out:
// executeInSequence is the mainhandler of all the incoming client functions.
func (w *walletSvrManager) executeInSequence(fn functionName, msg interface{}) interface{} {
switch fn {
case validateAddressFn:
vam := msg.(validateAddressMsg)
resp := new(validateAddressResponse)
vawrs := make([]*wallettypes.ValidateAddressWalletResult, w.serversLen)
connectCount := 0
for i, s := range w.servers {
if w.servers[i] == nil {
continue
}
vawr, err := s.ValidateAddress(vam.address)
if err != nil && (err != rpcclient.ErrClientDisconnect &&
err != rpcclient.ErrClientShutdown) {
log.Infof("validateAddressFn failure on server %v: %v", i, err)
resp.err = err
return resp
} else if err != nil && (err == rpcclient.ErrClientDisconnect ||
err == rpcclient.ErrClientShutdown) {
vawrs[i] = nil
continue
}
connectCount++
vawrs[i] = vawr
}

if connectCount < w.minServers {
log.Errorf("Unable to check any servers for validateAddressFn")
resp.err = fmt.Errorf("not processing command; %v servers avail is below min of %v", connectCount, w.minServers)
return resp
}

for i := 0; i < w.serversLen; i++ {
if i == w.serversLen-1 {
break
}
if vawrs[i] == nil || vawrs[i+1] == nil {
continue
}
if vawrs[i].PubKey != vawrs[i+1].PubKey {
log.Infof("validateAddressFn nonequiv failure on servers "+
"%v, %v (%v != %v)", i, i+1, vawrs[i].PubKey, vawrs[i+1].PubKey)
resp.err = fmt.Errorf("non equivalent pubkey returned")
return resp
}
}

for i := range vawrs {
if vawrs[i] != nil {
resp.addrInfo = vawrs[i]
break
}
}
return resp

case createMultisigFn:
cmsm := msg.(createMultisigMsg)
resp := new(createMultisigResponse)
Expand Down Expand Up @@ -368,26 +298,6 @@ func (w *walletSvrManager) connected() ([]*wallettypes.WalletInfoResult, error)
return response.walletInfo, response.err
}

// ValidateAddress
//
// This should return equivalent results from all wallet RPCs. If this
// encounters a failure, it should be considered fatal.
func (w *walletSvrManager) ValidateAddress(addr dcrutil.Address) (*wallettypes.ValidateAddressWalletResult, error) {
// Assert that all servers are online.
_, err := w.connected()
if err != nil {
return nil, connectionError(err)
}

reply := make(chan validateAddressResponse)
w.msgChan <- validateAddressMsg{
address: addr,
reply: reply,
}
response := <-reply
return response.addrInfo, response.err
}

// CreateMultisig
//
// This should return equivalent results from all wallet RPCs. If this
Expand Down
4 changes: 2 additions & 2 deletions controllers/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (controller *MainController) APIAddress(c web.C, r *http.Request) ([]string
return nil, codes.Unavailable, "system error", errors.New("unable to process wallet commands")
}

poolValidateAddress, err := controller.rpcServers.ValidateAddress(pooladdress)
poolValidateAddress, err := controller.StakepooldServers.ValidateAddress(pooladdress)
if err != nil {
log.Errorf("unable to validate address: %v", err)
return nil, codes.Unavailable, "system error", errors.New("unable to process wallet commands")
Expand Down Expand Up @@ -754,7 +754,7 @@ func (controller *MainController) AddressPost(c web.C, r *http.Request) (string,
if controller.RPCIsStopped() {
return "/error", http.StatusSeeOther
}
poolValidateAddress, err := controller.rpcServers.ValidateAddress(pooladdress)
poolValidateAddress, err := controller.StakepooldServers.ValidateAddress(pooladdress)
if err != nil {
controller.handlePotentialFatalError("ValidateAddress pooladdress", err)
return "/error", http.StatusSeeOther
Expand Down
43 changes: 43 additions & 0 deletions stakepooldclient/stakepooldclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package stakepooldclient
import (
"errors"
"fmt"
"github.com/decred/dcrd/dcrutil"

"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
Expand Down Expand Up @@ -265,6 +266,48 @@ func (s *StakepooldManager) VoteVersion() (uint32, error) {
return lastVersion, nil
}

// ValidateAddress calls ValidateAddress RPC on all stakepoold servers.
// Returns an error if responses are not the same from all stakepoold instances.
func (s *StakepooldManager) ValidateAddress(addr dcrutil.Address) (*pb.ValidateAddressResponse, error) {
responses := make(map[int]*pb.ValidateAddressResponse)

// Get ValidateAddress response from all wallets
for i, conn := range s.grpcConnections {
client := pb.NewStakepooldServiceClient(conn)
req := &pb.ValidateAddressRequest{
Address: addr.EncodeAddress(),
}
resp, err := client.ValidateAddress(context.Background(), req)
if err != nil {
log.Errorf("ValidateAddress RPC failed on stakepoold instance %d: %v", i, err)
return nil, err
}
responses[i] = resp
}

// Ensure responses are identical
var lastResponse *pb.ValidateAddressResponse
lastServer := 0
firstrun := true
for k, v := range responses {
if firstrun {
firstrun = false
lastResponse = v
}

if v.IsMine != lastResponse.IsMine ||
v.PubKeyAddr != lastResponse.PubKeyAddr {
vErr := fmt.Errorf("wallets %d and %d have different ValideAddress responses",
k, lastServer)
return nil, vErr
}

lastServer = k
}

return lastResponse, nil
}

// ImportScript calls ImportScript RPC on all stakepoold instances. It stops
// executing and returns an error if any RPC call fails
func (s *StakepooldManager) ImportScript(script []byte) (heightImported int64, err error) {
Expand Down

0 comments on commit b25f430

Please sign in to comment.