diff --git a/go-libp2p-kad-dht b/go-libp2p-kad-dht index 3d18c6b..6b49032 160000 --- a/go-libp2p-kad-dht +++ b/go-libp2p-kad-dht @@ -1 +1 @@ -Subproject commit 3d18c6b2c73dae3977a9fddb75d4ac1427016c14 +Subproject commit 6b490320a6c1b70eba2031260a2515c26e7519fe diff --git a/pkg/cid-source/http_server.go b/pkg/cid-source/http_server.go index e42deb1..3867262 100644 --- a/pkg/cid-source/http_server.go +++ b/pkg/cid-source/http_server.go @@ -18,79 +18,84 @@ import ( ) type HttpCidSource struct { - port int - hostname string - lock sync.Mutex - server *http.Server - providerRecords []ProviderRecords - isStarted bool + port int + hostname string + lock sync.Mutex + server *http.Server + //receive cids from provider and send them to the discoverer interface + providerRecordsChannel chan ProviderRecords + isStarted bool } -func (httpCidSource *HttpCidSource) Dequeue() ProviderRecords { +/* func (httpCidSource *HttpCidSource) Dequeue() ProviderRecords { + httpCidSource.lock.Lock() + defer httpCidSource.lock.Unlock() if len(httpCidSource.providerRecords) == 0 { log.Debug("Queue is empty") return ProviderRecords{} } elem := httpCidSource.providerRecords[0] httpCidSource.providerRecords = httpCidSource.providerRecords[1:] - log.Debugf("Removed element from queue, lenth is now: %d", len(httpCidSource.providerRecords)) + log.Debugf("Removed element from queue, length is now: %d", len(httpCidSource.providerRecords)) return elem } func (httpCidSource *HttpCidSource) Enqueue(providerRecords ProviderRecords) { + httpCidSource.lock.Lock() + defer httpCidSource.lock.Unlock() httpCidSource.providerRecords = append(httpCidSource.providerRecords, providerRecords) log.Debugf("Added new element to queue, length is now: %d", len(httpCidSource.providerRecords)) -} +} */ func NewHttpCidSource(port int, hostname string) *HttpCidSource { return &HttpCidSource{ - port: port, - hostname: hostname, - server: nil, - isStarted: false, - providerRecords: []ProviderRecords{}, + port: port, + hostname: hostname, + server: nil, + isStarted: false, + providerRecordsChannel: make(chan ProviderRecords, 10), } } func (httpCidSource *HttpCidSource) ServeHTTP(w http.ResponseWriter, r *http.Request) { log.Info("Received request in server HTTP") - if r.URL.Path == "/ProviderRecord" { - if r.Method == http.MethodPost { - log.Debug("The request was a post method") - // create a new TrackableCid instance - var providerRecords ProviderRecords - - // decode the request body into the TrackableCid struct - err := json.NewDecoder(r.Body).Decode(&providerRecords) - if err == nil { - log.Info("Decoded new encapsulated json received from post") - - // add the trackableCid to the list - httpCidSource.Enqueue(providerRecords) - } else { - - http.Error(w, "Error decoding request body", http.StatusBadRequest) - } - - } else if r.Method == http.MethodGet { - log.Debug("The request was a get request") - // check if there are any trackableCids to return - if len(httpCidSource.providerRecords) != 0 { - // return the last unretrieved trackableCid + if r.Method == http.MethodPost { + log.Debug("The request was a post method") + // create a new TrackableCid instance + var providerRecords ProviderRecords + + // decode the request body into the TrackableCid struct + err := json.NewDecoder(r.Body).Decode(&providerRecords) + if err == nil { + log.Info("Decoded new encapsulated json received from post") + + //send the trackable cid over the channel received in method getNewCid + httpCidSource.providerRecordsChannel <- providerRecords + } else { + http.Error(w, "Error decoding request body", http.StatusBadRequest) + } + } else { + // return "Method Not Allowed" error + http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + } /* else if r.Method == http.MethodGet { + log.Debug("The request was a get request") + // check if there are any EncapsulatedJSONProviderRecords to return + if len(httpCidSource.providerRecords) != 0 { + // return the last unretrieved EncapsulatedJSONProviderRecords + var providerRecordsArray []ProviderRecords + // empty the trackable cid queue + for len(httpCidSource.providerRecords) != 0 { providerRecords := httpCidSource.Dequeue() log.Info("Sending new encapsulated json cid to user with get method") // send the trackableCid back to the client as a response - json.NewEncoder(w).Encode(providerRecords) - } else { - - http.Error(w, "No record available currently", http.StatusNoContent) + providerRecordsArray = append(providerRecordsArray, providerRecords) } - + json.NewEncoder(w).Encode(providerRecordsArray) } else { - // return "Method Not Allowed" error - http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + http.Error(w, "No record available currently", http.StatusNoContent) } - } + + } */ } @@ -108,42 +113,6 @@ func (httpCidSource *HttpCidSource) StartServer() error { Handler: httpCidSource, } - /* http.HandleFunc("/ProviderRecord", func(w http.ResponseWriter, r *http.Request) { - log.Info("Set handler for requests") - if r.Method == http.MethodPost { - // create a new TrackableCid instance - var providerRecords ProviderRecords - - // decode the request body into the TrackableCid struct - err := json.NewDecoder(r.Body).Decode(&providerRecords) - if err == nil { - log.Info("Decoded new encapsulated json received from post") - - // add the trackableCid to the list - httpCidSource.Enqueue(providerRecords) - } else { - - http.Error(w, "Error decoding request body", http.StatusBadRequest) - } - - } else if r.Method == http.MethodGet { - // check if there are any trackableCids to return - if len(httpCidSource.providerRecords) != 0 { - // return the last unretrieved trackableCid - providerRecords := httpCidSource.Dequeue() - log.Info("Sending new encapsulated json cid to user with get method") - // send the trackableCid back to the client as a response - json.NewEncoder(w).Encode(providerRecords) - } else { - - http.Error(w, "No record available currently", http.StatusNoContent) - } - - } else { - // return "Method Not Allowed" error - http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) - } - }) */ if err := httpCidSource.server.ListenAndServe(); err != nil { if err == http.ErrServerClosed { log.Infof("Server closed under request: %v", err) @@ -155,6 +124,7 @@ func (httpCidSource *HttpCidSource) StartServer() error { return nil } +//Gracefully shutdowns http server. Call after the provider has finished providing the cids func (httpCidSource *HttpCidSource) Shutdown(ctx context.Context) error { httpCidSource.lock.Lock() defer httpCidSource.lock.Unlock() @@ -168,6 +138,8 @@ func (httpCidSource *HttpCidSource) Shutdown(ctx context.Context) error { // We can use .Shutdown to gracefully shuts down the server without // interrupting any active connection httpCidSource.server.Shutdown(ctx) + close(httpCidSource.providerRecordsChannel) + log.Info("Shutted down http server") stop <- true }() @@ -177,102 +149,102 @@ func (httpCidSource *HttpCidSource) Shutdown(ctx context.Context) error { break case <-stop: log.Infof("Finished and shutting down http server") + break } return nil } -func GetNewHttpCid(source interface{}) ([]TrackableCid, error) { +func GetNewHttpCid(source interface{}, trackableCidArrayC chan<- []TrackableCid) ([]TrackableCid, error) { httpCidSource, ok := source.(*HttpCidSource) if !ok { return []TrackableCid{}, fmt.Errorf("Invalid source type: %T", source) } - // send a GET request to the server - url := fmt.Sprintf("http://%s:%d/ProviderRecord", httpCidSource.hostname, httpCidSource.port) - resp, err := http.Get(url) - if err != nil { - return []TrackableCid{}, err - } - defer resp.Body.Close() - - // check the status code - if resp.StatusCode == http.StatusNoContent { - return []TrackableCid{}, errors.New(fmt.Sprintf("Error while retrieving new cid from stack: %s", resp.Status)) - } else if resp.StatusCode != http.StatusOK { - return []TrackableCid{}, errors.New(fmt.Sprintf("Error retrieving trackableCid: %s", resp.Status)) - } - - // decode the response into a EncapsulatedJSONProviderRecord struct - var providerRecords ProviderRecords - err = json.NewDecoder(resp.Body).Decode(&providerRecords) - if err != nil { - return []TrackableCid{}, errors.Wrap(err, " while decoding trackable cid") - } - - if reflect.DeepEqual(providerRecords, ProviderRecords{}) { - return nil, errors.New("ended providing") - } - - var trackableCidPrs []TrackableCid - - for _, providerRecord := range providerRecords.EncapsulatedJSONProviderRecords { - log.Debug("Read a new PR from the web server:") - - log.Debugf("It's cid is: %s", providerRecord.CID) - newCid, err := cid.Parse(providerRecord.CID) - if err != nil { - log.Errorf("could not convert string to cid %s", err) - } - - log.Debugf("It's peer id is: %s", providerRecord.ID) - newPid, err := peer.Decode(providerRecord.ID) - if err != nil { - log.Errorf("could not convert string to pid %s", err) - } - - log.Debugf("It's creator is: %s", providerRecord.Creator) - newCreator, err := peer.Decode(providerRecord.Creator) - if err != nil { - log.Errorf("could not convert string to creator pid %s", err) - } - - log.Debugf("It's provide time is: %s", providerRecord.ProvideTime) - newProvideTime, err := time.ParseDuration(providerRecord.ProvideTime) - - if err != nil { - log.Errorf("Error while parsing provide time: %s", err) - } - - log.Debugf("It's publication time is: %s", providerRecord.PublicationTime) - newPublicationTime, err := time.Parse(time.RFC3339, providerRecord.PublicationTime) - - if err != nil { - log.Errorf("Error while parsing publication time: %s", err) - } - - log.Debugf("It's user agent is: %s", providerRecord.UserAgent) + log.Debug("Starting to listen for CIDs received from post request through channel.") + for { + select { + //channel filled up using post method + case providerRecords, ok := <-httpCidSource.providerRecordsChannel: + if !ok { + //return both nil to end the method + log.Debug("Received not ok message in http server from channel") + return nil, nil + } - multiaddresses := make([]ma.Multiaddr, 0) - for i := 0; i < len(providerRecord.Addresses); i++ { - multiaddr, err := ma.NewMultiaddr(providerRecord.Addresses[i]) - if err != nil { - //log.Errorf("could not convert string to multiaddress %s", err) - continue + if reflect.DeepEqual(providerRecords, nil) { + //return both nil to end the method + log.Debug("Received empty provider records from channel") + return nil, nil } - multiaddresses = append(multiaddresses, multiaddr) - } - log.Infof("generated new CID %s", newCid.Hash().B58String()) + // is an array of trackableCids that are created from encapsulated provider records + var trackableCidArray []TrackableCid + + for _, providerRecord := range providerRecords.EncapsulatedJSONProviderRecords { + log.Debug("Read a new PR from the web server:") + + log.Debugf("It's cid is: %s", providerRecord.CID) + newCid, err := cid.Parse(providerRecord.CID) + if err != nil { + log.Errorf("could not convert string to cid %s", err) + continue + } + + log.Debugf("It's peer id is: %s", providerRecord.ID) + newPid, err := peer.Decode(providerRecord.ID) + if err != nil { + log.Errorf("could not convert string to pid %s", err) + continue + } + + log.Debugf("It's creator is: %s", providerRecord.Creator) + newCreator, err := peer.Decode(providerRecord.Creator) + if err != nil { + log.Errorf("could not convert string to creator pid %s", err) + continue + } + + log.Debugf("It's provide time is: %s", providerRecord.ProvideTime) + newProvideTime, err := time.ParseDuration(providerRecord.ProvideTime) + + if err != nil { + log.Errorf("Error while parsing provide time: %s", err) + continue + } + + log.Debugf("It's publication time is: %s", providerRecord.PublicationTime) + newPublicationTime, err := time.Parse(time.RFC3339, providerRecord.PublicationTime) + + if err != nil { + log.Errorf("Error while parsing publication time: %s", err) + continue + } + + log.Debugf("It's user agent is: %s", providerRecord.UserAgent) + + multiaddresses := make([]ma.Multiaddr, 0) + for i := 0; i < len(providerRecord.Addresses); i++ { + multiaddr, err := ma.NewMultiaddr(providerRecord.Addresses[i]) + if err != nil { + log.Errorf("could not convert string to multiaddress %s", err) + continue + } + multiaddresses = append(multiaddresses, multiaddr) + } + + log.Infof("generated new CID %s", newCid.Hash().B58String()) + + log.Infof("Read a new provider ID %s.The multiaddresses are %v. The creator is %s. The new CID is %s", string(newPid), multiaddresses, newCreator, newCid) + trackableCid := NewTrackableCid(newPid, newCid, newCreator, multiaddresses, newPublicationTime, newProvideTime, providerRecord.UserAgent) + trackableCidArray = append(trackableCidArray, trackableCid) + } + trackableCidArrayC <- trackableCidArray + default: - log.Infof("Read a new provider ID %s.The multiaddresses are %v. The creator is %s. The new CID is %s", string(newPid), multiaddresses, newCreator, newCid) - trackableCid := NewTrackableCid(newPid, newCid, newCreator, multiaddresses, newPublicationTime, newProvideTime, providerRecord.UserAgent) - trackableCidPrs = append(trackableCidPrs, trackableCid) + } } - - return trackableCidPrs, nil - } func (httpCidSource *HttpCidSource) GetNewCid() (TrackableCid, error) { diff --git a/pkg/cid-source/http_server_test.go b/pkg/cid-source/http_server_test.go index fbe7459..66dc9ee 100644 --- a/pkg/cid-source/http_server_test.go +++ b/pkg/cid-source/http_server_test.go @@ -2,8 +2,9 @@ package cid_source import ( "bytes" + "context" "encoding/json" - "fmt" + "errors" "ipfs-cid-hoarder/pkg/config" "ipfs-cid-hoarder/pkg/models" "ipfs-cid-hoarder/pkg/p2p" @@ -32,7 +33,7 @@ func PostRequestProviders() { "/ip6/::1/udp/4001/quic", "/ip6/2602:ff16:6:0:1:1c1:0:1/udp/4001/quic", "/ip4/89.233.108.3/udp/4001/quic", "/ip4/127.0.0.1/tcp/4001", "/ip4/89.233.108.3/tcp/4001"}, "QmWCmp2w4MVvuWSwfYJyzDBNJxmub5mccYsSEhmKMq1zfW", - "0s", + "2023-01-16T14:04:42+02:00", "0s", "go-ipfs/0.7.0/", ), @@ -43,7 +44,7 @@ func PostRequestProviders() { "/ip6/2a01:4f9:c010:d4d4::1/udp/4001/quic", "/ip6/::1/udp/4001/quic", "/ip4/65.21.63.62/udp/4001/quic", "/ip4/65.21.63.62/tcp/4001", "/ip4/127.0.0.1/tcp/4001", "/ip6/::1/tcp/4001", "/ip4/127.0.0.1/udp/4001/quic"}, "QmWCmp2w4MVvuWSwfYJyzDBNJxmub5mccYsSEhmKMq1zfW", - "0s", + "2023-01-16T14:04:42+02:00", "0s", "go-ipfs/0.8.0/", ), @@ -56,7 +57,7 @@ func PostRequestProviders() { []string{"/ip4/45.63.7.28/tcp/4001", "/ip4/127.0.0.1/udp/4001/quic", "/ip4/45.63.7.28/udp/4001/quic", "/ip6/::1/udp/4001/quic", "/ip6/::1/tcp/4001", "/ip4/127.0.0.1/tcp/4001"}, "QmWCmp2w4MVvuWSwfYJyzDBNJxmub5mccYsSEhmKMq1zfW", - "0s", + "2023-01-16T14:04:42+02:00", "0s", "kubo/0.14.0/e0fabd6", ), @@ -66,7 +67,7 @@ func PostRequestProviders() { []string{"/ip4/127.0.0.1/udp/4001/quic", "/ip4/165.227.164.94/tcp/4001", "/ip6/64:ff9b::a5e3:a45e/udp/4001/quic", "/ip6/::1/udp/4001/quic", "/ip4/127.0.0.1/tcp/4001", "/ip6/::1/tcp/4001", "/ip4/165.227.164.94/udp/4001/quic"}, "QmWCmp2w4MVvuWSwfYJyzDBNJxmub5mccYsSEhmKMq1zfW", - "0s", + "2023-01-16T14:04:42+02:00", "0s", "go-ipfs/0.7.0/", ), @@ -77,7 +78,7 @@ func PostRequestProviders() { if err != nil { log.Errorf("Error marshalling provider records for cid: %s", err) } - req, err := http.NewRequest("POST", "http://localhost:8080/ProviderRecord", bytes.NewReader(data)) + req, err := http.NewRequest("POST", "http://localhost:8080/", bytes.NewReader(data)) if err != nil { log.Errorf("Error creating POST request: %s", err) } @@ -92,7 +93,7 @@ func PostRequestProviders() { if err != nil { log.Errorf("Error marshalling provider records for cid: %s", err) } - req, err = http.NewRequest("POST", "http://localhost:8080/ProviderRecord", bytes.NewReader(data)) + req, err = http.NewRequest("POST", "http://localhost:8080/", bytes.NewReader(data)) if err != nil { log.Errorf("Error creating POST request: %s", err) } @@ -103,11 +104,11 @@ func PostRequestProviders() { } // create a POST request - data, err = json.Marshal(ProviderRecords{}) + data, err = json.Marshal(nil) if err != nil { log.Errorf("Error marshalling provider records for cid: %s", err) } - req, err = http.NewRequest("POST", "http://localhost:8080/ProviderRecord", bytes.NewReader(data)) + req, err = http.NewRequest("POST", "http://localhost:8080/", bytes.NewReader(data)) if err != nil { log.Errorf("Error creating POST request: %s", err) } @@ -117,72 +118,132 @@ func PostRequestProviders() { log.Errorf("Error sending POST request: %s", err) } + /* // create a POST request + data, err = json.Marshal(nil) + if err != nil { + log.Errorf("Error marshalling provider records for cid: %s", err) + } + req, err = http.NewRequest("POST", "http://localhost:8080/", bytes.NewReader(data)) + if err != nil { + log.Errorf("Error creating POST request: %s", err) + } + // send the post request + _, err = http.DefaultClient.Do(req) + if err != nil { + log.Errorf("Error sending POST request: %s", err) + } */ + } -func GetRequest(httpSource *HttpCidSource) error { - trackableCids, err := GetNewHttpCid(httpSource) +func GetCidFromChannel(httpSource *HttpCidSource) error { - if trackableCids == nil { + trackableCidsChannel := make(chan []TrackableCid, 10) - return nil - } + go GetNewHttpCid(httpSource, trackableCidsChannel) + counter := 0 + for { + select { + case trackableCids, ok := <-trackableCidsChannel: + if !ok { + log.Debug("Received not ok message from channel") + return errors.New("Received not ok message from channel") + } + if trackableCids == nil { + log.Debug("Received nil trackable CIDs from channel") + return errors.New("Received nil trackable CIDs from channel") + } - if err != nil { - fmt.Errorf("Error %s while getting new cid", err) - return err - } - tr := trackableCids[0] - cidStr := tr.CID.Hash().B58String() + tr := trackableCids[0] + cidStr := tr.CID.Hash().B58String() - log.Debugf( - "New provide and CID received from channel. Cid:%s,Pid:%s,Mutliaddresses:%v,ProvideTime:%s,UserAgent:%s", - cidStr, tr.ID.String(), - tr.Addresses, tr.ProvideTime, tr.UserAgent, - ) + log.Debugf( + "New trackable CID array received from http channel. Cid:%s,ProvideTime:%s,PublicationTime:%s,Creator:%s. It's number is %d", + cidStr, tr.ProvideTime, tr.PublicationTime, tr.Creator, counter, + ) + counter++ - //the starting values for the discoverer - cidIn, err := cid.Parse(cidStr) + //the starting values for the discoverer + cidIn, err := cid.Parse(cidStr) - if err != nil { - log.Errorf("couldnt parse cid") - } + if err != nil { + log.Errorf("couldnt parse cid") + } + //dummy values for test + pingInterval, _ := time.ParseDuration("30m") - cidInfo := models.NewCidInfo(cidIn, 0, 0, config.JsonFileSource, - config.HttpServerSource, tr.Creator) - fetchRes := models.NewCidFetchResults(cidIn, 0) - - // generate a new CidFetchResults - //TODO starting data for the discoverer - fetchRes.TotalHops = 0 - fetchRes.HopsToClosest = 0 - for _, trackableCid := range trackableCids { - - cidInfo.AddProvideTime(trackableCid.ProvideTime) - - //TODO discoverer starting ping res - pingRes := models.NewPRPingResults( - cidIn, - trackableCid.ID, - //the below are starting data for the discoverer - 0, - time.Time{}, - 0, - true, - true, - p2p.NoConnError, - ) - cidInfo.AddCreator(trackableCid.Creator) - fetchRes.AddPRPingResults(pingRes) - - prHolderInfo := models.NewPeerInfo( - trackableCid.ID, - trackableCid.Addresses, - trackableCid.UserAgent, - ) - - cidInfo.AddPRHolder(prHolderInfo) + studyDuration, _ := time.ParseDuration("24h") + + cidInfo := models.NewCidInfo(cidIn, pingInterval, studyDuration, config.JsonFileSource, + "http-server", "") + + cidInfo.AddPublicationTime(tr.PublicationTime) + cidInfo.AddProvideTime(tr.ProvideTime) + cidInfo.AddCreator(tr.Creator) + + fetchRes := models.NewCidFetchResults(cidIn, 0) + + // generate a new CidFetchResults + //TODO starting data for the discoverer + fetchRes.TotalHops = 0 + fetchRes.HopsToClosest = 0 + for _, trackableCid := range trackableCids { + log.Debugf( + "For looping the trackable CID array for trackable CID: %d. The peer ID is: %s. The peer Multiaddresses are: %v. The user agent is: %s ", + counter-1, trackableCid.ID.String(), trackableCid.Addresses, trackableCid.UserAgent) + /* err := addPeerToProviderStore(ctx, discoverer.host, trackableCid.ID, trackableCid.CID, trackableCid.Addresses) + if err != nil { + log.Errorf("error %s calling addpeertoproviderstore method", err) + } else { + log.Debug("Added providers to provider store") + } + + err = addAgentVersionToProvideStore(discoverer.host, trackableCid.ID, trackableCid.UserAgent) + + if err != nil { + log.Errorf("error %s calling addAgentVersionToProvideStore", err) + } else { + log.Debug("Added agent version to provider store") + } */ + + //TODO discoverer starting ping res + pingRes := models.NewPRPingResults( + cidIn, + trackableCid.ID, + //the below are starting data for the discoverer + 0, + time.Time{}, + 0, + true, + true, + p2p.NoConnError, + ) + + /* log.Debugf("User agent received from provider store: %s", discoverer.host.GetUserAgentOfPeer(trackableCid.ID)) */ + + prHolderInfo := models.NewPeerInfo( + trackableCid.ID, + trackableCid.Addresses, + trackableCid.UserAgent, + ) + + cidInfo.AddPRHolder(prHolderInfo) + fetchRes.AddPRPingResults(pingRes) + + } + cidInfo.AddPRFetchResults(fetchRes) + + tot, success, failed := cidInfo.GetFetchResultSummaryOfRound(0) + if tot < 0 { + log.Warnf("no ping results for the PR provide round of Cid %s", cidInfo.CID.Hash().B58String()) + } else { + log.Infof("Cid %s - %d total PRHolders | %d successfull PRHolders | %d failed PRHolders", + cidIn, tot, success, failed) + } + default: + //log.Debug("haven't received anything yet") + } } - cidInfo.AddPRFetchResults(fetchRes) + return nil } @@ -192,16 +253,9 @@ func TestGetRequest(t *testing.T) { go httpSource.StartServer() PostRequestProviders() - err := GetRequest(httpSource) - if err != nil { - t.Errorf("%s", err) - } - err = GetRequest(httpSource) - if err != nil { + _ = GetCidFromChannel(httpSource) + /* if err != nil { t.Errorf("%s", err) - } - err = GetRequest(httpSource) - if err != nil { - t.Errorf("%s", err) - } + } */ + httpSource.Shutdown(context.TODO()) } diff --git a/pkg/hoarder/discoverer.go b/pkg/hoarder/discoverer.go index 8e78bd9..9246f0b 100644 --- a/pkg/hoarder/discoverer.go +++ b/pkg/hoarder/discoverer.go @@ -36,13 +36,24 @@ func NewCidDiscoverer(tracker *CidTracker) (*CidDiscoverer, error) { func (discoverer *CidDiscoverer) httpRun() { trackableCidsChannel := make(chan []src.TrackableCid, discoverer.Workers) // CID generator - var genWG sync.WaitGroup - genWG.Add(1) - go discoverer.generateCidsHttp(&genWG, trackableCidsChannel) + /* var genWG sync.WaitGroup + genWG.Add(1) */ + /* go discoverer.generateCidsHttp(&genWG, trackableCidsChannel)*/ + + // Receives provider records from http server + // they are received in form + //[trackableCids{ pr1, pr2, pr3, pr4}, + //trackableCids{ pr1, pr2, pr3, pr4}, + //trackableCids{ pr1, pr2, pr3, pr4}] + // for different cids each + go src.GetNewHttpCid(discoverer.CidSource, trackableCidsChannel) + var addProviderWG sync.WaitGroup + addProviderWG.Add(1) go discoverer.addProviderRecordsHttp(&addProviderWG, trackableCidsChannel) - genWG.Wait() + /* genWG.Wait() */ addProviderWG.Wait() + go discoverer.httpSource.Shutdown(discoverer.ctx) err := discoverer.host.Close() if err != nil { log.Errorf("failed to close host: %s", err) @@ -92,6 +103,7 @@ func (discoverer *CidDiscoverer) run() { func (discoverer *CidDiscoverer) addToMap(trackableCid *src.TrackableCid) { cidStr := trackableCid.CID.Hash().B58String() + //if cid entry exists append new trackable cid received. if typeInstance, ok := discoverer.CidMap[cidStr]; ok { discoverer.CidMap[cidStr] = append(typeInstance, trackableCid) } else { @@ -106,12 +118,13 @@ func (discoverer *CidDiscoverer) addProvider(addProviderWG *sync.WaitGroup, trac ctx := discoverer.ctx for { select { + //receive trackable cids from generateCids interface method case trackableCid, ok := <-trackableCidC: if !ok { - break + return } if trackableCid.IsEmpty() { - break + return } cidStr := trackableCid.CID.Hash().B58String() @@ -122,15 +135,16 @@ func (discoverer *CidDiscoverer) addProvider(addProviderWG *sync.WaitGroup, trac trackableCid.Addresses, trackableCid.ProvideTime, trackableCid.UserAgent, ) discoverer.m.Lock() + //adds to map to be later for looped (collects all of the provider records for a specific CID) discoverer.addToMap(trackableCid) - + //not exactly useful because we run the pinger on a seperate host err := addPeerToProviderStore(ctx, discoverer.host, trackableCid.ID, trackableCid.CID, trackableCid.Addresses) if err != nil { log.Errorf("error %s calling addpeertoproviderstore method", err) } else { log.Debug("Added providers to provider store") } - + //not exactly useful because we run the pinger on a seperate host err = addAgentVersionToProvideStore(discoverer.host, trackableCid.ID, trackableCid.UserAgent) if err != nil { @@ -153,24 +167,28 @@ func (discoverer *CidDiscoverer) addProvider(addProviderWG *sync.WaitGroup, trac func (discoverer *CidDiscoverer) addProviderRecordsHttp(addProviderWG *sync.WaitGroup, trackableCidsChannel <-chan []src.TrackableCid) { defer addProviderWG.Done() ctx := discoverer.ctx + counter := 0 for { select { + //receives a trackable cids array from the getNewCidHttp method when a post request is received. case trackableCids, ok := <-trackableCidsChannel: if !ok { - break + log.Debug("Received not ok message from channel") + return } if trackableCids == nil { - break + log.Debug("Received nil trackable CIDs from channel") + return } tr := trackableCids[0] cidStr := tr.CID.Hash().B58String() log.Debugf( - "New provide and CID received from channel. Cid:%s,Pid:%s,Mutliaddresses:%v,ProvideTime:%s,UserAgent:%s", - cidStr, tr.ID.String(), - tr.Addresses, tr.ProvideTime, tr.UserAgent, + "New trackable CID array received from http channel. Cid:%s,ProvideTime:%s,PublicationTime:%s,Creator:%s. It's number is %d", + cidStr, tr.ProvideTime, tr.PublicationTime, tr.Creator, counter, ) + counter++ //the starting values for the discoverer cidIn, err := cid.Parse(cidStr) @@ -181,6 +199,11 @@ func (discoverer *CidDiscoverer) addProviderRecordsHttp(addProviderWG *sync.Wait cidInfo := models.NewCidInfo(cidIn, discoverer.ReqInterval, discoverer.StudyDuration, config.JsonFileSource, discoverer.CidSource.Type(), "") + + cidInfo.AddPublicationTime(tr.PublicationTime) + cidInfo.AddProvideTime(tr.ProvideTime) + cidInfo.AddCreator(tr.Creator) + fetchRes := models.NewCidFetchResults(cidIn, 0) // generate a new CidFetchResults @@ -188,8 +211,10 @@ func (discoverer *CidDiscoverer) addProviderRecordsHttp(addProviderWG *sync.Wait fetchRes.TotalHops = 0 fetchRes.HopsToClosest = 0 for _, trackableCid := range trackableCids { - - err := addPeerToProviderStore(ctx, discoverer.host, trackableCid.ID, trackableCid.CID, trackableCid.Addresses) + log.Debugf( + "For looping the trackable CID array for trackable CID: %d. The peer ID is: %s. The peer Multiaddresses are: %v. The user agent is: %s ", + counter-1, trackableCid.ID.String(), trackableCid.Addresses, trackableCid.UserAgent) + /* err := addPeerToProviderStore(ctx, discoverer.host, trackableCid.ID, trackableCid.CID, trackableCid.Addresses) if err != nil { log.Errorf("error %s calling addpeertoproviderstore method", err) } else { @@ -202,9 +227,7 @@ func (discoverer *CidDiscoverer) addProviderRecordsHttp(addProviderWG *sync.Wait log.Errorf("error %s calling addAgentVersionToProvideStore", err) } else { log.Debug("Added agent version to provider store") - } - cidInfo.AddPublicationTime(trackableCid.PublicationTime) - cidInfo.AddProvideTime(trackableCid.ProvideTime) + } */ //TODO discoverer starting ping res pingRes := models.NewPRPingResults( @@ -218,21 +241,29 @@ func (discoverer *CidDiscoverer) addProviderRecordsHttp(addProviderWG *sync.Wait true, p2p.NoConnError, ) - cidInfo.AddCreator(trackableCid.Creator) - fetchRes.AddPRPingResults(pingRes) - log.Debugf("User agent received from provider store: %s", discoverer.host.GetUserAgentOfPeer(trackableCid.ID)) + /* log.Debugf("User agent received from provider store: %s", discoverer.host.GetUserAgentOfPeer(trackableCid.ID)) */ prHolderInfo := models.NewPeerInfo( trackableCid.ID, - discoverer.host.Peerstore().Addrs(trackableCid.ID), - discoverer.host.GetUserAgentOfPeer(trackableCid.ID), + trackableCid.Addresses, + trackableCid.UserAgent, ) cidInfo.AddPRHolder(prHolderInfo) + fetchRes.AddPRPingResults(pingRes) + } cidInfo.AddPRFetchResults(fetchRes) + tot, success, failed := cidInfo.GetFetchResultSummaryOfRound(0) + if tot < 0 { + log.Warnf("no ping results for the PR provide round of Cid %s", cidInfo.CID.Hash().B58String()) + } else { + log.Infof("Cid %s - %d total PRHolders | %d successfull PRHolders | %d failed PRHolders", + cidIn, tot, success, failed) + } + discoverer.DBCli.AddCidInfo(cidInfo) discoverer.DBCli.AddFetchResult(fetchRes) @@ -248,6 +279,7 @@ func (discoverer *CidDiscoverer) addProviderRecordsHttp(addProviderWG *sync.Wait } // This method essentially initializes the data for the pinger to be able to get information about the PR holders later. +// Run for each CID received by generateCids which is stored inside the map. func (discoverer *CidDiscoverer) discoveryProcess(discovererWG *sync.WaitGroup, cidstr string, trackableCidArr []*src.TrackableCid) { defer discovererWG.Done() //the starting values for the discoverer @@ -261,13 +293,12 @@ func (discoverer *CidDiscoverer) discoveryProcess(discovererWG *sync.WaitGroup, fetchRes := models.NewCidFetchResults(cidIn, 0) // generate a new CidFetchResults - //TODO starting data for the discoverer fetchRes.TotalHops = 0 fetchRes.HopsToClosest = 0 for _, val := range trackableCidArr { cidInfo.AddPublicationTime(val.PublicationTime) cidInfo.AddProvideTime(val.ProvideTime) - //TODO discoverer starting ping res + //discoverer starting ping results != publisher where we can have the actual data pingRes := models.NewPRPingResults( cidIn, val.ID, @@ -315,7 +346,7 @@ func addAddrtoPeerstore(h host.Host, pid peer.ID, multiaddr []ma.Multiaddr) { } */ -// Instead of adding directly to peerstore the API is the following +//DEPRECATED Instead of adding directly to peerstore the API is the following func addPeerToProviderStore(ctx context.Context, h *p2p.Host, pid peer.ID, cid cid.Cid, multiaddr []ma.Multiaddr) error { keyMH := cid.Hash() err := h.DHT.ProviderStore().AddProvider(ctx, keyMH, peer.AddrInfo{ID: pid, Addrs: multiaddr}) @@ -325,6 +356,7 @@ func addPeerToProviderStore(ctx context.Context, h *p2p.Host, pid peer.ID, cid c return nil } +//DEPRECATED func addAgentVersionToProvideStore(h *p2p.Host, pid peer.ID, useragent string) error { return h.Peerstore().Put(pid, "AgentVersion", useragent) } diff --git a/pkg/hoarder/pinger.go b/pkg/hoarder/pinger.go index c6c3f75..9e12a93 100644 --- a/pkg/hoarder/pinger.go +++ b/pkg/hoarder/pinger.go @@ -386,7 +386,7 @@ func (pinger *CidPinger) runPinger(wg *sync.WaitGroup, pingerID int) { }(pinger, cidInfo, cidFetchRes) wg.Add(1) - // recalculate the closest k peers to the content. This needs to be done due to node churn.(?) + // calculate the closest k peers to the content. go func(p *CidPinger, c *models.CidInfo, fetchRes *models.CidFetchResults) { defer wg.Done() t := time.Now() diff --git a/pkg/hoarder/tracker.go b/pkg/hoarder/tracker.go index 52fc938..5266d59 100644 --- a/pkg/hoarder/tracker.go +++ b/pkg/hoarder/tracker.go @@ -8,7 +8,6 @@ import ( log "github.com/sirupsen/logrus" src "ipfs-cid-hoarder/pkg/cid-source" - "ipfs-cid-hoarder/pkg/config" "ipfs-cid-hoarder/pkg/db" "ipfs-cid-hoarder/pkg/p2p" ) @@ -84,39 +83,18 @@ func NewCidTracker( }, nil } -// Receives provider records from http server -// they are received in form trackableCids{ pr1, pr2, pr3, pr4} for the same cid -func (tracker *CidTracker) generateCidsHttp(genWG *sync.WaitGroup, trackableCidArrayC chan<- []src.TrackableCid) { +/* func (tracker *CidTracker) generateCidsHttp(genWG *sync.WaitGroup, trackableCidArrayC chan<- []src.TrackableCid) { defer genWG.Done() - // generate a timer to determine + // generate a timer to determine when to start the next get request minTimeT := time.NewTicker(10 * time.Second) log.Debugf("Source is: %s and config source is: ", tracker.CidSource.Type(), config.HttpServerSource) - for true { - trackableCids, err := src.GetNewHttpCid(tracker.CidSource) - - if trackableCids == nil { - log.Debug("Received empty provider records") - trackableCidArrayC <- nil - close(trackableCidArrayC) - //gracefully shutdown server - go tracker.httpSource.Shutdown(tracker.ctx) - break - } - - if err != nil { - log.Errorf("error while getting new cid: %s", err) - // check if ticker for next iteration was raised - <-minTimeT.C - continue - } + go src.GetNewHttpCid(tracker.CidSource, trackableCidArrayC) - trackableCidArrayC <- trackableCids - // check if ticker for next iteration was raised - <-minTimeT.C + log.Debug("Exited from get new http cid") + return - } -} +} */ //Generates cids depending on the cid source func (tracker *CidTracker) generateCids(genWG *sync.WaitGroup, trackableCidC chan<- *src.TrackableCid) {