Skip to content

Commit

Permalink
need to implement p2p lib
Browse files Browse the repository at this point in the history
  • Loading branch information
gnana997 committed Jan 23, 2024
1 parent 5877c7d commit 596ad22
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 22 deletions.
8 changes: 7 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,13 @@ import (

func main() {

tr := p2p.NewTCPTransport(":3000")
tcpOpts := p2p.TCPTransportOptions{
ListenAddr: ":3000",
HandshakeFunc: p2p.NOPHandshakeFunc,
Decoder: p2p.NOPDecoder{},
}

tr := p2p.NewTCPTransport(tcpOpts)

if err := tr.ListenAndAccept(); err != nil {
log.Fatal(err)
Expand Down
28 changes: 26 additions & 2 deletions p2p/encoding.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,31 @@
package p2p

import "io"
import (
"bytes"
"encoding/gob"
"io"
)

type Decoder interface {
Decode(io.Reader, any) error
Decode(io.Reader, *RPC) error
}

type GOBDecoder struct{}

func (dec GOBDecoder) Decode(r io.Reader, rpc *RPC) error {
return gob.NewDecoder(r).Decode(rpc)
}

type NOPDecoder struct{}

func (dec NOPDecoder) Decode(r io.Reader, rpc *RPC) error {
buf := new(bytes.Buffer)
n, err := buf.ReadFrom(r)
if err != nil {
return err
}

rpc.Payload = buf.Bytes()[:n]

return nil
}
6 changes: 6 additions & 0 deletions p2p/handshake.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package p2p

import "errors"

//ErrInvalidHandshake is returned if the handshake between
// the local and remote node could not be established.
var ErrInvalidHandShake = errors.New("invalid handshake")

type HandshakeFunc func(Peer) error

func NOPHandshakeFunc(Peer) error {
Expand Down
11 changes: 11 additions & 0 deletions p2p/message.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package p2p

import "net"

// Message represents any arbitrary data
// that is being sent over the transport
// between two nodes in the network.
type RPC struct {
From net.Addr
Payload []byte
}
10 changes: 7 additions & 3 deletions p2p/tcp-transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
)

func TestTCPTransport(t *testing.T) {
listenAddr := ":3000"
tr := NewTCPTransport(listenAddr)
tcpOpts := TCPTransportOptions{
ListenAddr: ":3000",
HandshakeFunc: NOPHandshakeFunc,
}

assert.Equal(t, tr.listenAddr, listenAddr)
tr := NewTCPTransport(tcpOpts)

assert.Equal(t, tr.ListenAddr, ":3000")

assert.Nil(t, tr.ListenAndAccept())
}
37 changes: 21 additions & 16 deletions p2p/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,29 @@ func NewTCPPeer(conn net.Conn, outbound bool) *TCPPeer {
}
}

type TCPTransportOptions struct {
ListenAddr string
HandshakeFunc HandshakeFunc
Decoder Decoder
}

type TCPTransport struct {
listenAddr string
listener net.Listener
shakeHands HandshakeFunc
decoder Decoder
TCPTransportOptions
listener net.Listener

mu sync.RWMutex
peers map[net.Addr]Peer
}

func NewTCPTransport(listenAddr string) *TCPTransport {
func NewTCPTransport(opts TCPTransportOptions) *TCPTransport {
return &TCPTransport{
listenAddr: listenAddr,
shakeHands: NOPHandshakeFunc,
TCPTransportOptions: opts,
}
}

func (t *TCPTransport) ListenAndAccept() error {
var err error
t.listener, err = net.Listen("tcp", t.listenAddr)
t.listener, err = net.Listen("tcp", t.ListenAddr)
if err != nil {
return err
}
Expand All @@ -65,31 +68,33 @@ func (t *TCPTransport) startAcceptLoop() {
}
}

type Msg struct{}

func (t *TCPTransport) handleConn(conn net.Conn) {
peer := NewTCPPeer(conn, true)
fmt.Printf("handling connection %+v\n", peer)
fmt.Printf("TCP handling connection %+v\n", peer)

if err := t.shakeHands(peer); err != nil {
fmt.Printf("handshake error: %v\n", err)
if err := t.HandshakeFunc(peer); err != nil {
fmt.Printf("TCP handshake error: %v\n", err)
conn.Close()
return
}

lenDecodeError := 0
//Read Loop
msg := &Msg{}
rpc := &RPC{}
for {
// read from the connection
if err := t.decoder.Decode(conn, msg); err != nil {
if err := t.Decoder.Decode(conn, rpc); err != nil {
lenDecodeError++
if lenDecodeError == 5 {
fmt.Printf("dropping connection due to multiple decode errors: %+v\n", peer)
fmt.Printf("TCP dropping connection due to multiple decode errors: %+v\n", peer)
return
}
fmt.Printf("decode error: %v\n", err)
continue
}

rpc.From = conn.RemoteAddr()
fmt.Printf("message: %+v\n", rpc)
// write to the connection
// close the connection
}
Expand Down

0 comments on commit 596ad22

Please sign in to comment.