From 4763af526e77273850364747df4787d36c7536a3 Mon Sep 17 00:00:00 2001 From: Gnana Siva Sai V Date: Fri, 2 Feb 2024 22:02:25 +0530 Subject: [PATCH] refactored the file server to send the file stream to peers --- file-server.go | 113 +++++++++--------- main.go | 1 - .../a94a8fe5ccb19ba61c4c0873d391e987982fbbd3 | 1 + .../a94a8fe5ccb19ba61c4c0873d391e987982fbbd3 | 1 + storage.go | 12 +- 5 files changed, 63 insertions(+), 65 deletions(-) create mode 100644 network_3000/a94a8/fe5cc/b19ba/61c4c/0873d/391e9/87982/fbbd3/a94a8fe5ccb19ba61c4c0873d391e987982fbbd3 create mode 100644 network_4000/a94a8/fe5cc/b19ba/61c4c/0873d/391e9/87982/fbbd3/a94a8fe5ccb19ba61c4c0873d391e987982fbbd3 diff --git a/file-server.go b/file-server.go index 9431f18..1a3d686 100644 --- a/file-server.go +++ b/file-server.go @@ -48,6 +48,7 @@ type Message struct { type MessageStoreFile struct { Filename string + Size int64 } func (fs *FileServer) broadcast(msg *Message) error { @@ -61,62 +62,51 @@ func (fs *FileServer) broadcast(msg *Message) error { return gob.NewEncoder(mw).Encode(msg) } +// 1. Store this file to disk +// 2. broadcast this file to all known peers in network func (fs *FileServer) StoreData(key string, r io.Reader) error { - // 1. Store this file to disk - // 2. broadcast this file to all known peers in network - // log.Println("Storing file", key) + var ( + payloadBuffer = new(bytes.Buffer) + tee = io.TeeReader(r, payloadBuffer) + ) - buf := new(bytes.Buffer) + size, err := fs.store.Write(key, tee) + if err != nil { + return err + } msg := Message{ Payload: MessageStoreFile{ Filename: key, + Size: size, }, } - if err := gob.NewEncoder(buf).Encode(msg); err != nil { + msgBuf := new(bytes.Buffer) + + if err := gob.NewEncoder(msgBuf).Encode(msg); err != nil { fmt.Printf("error with encoder: %v", err) return err } for _, peer := range fs.peers { - if err := peer.Send(buf.Bytes()); err != nil { + if err := peer.Send(msgBuf.Bytes()); err != nil { fmt.Printf("error with sending: %v", err) return err } } - // time.Sleep(time.Second * 3) + for _, peer := range fs.peers { + n, err := io.Copy(peer, payloadBuffer) + if err != nil { + fmt.Printf("error with sending: %v", err) + return err + } - // payload := []byte("This Large File") - // for _, peer := range fs.peers { - // if err := peer.Send(payload); err != nil { - // fmt.Printf("error with sending: %v", err) - // return err - // } - // } + fmt.Println("received and written bytes to disk: ", n) + } return nil - // buf := new(bytes.Buffer) - // tee := io.TeeReader(r, buf) - - // if err := fs.store.Write(key, tee); err != nil { - // return err - // } - - // // the reader is empty now - - // fmt.Println(buf.Bytes()) - - // p := &DataMessage{ - // Key: key, - // Value: buf.Bytes(), - // } - - // return fs.broadcast(&Message{ - // From: "TODO", - // Payload: p, - // }) } func (fs *FileServer) Stop() { @@ -151,38 +141,45 @@ func (fs *FileServer) loop() { fmt.Printf("error with decoder: %v", err) log.Fatal(err) } - fmt.Printf("received message: %+v\n", m.Payload) - - peer, ok := fs.peers[msg.From] - if !ok { - panic("peer not found in peer map") - } - b := make([]byte, 1028) - if _, err := peer.Read(b); err != nil { - panic(err) + if err := fs.handleMessage(msg.From, &m); err != nil { + log.Printf("error handling message: %s", err) } - - peer.Streamed() - - fmt.Printf("received message: %v\n", string(b)) - // if err := fs.handleMessage(&m); err != nil { - // log.Printf("error handling message: %s", err) - // } case <-fs.quitch: return } } } -// func (fs *FileServer) handleMessage(msg *Message) error { -// switch m := msg.Payload.(type) { -// case *DataMessage: -// return fs.StoreData(m.Key, bytes.NewReader(m.Value)) -// default: -// return fmt.Errorf("unknown message type: %T", msg.Payload) -// } -// } +func (fs *FileServer) handleMessage(from net.Addr, msg *Message) error { + switch m := msg.Payload.(type) { + case MessageStoreFile: + fmt.Printf("storing file: %s\n", m.Filename) + fs.handleMessageStoreFile(from, m) + } + return nil +} + +func (fs *FileServer) handleMessageStoreFile(from net.Addr, msg MessageStoreFile) error { + peer, ok := fs.peers[from] + if !ok { + return fmt.Errorf("peer not found in peer map") + } + fmt.Printf("peer: %+v\n", peer) + + // this io.Copyy is blocker here + // as the connection will not always + // send EOF for io to stop copying + if _, err := fs.store.Write(msg.Filename, io.LimitReader(peer, msg.Size)); err != nil { + return err + } + + fmt.Printf("written %d bytes to disk: %s\n", msg.Size, msg.Filename) + + peer.Streamed() + + return nil +} func (fs *FileServer) bootstrapNetwork() error { for _, addr := range fs.BootstrapNodes { diff --git a/main.go b/main.go index e32ff5d..cd91ab1 100644 --- a/main.go +++ b/main.go @@ -57,6 +57,5 @@ func main() { fmt.Printf("main error: %s", err) log.Fatal(err) } - fmt.Println("s2 has sent message to peers") select {} } diff --git a/network_3000/a94a8/fe5cc/b19ba/61c4c/0873d/391e9/87982/fbbd3/a94a8fe5ccb19ba61c4c0873d391e987982fbbd3 b/network_3000/a94a8/fe5cc/b19ba/61c4c/0873d/391e9/87982/fbbd3/a94a8fe5ccb19ba61c4c0873d391e987982fbbd3 new file mode 100644 index 0000000..1a68b40 --- /dev/null +++ b/network_3000/a94a8/fe5cc/b19ba/61c4c/0873d/391e9/87982/fbbd3/a94a8fe5ccb19ba61c4c0873d391e987982fbbd3 @@ -0,0 +1 @@ +It's a huge file \ No newline at end of file diff --git a/network_4000/a94a8/fe5cc/b19ba/61c4c/0873d/391e9/87982/fbbd3/a94a8fe5ccb19ba61c4c0873d391e987982fbbd3 b/network_4000/a94a8/fe5cc/b19ba/61c4c/0873d/391e9/87982/fbbd3/a94a8fe5ccb19ba61c4c0873d391e987982fbbd3 new file mode 100644 index 0000000..1a68b40 --- /dev/null +++ b/network_4000/a94a8/fe5cc/b19ba/61c4c/0873d/391e9/87982/fbbd3/a94a8fe5ccb19ba61c4c0873d391e987982fbbd3 @@ -0,0 +1 @@ +It's a huge file \ No newline at end of file diff --git a/storage.go b/storage.go index 9a0c513..0f6ed02 100644 --- a/storage.go +++ b/storage.go @@ -111,7 +111,7 @@ func (s *Store) Delete(key string) error { return os.RemoveAll(firstPathWithRoot) } -func (s *Store) Write(key string, r io.Reader) error { +func (s *Store) Write(key string, r io.Reader) (int64, error) { return s.writeStream(key, r) } @@ -139,18 +139,18 @@ func (s *Store) readStream(key string) (io.ReadCloser, error) { return os.Open(pathKeyWithRoot) } -func (s *Store) writeStream(key string, r io.Reader) error { +func (s *Store) writeStream(key string, r io.Reader) (int64, error) { pathKey := s.PathTransformFunc(key) pathNameWithRoot := fmt.Sprintf("%s/%s", s.Root, pathKey.Path) if err := os.MkdirAll(pathNameWithRoot, os.ModePerm); err != nil { - return err + return 0, err } fullPath := pathKey.FullPath() fullPathWithRoot := fmt.Sprintf("%s/%s", s.Root, fullPath) f, err := os.Create(fullPathWithRoot) if err != nil { - return err + return 0, err } defer func() { log.Printf("closed file: %s", fullPathWithRoot) @@ -159,12 +159,12 @@ func (s *Store) writeStream(key string, r io.Reader) error { n, err := io.Copy(f, r) if err != nil { - return err + return 0, err } log.Printf("written (%d) bytes to disk: %s", n, fullPathWithRoot) - return nil + return n, nil } func (s *Store) Close() error {