Skip to content

Commit

Permalink
tmp-debug
Browse files Browse the repository at this point in the history
  • Loading branch information
jberci committed Aug 22, 2019
1 parent f2e417b commit 1e6e48c
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 3 deletions.
17 changes: 16 additions & 1 deletion go/storage/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package client
import (
"context"
cryptorand "crypto/rand"
"fmt"
"io"
"math/rand"
"os"
"sync"
"time"

Expand Down Expand Up @@ -78,6 +80,7 @@ func (b *storageClientBackend) getStorageWatcher(runtimeID signature.MapKey) (st
defer b.runtimeWatchersLock.RUnlock()

watcher := b.runtimeWatchers[runtimeID]
fmt.Printf("getStorageWatcher on %p returning %p from %+v\n", b, watcher, b.runtimeWatchers)
if watcher == nil {
b.logger.Error("worker/storage/client: no watcher for runtime",
"runtime_id", runtimeID,
Expand Down Expand Up @@ -113,6 +116,8 @@ func (b *storageClientBackend) WatchRuntime(id signature.PublicKey) error {
// Watcher doesn't exist. Start new watcher.
watcher = newWatcher(b.ctx, id, b.identity.TLSCertificate, b.scheduler, b.registry)
b.runtimeWatchers[id.ToMapKey()] = watcher
fmt.Printf("inserted watcher %p into runtimeWatchers\n", watcher)
os.Stdout.Sync()

// Signal init when the first registered runtime is initialized.
if !b.signaledInit {
Expand Down Expand Up @@ -480,6 +485,7 @@ func (b *storageClientBackend) readWithClient(
}

clientStates := watcher.getClientStates()
fmt.Printf("client %p: client states array from watcher %p is %+v\n", b, watcher, clientStates)
n := len(clientStates)
if n == 0 {
b.logger.Error("readWithClient: no connected nodes for runtime",
Expand All @@ -498,7 +504,16 @@ func (b *storageClientBackend) readWithClient(
state := clientStates[randIndex]

// Skip this node when reading.
if state.node.ID.Equal(b.identity.NodeSigner.Public()) {
node := state.node
if node == nil {
fmt.Printf("\033[31;1m ** node is nil\033[0m %+v\n", state)
time.Sleep(2 * time.Second)
}
//nodeId := node.ID
identity := b.identity
signer := identity.NodeSigner
selfID := signer.Public()
if node != nil && node.ID.Equal(selfID) {
continue
}

Expand Down
19 changes: 18 additions & 1 deletion go/storage/client/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"sync"

"google.golang.org/grpc"
Expand Down Expand Up @@ -49,10 +51,12 @@ func (w *debugWatcherState) initialized() <-chan struct{} {
func newDebugWatcher(state *clientState) storageWatcher {
initCh := make(chan struct{})
close(initCh)
return &debugWatcherState{
ret := &debugWatcherState{
initCh: initCh,
clientState: state,
}
fmt.Printf("created debug watcher %p\n", ret)
return ret
}

// watcherState contains storage watcher state.
Expand Down Expand Up @@ -117,9 +121,14 @@ func (w *watcherState) getClientStates() []clientState {
w.RLock()
defer w.RUnlock()
clientStates := []clientState{}
fmt.Printf(">>>>\n")
fmt.Printf("current clientStates array is %+v\n", w.clientStates)
defer fmt.Printf("watcher %p returning actual array %+v\n", w, clientStates)
for _, state := range w.clientStates {
clientStates = append(clientStates, *state)
fmt.Printf("copied %+v into %+v\n", *state, clientStates[len(clientStates)-1])
}
fmt.Printf("<<<<\n")
return clientStates
}
func (w *watcherState) updateStorageNodeConnections() {
Expand Down Expand Up @@ -203,16 +212,21 @@ func (w *watcherState) updateStorageNodeConnections() {
manualResolver.UpdateState(resolverState)

numConnNodes++
if node == nil {
fmt.Printf("\033[31;1m ** node is nil in watcher\033[0m\n")
}
connClientStates = append(connClientStates, &clientState{
node: node,
client: storage.NewStorageClient(conn),
conn: conn,
resolverCleanupCb: cleanupCb,
})
fmt.Printf("appended new clientState: %+v\n", connClientStates[len(connClientStates)-1])
w.logger.Debug("storage node connection updated",
"node", node,
)
}
fmt.Printf("--------------\n")
if numConnNodes == 0 {
w.logger.Error("failed to connect to any of the storage committee members",
"nodes", nodeList,
Expand Down Expand Up @@ -262,6 +276,7 @@ func (w *watcherState) watch(ctx context.Context) {
nodeListCh, sub := w.registry.WatchNodeList()
defer sub.Close()

fmt.Printf("watcher %p starting up\n", w)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -330,6 +345,8 @@ func newWatcher(
scheduledNodes: make(map[signature.MapKey]bool),
clientStates: []*clientState{},
}
fmt.Printf("created watcher %p\n", watcher)
os.Stdout.Sync()

go watcher.watch(ctx)

Expand Down
2 changes: 1 addition & 1 deletion go/storage/mkvs/urkel/urkel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func writeLogToMap(wl writelog.WriteLog) map[string]string {
return writeLogSet
}

func foldWriteLogIterator(t *testing.T, w db.WriteLogIterator) writelog.WriteLog {
func foldWriteLogIterator(t *testing.T, w writelog.Iterator) writelog.WriteLog {
writeLog := writelog.WriteLog{}

for {
Expand Down

0 comments on commit 1e6e48c

Please sign in to comment.