Skip to content

Commit

Permalink
Don't export Packet and Forward
Browse files Browse the repository at this point in the history
  • Loading branch information
ineiti committed Aug 14, 2024
1 parent 0a8a0e3 commit 6df84b4
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 43 deletions.
2 changes: 1 addition & 1 deletion mino/minogrpc/ptypes/overlay.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions mino/minows/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
const pathCall = "/call"
const pathStream = "/stream"

// Packet encapsulates a message sent over the network streams.
type Packet struct {
// packet encapsulates a message sent over the network streams.
type packet struct {
Source []byte
Payload []byte
}
Expand Down Expand Up @@ -278,7 +278,7 @@ func (r rpc) send(enc *gob.Encoder, msg serde.Message) error {
payload = bytes
}

err := enc.Encode(&Packet{Source: from, Payload: payload})
err := enc.Encode(&packet{Source: from, Payload: payload})
if errors.Is(err, network.ErrReset) {
return err
}
Expand All @@ -289,7 +289,7 @@ func (r rpc) send(enc *gob.Encoder, msg serde.Message) error {
}

func (r rpc) receive(dec *gob.Decoder) (ma.Multiaddr, serde.Message, error) {
var packet Packet
var packet packet
err := dec.Decode(&packet)
if errors.Is(err, network.ErrReset) {
return nil, nil, err
Expand Down Expand Up @@ -335,7 +335,7 @@ func (r rpc) createOrchestrator(ctx context.Context,
myAddr: myAddr,
rpc: r,
outs: encoders,
in: make(chan Packet, MaxUnreadAllowed),
in: make(chan packet, MaxUnreadAllowed),
}

var wg sync.WaitGroup
Expand Down Expand Up @@ -379,7 +379,7 @@ func (r rpc) createParticipant(stream network.Stream) participant {
myAddr: r.mino.myAddr,
rpc: r,
out: encoder,
in: make(chan Packet),
in: make(chan packet),
}

done := make(chan any)
Expand Down
60 changes: 30 additions & 30 deletions mino/minows/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var ErrNotPlayer = xerrors.New("not player")
// in orchestrator's incoming message buffer before pausing relaying
const MaxUnreadAllowed = 1e3

type Forward struct {
Packet
type forward struct {
packet
Destination []byte
}

Expand All @@ -31,7 +31,7 @@ type orchestrator struct {
rpc rpc
// Connects to the participants
outs map[peer.ID]*gob.Encoder
in chan Packet
in chan packet
}

func (o orchestrator) Send(msg serde.Message, addrs ...mino.Address) <-chan error {
Expand Down Expand Up @@ -67,58 +67,58 @@ func (o orchestrator) send(addr mino.Address, msg serde.Message) error {
return xerrors.Errorf("could not serialize message: %v", err)
}

err = encoder.Encode(&Packet{Source: src, Payload: payload})
err = encoder.Encode(&packet{Source: src, Payload: payload})
if err != nil {
return xerrors.Errorf("could not encode packet: %v", err)
}
return nil
}

func (o orchestrator) fetch(decoder *gob.Decoder) (Packet, mino.Address,
func (o orchestrator) fetch(decoder *gob.Decoder) (packet, mino.Address,
error) {
var forward Forward
var forward forward
err := decoder.Decode(&forward)
if err != nil {
return Packet{}, nil,
return packet{}, nil,
xerrors.Errorf("could not decode packet: %v", err)
}
dest := o.rpc.mino.GetAddressFactory().
FromText(forward.Destination)
if dest == nil {
return Packet{}, nil,
return packet{}, nil,
xerrors.New("could not unmarshal address")
}
return forward.Packet, dest, nil
return forward.packet, dest, nil
}

func (o orchestrator) relay(packet Packet, dest address) error {
func (o orchestrator) relay(pkt packet, dest address) error {
encoder, ok := o.outs[dest.identity]
if !ok {
return xerrors.Errorf("%v: %v", ErrNotPlayer, dest)
}
err := encoder.Encode(packet)
err := encoder.Encode(pkt)
if err != nil {
return xerrors.Errorf("could not encode packet: %v", err)
}
o.logger.Debug().Stringer("to", dest).Msgf("relayed packet")
return nil
}

func (o orchestrator) listen(decoder *gob.Decoder) (Packet, error) {
func (o orchestrator) listen(decoder *gob.Decoder) (packet, error) {
for {
packet, dest, err := o.fetch(decoder)
pkt, dest, err := o.fetch(decoder)
if err != nil {
return Packet{}, xerrors.Errorf("could not receive: %v", err)
return packet{}, xerrors.Errorf("could not receive: %v", err)
}
switch to := dest.(type) {
case orchestratorAddr:
if o.myAddr.Equal(to) {
return packet, nil
return pkt, nil
}
case address:
err := o.relay(packet, to)
err := o.relay(pkt, to)
if err != nil {
return Packet{}, xerrors.Errorf("could not relay: %v", err)
return packet{}, xerrors.Errorf("could not relay: %v", err)
}
}
}
Expand All @@ -131,7 +131,7 @@ type participant struct {
rpc rpc
// Connects to the orchestrator
out *gob.Encoder
in chan Packet
in chan packet
}

func (p participant) Send(msg serde.Message, addrs ...mino.Address) <-chan error {
Expand Down Expand Up @@ -165,8 +165,8 @@ func (p participant) send(addr mino.Address, msg serde.Message) error {
}

// Send to orchestrator to relay to the destination participant
forward := Forward{
Packet: Packet{Source: src, Payload: payload},
forward := forward{
packet: packet{Source: src, Payload: payload},
Destination: dest,
}
err = p.out.Encode(&forward)
Expand All @@ -176,13 +176,13 @@ func (p participant) send(addr mino.Address, msg serde.Message) error {
return nil
}

func (p participant) listen(decoder *gob.Decoder) (Packet, error) {
var packet Packet
err := decoder.Decode(&packet)
func (p participant) listen(decoder *gob.Decoder) (packet, error) {
var pkt packet
err := decoder.Decode(&pkt)
if err != nil {
return Packet{}, xerrors.Errorf("could not decode packet: %v", err)
return packet{}, xerrors.Errorf("could not decode packet: %v", err)
}
return packet, nil
return pkt, nil
}

type sendFn func(addr mino.Address, msg serde.Message) error
Expand Down Expand Up @@ -212,24 +212,24 @@ func doSend(addrs []mino.Address, msg serde.Message, send sendFn,
return errs
}

type unpackFn func(packet Packet) (mino.Address, serde.Message, error)
type unpackFn func(pkt packet) (mino.Address, serde.Message, error)

func unpacker(af mino.AddressFactory, f serde.Factory,
c serde.Context) unpackFn {
return func(packet Packet) (mino.Address, serde.Message, error) {
src := af.FromText(packet.Source)
return func(pkt packet) (mino.Address, serde.Message, error) {
src := af.FromText(pkt.Source)
if src == nil {
return nil, nil, xerrors.New("could not unmarshal address")
}
msg, err := f.Deserialize(c, packet.Payload)
msg, err := f.Deserialize(c, pkt.Payload)
if err != nil {
return src, nil, xerrors.Errorf("could not deserialize message: %v", err)
}
return src, msg, nil
}
}

func doReceive(ctx context.Context, in chan Packet,
func doReceive(ctx context.Context, in chan packet,
unpack unpackFn, logger zerolog.Logger) (mino.Address, serde.Message, error) {
select {
case <-ctx.Done():
Expand Down
2 changes: 1 addition & 1 deletion mino/router/tree/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,6 @@ func (t *dynTree) updateTree(to mino.Address) {
// Optimistic creation of a branch for this node. It assumes that none of
// the thoses addresses will come before the branches are created but this
// is not true. The tree will correct itself if that happens.
// TODO: auto-update by updates in types.Packet
// TODO: auto-update by updates in types.packet
t.branches[to] = set
}
10 changes: 5 additions & 5 deletions mino/router/tree/types/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var packetFormat = registry.NewSimpleRegistry()

// Packet describes a tree routing packet
//
// - implements router.Packet
// - implements router.packet
type Packet struct {
src mino.Address
dest []mino.Address
Expand All @@ -33,19 +33,19 @@ func NewPacket(src mino.Address, msg []byte, dest ...mino.Address) *Packet {
}
}

// GetSource implements router.Packet. It returns the source address of the
// GetSource implements router.packet. It returns the source address of the
// packet.
func (p *Packet) GetSource() mino.Address {
return p.src
}

// GetDestination implements router.Packet. It returns a list of addresses where
// GetDestination implements router.packet. It returns a list of addresses where
// the packet should be send to.
func (p *Packet) GetDestination() []mino.Address {
return append([]mino.Address{}, p.dest...)
}

// GetMessage implements router.Packet. It returns the byte buffer of the
// GetMessage implements router.packet. It returns the byte buffer of the
// message.
func (p *Packet) GetMessage() []byte {
return append([]byte{}, p.msg...)
Expand All @@ -63,7 +63,7 @@ func (p *Packet) Add(to mino.Address) {
p.dest = append(p.dest, to)
}

// Slice implements router.Packet. It removes the address from the destination
// Slice implements router.packet. It removes the address from the destination
// list and returns a packet with this single destination, if it exists.
// Otherwise the packet stays unchanged.
func (p *Packet) Slice(addr mino.Address) router.Packet {
Expand Down

0 comments on commit 6df84b4

Please sign in to comment.