Skip to content

Commit

Permalink
Implement custom serialization. Fixes #138. Cleanup UnderlayFactory A…
Browse files Browse the repository at this point in the history
…PI. Fixes #82. Bump to v3
  • Loading branch information
plorenz committed Sep 6, 2024
1 parent 42df2e5 commit 2c4a2ac
Show file tree
Hide file tree
Showing 21 changed files with 155 additions and 96 deletions.
21 changes: 10 additions & 11 deletions accept_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package channel

import (
"github.com/michaelquigley/pfxlog"
"github.com/openziti/transport/v2"
"time"
)

Expand All @@ -29,28 +28,28 @@ type UnderlayAcceptor interface {
}

type UnderlayDispatcherConfig struct {
Listener UnderlayListener
ConnectTimeout time.Duration
TransportConfig transport.Configuration
Listener UnderlayListener
ConnectTimeout time.Duration
// TransportConfig transport.Configuration
Acceptors map[string]UnderlayAcceptor
DefaultAcceptor UnderlayAcceptor
}

// An UnderlayDispatcher accept underlays from an underlay listener and hands them off to
// UnderlayAcceptor instances, based on the TypeHeader.
type UnderlayDispatcher struct {
listener UnderlayListener
connectTimeout time.Duration
transportConfig transport.Configuration
listener UnderlayListener
connectTimeout time.Duration
//transportConfig transport.Configuration
acceptors map[string]UnderlayAcceptor
defaultAcceptor UnderlayAcceptor
}

