Skip to content
This repository has been archived by the owner on Apr 5, 2024. It is now read-only.

Commit

Permalink
TCPCL: Refactore and document package
Browse files Browse the repository at this point in the history
  • Loading branch information
oxzi committed Oct 11, 2019
1 parent 1a5d9b4 commit 5b512f7
Show file tree
Hide file tree
Showing 10 changed files with 64 additions and 40 deletions.
6 changes: 6 additions & 0 deletions cla/tcpcl/0doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Package tcpcl provides a library for the Delay-Tolerant Networking TCP Convergence Layer Protocol Version 4,
// draft-ietf-dtn-tcpclv4-14.
//
// A new TCPCL server can be started by the Listener, which provides multiple connection to its Clients. To reach
// a remote server, a new Client connection can be dialed.
package tcpcl
33 changes: 19 additions & 14 deletions cla/tcpcl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ import (
// sessTermErr will be returned from a state handler iff a SESS_TERM was received.
var sessTermErr = errors.New("SESS_TERM received")

type TCPCLClient struct {
// Client is a TCPCL client for a bidirectional Bundle exchange. Thus, the Client type implements both
// cla.ConvergenceReceiver and cla.ConvergenceSender. A Client can be created by the Listener for incoming
// connections or dialed for outbounding connections.
type Client struct {
address string
started bool
permanent bool
Expand Down Expand Up @@ -69,8 +72,9 @@ type TCPCLClient struct {
reportChan chan cla.ConvergenceStatus
}

func NewTCPCLClient(conn net.Conn, endpointID bundle.EndpointID) *TCPCLClient {
return &TCPCLClient{
// NewClient creates a new Client on an existing connection. This function is used from the Listener.
func NewClient(conn net.Conn, endpointID bundle.EndpointID) *Client {
return &Client{
address: conn.RemoteAddr().String(),
conn: conn,
active: false,
Expand All @@ -83,8 +87,9 @@ func NewTCPCLClient(conn net.Conn, endpointID bundle.EndpointID) *TCPCLClient {
}
}

func Dial(address string, endpointID bundle.EndpointID, permanent bool) *TCPCLClient {
return &TCPCLClient{
// DialClient tries to establish a new TCPCL Client to a remote server.
func DialClient(address string, endpointID bundle.EndpointID, permanent bool) *Client {
return &Client{
address: address,
permanent: permanent,
active: true,
Expand All @@ -97,7 +102,7 @@ func Dial(address string, endpointID bundle.EndpointID, permanent bool) *TCPCLCl
}
}

func (client *TCPCLClient) String() string {
func (client *Client) String() string {
var b strings.Builder

fmt.Fprintf(&b, "TCPCL(")
Expand All @@ -113,14 +118,14 @@ func (client *TCPCLClient) String() string {
}

// log prepares a new log entry with predefined session data.
func (client *TCPCLClient) log() *log.Entry {
func (client *Client) log() *log.Entry {
return log.WithFields(log.Fields{
"session": client,
"state": client.state,
})
}

func (client *TCPCLClient) Start() (err error, retry bool) {
func (client *Client) Start() (err error, retry bool) {
if client.started {
if client.active {
client.conn = nil
Expand Down Expand Up @@ -162,27 +167,27 @@ func (client *TCPCLClient) Start() (err error, retry bool) {
return
}

func (client *TCPCLClient) Close() {
func (client *Client) Close() {
client.handleMetaStop <- struct{}{}
<-client.handleMetaStopAck
}

func (client *TCPCLClient) Channel() chan cla.ConvergenceStatus {
func (client *Client) Channel() chan cla.ConvergenceStatus {
return client.reportChan
}

func (client *TCPCLClient) Address() string {
func (client *Client) Address() string {
return client.address
}

func (client *TCPCLClient) IsPermanent() bool {
func (client *Client) IsPermanent() bool {
return client.permanent
}

func (client *TCPCLClient) GetEndpointID() bundle.EndpointID {
func (client *Client) GetEndpointID() bundle.EndpointID {
return client.endpointID
}

func (client *TCPCLClient) GetPeerEndpointID() bundle.EndpointID {
func (client *Client) GetPeerEndpointID() bundle.EndpointID {
return client.peerEndpointID
}
2 changes: 1 addition & 1 deletion cla/tcpcl/client_contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
// This file contains code for the Client's contact state.

// handleContact manges the contact state for the Contact Header exchange.
func (client *TCPCLClient) handleContact() error {
func (client *Client) handleContact() error {
switch {
case client.active && !client.contactSent, !client.active && !client.contactSent && client.contactRecv:
client.chSent = NewContactHeader(0)
Expand Down
6 changes: 3 additions & 3 deletions cla/tcpcl/client_established.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
// This file contains code for the Client's established state.

// handleEstablished manges the established state.
func (client *TCPCLClient) handleEstablished() (err error) {
func (client *Client) handleEstablished() (err error) {
defer func() {
if err != nil && client.keepaliveStarted {
client.keepaliveTicker.Stop()
Expand Down Expand Up @@ -140,12 +140,12 @@ func (client *TCPCLClient) handleEstablished() (err error) {
return nil
}

func (client *TCPCLClient) Send(bndl *bundle.Bundle) error {
func (client *Client) Send(bndl *bundle.Bundle) error {
client.transferOutMutex.Lock()
defer client.transferOutMutex.Unlock()

if !client.state.IsEstablished() {
return fmt.Errorf("TCPCLClient is not in an established state")
return fmt.Errorf("Client is not in an established state")
}

client.transferOutId += 1
Expand Down
12 changes: 8 additions & 4 deletions cla/tcpcl/client_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"github.com/dtn7/dtn7-go/cla"
)

func (client *TCPCLClient) handleMeta() {
// handleMeta supervises the other handlers and propagates shutdown signals.
func (client *Client) handleMeta() {
for range client.handleMetaStop {
client.log().Info("Handler received stop signal")

Expand All @@ -27,7 +28,8 @@ func (client *TCPCLClient) handleMeta() {
}
}

func (client *TCPCLClient) handleConnIn() {
// handleConnIn handles incoming connections.
func (client *Client) handleConnIn() {
defer func() {
client.log().Debug("Leaving incoming connection handler")
client.handleMetaStop <- struct{}{}
Expand Down Expand Up @@ -63,7 +65,8 @@ func (client *TCPCLClient) handleConnIn() {
}
}

func (client *TCPCLClient) handleConnOut() {
// handleConnOut handles outgoing connections.
func (client *Client) handleConnOut() {
defer func() {
client.log().Debug("Leaving outgoing connection handler")
client.handleMetaStop <- struct{}{}
Expand Down Expand Up @@ -99,7 +102,8 @@ func (client *TCPCLClient) handleConnOut() {
}
}

func (client *TCPCLClient) handleState() {
// handleState handles the current or future state and starts the state's handler.
func (client *Client) handleState() {
defer func() {
client.log().Debug("Leaving state handler")
client.handleMetaStop <- struct{}{}
Expand Down
2 changes: 1 addition & 1 deletion cla/tcpcl/client_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
// This file contains code for the Client's contact state.

// handleSessInit manges the initialization state.
func (client *TCPCLClient) handleSessInit() error {
func (client *Client) handleSessInit() error {
// XXX
const (
keepalive = 10
Expand Down
22 changes: 13 additions & 9 deletions cla/tcpcl/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ import (
"github.com/dtn7/dtn7-go/cla"
)

type TCPCLListener struct {
// Listener is a TCPCL server bound to a TCP port to accept incoming TCPCL connections.
// This type implements the cla.ConvergenceProvider and should be supervised by a cla.Manager.
type Listener struct {
listenAddress string
endpointID bundle.EndpointID
manager *cla.Manager
Expand All @@ -21,8 +23,10 @@ type TCPCLListener struct {
stopAck chan struct{}
}

func NewTCPCLListener(listenAddress string, endpointID bundle.EndpointID) *TCPCLListener {
return &TCPCLListener{
// NewListener creates a new Listener which should be bound to the given address and advertises the endpoint ID as
// its own node identifier.
func NewListener(listenAddress string, endpointID bundle.EndpointID) *Listener {
return &Listener{
listenAddress: listenAddress,
endpointID: endpointID,

Expand All @@ -31,11 +35,11 @@ func NewTCPCLListener(listenAddress string, endpointID bundle.EndpointID) *TCPCL
}
}

func (listener *TCPCLListener) RegisterManager(manager *cla.Manager) {
func (listener *Listener) RegisterManager(manager *cla.Manager) {
listener.manager = manager
}

func (listener *TCPCLListener) Start() error {
func (listener *Listener) Start() error {
tcpAddr, err := net.ResolveTCPAddr("tcp", listener.listenAddress)
if err != nil {
return err
Expand All @@ -58,11 +62,11 @@ func (listener *TCPCLListener) Start() error {
default:
if err := ln.SetDeadline(time.Now().Add(50 * time.Millisecond)); err != nil {
log.WithError(err).WithField("cla", listener).Warn(
"TCPCLListener failed to set deadline on TCP socket")
"Listener failed to set deadline on TCP socket")

listener.Close()
} else if conn, err := ln.Accept(); err == nil {
client := NewTCPCLClient(conn, listener.endpointID)
client := NewClient(conn, listener.endpointID)
listener.clas = append(listener.clas, client)
listener.manager.Register(client)
}
Expand All @@ -73,11 +77,11 @@ func (listener *TCPCLListener) Start() error {
return nil
}

func (listener *TCPCLListener) Close() {
func (listener *Listener) Close() {
close(listener.stopSyn)
<-listener.stopAck
}

func (listener TCPCLListener) String() string {
func (listener Listener) String() string {
return fmt.Sprintf("tcpcl://%s", listener.listenAddress)
}
15 changes: 10 additions & 5 deletions cla/tcpcl/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/dtn7/dtn7-go/cla"
)

func getRandomPort(t *testing.T) int {
func getRandomPort(t *testing.T) (port int) {
addr, err := net.ResolveTCPAddr("tcp", "localhost:0")
if err != nil {
t.Fatal(err)
Expand All @@ -23,8 +23,13 @@ func getRandomPort(t *testing.T) int {
t.Fatal(err)
}

defer l.Close()
return l.Addr().(*net.TCPAddr).Port
port = l.Addr().(*net.TCPAddr).Port

if err := l.Close(); err != nil {
t.Fatal(err)
}

return
}

func handleListener(serverAddr string, msgs, clients int, clientWg, serverWg *sync.WaitGroup, errs chan error) {
Expand All @@ -36,7 +41,7 @@ func handleListener(serverAddr string, msgs, clients int, clientWg, serverWg *sy
defer serverWg.Done()

manager := cla.NewManager()
manager.Register(NewTCPCLListener(serverAddr, bundle.MustNewEndpointID("dtn://server/")))
manager.Register(NewListener(serverAddr, bundle.MustNewEndpointID("dtn://server/")))

go func() {
for {
Expand Down Expand Up @@ -91,7 +96,7 @@ func handleClient(serverAddr string, clientNo, msgs, payload int, wg *sync.WaitG
var msgsRecv uint32

clientEid := fmt.Sprintf("dtn://client-%d/", clientNo)
client := Dial(serverAddr, bundle.MustNewEndpointID(clientEid), false)
client := DialClient(serverAddr, bundle.MustNewEndpointID(clientEid), false)
if err, _ := client.Start(); err != nil {
errs <- err
return
Expand Down
4 changes: 2 additions & 2 deletions cmd/dtnd/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func parseListen(conv convergenceConf, nodeId bundle.EndpointID) (cla.Convergabl
return mtcp.NewMTCPServer(conv.Endpoint, nodeId, true), msg, nil

case "tcpcl":
listener := tcpcl.NewTCPCLListener(conv.Endpoint, nodeId)
listener := tcpcl.NewListener(conv.Endpoint, nodeId)

msg := discovery.DiscoveryMessage{
Type: discovery.TCPCL,
Expand All @@ -111,7 +111,7 @@ func parsePeer(conv convergenceConf, nodeId bundle.EndpointID) (cla.ConvergenceS
return mtcp.NewMTCPClient(conv.Endpoint, endpointID, true), nil

case "tcpcl":
return tcpcl.Dial(conv.Endpoint, nodeId, true), nil
return tcpcl.DialClient(conv.Endpoint, nodeId, true), nil

default:
return nil, fmt.Errorf("Unknown peer.protocol \"%s\"", conv.Protocol)
Expand Down
2 changes: 1 addition & 1 deletion discovery/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (ds *DiscoveryService) handleDiscovery(dm DiscoveryMessage, addr string) {
client = mtcp.NewMTCPClient(fmt.Sprintf("%s:%d", addr, dm.Port), dm.Endpoint, false)

case TCPCL:
client = tcpcl.Dial(fmt.Sprintf("%s:%d", addr, dm.Port), ds.c.NodeId, false)
client = tcpcl.DialClient(fmt.Sprintf("%s:%d", addr, dm.Port), ds.c.NodeId, false)

default:
log.WithFields(log.Fields{
Expand Down

0 comments on commit 5b512f7

Please sign in to comment.