Skip to content
This repository has been archived by the owner on Apr 5, 2024. It is now read-only.

[WIP] TCP Convergence Layer Protocol Version 4 #15

Merged
merged 53 commits into from
Oct 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
c599b88
TCPCL: Contact Header
oxzi Sep 2, 2019
902529b
TCPCL: Session Initialization Message
oxzi Sep 5, 2019
50df855
TCPCL: Keepalive/Session Upkeep Message
oxzi Sep 5, 2019
a0d3af0
TCPCL: Message Rejection Message
oxzi Sep 5, 2019
ca51240
TCPCL: Treat Contact Header's flags as bit flags
oxzi Sep 5, 2019
8e11f26
TCPCL: Data Transmission Message
oxzi Sep 5, 2019
fbf9215
TCPCL: Data Acknowledgement Message
oxzi Sep 5, 2019
1ba7f62
TCPCL: Transfer Refusal Message
oxzi Sep 5, 2019
88b7549
TCPCL: Session Termination Message
oxzi Sep 5, 2019
7737427
TCPCL: Check Reason Codes and ignore unknown Flags
oxzi Sep 5, 2019
bb0ff65
TCPCL: Marshal/Unmarshal with io.Writer/io.Reader
oxzi Sep 6, 2019
7bf47b8
TCPCL: First server/client, working Contact Header
oxzi Sep 6, 2019
7321847
TCPCL: Refactored Contact Header Handler
oxzi Sep 9, 2019
c836a45
TCPCL: SESS_INIT support for the client code
oxzi Sep 9, 2019
d0307f0
TCPCL: Negotiate Session Parameters
oxzi Sep 9, 2019
c24c7b0
TCPCL: Send and receive KEEPALIVE messages
oxzi Sep 9, 2019
c593f6e
TCPCL: Refactored Client state into own file
oxzi Sep 14, 2019
dec02ba
TCPCL: Move Client State code to own files
oxzi Sep 14, 2019
d5a25bb
TCPCL: Check lasts KEEPALIVE reception time
oxzi Sep 14, 2019
ab47ea0
TCPCL: Handle SESS_TERM
oxzi Sep 14, 2019
35bb2ca
TCPCL: Change client state Next function
oxzi Sep 14, 2019
40f17ff
TCPCL: Initial XFER_SEGMENT support
oxzi Sep 15, 2019
1cc5b52
TCPCL: Crop null bytes for outbunding Transfers
oxzi Sep 21, 2019
a0bfd1a
TCPCL: Generic Message type and parsing
oxzi Sep 21, 2019
447d6ef
TCPCL: Central message exchange handler
oxzi Sep 23, 2019
d37bbfe
TCPCL: Add KEEPALIVE to the Established state
oxzi Sep 23, 2019
0ea17ee
TCPCL: Synchronize shutting down handlers
oxzi Sep 23, 2019
ffce4b6
TCPCL: Protect against race conditions
oxzi Sep 24, 2019
dfe7e10
TCPCL: Channel to link Send and handleEstablished
oxzi Sep 24, 2019
11f9d8a
TCPCL: XFER_ACK for Transfers
oxzi Sep 24, 2019
c9665c1
TCPCL: Manage incoming Transfers
oxzi Sep 24, 2019
5a94f00
TCPCL: Deserialize Bundle from incoming Transfer
oxzi Sep 24, 2019
5831345
TCPCL: TCPCLient is Convergence{Receiver,Sender}
oxzi Oct 7, 2019
08710ef
TCPCL: Handle TCPCLClient's ConvergenceMessages
oxzi Oct 7, 2019
7b65132
TCPCL: Network test
oxzi Oct 7, 2019
ff218ff
TCPCL: Count messages in tests
oxzi Oct 7, 2019
4a40fa5
TCPCL: Server to Listener, bound to cla.Manager
oxzi Oct 7, 2019
3de7939
TCPCL: Closing Listener will also close Clients
oxzi Oct 7, 2019
c59c697
TCPCL: Send bundles bidirectionally for tests
oxzi Oct 7, 2019
590fccd
ConvergenceProvider type for middleware CLAs
oxzi Oct 7, 2019
979e264
TCPCL: Listener implements ConvergenceProvider
oxzi Oct 7, 2019
09ae000
TCPCL: Simplify type switches
oxzi Oct 7, 2019
d438824
Implement TCPCL in discovery
oxzi Oct 8, 2019
702bf6e
Add TCPCL to dtnd
oxzi Oct 8, 2019
db52c46
Discovery: Stop discovering yourself
oxzi Oct 8, 2019
81a3f14
Configure dtnd's CLAs with correct node IDs
oxzi Oct 8, 2019
253fda5
TCPCL: Fix XFER_SEGMENT's END flag
oxzi Oct 8, 2019
3213969
TCPCL: Ensure to fill buffer completely
oxzi Oct 9, 2019
9f3ad3c
TCPCL: Merge binary.Reads and Writes
oxzi Oct 9, 2019
0d13b25
TCPCL: Test Transfers
oxzi Oct 9, 2019
4f94560
TCPCL: Increase Client's verbosity
oxzi Oct 10, 2019
93bdcdf
TCPCL: Seperate handlers, fix race condition
oxzi Oct 11, 2019
744b30c
TCPCL: Refactore and document package
oxzi Oct 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions cla/convergence_layer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Package cla defines two interfaces for convergence layers.
// Package cla defines two interfaces for Convergence Layer Adapters.
//
// The ConvergenceReceiver specifies a type which receives bundles and forwards
// those to an exposed channel.
Expand All @@ -9,23 +9,38 @@
// An implemented convergence layer can be a ConvergenceReceiver,
// ConvergenceSender or even both. This depends on the convergence layer's
// specification and is an implemention matter.
//
// Furthermore, the ConvergenceProvider provides the ability to create new
// instances of Convergence objects.
//
// Those types are generalized by the Convergable interface.
//
// A centralized instance for CLA management offers the Manager, designed to
// work seamlessly with the types above.
package cla

