Skip to content

Commit

Permalink
making the fileserver distributed server
Browse files Browse the repository at this point in the history
  • Loading branch information
gnana997 committed Feb 2, 2024
1 parent 0646617 commit b766b2f
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 30 deletions.
102 changes: 75 additions & 27 deletions file-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,11 @@ func NewFileServer(opts FileServerOpts) *FileServer {
}

type Message struct {
From string
Payload any
}

type DataMessage struct {
Key string
Value []byte
type MessageStoreFile struct {
Filename string
}

func (fs *FileServer) broadcast(msg *Message) error {
Expand All @@ -66,28 +64,59 @@ func (fs *FileServer) broadcast(msg *Message) error {
func (fs *FileServer) StoreData(key string, r io.Reader) error {
// 1. Store this file to disk
// 2. broadcast this file to all known peers in network
log.Println("Storing file", key)
// log.Println("Storing file", key)

buf := new(bytes.Buffer)
tee := io.TeeReader(r, buf)

if err := fs.store.Write(key, tee); err != nil {
msg := Message{
Payload: MessageStoreFile{
Filename: key,
},
}

if err := gob.NewEncoder(buf).Encode(msg); err != nil {
fmt.Printf("error with encoder: %v", err)
return err
}

// the reader is empty now
for _, peer := range fs.peers {
if err := peer.Send(buf.Bytes()); err != nil {
fmt.Printf("error with sending: %v", err)
return err
}
}

// time.Sleep(time.Second * 3)

fmt.Println(buf.Bytes())
// payload := []byte("This Large File")
// for _, peer := range fs.peers {
// if err := peer.Send(payload); err != nil {
// fmt.Printf("error with sending: %v", err)
// return err
// }
// }

p := &DataMessage{
Key: key,
Value: buf.Bytes(),
}
return nil
// buf := new(bytes.Buffer)
// tee := io.TeeReader(r, buf)

// if err := fs.store.Write(key, tee); err != nil {
// return err
// }

// // the reader is empty now

return fs.broadcast(&Message{
From: "TODO",
Payload: p,
})
// fmt.Println(buf.Bytes())

// p := &DataMessage{
// Key: key,
// Value: buf.Bytes(),
// }

// return fs.broadcast(&Message{
// From: "TODO",
// Payload: p,
// })
}

func (fs *FileServer) Stop() {
Expand Down Expand Up @@ -119,26 +148,41 @@ func (fs *FileServer) loop() {
case msg := <-fs.Transport.Consume():
var m Message
if err := gob.NewDecoder(bytes.NewReader(msg.Payload)).Decode(&m); err != nil {
fmt.Printf("error with decoder: %v", err)
log.Fatal(err)
}
fmt.Printf("received message: %+v\n", m.Payload)

if err := fs.handleMessage(&m); err != nil {
log.Printf("error handling message: %s", err)
peer, ok := fs.peers[msg.From]
if !ok {
panic("peer not found in peer map")
}

b := make([]byte, 1028)
if _, err := peer.Read(b); err != nil {
panic(err)
}

peer.Streamed()

fmt.Printf("received message: %v\n", string(b))
// if err := fs.handleMessage(&m); err != nil {
// log.Printf("error handling message: %s", err)
// }
case <-fs.quitch:
return
}
}
}

func (fs *FileServer) handleMessage(msg *Message) error {
switch m := msg.Payload.(type) {
case *DataMessage:
return fs.StoreData(m.Key, bytes.NewReader(m.Value))
default:
return fmt.Errorf("unknown message type: %T", msg.Payload)
}
}
// func (fs *FileServer) handleMessage(msg *Message) error {
// switch m := msg.Payload.(type) {
// case *DataMessage:
// return fs.StoreData(m.Key, bytes.NewReader(m.Value))
// default:
// return fmt.Errorf("unknown message type: %T", msg.Payload)
// }
// }

func (fs *FileServer) bootstrapNetwork() error {
for _, addr := range fs.BootstrapNodes {
Expand Down Expand Up @@ -172,3 +216,7 @@ func (fs *FileServer) Start() error {
func (fs *FileServer) Close() error {
return fs.store.Close()
}

func init() {
gob.Register(MessageStoreFile{})
}
7 changes: 5 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ func main() {
time.Sleep(2 * time.Second)

data := bytes.NewReader([]byte("It's a huge file"))
log.Fatal(s2.StoreData("test", data))

if err := s2.StoreData("test", data); err != nil {
fmt.Printf("main error: %s", err)
log.Fatal(err)
}
fmt.Println("s2 has sent message to peers")
select {}
}
2 changes: 1 addition & 1 deletion p2p/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func (dec GOBDecoder) Decode(r io.Reader, rpc *RPC) error {
type NOPDecoder struct{}

func (dec NOPDecoder) Decode(r io.Reader, rpc *RPC) error {
buf := make([]byte, 1024)
buf := make([]byte, 1028)
n, err := r.Read(buf)
if err != nil {
return err
Expand Down
11 changes: 11 additions & 0 deletions p2p/tcp_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ type TCPPeer struct {
// if we dial and retrieve a connection we are an outbound peer
// if we accept and retrieve a connection we are an inbound peer
outbound bool

Wg *sync.WaitGroup
}

func NewTCPPeer(conn net.Conn, outbound bool) *TCPPeer {
return &TCPPeer{
Conn: conn,
outbound: outbound,
Wg: &sync.WaitGroup{},
}
}

Expand All @@ -39,6 +42,10 @@ func (tp *TCPPeer) Send(payload []byte) error {
return nil
}

func (tp *TCPPeer) Streamed() {
tp.Wg.Done()
}

type TCPTransportOptions struct {
ListenAddr string
HandshakeFunc HandshakeFunc
Expand Down Expand Up @@ -146,7 +153,11 @@ func (t *TCPTransport) handleConn(conn net.Conn, outbound bool) {
}

rpc.From = conn.RemoteAddr()
peer.Wg.Add(1)
fmt.Println("waiting till stream is done")
t.rpcch <- rpc
peer.Wg.Wait()
fmt.Println("stream is done")
// write to the connection
// close the connection
}
Expand Down
1 change: 1 addition & 0 deletions p2p/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "net"
// remote nodes in the network.
type Peer interface {
net.Conn
Streamed()
Send([]byte) error
}

Expand Down

0 comments on commit b766b2f

Please sign in to comment.