Skip to content

Commit

Permalink
Ref #47
Browse files Browse the repository at this point in the history
Omtimize send/recv routines

Signed-off-by: Artur Troian <[email protected]>
  • Loading branch information
troian committed Aug 31, 2017
1 parent 25b9c8f commit f82ea41
Show file tree
Hide file tree
Showing 9 changed files with 548 additions and 786 deletions.
11 changes: 6 additions & 5 deletions clients/sessions.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,21 +89,22 @@ func NewManager(c *Config) (*Manager, error) {
allowReplace: c.AllowReplace,
offlineQoS0: c.OfflineQoS0,
quit: make(chan struct{}),
//sessions: make(map[string]*session),
//subscribers: make(map[string]subscriber.ConnectionProvider),
log: configuration.GetProdLogger().Named("sessions"),
log: configuration.GetProdLogger().Named("sessions"),
}

m.persistence, _ = c.Persist.Sessions()

var err error

m.log.Info("Loading sessions. Might take a while")

// load sessions for fill systree
// those sessions having either will delay or expire are created with and timer started
if err := m.loadSessions(); err != nil {
if err = m.loadSessions(); err != nil {
return nil, err
}

if err := m.loadSubscribers(); err != nil {
if err = m.loadSubscribers(); err != nil {
return nil, err
}

Expand Down
231 changes: 123 additions & 108 deletions connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,37 +81,35 @@ type Config struct {

// Type session
type Type struct {
conn *netConn
pubIn *ackQueue
pubOut *ackQueue
publisher *publisher
flowControl *packetsFlowControl
onDisconnect onDisconnect
subscriber subscriber.ConnectionProvider
messenger types.TopicMessenger
auth auth.SessionPermissions
id string
username string
quit chan struct{}
onStart types.Once
onStop types.Once
conn net.Conn
pubIn *ackQueue
pubOut *ackQueue
flowControl *packetsFlowControl
tx *transmitter
rx *receiver
onDisconnect onDisconnect
subscriber subscriber.ConnectionProvider
messenger types.TopicMessenger
auth auth.SessionPermissions
id string
username string
quit chan struct{}
onStart types.Once
//onStop types.Once
onConnDisconnect types.Once
started sync.WaitGroup
//started sync.WaitGroup

retained struct {
lock sync.Mutex
list []*packet.Publish
}

log struct {
prod *zap.Logger
dev *zap.Logger
}

log *zap.Logger
sendQuota int32
offlineQoS0 bool
clean bool
version packet.ProtocolVersion
will bool
}

type unacknowledgedPublish struct {
Expand All @@ -128,104 +126,156 @@ func New(c *Config) (s *Type, err error) {
messenger: c.Messenger,
version: c.Version,
clean: c.Clean,
conn: c.Conn,
onDisconnect: c.OnDisconnect,
sendQuota: int32(c.SendQuota),
quit: make(chan struct{}),
will: true,
}

s.started.Add(1)
s.publisher = newPublisher(s.quit)
s.pubIn = newAckQueue(s.onReleaseIn)
s.pubOut = newAckQueue(s.onReleaseOut)
s.flowControl = newFlowControl(s.quit, c.PreserveOrder)
s.log = configuration.GetProdLogger().Named("connection." + s.id)

s.tx = newTransmitter(&transmitterConfig{
quit: s.quit,
log: s.log,
id: s.id,
pubIn: s.pubIn,
pubOut: s.pubOut,
flowControl: s.flowControl,
conn: s.conn,
onDisconnect: s.onConnectionClose,
})

s.log.prod = configuration.GetProdLogger().Named("connection." + s.id)
s.log.dev = configuration.GetDevLogger().Named("connection." + s.id)
s.rx = newReceiver(&receiverConfig{
quit: s.quit,
conn: s.conn,
onDisconnect: s.onConnectionClose,
onPacket: s.processIncoming,
version: s.version,
will: &s.will,
})

if c.KeepAlive > 0 {
s.rx.keepAlive = time.Second * time.Duration(c.KeepAlive)
s.rx.keepAlive = s.rx.keepAlive + (s.rx.keepAlive / 2)
}

// publisher queue is ready, assign to subscriber new online callback
// transmitter queue is ready, assign to subscriber new online callback
// and signal it to forward messages to online callback by creating channel
s.subscriber.Online(s.onSubscribedPublish)
if !s.clean && c.State != nil {
// restore persisted state of the session if any
s.loadPersistence(c.State) // nolint: errcheck
}

s.pubIn = newAckQueue(s.onReleaseIn)
s.pubOut = newAckQueue(s.onReleaseOut)

// Run connection
// error check check at deferred call
s.conn, err = newNet(
&netConfig{
id: s.id,
conn: c.Conn,
keepAlive: c.KeepAlive,
protoVersion: c.Version,
on: onProcess{
publish: s.onPublish,
ack: s.onAck,
subscribe: s.onSubscribe,
unSubscribe: s.onUnSubscribe,
disconnect: s.onConnectionClose,
},
packetsMetric: c.Metric.Packets(),
})
return
}

// Start signal session to start processing messages.
// Effective only first invoke
// Start run connection
func (s *Type) Start() {
s.onStart.Do(func() {
s.conn.start()

s.publisher.stopped.Add(1)
s.publisher.started.Add(1)
go s.publishWorker()
s.publisher.started.Wait()

s.started.Done()
s.tx.run()
s.rx.run()
})
}

// Stop session. Function assumed to be invoked once server about to either shutdown, disconnect
// or session is being replaced
// Effective only first invoke
func (s *Type) Stop(reason packet.ReasonCode) {
s.onStop.Do(func() {
s.conn.stop(&reason)
})
s.onConnectionClose(true)
}

func (s *Type) loadPersistence(state *persistenceTypes.SessionMessages) (err error) {
//defer s.publisher.cond.L.Unlock()
//s.publisher.cond.L.Lock()
func (s *Type) processIncoming(p packet.Provider) error {
var err error
var resp packet.Provider

switch pkt := p.(type) {
case *packet.Publish:
resp = s.onPublish(pkt)
case *packet.Ack:
resp = s.onAck(pkt)
case *packet.Subscribe:
resp = s.onSubscribe(pkt)
case *packet.UnSubscribe:
resp = s.onUnSubscribe(pkt)
case *packet.PingReq:
// For PINGREQ message, we should send back PINGRESP
mR, _ := packet.NewMessage(s.version, packet.PINGRESP)
resp, _ = mR.(*packet.PingResp)
case *packet.Disconnect:
// For DISCONNECT message, we should quit without sending Will
s.will = false

//if s.version == packet.ProtocolV50 {
// // FIXME: CodeRefusedBadUsernameOrPassword has same id as CodeDisconnectWithWill
// if m.ReasonCode() == packet.CodeRefusedBadUsernameOrPassword {
// s.will = true
// }
//
// expireIn := time.Duration(0)
// if val, e := m.PropertyGet(packet.PropertySessionExpiryInterval); e == nil {
// expireIn = time.Duration(val.(uint32))
// }
//
// // If the Session Expiry Interval in the CONNECT packet was zero, then it is a Protocol Error to set a non-
// // zero Session Expiry Interval in the DISCONNECT packet sent by the Client. If such a non-zero Session
// // Expiry Interval is received by the Server, it does not treat it as a valid DISCONNECT packet. The Server
// // uses DISCONNECT with Reason Code 0x82 (Protocol Error) as described in section 4.13.
// if s.expireIn != nil && *s.expireIn == 0 && expireIn != 0 {
// m, _ := packet.NewMessage(packet.ProtocolV50, packet.DISCONNECT)
// msg, _ := m.(*packet.Disconnect)
// msg.SetReasonCode(packet.CodeProtocolError)
// s.WriteMessage(msg, true) // nolint: errcheck
// }
//}
//return
err = errors.New("disconnect")
default:
s.log.Error("Unsupported incoming message type",
zap.String("ClientID", s.id),
zap.String("type", p.Type().Name()))
return nil
}

if resp != nil {
s.tx.sendPacket(resp)
}

return err
}

func (s *Type) loadPersistence(state *persistenceTypes.SessionMessages) (err error) {
for _, d := range state.OutMessages {
var msg packet.Provider
if msg, _, err = packet.Decode(s.version, d); err != nil {
s.log.prod.Error("Couldn't decode persisted message", zap.Error(err))
s.log.Error("Couldn't decode persisted message", zap.Error(err))
err = ErrPersistence
return
}

switch m := msg.(type) {
case *packet.Publish:
if m.QoS() == packet.QoS2 && m.Dup() {
s.log.prod.Error("3: QoS2 DUP")
s.log.Error("3: QoS2 DUP")
}
}

s.publisher.pushFront(msg)
s.tx.loadFront(msg)
}

for _, d := range state.UnAckMessages {
var msg packet.Provider
if msg, _, err = packet.Decode(s.version, d); err != nil {
s.log.prod.Error("Couldn't decode persisted message", zap.Error(err))
s.log.Error("Couldn't decode persisted message", zap.Error(err))
err = ErrPersistence
return
}

s.publisher.pushFront(&unacknowledgedPublish{msg: msg})
s.tx.loadFront(&unacknowledgedPublish{msg: msg})
}

return
Expand All @@ -238,15 +288,15 @@ func (s *Type) loadPersistence(state *persistenceTypes.SessionMessages) (err err
// For the server, when this method is called, it means there's a message that
// should be published to the client on the other end of this connection. So we
// will call publish() to send the message.
func (s *Type) onSubscribedPublish(msg *packet.Publish) error {
_m, _ := packet.NewMessage(s.version, packet.PUBLISH)
m := _m.(*packet.Publish)
func (s *Type) onSubscribedPublish(p *packet.Publish) error {
_pkt, _ := packet.NewMessage(s.version, packet.PUBLISH)
pkt := _pkt.(*packet.Publish)

// [MQTT-3.3.1-9]
// [MQTT-3.3.1-3]
m.Set(msg.Topic(), msg.Payload(), msg.QoS(), false, false) // nolint: errcheck
pkt.Set(p.Topic(), p.Payload(), p.QoS(), false, false) // nolint: errcheck

s.publisher.pushBack(m)
s.tx.queuePacket(pkt)

return nil
}
Expand All @@ -256,7 +306,7 @@ func (s *Type) publishToTopic(msg *packet.Publish) error {
// [MQTT-3.3.1.3]
if msg.Retain() {
if err := s.messenger.Retain(msg); err != nil {
s.log.prod.Error("Error retaining message", zap.String("ClientID", s.id), zap.Error(err))
s.log.Error("Error retaining message", zap.String("ClientID", s.id), zap.Error(err))
}

// [MQTT-3.3.1-7]
Expand All @@ -272,7 +322,7 @@ func (s *Type) publishToTopic(msg *packet.Publish) error {
}

if err := s.messenger.Publish(msg); err != nil {
s.log.prod.Error("Couldn't publish", zap.String("ClientID", s.id), zap.Error(err))
s.log.Error("Couldn't publish", zap.String("ClientID", s.id), zap.Error(err))
}

return nil
Expand All @@ -298,38 +348,3 @@ func (s *Type) onReleaseOut(msg packet.Provider) {
s.flowControl.release(id)
}
}

// publishWorker publish messages coming from subscribed topics
func (s *Type) publishWorker() {
//defer s.Stop(message.CodeSuccess)
defer s.publisher.stopped.Done()
s.publisher.started.Done()

value := s.publisher.waitForMessage()
for ; value != nil; value = s.publisher.waitForMessage() {
var msg packet.Provider
switch m := value.(type) {
case *packet.Publish:
if m.QoS() != packet.QoS0 {
if id, err := s.flowControl.acquire(); err == nil {
m.SetPacketID(id)
s.pubOut.store(m)
} else {
// if acquire id returned error session is about to exit. Queue message back and get away
s.publisher.pushBack(m)
return
}
}
msg = m
case *unacknowledgedPublish:
msg = m.msg
s.pubOut.store(msg)
}

if _, err := s.conn.WriteMessage(msg, false); err != nil {
// Couldn't deliver message to client thus requeue it back
// Error during write means connection has been closed/timed-out etc
return
}
}
}
Loading

0 comments on commit f82ea41

Please sign in to comment.