Skip to content

Commit

Permalink
implemented fileserver and refactored peer
Browse files Browse the repository at this point in the history
  • Loading branch information
gnana997 committed Jan 26, 2024
1 parent 1820429 commit 0bd2430
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 17 deletions.
40 changes: 39 additions & 1 deletion file-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package main

import (
"fmt"
"log"
"net"
"sync"

"github.com/gnana997/decentralised-storage-go/p2p"
)

type FileServerOpts struct {
RootFolder string
RootFolder string
BootstrapNodes []string

Transport p2p.Transport
PathTransformFunc PathTransformFunc
Expand All @@ -16,6 +20,9 @@ type FileServerOpts struct {
type FileServer struct {
FileServerOpts

peerLock sync.Mutex
peers map[net.Addr]p2p.Peer

store *Store
quitch chan struct{}
}
Expand All @@ -28,13 +35,25 @@ func NewFileServer(opts FileServerOpts) *FileServer {
PathTransformFunc: opts.PathTransformFunc,
}),
quitch: make(chan struct{}),
peers: make(map[net.Addr]p2p.Peer),
}
}

func (fs *FileServer) Stop() {
close(fs.quitch)
}

func (fs *FileServer) OnPeer(p p2p.Peer) error {
fs.peerLock.Lock()
defer fs.peerLock.Unlock()

fs.peers[p.RemoteAddr()] = p

log.Printf("FileServer handling new peer: %s", p.RemoteAddr())

return nil
}

func (fs *FileServer) loop() {
defer func() {
if err := fs.Transport.Close(); err != nil {
Expand All @@ -54,11 +73,30 @@ func (fs *FileServer) loop() {
}
}

func (fs *FileServer) bootstrapNetwork() error {
for _, addr := range fs.BootstrapNodes {
if len(addr) == 0 {
continue
}
go func(addr string) {
fmt.Printf("dialing %s\n", addr)
if err := fs.Transport.Connect(addr); err != nil {
log.Printf("dial error: %s", err)
}
}(addr)
}
return nil
}

func (fs *FileServer) Start() error {
if err := fs.Transport.ListenAndAccept(); err != nil {
return err
}

if len(fs.BootstrapNodes) != 0 {
fs.bootstrapNetwork()
}

fs.loop()

return nil
Expand Down
32 changes: 19 additions & 13 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package main

import (
"fmt"
"time"
"log"

"github.com/gnana997/decentralised-storage-go/p2p"
)
Expand All @@ -13,31 +13,37 @@ func OnPeer(peer p2p.Peer) error {
return nil
}

func main() {
func makeServer(listenAddr string, nodes ...string) *FileServer {

tcpTransportOpts := p2p.TCPTransportOptions{
ListenAddr: ":3000",
ListenAddr: listenAddr,
HandshakeFunc: p2p.NOPHandshakeFunc,
Decoder: p2p.NOPDecoder{},
}
tcptransport := p2p.NewTCPTransport(tcpTransportOpts)

fileServerOpts := FileServerOpts{
RootFolder: "3000_network",
PathTransformFunc: CASPathTransformFunc,
RootFolder: listenAddr + "_network",
BootstrapNodes: nodes,

Transport: tcptransport,
PathTransformFunc: CASPathTransformFunc,
Transport: tcptransport,
}

fs := NewFileServer(fileServerOpts)
s := NewFileServer(fileServerOpts)

tcptransport.OnPeer = s.OnPeer

return s
}

func main() {
s1 := makeServer(":3000", "")
s2 := makeServer(":4000", ":3000")

go func() {
time.Sleep(time.Second * 3)
fs.Stop()
log.Fatal(s1.Start())
}()

if err := fs.Start(); err != nil {
fmt.Println(err)
return
}
s2.Start()
}
38 changes: 35 additions & 3 deletions p2p/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,25 @@ func NewTCPPeer(conn net.Conn, outbound bool) *TCPPeer {
}
}

func (tp *TCPPeer) Send(payload []byte) error {
if tp.conn == nil {
return errors.New("peer is closed")
}

_, err := tp.conn.Write(payload)
if err != nil {
return err
}

return nil
}

// RemoteAddr impements the peer interface and will return the
// remoteAddr of its underlying connection.
func (tp *TCPPeer) RemoteAddr() net.Addr {
return tp.conn.RemoteAddr()
}

// Close implements the peer interface.
func (tp *TCPPeer) Close() error {
return tp.conn.Close()
Expand Down Expand Up @@ -60,6 +79,19 @@ func (t *TCPTransport) Consume() <-chan RPC {
return t.rpcch
}

// Connect implements the Transport interface,
// to connect to the peers over TCP.
func (t *TCPTransport) Connect(addr string) error {
conn, err := net.Dial("tcp", addr)
if err != nil {
return err
}

go t.handleConn(conn, true)

return nil
}

func (t *TCPTransport) ListenAndAccept() error {
var err error
t.listener, err = net.Listen("tcp", t.ListenAddr)
Expand All @@ -85,18 +117,18 @@ func (t *TCPTransport) startAcceptLoop() {
continue
}

go t.handleConn(conn)
go t.handleConn(conn, false)

}
}

func (t *TCPTransport) handleConn(conn net.Conn) {
func (t *TCPTransport) handleConn(conn net.Conn, outbound bool) {
var err error
defer func() {
fmt.Printf("droppig peer connection: %s", err)
conn.Close()
}()
peer := NewTCPPeer(conn, true)
peer := NewTCPPeer(conn, outbound)

if err = t.HandshakeFunc(peer); err != nil {
return
Expand Down
5 changes: 5 additions & 0 deletions p2p/transport.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
package p2p

import "net"

// Peer is an interface that represents the
// remote nodes in the network.
type Peer interface {
Send([]byte) error
RemoteAddr() net.Addr
Close() error
}

// Transport is anything that handles the communication
// between peers in the network. This can be of
// form (TCP, UDP, websocket, etc)
type Transport interface {
Connect(addr string) error
ListenAndAccept() error
Consume() <-chan RPC
Close() error
Expand Down

0 comments on commit 0bd2430

Please sign in to comment.