Skip to content
This repository has been archived by the owner on Apr 5, 2024. It is now read-only.

Commit

Permalink
TCPCL: Seperate handlers, fix race condition
Browse files Browse the repository at this point in the history
  • Loading branch information
oxzi committed Oct 11, 2019
1 parent 8f27155 commit 1a5d9b4
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 148 deletions.
142 changes: 16 additions & 126 deletions cla/tcpcl/client.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package tcpcl

import (
"bufio"
"errors"
"fmt"
"io"
"net"
"strings"
"sync"
"sync/atomic"
"time"

log "github.com/sirupsen/logrus"
Expand All @@ -32,7 +29,11 @@ type TCPCLClient struct {
msgsOut chan Message
msgsIn chan Message

handleCounter int32
handleMetaStop chan struct{}
handleMetaStopAck chan struct{}
handlerConnInStop chan struct{}
handlerConnOutStop chan struct{}
handlerStateStop chan struct{}

active bool
state *ClientState
Expand Down Expand Up @@ -145,136 +146,25 @@ func (client *TCPCLClient) Start() (err error, retry bool) {

client.log().Info("Starting client")

client.handleMetaStop = make(chan struct{}, 10)
client.handleMetaStopAck = make(chan struct{}, 2)
client.handlerConnInStop = make(chan struct{}, 2)
client.handlerConnOutStop = make(chan struct{}, 2)
client.handlerStateStop = make(chan struct{}, 2)

client.reportChan = make(chan cla.ConvergenceStatus, 100)

client.handleCounter = 2
go client.handleConnection()
go client.handleMeta()
go client.handleConnIn()
go client.handleConnOut()
go client.handleState()

return
}

func (client *TCPCLClient) handleConnection() {
defer func() {
client.log().Debug("Leaving connection handler function")
client.state.Terminate()

atomic.AddInt32(&client.handleCounter, -1)
}()

// var rw = bufio.NewReadWriter(bufio.NewReader(client.conn), bufio.NewWriter(client.conn))
var r = bufio.NewReader(client.conn)
var w = bufio.NewWriter(client.conn)

for {
select {
case msg := <-client.msgsOut:
if err := msg.Marshal(w); err != nil {
client.log().WithError(err).WithField("msg", msg).Error("Sending message errored")
return
} else if err := w.Flush(); err != nil {
client.log().WithError(err).WithField("msg", msg).Error("Flushing errored")
return
} else {
client.log().WithField("msg", msg).Debug("Sent message")
}

if _, ok := msg.(*SessionTerminationMessage); ok {
client.log().WithField("msg", msg).Debug("Closing connection after sending SESS_TERM")

if err := client.conn.Close(); err != nil {
client.log().WithError(err).Warn("Failed to close TCP connection")
}
return
}

default:
if err := client.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond)); err != nil {
client.log().WithError(err).Error("Setting read deadline errored")
return
}

if msg, err := ReadMessage(r); err == nil {
client.log().WithField("msg", msg).Debug("Received message")
client.msgsIn <- msg
} else if err == io.EOF {
client.log().Info("Read EOF, closing down.")
return
} else if netErr, ok := err.(net.Error); ok && !netErr.Timeout() {
client.log().WithError(netErr).Error("Network error occured")
return
} else if !ok {
client.log().WithError(err).Error("Parsing next message errored")
return
}
}
}
}

func (client *TCPCLClient) handleState() {
defer func() {
client.log().Debug("Leaving state handler function")

atomic.AddInt32(&client.handleCounter, -1)
}()

for {
switch {
case !client.state.IsTerminated():
var stateHandler func() error

switch {
case client.state.IsContact():
stateHandler = client.handleContact
case client.state.IsInit():
stateHandler = client.handleSessInit
case client.state.IsEstablished():
stateHandler = client.handleEstablished
default:
client.log().WithField("state", client.state).Fatal("Illegal state")
}

if err := stateHandler(); err != nil {
if err == sessTermErr {
client.log().Info("Received SESS_TERM, switching to Termination state")
} else {
client.log().WithError(err).Warn("State handler errored")
}

client.state.Terminate()
goto terminationCase
}
break

terminationCase:
fallthrough

default:
client.log().Info("Entering Termination state")

var sessTerm = NewSessionTerminationMessage(0, TerminationUnknown)
client.msgsOut <- &sessTerm

emptyEndpoint := bundle.EndpointID{}
if client.endpointID != emptyEndpoint {
client.reportChan <- cla.NewConvergencePeerDisappeared(client, client.peerEndpointID)
}

return
}

if atomic.LoadInt32(&client.handleCounter) != 2 {
return
}
}
}

func (client *TCPCLClient) Close() {
client.state.Terminate()

for atomic.LoadInt32(&client.handleCounter) > 0 {
time.Sleep(time.Millisecond)
}
client.handleMetaStop <- struct{}{}
<-client.handleMetaStopAck
}

