Skip to content

Commit

Permalink
Merge pull request #1976 from dperny/fix-log-race
Browse files Browse the repository at this point in the history
Fix possible race in ListenSubscriptions
  • Loading branch information
aaronlehmann authored Mar 9, 2017
2 parents cefa878 + b87cbd6 commit 58bfef2
Showing 1 changed file with 39 additions and 16 deletions.
55 changes: 39 additions & 16 deletions manager/logbroker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type LogBroker struct {
subscriptionQueue *watch.Queue

registeredSubscriptions map[string]*subscription
connectedNodes map[string]struct{}
subscriptionsByNode map[string]map[*subscription]struct{}

pctx context.Context
cancelAll context.CancelFunc
Expand Down Expand Up @@ -70,7 +70,7 @@ func (lb *LogBroker) Run(ctx context.Context) error {
lb.logQueue = watch.NewQueue()
lb.subscriptionQueue = watch.NewQueue()
lb.registeredSubscriptions = make(map[string]*subscription)
lb.connectedNodes = make(map[string]struct{})
lb.subscriptionsByNode = make(map[string]map[*subscription]struct{})
lb.mu.Unlock()

select {
Expand Down Expand Up @@ -139,10 +139,13 @@ func (lb *LogBroker) registerSubscription(subscription *subscription) {
lb.registeredSubscriptions[subscription.message.ID] = subscription
lb.subscriptionQueue.Publish(subscription)

// Mark nodes that won't receive the message as done.
for _, node := range subscription.Nodes() {
if _, ok := lb.connectedNodes[node]; !ok {
if _, ok := lb.subscriptionsByNode[node]; !ok {
// Mark nodes that won't receive the message as done.
subscription.Done(node, fmt.Errorf("node %s is not available", node))
} else {
// otherwise, add the subscription to the node's subscriptions list
lb.subscriptionsByNode[node][subscription] = struct{}{}
}
}
}
Expand All @@ -153,6 +156,14 @@ func (lb *LogBroker) unregisterSubscription(subscription *subscription) {

delete(lb.registeredSubscriptions, subscription.message.ID)

// remove the subscription from all of the nodes
for _, node := range subscription.Nodes() {
// but only if a node exists
if _, ok := lb.subscriptionsByNode[node]; ok {
delete(lb.subscriptionsByNode[node], subscription)
}
}

subscription.Close()
lb.subscriptionQueue.Publish(subscription)
}
Expand Down Expand Up @@ -200,6 +211,21 @@ func (lb *LogBroker) publish(log *api.PublishLogsMessage) {
lb.logQueue.Publish(&logMessage{PublishLogsMessage: log})
}

// markDone wraps (*Subscription).Done() so that the removal of the sub from
// the node's subscription list is possible
func (lb *LogBroker) markDone(sub *subscription, nodeID string, err error) {
lb.mu.Lock()
defer lb.mu.Unlock()

// remove the subscription from the node's subscription list, if it exists
if _, ok := lb.subscriptionsByNode[nodeID]; ok {
delete(lb.subscriptionsByNode[nodeID], sub)
}

// mark the sub as done
sub.Done(nodeID, err)
}

// SubscribeLogs creates a log subscription and streams back logs
func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api.Logs_SubscribeLogsServer) error {
ctx := stream.Context()
Expand Down Expand Up @@ -260,14 +286,19 @@ func (lb *LogBroker) nodeConnected(nodeID string) {
lb.mu.Lock()
defer lb.mu.Unlock()

lb.connectedNodes[nodeID] = struct{}{}
if _, ok := lb.subscriptionsByNode[nodeID]; !ok {
lb.subscriptionsByNode[nodeID] = make(map[*subscription]struct{})
}
}

func (lb *LogBroker) nodeDisconnected(nodeID string) {
lb.mu.Lock()
defer lb.mu.Unlock()

delete(lb.connectedNodes, nodeID)
for sub := range lb.subscriptionsByNode[nodeID] {
sub.Done(nodeID, fmt.Errorf("node %s disconnected unexpectedly", nodeID))
}
delete(lb.subscriptionsByNode, nodeID)
}

// ListenSubscriptions returns a stream of matching subscriptions for the current node
Expand All @@ -292,12 +323,6 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
log.Debug("node registered")

activeSubscriptions := make(map[string]*subscription)
defer func() {
// If the worker quits, mark all active subscriptions as finished.
for _, subscription := range activeSubscriptions {
subscription.Done(remote.NodeID, fmt.Errorf("node %s disconnected unexpectedly", remote.NodeID))
}
}()

// Start by sending down all active subscriptions.
for _, subscription := range subscriptions {
Expand All @@ -323,15 +348,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
subscription := v.(*subscription)

if subscription.Closed() {
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
delete(activeSubscriptions, subscription.message.ID)
} else {
// Avoid sending down the same subscription multiple times
if _, ok := activeSubscriptions[subscription.message.ID]; ok {
continue
}
activeSubscriptions[subscription.message.ID] = subscription
log.WithField("subscription.id", subscription.message.ID).Debug("subscription added")
}
if err := stream.Send(subscription.message); err != nil {
log.Error(err)
Expand All @@ -355,7 +378,7 @@ func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err er
var currentSubscription *subscription
defer func() {
if currentSubscription != nil {
currentSubscription.Done(remote.NodeID, err)
lb.markDone(currentSubscription, remote.NodeID, err)
}
}()

Expand Down Expand Up @@ -387,7 +410,7 @@ func (lb *LogBroker) PublishLogs(stream api.LogBroker_PublishLogsServer) (err er
if logMsg.Close {
// Mark done and then set to nil so if we error after this point,
// we don't try to close again in the defer
currentSubscription.Done(remote.NodeID, err)
lb.markDone(currentSubscription, remote.NodeID, err)
currentSubscription = nil
return nil
}
Expand Down

0 comments on commit 58bfef2

Please sign in to comment.