From b766b2fa07445f85c34b91ee08fd6da0ab325791 Mon Sep 17 00:00:00 2001 From: Gnana Siva Sai V Date: Fri, 2 Feb 2024 20:46:48 +0530 Subject: [PATCH] making the fileserver distributed server --- file-server.go | 102 +++++++++++++++++++++++++++++++------------ main.go | 7 ++- p2p/encoding.go | 2 +- p2p/tcp_transport.go | 11 +++++ p2p/transport.go | 1 + 5 files changed, 93 insertions(+), 30 deletions(-) diff --git a/file-server.go b/file-server.go index bc095cc..9431f18 100644 --- a/file-server.go +++ b/file-server.go @@ -43,13 +43,11 @@ func NewFileServer(opts FileServerOpts) *FileServer { } type Message struct { - From string Payload any } -type DataMessage struct { - Key string - Value []byte +type MessageStoreFile struct { + Filename string } func (fs *FileServer) broadcast(msg *Message) error { @@ -66,28 +64,59 @@ func (fs *FileServer) broadcast(msg *Message) error { func (fs *FileServer) StoreData(key string, r io.Reader) error { // 1. Store this file to disk // 2. broadcast this file to all known peers in network - log.Println("Storing file", key) + // log.Println("Storing file", key) buf := new(bytes.Buffer) - tee := io.TeeReader(r, buf) - if err := fs.store.Write(key, tee); err != nil { + msg := Message{ + Payload: MessageStoreFile{ + Filename: key, + }, + } + + if err := gob.NewEncoder(buf).Encode(msg); err != nil { + fmt.Printf("error with encoder: %v", err) return err } - // the reader is empty now + for _, peer := range fs.peers { + if err := peer.Send(buf.Bytes()); err != nil { + fmt.Printf("error with sending: %v", err) + return err + } + } + + // time.Sleep(time.Second * 3) - fmt.Println(buf.Bytes()) + // payload := []byte("This Large File") + // for _, peer := range fs.peers { + // if err := peer.Send(payload); err != nil { + // fmt.Printf("error with sending: %v", err) + // return err + // } + // } - p := &DataMessage{ - Key: key, - Value: buf.Bytes(), - } + return nil + // buf := new(bytes.Buffer) + // tee := io.TeeReader(r, buf) + + // if err := fs.store.Write(key, tee); err != nil { + // return err + // } + + // // the reader is empty now - return fs.broadcast(&Message{ - From: "TODO", - Payload: p, - }) + // fmt.Println(buf.Bytes()) + + // p := &DataMessage{ + // Key: key, + // Value: buf.Bytes(), + // } + + // return fs.broadcast(&Message{ + // From: "TODO", + // Payload: p, + // }) } func (fs *FileServer) Stop() { @@ -119,26 +148,41 @@ func (fs *FileServer) loop() { case msg := <-fs.Transport.Consume(): var m Message if err := gob.NewDecoder(bytes.NewReader(msg.Payload)).Decode(&m); err != nil { + fmt.Printf("error with decoder: %v", err) log.Fatal(err) } + fmt.Printf("received message: %+v\n", m.Payload) - if err := fs.handleMessage(&m); err != nil { - log.Printf("error handling message: %s", err) + peer, ok := fs.peers[msg.From] + if !ok { + panic("peer not found in peer map") } + + b := make([]byte, 1028) + if _, err := peer.Read(b); err != nil { + panic(err) + } + + peer.Streamed() + + fmt.Printf("received message: %v\n", string(b)) + // if err := fs.handleMessage(&m); err != nil { + // log.Printf("error handling message: %s", err) + // } case <-fs.quitch: return } } } -func (fs *FileServer) handleMessage(msg *Message) error { - switch m := msg.Payload.(type) { - case *DataMessage: - return fs.StoreData(m.Key, bytes.NewReader(m.Value)) - default: - return fmt.Errorf("unknown message type: %T", msg.Payload) - } -} +// func (fs *FileServer) handleMessage(msg *Message) error { +// switch m := msg.Payload.(type) { +// case *DataMessage: +// return fs.StoreData(m.Key, bytes.NewReader(m.Value)) +// default: +// return fmt.Errorf("unknown message type: %T", msg.Payload) +// } +// } func (fs *FileServer) bootstrapNetwork() error { for _, addr := range fs.BootstrapNodes { @@ -172,3 +216,7 @@ func (fs *FileServer) Start() error { func (fs *FileServer) Close() error { return fs.store.Close() } + +func init() { + gob.Register(MessageStoreFile{}) +} diff --git a/main.go b/main.go index 70a281c..e32ff5d 100644 --- a/main.go +++ b/main.go @@ -53,7 +53,10 @@ func main() { time.Sleep(2 * time.Second) data := bytes.NewReader([]byte("It's a huge file")) - log.Fatal(s2.StoreData("test", data)) - + if err := s2.StoreData("test", data); err != nil { + fmt.Printf("main error: %s", err) + log.Fatal(err) + } + fmt.Println("s2 has sent message to peers") select {} } diff --git a/p2p/encoding.go b/p2p/encoding.go index 64399b8..f8523fe 100644 --- a/p2p/encoding.go +++ b/p2p/encoding.go @@ -18,7 +18,7 @@ func (dec GOBDecoder) Decode(r io.Reader, rpc *RPC) error { type NOPDecoder struct{} func (dec NOPDecoder) Decode(r io.Reader, rpc *RPC) error { - buf := make([]byte, 1024) + buf := make([]byte, 1028) n, err := r.Read(buf) if err != nil { return err diff --git a/p2p/tcp_transport.go b/p2p/tcp_transport.go index 078f5dd..b050445 100644 --- a/p2p/tcp_transport.go +++ b/p2p/tcp_transport.go @@ -17,12 +17,15 @@ type TCPPeer struct { // if we dial and retrieve a connection we are an outbound peer // if we accept and retrieve a connection we are an inbound peer outbound bool + + Wg *sync.WaitGroup } func NewTCPPeer(conn net.Conn, outbound bool) *TCPPeer { return &TCPPeer{ Conn: conn, outbound: outbound, + Wg: &sync.WaitGroup{}, } } @@ -39,6 +42,10 @@ func (tp *TCPPeer) Send(payload []byte) error { return nil } +func (tp *TCPPeer) Streamed() { + tp.Wg.Done() +} + type TCPTransportOptions struct { ListenAddr string HandshakeFunc HandshakeFunc @@ -146,7 +153,11 @@ func (t *TCPTransport) handleConn(conn net.Conn, outbound bool) { } rpc.From = conn.RemoteAddr() + peer.Wg.Add(1) + fmt.Println("waiting till stream is done") t.rpcch <- rpc + peer.Wg.Wait() + fmt.Println("stream is done") // write to the connection // close the connection } diff --git a/p2p/transport.go b/p2p/transport.go index 4db088d..c36b1ee 100644 --- a/p2p/transport.go +++ b/p2p/transport.go @@ -6,6 +6,7 @@ import "net" // remote nodes in the network. type Peer interface { net.Conn + Streamed() Send([]byte) error }