func (client *TCPCLClient) Channel() chan cla.ConvergenceStatus {
Expand Down
159 changes: 159 additions & 0 deletions cla/tcpcl/client_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
package tcpcl

import (
"bufio"
"io"
"net"
"time"

"github.com/dtn7/dtn7-go/bundle"
"github.com/dtn7/dtn7-go/cla"
)

func (client *TCPCLClient) handleMeta() {
for range client.handleMetaStop {
client.log().Info("Handler received stop signal")

client.state.Terminate()

chans := []chan struct{}{client.handlerConnInStop, client.handlerConnOutStop, client.handlerStateStop}
for _, chn := range chans {
close(chn)
}

close(client.handleMetaStopAck)

return
}
}

func (client *TCPCLClient) handleConnIn() {
defer func() {
client.log().Debug("Leaving incoming connection handler")
client.handleMetaStop <- struct{}{}
}()

var r = bufio.NewReader(client.conn)

for {
select {
case <-client.handlerConnInStop:
return

default:
if err := client.conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
client.log().WithError(err).Error("Setting read deadline errored")
return
}

if msg, err := ReadMessage(r); err == nil {
client.log().WithField("msg", msg).Debug("Received message")
client.msgsIn <- msg
} else if err == io.EOF {
client.log().Info("Read EOF, closing down.")
return
} else if netErr, ok := err.(net.Error); ok && !netErr.Timeout() {
client.log().WithError(netErr).Error("Network error occurred")
return
} else if !ok {
client.log().WithError(err).Error("Parsing next message errored")
return
}
}
}
}

func (client *TCPCLClient) handleConnOut() {
defer func() {
client.log().Debug("Leaving outgoing connection handler")
client.handleMetaStop <- struct{}{}
}()

var w = bufio.NewWriter(client.conn)

for {
select {
case <-client.handlerConnOutStop:
return

case msg := <-client.msgsOut:
if err := msg.Marshal(w); err != nil {
client.log().WithError(err).WithField("msg", msg).Error("Sending message errored")
return
} else if err := w.Flush(); err != nil {
client.log().WithError(err).WithField("msg", msg).Error("Flushing errored")
return
} else {
client.log().WithField("msg", msg).Debug("Sent message")
}

if _, ok := msg.(*SessionTerminationMessage); ok {
client.log().WithField("msg", msg).Debug("Closing connection after sending SESS_TERM")

if err := client.conn.Close(); err != nil {
client.log().WithError(err).Warn("Failed to close TCP connection")
}
return
}
}
}
}

func (client *TCPCLClient) handleState() {
defer func() {
client.log().Debug("Leaving state handler")
client.handleMetaStop <- struct{}{}
}()

for {
select {
case <-client.handlerStateStop:
return

default:
switch {
case !client.state.IsTerminated():
var stateHandler func() error

switch {
case client.state.IsContact():
stateHandler = client.handleContact
case client.state.IsInit():
stateHandler = client.handleSessInit
case client.state.IsEstablished():
stateHandler = client.handleEstablished
default:
client.log().WithField("state", client.state).Fatal("Illegal state")
}

if err := stateHandler(); err != nil {
if err == sessTermErr {
client.log().Info("Received SESS_TERM, switching to Termination state")
} else {
client.log().WithError(err).Warn("State handler errored")
}

client.state.Terminate()
goto terminationCase
}
break

terminationCase:
fallthrough

default:
client.log().Info("Entering Termination state")

var sessTerm = NewSessionTerminationMessage(0, TerminationUnknown)
client.msgsOut <- &sessTerm

emptyEndpoint := bundle.EndpointID{}
if client.endpointID != emptyEndpoint {
client.reportChan <- cla.NewConvergencePeerDisappeared(client, client.peerEndpointID)
}

return
}
}
}
}
2 changes: 1 addition & 1 deletion cla/tcpcl/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ var messages = map[uint8]Message{
func NewMessage(typeCode uint8) (msg Message, err error) {
msgType, exists := messages[typeCode]
if !exists {
err = fmt.Errorf("No TCPCL Message registered for type code %d", typeCode)
err = fmt.Errorf("No TCPCL Message registered for type code %X", typeCode)
return
}

Expand Down
9 changes: 4 additions & 5 deletions cla/tcpcl/message_xfer_segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,11 @@ func (dtm *DataTransmissionMessage) Unmarshal(r io.Reader) error {
if err := binary.Read(r, binary.BigEndian, &dataLen); err != nil {
return err
} else if dataLen > 0 {
dataBuff := make([]byte, dataLen)

if _, err := io.ReadFull(r, dataBuff); err != nil {
dtm.Data = make([]byte, dataLen)
if _, err := io.ReadFull(r, dtm.Data); err != nil {
return err
} else {
dtm.Data = dataBuff
} else if dataLen != uint64(len(dtm.Data)) {
return fmt.Errorf("XFER_SEGMENT's data length should be %d, got %d bytes", dataLen, len(dtm.Data))
}
}

Expand Down
Loading

0 comments on commit 1a5d9b4

Please sign in to comment.