Skip to content

Commit

Permalink
subscriptions: Add SubscriptionListener functionality
Browse files Browse the repository at this point in the history
This allows external components to install subscription listeners,
which are notified whenever a subscription is added to or removed
from the subscription manager.
  • Loading branch information
Jannis Pohlmann committed Dec 18, 2017
1 parent bdb6eab commit b829e73
Showing 1 changed file with 56 additions and 5 deletions.
61 changes: 56 additions & 5 deletions subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,16 @@ type ConnectionSubscriptions map[string]*Subscription
// subscription IDs to subscriptions.
type Subscriptions map[Connection]ConnectionSubscriptions

// SubscriptionListener defines an interface for components that
// are to be notified of added/removed subscriptions.
type SubscriptionListener interface {
// SubscriptionAdded is called whenever a new subscription is added.
SubscriptionAdded(Connection, *Subscription)

// SubscriptionRemoved is called whenever a subscription is removed.
SubscriptionRemoved(Connection, *Subscription)
}

// SubscriptionManager provides a high-level interface to managing
// and accessing the subscriptions made by GraphQL WS clients.
type SubscriptionManager interface {
Expand All @@ -77,6 +87,13 @@ type SubscriptionManager interface {

// RemoveSubscriptions removes all subscriptions of a client connection.
RemoveSubscriptions(Connection)

// AddSubscriptionListener adds a listener to be notified when subscriptions
// are added or removed.
AddSubscriptionListener(SubscriptionListener)

// RemoveSubscriptionListener removes a previously added listener.
RemoveSubscriptionListener(SubscriptionListener)
}

/**
Expand All @@ -87,15 +104,17 @@ type subscriptionManager struct {
subscriptions Subscriptions
schema *graphql.Schema
logger *log.Entry
listeners map[SubscriptionListener]bool
}

// NewSubscriptionManager creates a new subscription manager.
func NewSubscriptionManager(schema *graphql.Schema) SubscriptionManager {
manager := new(subscriptionManager)
manager.subscriptions = make(Subscriptions)
manager.logger = NewLogger("subscriptions")
manager.schema = schema
return manager
return &subscriptionManager{
subscriptions: make(Subscriptions),
logger: NewLogger("subscriptions"),
schema: schema,
listeners: map[SubscriptionListener]bool{},
}
}

func (m *subscriptionManager) Subscriptions() Subscriptions {
Expand Down Expand Up @@ -157,6 +176,9 @@ func (m *subscriptionManager) AddSubscription(

m.subscriptions[conn][subscription.ID] = subscription

// Notify listeners
m.notifySubscriptionAdded(conn, subscription)

return nil
}

Expand All @@ -169,6 +191,9 @@ func (m *subscriptionManager) RemoveSubscription(
"subscription": subscription.ID,
}).Info("Remove subscription")

// Notify listeners
m.notifySubscriptionRemoved(conn, subscription)

// Remove the subscription from its connections' subscription map
delete(m.subscriptions[conn], subscription.ID)

Expand All @@ -195,6 +220,32 @@ func (m *subscriptionManager) RemoveSubscriptions(conn Connection) {
}
}

func (m *subscriptionManager) AddSubscriptionListener(listener SubscriptionListener) {
m.listeners[listener] = true
}

func (m *subscriptionManager) RemoveSubscriptionListener(listener SubscriptionListener) {
delete(m.listeners, listener)
}

func (m *subscriptionManager) notifySubscriptionAdded(
conn Connection,
subscription *Subscription,
) {
for listener := range m.listeners {
listener.SubscriptionAdded(conn, subscription)
}
}

func (m *subscriptionManager) notifySubscriptionRemoved(
conn Connection,
subscription *Subscription,
) {
for listener := range m.listeners {
listener.SubscriptionRemoved(conn, subscription)
}
}

func validateSubscription(s *Subscription) []error {
errs := []error{}

Expand Down

0 comments on commit b829e73

Please sign in to comment.