Skip to content

Commit

Permalink
refactored the file server to send the file stream to peers
Browse files Browse the repository at this point in the history
  • Loading branch information
gnana997 committed Feb 2, 2024
1 parent b766b2f commit 4763af5
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 65 deletions.
113 changes: 55 additions & 58 deletions file-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Message struct {

type MessageStoreFile struct {
Filename string
Size int64
}

func (fs *FileServer) broadcast(msg *Message) error {
Expand All @@ -61,62 +62,51 @@ func (fs *FileServer) broadcast(msg *Message) error {
return gob.NewEncoder(mw).Encode(msg)
}

// 1. Store this file to disk
// 2. broadcast this file to all known peers in network
func (fs *FileServer) StoreData(key string, r io.Reader) error {
// 1. Store this file to disk
// 2. broadcast this file to all known peers in network
// log.Println("Storing file", key)
var (
payloadBuffer = new(bytes.Buffer)
tee = io.TeeReader(r, payloadBuffer)
)

buf := new(bytes.Buffer)
size, err := fs.store.Write(key, tee)
if err != nil {
return err
}

msg := Message{
Payload: MessageStoreFile{
Filename: key,
Size: size,
},
}

if err := gob.NewEncoder(buf).Encode(msg); err != nil {
msgBuf := new(bytes.Buffer)

if err := gob.NewEncoder(msgBuf).Encode(msg); err != nil {
fmt.Printf("error with encoder: %v", err)
return err
}

for _, peer := range fs.peers {
if err := peer.Send(buf.Bytes()); err != nil {
if err := peer.Send(msgBuf.Bytes()); err != nil {
fmt.Printf("error with sending: %v", err)
return err
}
}

// time.Sleep(time.Second * 3)
for _, peer := range fs.peers {
n, err := io.Copy(peer, payloadBuffer)
if err != nil {
fmt.Printf("error with sending: %v", err)
return err
}

// payload := []byte("This Large File")
// for _, peer := range fs.peers {
// if err := peer.Send(payload); err != nil {
// fmt.Printf("error with sending: %v", err)
// return err
// }
// }
fmt.Println("received and written bytes to disk: ", n)
}

return nil
// buf := new(bytes.Buffer)
// tee := io.TeeReader(r, buf)

// if err := fs.store.Write(key, tee); err != nil {
// return err
// }

// // the reader is empty now

// fmt.Println(buf.Bytes())

// p := &DataMessage{
// Key: key,
// Value: buf.Bytes(),
// }

// return fs.broadcast(&Message{
// From: "TODO",
// Payload: p,
// })
}

func (fs *FileServer) Stop() {
Expand Down Expand Up @@ -151,38 +141,45 @@ func (fs *FileServer) loop() {
fmt.Printf("error with decoder: %v", err)
log.Fatal(err)
}
fmt.Printf("received message: %+v\n", m.Payload)

peer, ok := fs.peers[msg.From]
if !ok {
panic("peer not found in peer map")
}

b := make([]byte, 1028)
if _, err := peer.Read(b); err != nil {
panic(err)
if err := fs.handleMessage(msg.From, &m); err != nil {
log.Printf("error handling message: %s", err)
}

peer.Streamed()

fmt.Printf("received message: %v\n", string(b))
// if err := fs.handleMessage(&m); err != nil {
// log.Printf("error handling message: %s", err)
// }
case <-fs.quitch:
return
}
}
}

// func (fs *FileServer) handleMessage(msg *Message) error {
// switch m := msg.Payload.(type) {
// case *DataMessage:
// return fs.StoreData(m.Key, bytes.NewReader(m.Value))
// default:
// return fmt.Errorf("unknown message type: %T", msg.Payload)
// }
// }
func (fs *FileServer) handleMessage(from net.Addr, msg *Message) error {
switch m := msg.Payload.(type) {
case MessageStoreFile:
fmt.Printf("storing file: %s\n", m.Filename)
fs.handleMessageStoreFile(from, m)
}
return nil
}

func (fs *FileServer) handleMessageStoreFile(from net.Addr, msg MessageStoreFile) error {
peer, ok := fs.peers[from]
if !ok {
return fmt.Errorf("peer not found in peer map")
}
fmt.Printf("peer: %+v\n", peer)

// this io.Copyy is blocker here
// as the connection will not always
// send EOF for io to stop copying
if _, err := fs.store.Write(msg.Filename, io.LimitReader(peer, msg.Size)); err != nil {
return err
}

fmt.Printf("written %d bytes to disk: %s\n", msg.Size, msg.Filename)

peer.Streamed()

return nil
}

func (fs *FileServer) bootstrapNetwork() error {
for _, addr := range fs.BootstrapNodes {
Expand Down
1 change: 0 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,5 @@ func main() {
fmt.Printf("main error: %s", err)
log.Fatal(err)
}
fmt.Println("s2 has sent message to peers")
select {}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
It's a huge file
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
It's a huge file
12 changes: 6 additions & 6 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func (s *Store) Delete(key string) error {
return os.RemoveAll(firstPathWithRoot)
}

func (s *Store) Write(key string, r io.Reader) error {
func (s *Store) Write(key string, r io.Reader) (int64, error) {
return s.writeStream(key, r)
}

Expand Down Expand Up @@ -139,18 +139,18 @@ func (s *Store) readStream(key string) (io.ReadCloser, error) {
return os.Open(pathKeyWithRoot)
}

func (s *Store) writeStream(key string, r io.Reader) error {
func (s *Store) writeStream(key string, r io.Reader) (int64, error) {
pathKey := s.PathTransformFunc(key)
pathNameWithRoot := fmt.Sprintf("%s/%s", s.Root, pathKey.Path)
if err := os.MkdirAll(pathNameWithRoot, os.ModePerm); err != nil {
return err
return 0, err
}

fullPath := pathKey.FullPath()
fullPathWithRoot := fmt.Sprintf("%s/%s", s.Root, fullPath)
f, err := os.Create(fullPathWithRoot)
if err != nil {
return err
return 0, err
}
defer func() {
log.Printf("closed file: %s", fullPathWithRoot)
Expand All @@ -159,12 +159,12 @@ func (s *Store) writeStream(key string, r io.Reader) error {

n, err := io.Copy(f, r)
if err != nil {
return err
return 0, err
}

log.Printf("written (%d) bytes to disk: %s", n, fullPathWithRoot)

return nil
return n, nil
}

func (s *Store) Close() error {
Expand Down

0 comments on commit 4763af5

Please sign in to comment.