From 1dc06fb50d01f12bd4ed06bab50896491a0daf23 Mon Sep 17 00:00:00 2001 From: Matt Brittan Date: Mon, 19 Jul 2021 22:02:49 +1200 Subject: [PATCH] Add SetMaxResumePubInFlight option that can slow down the sending of stored publish packets when connection resumes. Ref #520 --- client.go | 39 ++++++++++- go.mod | 1 + go.sum | 2 + memstore_ordered.go | 161 ++++++++++++++++++++++++++++++++++++++++++++ options.go | 11 +++ 5 files changed, 213 insertions(+), 1 deletion(-) create mode 100644 memstore_ordered.go diff --git a/client.go b/client.go index 847daee6..38a9c5cf 100644 --- a/client.go +++ b/client.go @@ -19,6 +19,7 @@ package mqtt import ( "bytes" + "context" "errors" "fmt" "net" @@ -27,6 +28,8 @@ import ( "sync/atomic" "time" + "golang.org/x/sync/semaphore" + "github.com/eclipse/paho.mqtt.golang/packets" ) @@ -919,10 +922,42 @@ func (c *client) reserveStoredPublishIDs() { // Load all stored messages and resend them // Call this to ensure QOS > 1,2 even after an application crash // Note: This function will exit if c.stop is closed (this allows the shutdown to proceed avoiding a potential deadlock) -// +// other than that it does not return until all messages in the store have been sent (connect() does not complete its +// token before this completes) func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) { DEBUG.Println(STR, "enter Resume") + // Prior to sending a message getSemaphore will be called and once sent releaseSemaphore will be called + // with the token (so semaphore can be released when ACK received if applicable). + // Using a weighted semaphore rather than channels because this retains ordering + getSemaphore := func() {} // Default = do nothing + releaseSemaphore := func(_ *PublishToken) {} // Default = do nothing + var sem *semaphore.Weighted + if c.options.MaxResumePubInFlight > 0 { + sem = semaphore.NewWeighted(int64(c.options.MaxResumePubInFlight)) + ctx, cancel := context.WithCancel(context.Background()) // Context needed for semaphore + defer cancel() // ensure context gets cancelled + + go func() { + select { + case <-c.stop: // Request to stop (due to comm error etc) + cancel() + case <-ctx.Done(): // resume completed normally + } + }() + + getSemaphore = func() { sem.Acquire(ctx, 1) } + releaseSemaphore = func(token *PublishToken) { // Note: If token never completes then resume() may stall (will still exit on ctx.Done()) + go func() { + select { + case <-token.Done(): + case <-ctx.Done(): + } + sem.Release(1) + }() + } + } + storedKeys := c.persist.All() for _, key := range storedKeys { packet := c.persist.Get(key) @@ -986,12 +1021,14 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) { c.claimID(token, details.MessageID) DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID)) DEBUG.Println(STR, details) + getSemaphore() select { case c.obound <- &PacketAndToken{p: p, t: token}: case <-c.stop: DEBUG.Println(STR, "resume exiting due to stop") return } + releaseSemaphore(token) // If limiting simultaneous messages then we need to know when message is acknowledged default: ERROR.Println(STR, "invalid message type in store (discarded)") c.persist.Del(key) diff --git a/go.mod b/go.mod index fcbd7872..beb19918 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,5 @@ go 1.14 require ( github.com/gorilla/websocket v1.4.2 golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) diff --git a/go.sum b/go.sum index 7095afb1..4ce755ba 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0 h1:Jcxah/M+oLZ/R4/z5RzfPzGbPXnVDPkEDtf2JnuxN+U= golang.org/x/net v0.0.0-20200425230154-ff2c4b7c35a0/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/memstore_ordered.go b/memstore_ordered.go new file mode 100644 index 00000000..e2ae1ebe --- /dev/null +++ b/memstore_ordered.go @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2013 IBM Corp. + * + * All rights reserved. This program and the accompanying materials + * are made available under the terms of the Eclipse Public License v1.0 + * which accompanies this distribution, and is available at + * http://www.eclipse.org/legal/epl-v10.html + * + * Contributors: + * Seth Hoenig + * Allan Stockdill-Mander + * Mike Robertson + */ + +package mqtt + +import ( + "sort" + "sync" + "time" + + "github.com/eclipse/paho.mqtt.golang/packets" +) + +// OrderedMemoryStore uses a map internally so the order in which All() returns packets is +// undefined. OrderedMemoryStore resolves this by storing the time the message is added +// and sorting based upon this. + +// storedMessage encapsulates a message and the time it was initially stored +type storedMessage struct { + ts time.Time + msg packets.ControlPacket +} + +// OrderedMemoryStore implements the store interface to provide a "persistence" +// mechanism wholly stored in memory. This is only useful for +// as long as the client instance exists. +type OrderedMemoryStore struct { + sync.RWMutex + messages map[string]storedMessage + opened bool +} + +// NewOrderedMemoryStore returns a pointer to a new instance of +// OrderedMemoryStore, the instance is not initialized and ready to +// use until Open() has been called on it. +func NewOrderedMemoryStore() *OrderedMemoryStore { + store := &OrderedMemoryStore{ + messages: make(map[string]storedMessage), + opened: false, + } + return store +} + +// Open initializes a OrderedMemoryStore instance. +func (store *OrderedMemoryStore) Open() { + store.Lock() + defer store.Unlock() + store.opened = true + DEBUG.Println(STR, "OrderedMemoryStore initialized") +} + +// Put takes a key and a pointer to a Message and stores the +// message. +func (store *OrderedMemoryStore) Put(key string, message packets.ControlPacket) { + store.Lock() + defer store.Unlock() + if !store.opened { + ERROR.Println(STR, "Trying to use memory store, but not open") + return + } + store.messages[key] = storedMessage{ts: time.Now(), msg: message} +} + +// Get takes a key and looks in the store for a matching Message +// returning either the Message pointer or nil. +func (store *OrderedMemoryStore) Get(key string) packets.ControlPacket { + store.RLock() + defer store.RUnlock() + if !store.opened { + ERROR.Println(STR, "Trying to use memory store, but not open") + return nil + } + mid := mIDFromKey(key) + m, ok := store.messages[key] + if !ok || m.msg == nil { + CRITICAL.Println(STR, "OrderedMemoryStore get: message", mid, "not found") + } else { + DEBUG.Println(STR, "OrderedMemoryStore get: message", mid, "found") + } + return m.msg +} + +// All returns a slice of strings containing all the keys currently +// in the OrderedMemoryStore. +func (store *OrderedMemoryStore) All() []string { + store.RLock() + defer store.RUnlock() + if !store.opened { + ERROR.Println(STR, "Trying to use memory store, but not open") + return nil + } + type tsAndKey struct { + ts time.Time + key string + } + + tsKeys := make([]tsAndKey, 0, len(store.messages)) + for k, v := range store.messages { + tsKeys = append(tsKeys, tsAndKey{ts: v.ts, key: k}) + } + sort.Slice(tsKeys, func(a int, b int) bool { return tsKeys[a].ts.Before(tsKeys[b].ts) }) + + keys := make([]string, len(tsKeys)) + for i := range tsKeys { + keys[i] = tsKeys[i].key + } + return keys +} + +// Del takes a key, searches the OrderedMemoryStore and if the key is found +// deletes the Message pointer associated with it. +func (store *OrderedMemoryStore) Del(key string) { + store.Lock() + defer store.Unlock() + if !store.opened { + ERROR.Println(STR, "Trying to use memory store, but not open") + return + } + mid := mIDFromKey(key) + _, ok := store.messages[key] + if !ok { + WARN.Println(STR, "OrderedMemoryStore del: message", mid, "not found") + } else { + delete(store.messages, key) + DEBUG.Println(STR, "OrderedMemoryStore del: message", mid, "was deleted") + } +} + +// Close will disallow modifications to the state of the store. +func (store *OrderedMemoryStore) Close() { + store.Lock() + defer store.Unlock() + if !store.opened { + ERROR.Println(STR, "Trying to close memory store, but not open") + return + } + store.opened = false + DEBUG.Println(STR, "OrderedMemoryStore closed") +} + +// Reset eliminates all persisted message data in the store. +func (store *OrderedMemoryStore) Reset() { + store.Lock() + defer store.Unlock() + if !store.opened { + ERROR.Println(STR, "Trying to reset memory store, but not open") + } + store.messages = make(map[string]storedMessage) + WARN.Println(STR, "OrderedMemoryStore wiped") +} diff --git a/options.go b/options.go index 4a1420c1..065fcf63 100644 --- a/options.go +++ b/options.go @@ -88,6 +88,7 @@ type ClientOptions struct { ResumeSubs bool HTTPHeaders http.Header WebsocketOptions *WebsocketOptions + MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming } // NewClientOptions will create a new ClientClientOptions type with some @@ -401,3 +402,13 @@ func (o *ClientOptions) SetWebsocketOptions(w *WebsocketOptions) *ClientOptions o.WebsocketOptions = w return o } + +// SetMaxResumePubInFlight sets the maximum simultaneous publish messages that will be sent while resuming. Note that +// this only applies to messages coming from the store (so additional sends may push us over the limit) +// Note that the connect token will not be flagged as complete until all messages have been sent from the +// store. If broker does not respond to messages then resume may not complete. +// This option was put in place because resuming after downtime can saturate low capacity links. +func (o *ClientOptions) SetMaxResumePubInFlight(MaxResumePubInFlight int) *ClientOptions { + o.MaxResumePubInFlight = MaxResumePubInFlight + return o +}