func NewUnderlayDispatcher(config UnderlayDispatcherConfig) *UnderlayDispatcher {
return &UnderlayDispatcher{
listener: config.Listener,
connectTimeout: config.ConnectTimeout,
transportConfig: config.TransportConfig,
listener: config.Listener,
connectTimeout: config.ConnectTimeout,
//transportConfig: config.TransportConfig,
acceptors: config.Acceptors,
defaultAcceptor: config.DefaultAcceptor,
}
Expand All @@ -62,7 +61,7 @@ func (self *UnderlayDispatcher) Run() {
defer log.Warn("exited")

for {
underlay, err := self.listener.Create(self.connectTimeout, self.transportConfig)
underlay, err := self.listener.Create(self.connectTimeout)
if err != nil {
log.WithError(err).Error("error accepting connection")
if err.Error() == "closed" {
Expand Down
2 changes: 1 addition & 1 deletion channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ type UnderlayListener interface {
// UnderlayFactory is used by Channel to obtain an Underlay instance. An underlay "dialer" or "listener" implement
// UnderlayFactory, to provide instances to Channel.
type UnderlayFactory interface {
Create(timeout time.Duration, tcfg transport.Configuration) (Underlay, error)
Create(timeout time.Duration) (Underlay, error)
}

// Underlay abstracts a physical communications channel, typically sitting on top of 'transport'.
Expand Down
4 changes: 2 additions & 2 deletions channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ func TestCloseInBind(t *testing.T) {
req.NoError(err)

clientId := &identity.TokenId{Token: "echo-client"}
underlayFactory := NewClassicDialer(clientId, addr, nil)
underlayFactory := NewClassicDialer(DialerConfig{Identity: clientId, Endpoint: addr})

errC := make(chan error, 1)

Expand Down Expand Up @@ -343,7 +343,7 @@ func dialServer(options *Options, t *testing.T, bindHandler BindHandler) Channel
req.NoError(err)

clientId := &identity.TokenId{Token: "echo-client"}
underlayFactory := NewClassicDialer(clientId, addr, nil)
underlayFactory := NewClassicDialer(DialerConfig{Identity: clientId, Endpoint: addr})

ch, err := NewChannel("echo-test", underlayFactory, bindHandler, options)
req.NoError(err)
Expand Down
36 changes: 22 additions & 14 deletions classic_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,18 +30,30 @@ type classicDialer struct {
endpoint transport.Address
localBinding string
headers map[int32][]byte
underlayFactory func(peer transport.Conn, version uint32) classicUnderlay
underlayFactory func(messageStrategy MessageStrategy, peer transport.Conn, version uint32) classicUnderlay
messageStrategy MessageStrategy
transportConfig transport.Configuration
}

func NewClassicDialerWithBindAddress(identity *identity.TokenId, endpoint transport.Address, localBinding string, headers map[int32][]byte) UnderlayFactory {
type DialerConfig struct {
Identity *identity.TokenId
Endpoint transport.Address
LocalBinding string
Headers map[int32][]byte
MessageStrategy MessageStrategy
TransportConfig transport.Configuration
}

func NewClassicDialer(cfg DialerConfig) UnderlayFactory {
result := &classicDialer{
identity: identity,
endpoint: endpoint,
localBinding: localBinding,
headers: headers,
identity: cfg.Identity,
endpoint: cfg.Endpoint,
localBinding: cfg.LocalBinding,
headers: cfg.Headers,
messageStrategy: cfg.MessageStrategy,
}

if endpoint.Type() == "dtls" {
if cfg.Endpoint.Type() == "dtls" {
result.underlayFactory = newDatagramUnderlay
} else {
result.underlayFactory = newClassicImpl
Expand All @@ -50,11 +62,7 @@ func NewClassicDialerWithBindAddress(identity *identity.TokenId, endpoint transp
return result
}

func NewClassicDialer(identity *identity.TokenId, endpoint transport.Address, headers map[int32][]byte) UnderlayFactory {
return NewClassicDialerWithBindAddress(identity, endpoint, "", headers)
}

func (self *classicDialer) Create(timeout time.Duration, tcfg transport.Configuration) (Underlay, error) {
func (self *classicDialer) Create(timeout time.Duration) (Underlay, error) {
log := pfxlog.ContextLogger(self.endpoint.String())
log.Debug("started")
defer log.Debug("exited")
Expand All @@ -71,12 +79,12 @@ func (self *classicDialer) Create(timeout time.Duration, tcfg transport.Configur
log.Debugf("Attempting to dial with bind: %s", self.localBinding)

for time.Now().Before(deadline) {
peer, err := self.endpoint.DialWithLocalBinding("classic", self.localBinding, self.identity, timeout, tcfg)
peer, err := self.endpoint.DialWithLocalBinding("classic", self.localBinding, self.identity, timeout, self.transportConfig)
if err != nil {
return nil, err
}

underlay := self.underlayFactory(peer, version)
underlay := self.underlayFactory(self.messageStrategy, peer, version)
if err = self.sendHello(underlay, deadline); err != nil {
if tryCount > 0 {
return nil, err
Expand Down
16 changes: 10 additions & 6 deletions classic_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,18 +129,22 @@ func (impl *classicImpl) getPeer() transport.Conn {
return impl.peer
}

func newClassicImpl(peer transport.Conn, version uint32) classicUnderlay {
func newClassicImpl(messageStrategy MessageStrategy, peer transport.Conn, version uint32) classicUnderlay {
readF := ReadV2
marshalF := MarshalV2

if version == 2 {
readF = ReadV2
marshalF = MarshalV2
} else if version == 3 { // currently only used for testing fallback to a common protocol version
readF = ReadV2
if version == 3 { // currently only used for testing fallback to a common protocol version
marshalF = marshalV3
}

if messageStrategy != nil && messageStrategy.GetStreamProducer() != nil {
readF = messageStrategy.GetStreamProducer()
}

if messageStrategy != nil && messageStrategy.GetMarshaller() != nil {
marshalF = messageStrategy.GetMarshaller()
}

return &classicImpl{
peer: peer,
readF: readF,
Expand Down
15 changes: 11 additions & 4 deletions classic_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ type classicListener struct {
headers map[int32][]byte
closed atomic.Bool
listenerPool goroutines.Pool
underlayFactory func(peer transport.Conn, version uint32) classicUnderlay
messageStrategy MessageStrategy
underlayFactory func(messageStrategy MessageStrategy, peer transport.Conn, version uint32) classicUnderlay
}

func DefaultListenerConfig() ListenerConfig {
Expand All @@ -57,6 +58,7 @@ type ListenerConfig struct {
TransportConfig transport.Configuration
PoolConfigurator func(config *goroutines.PoolConfig)
ConnectionHandlers []ConnectionHandler
MessageStrategy MessageStrategy
}

func newClassicListener(identity *identity.TokenId, endpoint transport.Address, config ListenerConfig) *classicListener {
Expand Down Expand Up @@ -95,12 +97,17 @@ func newClassicListener(identity *identity.TokenId, endpoint transport.Address,
return &classicListener{
identity: identity,
endpoint: endpoint,
socket: nil,
close: closeNotify,
handlers: config.ConnectionHandlers,
acceptF: nil,
created: nil,
connectOptions: config.ConnectOptions,
tcfg: config.TransportConfig,
headers: config.Headers,
closed: atomic.Bool{},
listenerPool: pool,
handlers: config.ConnectionHandlers,
messageStrategy: config.MessageStrategy,
underlayFactory: underlayFactory,
}
}
Expand Down Expand Up @@ -151,7 +158,7 @@ func (self *classicListener) Close() error {
return nil
}

func (self *classicListener) Create(_ time.Duration, _ transport.Configuration) (Underlay, error) {
func (self *classicListener) Create(_ time.Duration) (Underlay, error) {
if self.created == nil {
return nil, errors.New("this listener was not set up for Create to be called, programming error")
}
Expand All @@ -169,7 +176,7 @@ func (self *classicListener) Create(_ time.Duration, _ transport.Configuration)
func (self *classicListener) acceptConnection(peer transport.Conn) {
log := pfxlog.ContextLogger(self.endpoint.String())
err := self.listenerPool.Queue(func() {
impl := self.underlayFactory(peer, 2)
impl := self.underlayFactory(self.messageStrategy, peer, 2)

connectionId, err := NextConnectionId()
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions cmd/channel/subcmd/underlay/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ func runDialer(_ *cobra.Command, _ []string) {
var dialer channel.UnderlayFactory
switch dialerUnderlay {
case "classic":
dialer = channel.NewClassicDialer(id, endpoint, nil)
dialer = channel.NewClassicDialer(channel.DialerConfig{
Identity: id,
Endpoint: endpoint,
})
case "reconnecting":
dialer = channel.NewReconnectingDialer(id, endpoint, nil)
dialer = channel.NewReconnectingDialer(channel.ReconnectingDialerConfig{Identity: id, Endpoint: endpoint})
default:
panic(fmt.Errorf("unknown underlay [%s]", dialerUnderlay))
}
Expand Down
51 changes: 44 additions & 7 deletions datagram_underlay.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,42 @@ import (
"time"
)

type readPacketFunction func(buf []byte) (*Message, error)

type DatagramUnderlay struct {
id string
connectionId string
headers map[int32][]byte
peer transport.Conn
closed atomic.Bool
readF readPacketFunction
marshalF marshalFunction
}

func newDatagramUnderlay(peer transport.Conn, _ uint32) classicUnderlay {
func newDatagramUnderlay(messageStrategy MessageStrategy, peer transport.Conn, version uint32) classicUnderlay {
readF := func(buf []byte) (*Message, error) {
reader := bytes.NewBuffer(buf)
return ReadV2(reader)
}

marshalF := MarshalV2

if version == 3 { // currently only used for testing fallback to a common protocol version
marshalF = marshalV3
}

if messageStrategy != nil && messageStrategy.GetPacketProducer() != nil {
readF = messageStrategy.GetPacketProducer()
}

if messageStrategy != nil && messageStrategy.GetMarshaller() != nil {
marshalF = messageStrategy.GetMarshaller()
}

return &DatagramUnderlay{
peer: peer,
peer: peer,
readF: readF,
marshalF: marshalF,
}
}

Expand All @@ -49,20 +74,18 @@ func (self *DatagramUnderlay) GetRemoteAddr() net.Addr {
}

func (self *DatagramUnderlay) Rx() (*Message, error) {
buf := make([]byte, 65000)
buf := make([]byte, 2000)
n, err := self.peer.Read(buf)
if err != nil {
return nil, err
}

buf = buf[:n]

reader := bytes.NewBuffer(buf)
return ReadV2(reader)
return self.readF(buf)
}

func (self *DatagramUnderlay) Tx(m *Message) error {
data, err := MarshalV2(m)
data, err := self.marshalF(m)
if err != nil {
return err
}
Expand Down Expand Up @@ -126,3 +149,17 @@ func (impl *DatagramUnderlay) getPeer() transport.Conn {
func (self *DatagramUnderlay) rxHello() (*Message, error) {
return self.Rx()
}

type DatagramMessageStrategy PacketMessageProducer

func (self DatagramMessageStrategy) GetMarshaller() MessageMarshaller {
return MarshalV2WithRaw
}

func (self DatagramMessageStrategy) GetStreamProducer() StreamMessageProducer {
return nil
}

func (self DatagramMessageStrategy) GetPacketProducer() PacketMessageProducer {
return PacketMessageProducer(self)
}
2 changes: 1 addition & 1 deletion example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func Example() {
panic(err)
}
dialId := &identity.TokenId{Token: "echo-client"}
underlayFactory := channel.NewClassicDialer(dialId, addr, nil)
underlayFactory := channel.NewClassicDialer(channel.DialerConfig{Identity: dialId, Endpoint: addr})

ch, err := channel.NewChannel("echo-test", underlayFactory, nil, nil)
if err != nil {
Expand Down
3 changes: 1 addition & 2 deletions existing_conn_dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/identity"
"github.com/openziti/transport/v2"
"net"
"time"
)
Expand All @@ -40,7 +39,7 @@ func NewExistingConnDialer(id *identity.TokenId, peer net.Conn, headers map[int3
}
}

func (self *existingConnDialer) Create(timeout time.Duration, _ transport.Configuration) (Underlay, error) {
func (self *existingConnDialer) Create(timeout time.Duration) (Underlay, error) {
log := pfxlog.Logger()
log.Debug("started")
defer log.Debug("exited")
Expand Down
3 changes: 1 addition & 2 deletions existing_conn_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/identity"
"github.com/openziti/transport/v2"
"github.com/pkg/errors"
"net"
"time"
Expand All @@ -40,7 +39,7 @@ func NewExistingConnListener(identity *identity.TokenId, peer net.Conn, headers
}
}

func (self *existingConnListener) Create(timeout time.Duration, _ transport.Configuration) (Underlay, error) {
func (self *existingConnListener) Create(timeout time.Duration) (Underlay, error) {
log := pfxlog.Logger()

impl := newExistingImpl(self.peer, 2)
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/openziti/channel/v2
module github.com/openziti/channel/v3

go 1.21

Expand Down
Loading

0 comments on commit 2c4a2ac

Please sign in to comment.