Skip to content
This repository has been archived by the owner on Apr 5, 2024. It is now read-only.

Commit

Permalink
TCPCL: Send and receive KEEPALIVE messages
Browse files Browse the repository at this point in the history
  • Loading branch information
oxzi committed Sep 9, 2019
1 parent ecc6c98 commit d9c49fe
Showing 1 changed file with 79 additions and 6 deletions.
85 changes: 79 additions & 6 deletions cla/tcpcl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,21 +62,30 @@ type TCPCLClient struct {
keepalive uint16
segmentMru uint64
transferMru uint64

// Established state fields:
keepaliveStarted bool
keepaliveStopSyn chan struct{}
keepaliveStopAck chan struct{}
}

func NewTCPCLClient(conn net.Conn, endpointID bundle.EndpointID) *TCPCLClient {
return &TCPCLClient{
conn: conn,
active: false,
endpointID: endpointID,
conn: conn,
active: false,
endpointID: endpointID,
keepaliveStopSyn: make(chan struct{}),
keepaliveStopAck: make(chan struct{}),
}
}

func Dial(address string, endpointID bundle.EndpointID) *TCPCLClient {
return &TCPCLClient{
address: address,
active: true,
endpointID: endpointID,
address: address,
active: true,
endpointID: endpointID,
keepaliveStopSyn: make(chan struct{}),
keepaliveStopAck: make(chan struct{}),
}
}

Expand Down Expand Up @@ -131,6 +140,14 @@ func (client *TCPCLClient) handler() {
client.terminate(TerminationUnknown)
return
}

case Established:
if err := client.handleEstablished(); err != nil {
logger.WithError(err).Warn("Error occured during established session")

// TODO
return
}
}
}
}
Expand Down Expand Up @@ -236,6 +253,62 @@ func (client *TCPCLClient) handleSessInit() error {
return nil
}

func (client *TCPCLClient) keepaliveHandler() {
var logger = log.WithField("session", client)

var keepaliveTicker = time.NewTicker(time.Duration(client.keepalive) * time.Second)
defer keepaliveTicker.Stop()

for {
select {
case <-keepaliveTicker.C:
var keepaliveMsg = NewKeepaliveMessage()
if err := keepaliveMsg.Marshal(client.rw); err != nil {
logger.WithError(err).Warn("Sending KEEPALIVE errored")
} else if err := client.rw.Flush(); err != nil {
logger.WithError(err).Warn("Flushing KEEPALIVE errored")
} else {
log.WithField("msg", keepaliveMsg).Debug("Sent KEEPALIVE message")
}

case <-client.keepaliveStopSyn:
close(client.keepaliveStopAck)
return
}
}
}

func (client *TCPCLClient) handleEstablished() error {
var logger = log.WithField("session", client)

if !client.keepaliveStarted {
go client.keepaliveHandler()
client.keepaliveStarted = true
}

nextMsg, nextMsgErr := client.rw.ReadByte()
if nextMsgErr != nil {
return nextMsgErr
} else if err := client.rw.UnreadByte(); err != nil {
return err
}

switch nextMsg {
case KEEPALIVE:
var keepaliveMsg KeepaliveMessage
if err := keepaliveMsg.Unmarshal(client.rw); err != nil {
return err
} else {
logger.WithField("msg", keepaliveMsg).Debug("Received KEEPALIVE message")
}

default:
logger.WithField("magic", nextMsg).Debug("Received unsupported magic")
}

return nil
}

// terminate sends a SESS_TERM message to its peer and closes the session afterwards.
func (client *TCPCLClient) terminate(code SessionTerminationCode) {
var logger = log.WithField("session", client)
Expand Down

0 comments on commit d9c49fe

Please sign in to comment.