diff --git a/cla/tcpcl/0doc.go b/cla/tcpcl/0doc.go new file mode 100644 index 00000000..0878b752 --- /dev/null +++ b/cla/tcpcl/0doc.go @@ -0,0 +1,6 @@ +// Package tcpcl provides a library for the Delay-Tolerant Networking TCP Convergence Layer Protocol Version 4, +// draft-ietf-dtn-tcpclv4-14. +// +// A new TCPCL server can be started by the Listener, which provides multiple connection to its Clients. To reach +// a remote server, a new Client connection can be dialed. +package tcpcl diff --git a/cla/tcpcl/client.go b/cla/tcpcl/client.go index 76103ea8..1fa5cd8f 100644 --- a/cla/tcpcl/client.go +++ b/cla/tcpcl/client.go @@ -17,7 +17,10 @@ import ( // sessTermErr will be returned from a state handler iff a SESS_TERM was received. var sessTermErr = errors.New("SESS_TERM received") -type TCPCLClient struct { +// Client is a TCPCL client for a bidirectional Bundle exchange. Thus, the Client type implements both +// cla.ConvergenceReceiver and cla.ConvergenceSender. A Client can be created by the Listener for incoming +// connections or dialed for outbounding connections. +type Client struct { address string started bool permanent bool @@ -69,8 +72,9 @@ type TCPCLClient struct { reportChan chan cla.ConvergenceStatus } -func NewTCPCLClient(conn net.Conn, endpointID bundle.EndpointID) *TCPCLClient { - return &TCPCLClient{ +// NewClient creates a new Client on an existing connection. This function is used from the Listener. +func NewClient(conn net.Conn, endpointID bundle.EndpointID) *Client { + return &Client{ address: conn.RemoteAddr().String(), conn: conn, active: false, @@ -83,8 +87,9 @@ func NewTCPCLClient(conn net.Conn, endpointID bundle.EndpointID) *TCPCLClient { } } -func Dial(address string, endpointID bundle.EndpointID, permanent bool) *TCPCLClient { - return &TCPCLClient{ +// DialClient tries to establish a new TCPCL Client to a remote server. +func DialClient(address string, endpointID bundle.EndpointID, permanent bool) *Client { + return &Client{ address: address, permanent: permanent, active: true, @@ -97,7 +102,7 @@ func Dial(address string, endpointID bundle.EndpointID, permanent bool) *TCPCLCl } } -func (client *TCPCLClient) String() string { +func (client *Client) String() string { var b strings.Builder fmt.Fprintf(&b, "TCPCL(") @@ -113,14 +118,14 @@ func (client *TCPCLClient) String() string { } // log prepares a new log entry with predefined session data. -func (client *TCPCLClient) log() *log.Entry { +func (client *Client) log() *log.Entry { return log.WithFields(log.Fields{ "session": client, "state": client.state, }) } -func (client *TCPCLClient) Start() (err error, retry bool) { +func (client *Client) Start() (err error, retry bool) { if client.started { if client.active { client.conn = nil @@ -162,27 +167,27 @@ func (client *TCPCLClient) Start() (err error, retry bool) { return } -func (client *TCPCLClient) Close() { +func (client *Client) Close() { client.handleMetaStop <- struct{}{} <-client.handleMetaStopAck } -func (client *TCPCLClient) Channel() chan cla.ConvergenceStatus { +func (client *Client) Channel() chan cla.ConvergenceStatus { return client.reportChan } -func (client *TCPCLClient) Address() string { +func (client *Client) Address() string { return client.address } -func (client *TCPCLClient) IsPermanent() bool { +func (client *Client) IsPermanent() bool { return client.permanent } -func (client *TCPCLClient) GetEndpointID() bundle.EndpointID { +func (client *Client) GetEndpointID() bundle.EndpointID { return client.endpointID } -func (client *TCPCLClient) GetPeerEndpointID() bundle.EndpointID { +func (client *Client) GetPeerEndpointID() bundle.EndpointID { return client.peerEndpointID } diff --git a/cla/tcpcl/client_contact.go b/cla/tcpcl/client_contact.go index eecd3561..f20f937f 100644 --- a/cla/tcpcl/client_contact.go +++ b/cla/tcpcl/client_contact.go @@ -7,7 +7,7 @@ import ( // This file contains code for the Client's contact state. // handleContact manges the contact state for the Contact Header exchange. -func (client *TCPCLClient) handleContact() error { +func (client *Client) handleContact() error { switch { case client.active && !client.contactSent, !client.active && !client.contactSent && client.contactRecv: client.chSent = NewContactHeader(0) diff --git a/cla/tcpcl/client_established.go b/cla/tcpcl/client_established.go index 91b823c5..de2f2d90 100644 --- a/cla/tcpcl/client_established.go +++ b/cla/tcpcl/client_established.go @@ -14,7 +14,7 @@ import ( // This file contains code for the Client's established state. // handleEstablished manges the established state. -func (client *TCPCLClient) handleEstablished() (err error) { +func (client *Client) handleEstablished() (err error) { defer func() { if err != nil && client.keepaliveStarted { client.keepaliveTicker.Stop() @@ -140,12 +140,12 @@ func (client *TCPCLClient) handleEstablished() (err error) { return nil } -func (client *TCPCLClient) Send(bndl *bundle.Bundle) error { +func (client *Client) Send(bndl *bundle.Bundle) error { client.transferOutMutex.Lock() defer client.transferOutMutex.Unlock() if !client.state.IsEstablished() { - return fmt.Errorf("TCPCLClient is not in an established state") + return fmt.Errorf("Client is not in an established state") } client.transferOutId += 1 diff --git a/cla/tcpcl/client_handler.go b/cla/tcpcl/client_handler.go index 4303c70d..29ba6a66 100644 --- a/cla/tcpcl/client_handler.go +++ b/cla/tcpcl/client_handler.go @@ -10,7 +10,8 @@ import ( "github.com/dtn7/dtn7-go/cla" ) -func (client *TCPCLClient) handleMeta() { +// handleMeta supervises the other handlers and propagates shutdown signals. +func (client *Client) handleMeta() { for range client.handleMetaStop { client.log().Info("Handler received stop signal") @@ -27,7 +28,8 @@ func (client *TCPCLClient) handleMeta() { } } -func (client *TCPCLClient) handleConnIn() { +// handleConnIn handles incoming connections. +func (client *Client) handleConnIn() { defer func() { client.log().Debug("Leaving incoming connection handler") client.handleMetaStop <- struct{}{} @@ -63,7 +65,8 @@ func (client *TCPCLClient) handleConnIn() { } } -func (client *TCPCLClient) handleConnOut() { +// handleConnOut handles outgoing connections. +func (client *Client) handleConnOut() { defer func() { client.log().Debug("Leaving outgoing connection handler") client.handleMetaStop <- struct{}{} @@ -99,7 +102,8 @@ func (client *TCPCLClient) handleConnOut() { } } -func (client *TCPCLClient) handleState() { +// handleState handles the current or future state and starts the state's handler. +func (client *Client) handleState() { defer func() { client.log().Debug("Leaving state handler") client.handleMetaStop <- struct{}{} diff --git a/cla/tcpcl/client_init.go b/cla/tcpcl/client_init.go index f0116325..7b8d104a 100644 --- a/cla/tcpcl/client_init.go +++ b/cla/tcpcl/client_init.go @@ -12,7 +12,7 @@ import ( // This file contains code for the Client's contact state. // handleSessInit manges the initialization state. -func (client *TCPCLClient) handleSessInit() error { +func (client *Client) handleSessInit() error { // XXX const ( keepalive = 10 diff --git a/cla/tcpcl/listener.go b/cla/tcpcl/listener.go index 6fdc9b32..259598a9 100644 --- a/cla/tcpcl/listener.go +++ b/cla/tcpcl/listener.go @@ -11,7 +11,9 @@ import ( "github.com/dtn7/dtn7-go/cla" ) -type TCPCLListener struct { +// Listener is a TCPCL server bound to a TCP port to accept incoming TCPCL connections. +// This type implements the cla.ConvergenceProvider and should be supervised by a cla.Manager. +type Listener struct { listenAddress string endpointID bundle.EndpointID manager *cla.Manager @@ -21,8 +23,10 @@ type TCPCLListener struct { stopAck chan struct{} } -func NewTCPCLListener(listenAddress string, endpointID bundle.EndpointID) *TCPCLListener { - return &TCPCLListener{ +// NewListener creates a new Listener which should be bound to the given address and advertises the endpoint ID as +// its own node identifier. +func NewListener(listenAddress string, endpointID bundle.EndpointID) *Listener { + return &Listener{ listenAddress: listenAddress, endpointID: endpointID, @@ -31,11 +35,11 @@ func NewTCPCLListener(listenAddress string, endpointID bundle.EndpointID) *TCPCL } } -func (listener *TCPCLListener) RegisterManager(manager *cla.Manager) { +func (listener *Listener) RegisterManager(manager *cla.Manager) { listener.manager = manager } -func (listener *TCPCLListener) Start() error { +func (listener *Listener) Start() error { tcpAddr, err := net.ResolveTCPAddr("tcp", listener.listenAddress) if err != nil { return err @@ -58,11 +62,11 @@ func (listener *TCPCLListener) Start() error { default: if err := ln.SetDeadline(time.Now().Add(50 * time.Millisecond)); err != nil { log.WithError(err).WithField("cla", listener).Warn( - "TCPCLListener failed to set deadline on TCP socket") + "Listener failed to set deadline on TCP socket") listener.Close() } else if conn, err := ln.Accept(); err == nil { - client := NewTCPCLClient(conn, listener.endpointID) + client := NewClient(conn, listener.endpointID) listener.clas = append(listener.clas, client) listener.manager.Register(client) } @@ -73,11 +77,11 @@ func (listener *TCPCLListener) Start() error { return nil } -func (listener *TCPCLListener) Close() { +func (listener *Listener) Close() { close(listener.stopSyn) <-listener.stopAck } -func (listener TCPCLListener) String() string { +func (listener Listener) String() string { return fmt.Sprintf("tcpcl://%s", listener.listenAddress) } diff --git a/cla/tcpcl/network_test.go b/cla/tcpcl/network_test.go index 51c95d11..89d30165 100644 --- a/cla/tcpcl/network_test.go +++ b/cla/tcpcl/network_test.go @@ -12,7 +12,7 @@ import ( "github.com/dtn7/dtn7-go/cla" ) -func getRandomPort(t *testing.T) int { +func getRandomPort(t *testing.T) (port int) { addr, err := net.ResolveTCPAddr("tcp", "localhost:0") if err != nil { t.Fatal(err) @@ -23,8 +23,13 @@ func getRandomPort(t *testing.T) int { t.Fatal(err) } - defer l.Close() - return l.Addr().(*net.TCPAddr).Port + port = l.Addr().(*net.TCPAddr).Port + + if err := l.Close(); err != nil { + t.Fatal(err) + } + + return } func handleListener(serverAddr string, msgs, clients int, clientWg, serverWg *sync.WaitGroup, errs chan error) { @@ -36,7 +41,7 @@ func handleListener(serverAddr string, msgs, clients int, clientWg, serverWg *sy defer serverWg.Done() manager := cla.NewManager() - manager.Register(NewTCPCLListener(serverAddr, bundle.MustNewEndpointID("dtn://server/"))) + manager.Register(NewListener(serverAddr, bundle.MustNewEndpointID("dtn://server/"))) go func() { for { @@ -91,7 +96,7 @@ func handleClient(serverAddr string, clientNo, msgs, payload int, wg *sync.WaitG var msgsRecv uint32 clientEid := fmt.Sprintf("dtn://client-%d/", clientNo) - client := Dial(serverAddr, bundle.MustNewEndpointID(clientEid), false) + client := DialClient(serverAddr, bundle.MustNewEndpointID(clientEid), false) if err, _ := client.Start(); err != nil { errs <- err return diff --git a/cmd/dtnd/configuration.go b/cmd/dtnd/configuration.go index b77683e3..5e4fa4ba 100644 --- a/cmd/dtnd/configuration.go +++ b/cmd/dtnd/configuration.go @@ -85,7 +85,7 @@ func parseListen(conv convergenceConf, nodeId bundle.EndpointID) (cla.Convergabl return mtcp.NewMTCPServer(conv.Endpoint, nodeId, true), msg, nil case "tcpcl": - listener := tcpcl.NewTCPCLListener(conv.Endpoint, nodeId) + listener := tcpcl.NewListener(conv.Endpoint, nodeId) msg := discovery.DiscoveryMessage{ Type: discovery.TCPCL, @@ -111,7 +111,7 @@ func parsePeer(conv convergenceConf, nodeId bundle.EndpointID) (cla.ConvergenceS return mtcp.NewMTCPClient(conv.Endpoint, endpointID, true), nil case "tcpcl": - return tcpcl.Dial(conv.Endpoint, nodeId, true), nil + return tcpcl.DialClient(conv.Endpoint, nodeId, true), nil default: return nil, fmt.Errorf("Unknown peer.protocol \"%s\"", conv.Protocol) diff --git a/discovery/service.go b/discovery/service.go index a18cf7b2..5b143b1c 100644 --- a/discovery/service.go +++ b/discovery/service.go @@ -68,7 +68,7 @@ func (ds *DiscoveryService) handleDiscovery(dm DiscoveryMessage, addr string) { client = mtcp.NewMTCPClient(fmt.Sprintf("%s:%d", addr, dm.Port), dm.Endpoint, false) case TCPCL: - client = tcpcl.Dial(fmt.Sprintf("%s:%d", addr, dm.Port), ds.c.NodeId, false) + client = tcpcl.DialClient(fmt.Sprintf("%s:%d", addr, dm.Port), ds.c.NodeId, false) default: log.WithFields(log.Fields{