import "github.com/dtn7/dtn7-go/bundle"

// Convergable describes any kind of type which supports convergence layer-
// related services. This can be both a more specified Convergence interface
// type or a ConvergenceProvider.
type Convergable interface {
// Close signals this Convergable to shut down.
Close()
}

// Convergence is an interface to describe all kinds of Convergence Layer
// Adapters. There should not be a direct implemention of this interface. One
// must implement ConvergenceReceiver and/or ConvergenceSender, which are both
// extending this interface.
// A type can be both a ConvergenceReceiver and ConvergenceSender.
type Convergence interface {
Convergable

// Start starts this Convergence{Receiver,Sender} and might return an error
// and a boolean indicating if another Start should be tried later.
Start() (error, bool)

// Close signals this Convergence{Receiver,Send} to shut down.
Close()

// Channel represents a return channel for transmitted bundles, status
// messages, etc.
Channel() chan ConvergenceStatus
Expand Down Expand Up @@ -62,3 +77,20 @@ type ConvergenceSender interface {
// if it's known. Otherwise the zero endpoint will be returned.
GetPeerEndpointID() bundle.EndpointID
}

// ConvergenceProvider is a more general kind of CLA service which does not
// transfer any Bundles by itself, but supplies/creates new Convergence types.
// Those Convergence objects will be passed to a Manager. Thus, one might think
// of a ConvergenceProvider as some kind of middleware.
type ConvergenceProvider interface {
Convergable

// RegisterManager tells the ConvergenceProvider where to report new instances
// of Convergence to.
RegisterManager(*Manager)

// Start starts this ConvergenceProvider. Before being started, the the
// RegisterManager method tells this ConvergenceProvider its Manager. However,
// the Manager will both call the RegisterManager and Start methods.
Start() error
}
79 changes: 71 additions & 8 deletions cla/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ type Manager struct {
// convs: Map[string]*convergenceElem
convs *sync.Map

// providers is an array of ConvergenceProvider. Those will report their
// created Convergence objects to this Manager, which also supervises it.
providers []ConvergenceProvider
providersMutex sync.Mutex

// inChnl receives ConvergenceStatus while outChnl passes it on. Both channels
// are not buffered. While this is not a problem for inChnl, outChnl must
// always be read, otherwise the Manager will block.
Expand All @@ -48,7 +53,7 @@ func NewManager() *Manager {

convs: new(sync.Map),

inChnl: make(chan ConvergenceStatus),
inChnl: make(chan ConvergenceStatus, 100),
outChnl: make(chan ConvergenceStatus),

stopSyn: make(chan struct{}),
Expand All @@ -70,13 +75,19 @@ func (manager *Manager) handler() {
for {
select {
case <-manager.stopSyn:
log.Debug("CLA Manager received closing-signal")
log.Debug("CLA Manager received closing signal")

manager.convs.Range(func(_, convElem interface{}) bool {
manager.Unregister(convElem.(*convergenceElem).conv)
return true
})

manager.providersMutex.Lock()
for _, provider := range manager.providers {
provider.Close()
}
manager.providersMutex.Unlock()

close(manager.inChnl)
close(manager.outChnl)

Expand Down Expand Up @@ -146,12 +157,22 @@ func (manager *Manager) Close() {
<-manager.stopAck
}

// Register a new CLA.
func (manager *Manager) Register(conv Convergence) {
// Register any kind of Convergable.
func (manager *Manager) Register(conv Convergable) {
if manager.isStopped() {
return
}

if c, ok := conv.(Convergence); ok {
manager.registerConvergence(c)
} else if c, ok := conv.(ConvergenceProvider); ok {
manager.registerProvider(c)
} else {
log.WithField("convergence", conv).Warn("Unknown kind of Convergable")
}
}

func (manager *Manager) registerConvergence(conv Convergence) {
// Check if this CLA is already known. Re-activate a deactivated CLA or abort.
var ce *convergenceElem
if convElem, exists := manager.convs.Load(conv.Address()); exists {
Expand Down Expand Up @@ -192,8 +213,38 @@ func (manager *Manager) Register(conv Convergence) {
}
}

// Unregister an already known CLA.
func (manager *Manager) Unregister(conv Convergence) {
func (manager *Manager) registerProvider(conv ConvergenceProvider) {
manager.providersMutex.Lock()
defer manager.providersMutex.Unlock()

for _, provider := range manager.providers {
if conv == provider {
log.WithField("provider", conv).Debug("Provider registration aborted, already known")
return
}
}

manager.providers = append(manager.providers, conv)

conv.RegisterManager(manager)

if err := conv.Start(); err != nil {
log.WithError(err).WithField("provider", conv).Warn("Starting Provider errored")
}
}

// Unregister any kind of Convergable.
func (manager *Manager) Unregister(conv Convergable) {
if c, ok := conv.(Convergence); ok {
manager.unregisterConvergence(c)
} else if c, ok := conv.(ConvergenceProvider); ok {
manager.unregisterProvider(c)
} else {
log.WithField("convergence", conv).Warn("Unknown kind of Convergable")
}
}

func (manager *Manager) unregisterConvergence(conv Convergence) {
convElem, exists := manager.convs.Load(conv.Address())
if !exists {
log.WithFields(log.Fields{
Expand All @@ -208,8 +259,20 @@ func (manager *Manager) Unregister(conv Convergence) {
manager.convs.Delete(conv.Address())
}

// Restart an already known CLA.
func (manager *Manager) Restart(conv Convergence) {
func (manager *Manager) unregisterProvider(conv ConvergenceProvider) {
manager.providersMutex.Lock()
defer manager.providersMutex.Unlock()

for i, provider := range manager.providers {
if conv == provider {
manager.providers = append(manager.providers[:i], manager.providers[i+1:]...)
return
}
}
}

// Restart a known Convergable.
func (manager *Manager) Restart(conv Convergable) {
manager.Unregister(conv)
manager.Register(conv)
}
Expand Down
6 changes: 6 additions & 0 deletions cla/tcpcl/0doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// Package tcpcl provides a library for the Delay-Tolerant Networking TCP Convergence Layer Protocol Version 4,
// draft-ietf-dtn-tcpclv4-14.
//
// A new TCPCL server can be started by the Listener, which provides multiple connection to its Clients. To reach
// a remote server, a new Client connection can be dialed.
package tcpcl
Loading