diff --git a/file-server.go b/file-server.go index 6a4fff1..170ff0f 100644 --- a/file-server.go +++ b/file-server.go @@ -2,12 +2,16 @@ package main import ( "fmt" + "log" + "net" + "sync" "github.com/gnana997/decentralised-storage-go/p2p" ) type FileServerOpts struct { - RootFolder string + RootFolder string + BootstrapNodes []string Transport p2p.Transport PathTransformFunc PathTransformFunc @@ -16,6 +20,9 @@ type FileServerOpts struct { type FileServer struct { FileServerOpts + peerLock sync.Mutex + peers map[net.Addr]p2p.Peer + store *Store quitch chan struct{} } @@ -28,6 +35,7 @@ func NewFileServer(opts FileServerOpts) *FileServer { PathTransformFunc: opts.PathTransformFunc, }), quitch: make(chan struct{}), + peers: make(map[net.Addr]p2p.Peer), } } @@ -35,6 +43,17 @@ func (fs *FileServer) Stop() { close(fs.quitch) } +func (fs *FileServer) OnPeer(p p2p.Peer) error { + fs.peerLock.Lock() + defer fs.peerLock.Unlock() + + fs.peers[p.RemoteAddr()] = p + + log.Printf("FileServer handling new peer: %s", p.RemoteAddr()) + + return nil +} + func (fs *FileServer) loop() { defer func() { if err := fs.Transport.Close(); err != nil { @@ -54,11 +73,30 @@ func (fs *FileServer) loop() { } } +func (fs *FileServer) bootstrapNetwork() error { + for _, addr := range fs.BootstrapNodes { + if len(addr) == 0 { + continue + } + go func(addr string) { + fmt.Printf("dialing %s\n", addr) + if err := fs.Transport.Connect(addr); err != nil { + log.Printf("dial error: %s", err) + } + }(addr) + } + return nil +} + func (fs *FileServer) Start() error { if err := fs.Transport.ListenAndAccept(); err != nil { return err } + if len(fs.BootstrapNodes) != 0 { + fs.bootstrapNetwork() + } + fs.loop() return nil diff --git a/main.go b/main.go index dbd691d..c33d298 100644 --- a/main.go +++ b/main.go @@ -2,7 +2,7 @@ package main import ( "fmt" - "time" + "log" "github.com/gnana997/decentralised-storage-go/p2p" ) @@ -13,31 +13,37 @@ func OnPeer(peer p2p.Peer) error { return nil } -func main() { +func makeServer(listenAddr string, nodes ...string) *FileServer { tcpTransportOpts := p2p.TCPTransportOptions{ - ListenAddr: ":3000", + ListenAddr: listenAddr, HandshakeFunc: p2p.NOPHandshakeFunc, Decoder: p2p.NOPDecoder{}, } tcptransport := p2p.NewTCPTransport(tcpTransportOpts) fileServerOpts := FileServerOpts{ - RootFolder: "3000_network", - PathTransformFunc: CASPathTransformFunc, + RootFolder: listenAddr + "_network", + BootstrapNodes: nodes, - Transport: tcptransport, + PathTransformFunc: CASPathTransformFunc, + Transport: tcptransport, } - fs := NewFileServer(fileServerOpts) + s := NewFileServer(fileServerOpts) + + tcptransport.OnPeer = s.OnPeer + + return s +} + +func main() { + s1 := makeServer(":3000", "") + s2 := makeServer(":4000", ":3000") go func() { - time.Sleep(time.Second * 3) - fs.Stop() + log.Fatal(s1.Start()) }() - if err := fs.Start(); err != nil { - fmt.Println(err) - return - } + s2.Start() } diff --git a/p2p/tcp_transport.go b/p2p/tcp_transport.go index 85d34b0..550069f 100644 --- a/p2p/tcp_transport.go +++ b/p2p/tcp_transport.go @@ -25,6 +25,25 @@ func NewTCPPeer(conn net.Conn, outbound bool) *TCPPeer { } } +func (tp *TCPPeer) Send(payload []byte) error { + if tp.conn == nil { + return errors.New("peer is closed") + } + + _, err := tp.conn.Write(payload) + if err != nil { + return err + } + + return nil +} + +// RemoteAddr impements the peer interface and will return the +// remoteAddr of its underlying connection. +func (tp *TCPPeer) RemoteAddr() net.Addr { + return tp.conn.RemoteAddr() +} + // Close implements the peer interface. func (tp *TCPPeer) Close() error { return tp.conn.Close() @@ -60,6 +79,19 @@ func (t *TCPTransport) Consume() <-chan RPC { return t.rpcch } +// Connect implements the Transport interface, +// to connect to the peers over TCP. +func (t *TCPTransport) Connect(addr string) error { + conn, err := net.Dial("tcp", addr) + if err != nil { + return err + } + + go t.handleConn(conn, true) + + return nil +} + func (t *TCPTransport) ListenAndAccept() error { var err error t.listener, err = net.Listen("tcp", t.ListenAddr) @@ -85,18 +117,18 @@ func (t *TCPTransport) startAcceptLoop() { continue } - go t.handleConn(conn) + go t.handleConn(conn, false) } } -func (t *TCPTransport) handleConn(conn net.Conn) { +func (t *TCPTransport) handleConn(conn net.Conn, outbound bool) { var err error defer func() { fmt.Printf("droppig peer connection: %s", err) conn.Close() }() - peer := NewTCPPeer(conn, true) + peer := NewTCPPeer(conn, outbound) if err = t.HandshakeFunc(peer); err != nil { return diff --git a/p2p/transport.go b/p2p/transport.go index 22f3c35..8cbb7f3 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -1,8 +1,12 @@ package p2p +import "net" + // Peer is an interface that represents the // remote nodes in the network. type Peer interface { + Send([]byte) error + RemoteAddr() net.Addr Close() error } @@ -10,6 +14,7 @@ type Peer interface { // between peers in the network. This can be of // form (TCP, UDP, websocket, etc) type Transport interface { + Connect(addr string) error ListenAndAccept() error Consume() <-chan RPC Close() error