Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HTTP server fixes #15

Closed
wants to merge 18 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go-libp2p-kad-dht
Submodule go-libp2p-kad-dht updated 0 files
113 changes: 42 additions & 71 deletions pkg/cid-source/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,21 @@ type HttpCidSource struct {
}

func (httpCidSource *HttpCidSource) Dequeue() ProviderRecords {
httpCidSource.lock.Lock()
defer httpCidSource.lock.Unlock()
if len(httpCidSource.providerRecords) == 0 {
log.Debug("Queue is empty")
return ProviderRecords{}
}
elem := httpCidSource.providerRecords[0]
httpCidSource.providerRecords = httpCidSource.providerRecords[1:]
log.Debugf("Removed element from queue, lenth is now: %d", len(httpCidSource.providerRecords))
log.Debugf("Removed element from queue, length is now: %d", len(httpCidSource.providerRecords))
return elem
}

func (httpCidSource *HttpCidSource) Enqueue(providerRecords ProviderRecords) {
httpCidSource.lock.Lock()
defer httpCidSource.lock.Unlock()
httpCidSource.providerRecords = append(httpCidSource.providerRecords, providerRecords)
log.Debugf("Added new element to queue, length is now: %d", len(httpCidSource.providerRecords))
}
Expand All @@ -54,42 +58,40 @@ func NewHttpCidSource(port int, hostname string) *HttpCidSource {

func (httpCidSource *HttpCidSource) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log.Info("Received request in server HTTP")
if r.URL.Path == "/ProviderRecord" {
if r.Method == http.MethodPost {
log.Debug("The request was a post method")
// create a new TrackableCid instance
var providerRecords ProviderRecords

// decode the request body into the TrackableCid struct
err := json.NewDecoder(r.Body).Decode(&providerRecords)
if err == nil {
log.Info("Decoded new encapsulated json received from post")

// add the trackableCid to the list
httpCidSource.Enqueue(providerRecords)
} else {

http.Error(w, "Error decoding request body", http.StatusBadRequest)
}
if r.Method == http.MethodPost {
log.Debug("The request was a post method")
// create a new TrackableCid instance
var providerRecords ProviderRecords

// decode the request body into the TrackableCid struct
err := json.NewDecoder(r.Body).Decode(&providerRecords)
if err == nil {
log.Info("Decoded new encapsulated json received from post")

// add the trackableCid to the list
httpCidSource.Enqueue(providerRecords)
} else {

} else if r.Method == http.MethodGet {
log.Debug("The request was a get request")
// check if there are any trackableCids to return
if len(httpCidSource.providerRecords) != 0 {
// return the last unretrieved trackableCid
providerRecords := httpCidSource.Dequeue()
log.Info("Sending new encapsulated json cid to user with get method")
// send the trackableCid back to the client as a response
json.NewEncoder(w).Encode(providerRecords)
} else {

http.Error(w, "No record available currently", http.StatusNoContent)
}
http.Error(w, "Error decoding request body", http.StatusBadRequest)
}

} else if r.Method == http.MethodGet {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reasoning behind having a Get method in the API? I thought we just needed to notify with a POST of the CID that we want to track (and the related info)

Am I missing anything?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @cortze, the functionality of the HTTP server is that a post requests are received by the peer that is publishing, and encapsulated json arrays are then added to the queue.

type HttpCidSource struct {
	port            int
	hostname        string
	lock            sync.Mutex
	server          *http.Server
	providerRecords []ProviderRecords
	isStarted       bool
}

func (httpCidSource *HttpCidSource) Dequeue() ProviderRecords {
	httpCidSource.lock.Lock()
	defer httpCidSource.lock.Unlock()
	if len(httpCidSource.providerRecords) == 0 {
		log.Debug("Queue is empty")
		return ProviderRecords{}
	}
	elem := httpCidSource.providerRecords[0]
	httpCidSource.providerRecords = httpCidSource.providerRecords[1:]
	log.Debugf("Removed element from queue, length is now: %d", len(httpCidSource.providerRecords))
	return elem
}

func (httpCidSource *HttpCidSource) Enqueue(providerRecords ProviderRecords) {
	httpCidSource.lock.Lock()
	defer httpCidSource.lock.Unlock()
	httpCidSource.providerRecords = append(httpCidSource.providerRecords, providerRecords)
	log.Debugf("Added new element to queue, length is now: %d", len(httpCidSource.providerRecords))
}

//A container for the encapsulated struct.
//
//File containts a json array of provider records.
//[{ProviderRecord1},{ProviderRecord2},{ProviderRecord3}]
type ProviderRecords struct {
	EncapsulatedJSONProviderRecords []EncapsulatedJSONProviderRecord `json:"ProviderRecords"`
}

//This struct will be used to create,read and store the encapsulated data necessary for reading the
//provider records.
type EncapsulatedJSONProviderRecord struct {
	ID              string   `json:"PeerID"`
	CID             string   `json:"ContentID"`
	Creator         string   `json:"Creator"`
	PublicationTime string   `json:"PublicationTime"`
	ProvideTime     string   `json:"ProvideTime"`
	UserAgent       string   `json:"UserAgent"`
	Addresses       []string `json:"PeerMultiaddresses"`
}

This means that we wait for all the providers to be added into the array.
The get method get's called right now of a time interval of 5 seconds and we receive the first element of the queue using a dequeue method. I found it easier to be able to determine whether the queue is just empty, inferring that the publisher is just publishing slower than the queue empties itself, or the publisher will not publish any more elements.
If the queue is just empty the server responds back with NoContent, else if the publisher finished publishing it must send a nil encapsulated json:

http.Error(w, "No record available currently", http.StatusNoContent)

inside the interval function 
trackableCids, err := src.GetNewHttpCid(tracker.CidSource)
if trackableCids == nil {
	log.Debug("Received empty provider records")
	trackableCidArrayC <- nil
	close(trackableCidArrayC)
	//gracefully shutdown server
	go tracker.httpSource.Shutdown(tracker.ctx)
	break
}

if err != nil {
	log.Errorf("error while getting new cid: %s", err)
	// check if ticker for next iteration was raised
	<-minTimeT.C
	continue
}

Copy link
Owner

@cortze cortze Jan 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohhhhh I see, I thought that the API was organized in a different way.

My understanding was that we would initialize the API server in the Cid-Hoarder, and that the publisher would be able to POST the CidInfo and the related one to the PRHolders in the same POST query.
But being this POST query after the publication has been accomplished

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tried doing it like this but I was impossible to tell whether the publishing process was finished. It also would add a lot of extra logic by checking for each PR which CID it corresponds to and some type of waiting mechanism to add all the PRs.

Copy link
Owner

@cortze cortze Jan 16, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but are you taking into account having a single json sent over the POST method including the CidInfo and PRHolders?
(something like this)

{
     "CidInfo":{
            "cid": 
             ....
             "pr_holders":{
                    "peer_id": 
                    ....
             }
     }
}

this would simplify having to match PRHolders with CIDs

Copy link
Contributor Author

@FotiosBistas FotiosBistas Jan 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is exactly how I'm sending it:

  "ProviderRecords":[
  {
    "PeerID":"12D3KooWG1MhbRPTGZiiLf1iECH19YMFjurkGjGsmKstfEMvNF42",
    "ContentID":"QmNyxnsJ15nWD85R3xjTKCm37BSjPqyzMHcwBrGqhaWhy8",
    "Creator":"QmPod7yQRSQX8jDtQCBi65ZSGfbkmoAX5FH4g8wzzJmtnY",
    "PublicationTime":"2023-01-16T14:04:42+02:00",
    "ProvideTime":"5.4405671s",
    "UserAgent":"hydra-booster/0.7.4",
    "PeerMultiaddresses": ["/ip4/18.188.54.97/udp/30014/quic","/ip4/18.188.54.97/tcp/30014","/ip4/172.31.10.39/udp/30014/quic","/ip4/127.0.0.1/udp/30014/quic","/ip4/172.31.10.39/tcp/30014","/ip4/127.0.0.1/tcp/30014"]
  },
  {
    "PeerID":"12D3KooWA4m41sRq68mdhk5cSTBptwxSXSsrjsyRKf2WJbNE7h9x",
    "ContentID":"QmNyxnsJ15nWD85R3xjTKCm37BSjPqyzMHcwBrGqhaWhy8",
    "Creator":"QmPod7yQRSQX8jDtQCBi65ZSGfbkmoAX5FH4g8wzzJmtnY",
    "PublicationTime":"2023-01-16T14:04:42+02:00",
    "ProvideTime":"5.4405671s",
    "UserAgent":"go-ipfs/0.7.0/",
    "PeerMultiaddresses":["/ip4/127.0.0.1/tcp/4001","/ip6/2605:7380:1000:1310:c08b:37ff:fe37:5ec3/tcp/4001","/ip4/127.0.0.1/udp/4001/quic","/ip6/::1/tcp/4001","/ip4/209.50.56.24/udp/4001/quic","/ip6/2605:7380:1000:1310:c08b:37ff:fe37:5ec3/udp/4001/quic","/ip6/::1/udp/4001/quic","/ip4/209.50.56.24/tcp/4001"]
  },
  {
    "PeerID":"12D3KooWSFppQG3QwTXCVKxtfQEjwoY6sLVgbLCegntd87Dc8L8Q",
    "ContentID":"QmNyxnsJ15nWD85R3xjTKCm37BSjPqyzMHcwBrGqhaWhy8",
    "Creator":"QmPod7yQRSQX8jDtQCBi65ZSGfbkmoAX5FH4g8wzzJmtnY",
    "PublicationTime":"2023-01-16T14:04:42+02:00",
    "ProvideTime":"5.4405671s",
    "UserAgent":"go-ipfs/0.7.0/",
    "PeerMultiaddresses":["/ip4/127.0.0.1/udp/4001/quic","/ip4/119.94.38.201/tcp/4001","/ip6/2001:4451:1114:8600:10f7:c70b:bec3:b939/udp/4001/quic","/ip6/::1/tcp/4001","/ip6/2001:4451:1114:8600:10f7:c70b:bec3:b939/tcp/4001","/ip4/127.0.0.1/tcp/4001","/ip6/2001:4451:1114:8600:a4a2:76bb:267:6cee/tcp/4001","/ip6/2001:4451:1114:8600:a4a2:76bb:267:6cee/udp/4001/quic","/ip6/::1/udp/4001/quic","/ip4/119.94.38.201/udp/4001/quic"]
  },
]

Notice that the ContentID,Creator,ProvideTime,PublicationTime stay the same. After that is sent a new batch of provider records for a specific CID will be sent over the server.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the reasoning behind having a Get method in the API? I thought we just needed to notify with a POST of the CID that we want to track (and the related info)

Am I missing anything?

@cortze I removed the get method. Essentially the internal queue of the server would fill up and ,at some point later after a lot of CIDs have been inserted, would never empty itself. I replaced it, just like you mentioned at the above comment, with just a post method that sends the received CIDs through a channel and then inserts them into the database. It does manage to insert them all, unlike the previous method which would get stuck, but it still is slow. E.G. the database has inserted 1500/1900 CIDs (numbers are random here) immediately and then the rest 400 CIDs all will get inserted after a couple of hours. So I think some approaches would be:

  1. Make the process synchronous, wait for all of the CIDs to get published and then insert them to the hoarder (can be done very easily, but is essentially the same as the json file approach)
  2. Implement some type of batching system, like inserting 1000 cids at a time into the database (I think it would be better to be done from the database side)
  3. Leave it as it is and we get some of the benefits of the asynchronous pings

Would love to hear your thoughts!

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @FotiosBistas , thanks for taking into consideration the upper suggestion. Tbh it makes everything cleaner and easier to follow.
About your different approaches:

  1. If you wait for all the CIDs to be published, you cannot follow the timings in each ping accurately. We will synchronize all the pings, having spikes of heavy ping sessions. Having the publication of it spaced over time serve the purpose as load balancing, so I'll say that it is fine to keep it this way
  2. Adding batching over queries makes complete sense. I'll work on it in the following month.
  3. It must stay like this until the batching is implemented. I need to sneak some time to take action on this.

log.Debug("The request was a get request")
// check if there are any trackableCids to return
if len(httpCidSource.providerRecords) != 0 {
// return the last unretrieved trackableCid
providerRecords := httpCidSource.Dequeue()
log.Info("Sending new encapsulated json cid to user with get method")
// send the trackableCid back to the client as a response
json.NewEncoder(w).Encode(providerRecords)
} else {
// return "Method Not Allowed" error
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)

http.Error(w, "No record available currently", http.StatusNoContent)
}

} else {
// return "Method Not Allowed" error
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
}

}
Expand All @@ -108,42 +110,6 @@ func (httpCidSource *HttpCidSource) StartServer() error {
Handler: httpCidSource,
}

/* http.HandleFunc("/ProviderRecord", func(w http.ResponseWriter, r *http.Request) {
log.Info("Set handler for requests")
if r.Method == http.MethodPost {
// create a new TrackableCid instance
var providerRecords ProviderRecords

// decode the request body into the TrackableCid struct
err := json.NewDecoder(r.Body).Decode(&providerRecords)
if err == nil {
log.Info("Decoded new encapsulated json received from post")

// add the trackableCid to the list
httpCidSource.Enqueue(providerRecords)
} else {

http.Error(w, "Error decoding request body", http.StatusBadRequest)
}

} else if r.Method == http.MethodGet {
// check if there are any trackableCids to return
if len(httpCidSource.providerRecords) != 0 {
// return the last unretrieved trackableCid
providerRecords := httpCidSource.Dequeue()
log.Info("Sending new encapsulated json cid to user with get method")
// send the trackableCid back to the client as a response
json.NewEncoder(w).Encode(providerRecords)
} else {

http.Error(w, "No record available currently", http.StatusNoContent)
}

} else {
// return "Method Not Allowed" error
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
}
}) */
if err := httpCidSource.server.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
log.Infof("Server closed under request: %v", err)
Expand Down Expand Up @@ -190,7 +156,7 @@ func GetNewHttpCid(source interface{}) ([]TrackableCid, error) {
}

// send a GET request to the server
url := fmt.Sprintf("http://%s:%d/ProviderRecord", httpCidSource.hostname, httpCidSource.port)
url := fmt.Sprintf("http://%s:%d/", httpCidSource.hostname, httpCidSource.port)
resp, err := http.Get(url)
if err != nil {
return []TrackableCid{}, err
Expand Down Expand Up @@ -224,32 +190,37 @@ func GetNewHttpCid(source interface{}) ([]TrackableCid, error) {
newCid, err := cid.Parse(providerRecord.CID)
if err != nil {
log.Errorf("could not convert string to cid %s", err)
continue
}

log.Debugf("It's peer id is: %s", providerRecord.ID)
newPid, err := peer.Decode(providerRecord.ID)
if err != nil {
log.Errorf("could not convert string to pid %s", err)
continue
}

log.Debugf("It's creator is: %s", providerRecord.Creator)
newCreator, err := peer.Decode(providerRecord.Creator)
if err != nil {
log.Errorf("could not convert string to creator pid %s", err)
continue
}

log.Debugf("It's provide time is: %s", providerRecord.ProvideTime)
newProvideTime, err := time.ParseDuration(providerRecord.ProvideTime)

if err != nil {
log.Errorf("Error while parsing provide time: %s", err)
continue
}

log.Debugf("It's publication time is: %s", providerRecord.PublicationTime)
newPublicationTime, err := time.Parse(time.RFC3339, providerRecord.PublicationTime)

if err != nil {
log.Errorf("Error while parsing publication time: %s", err)
continue
}

log.Debugf("It's user agent is: %s", providerRecord.UserAgent)
Expand All @@ -258,7 +229,7 @@ func GetNewHttpCid(source interface{}) ([]TrackableCid, error) {
for i := 0; i < len(providerRecord.Addresses); i++ {
multiaddr, err := ma.NewMultiaddr(providerRecord.Addresses[i])
if err != nil {
//log.Errorf("could not convert string to multiaddress %s", err)
log.Errorf("could not convert string to multiaddress %s", err)
continue
}
multiaddresses = append(multiaddresses, multiaddr)
Expand Down
42 changes: 29 additions & 13 deletions pkg/hoarder/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,24 +153,27 @@ func (discoverer *CidDiscoverer) addProvider(addProviderWG *sync.WaitGroup, trac
func (discoverer *CidDiscoverer) addProviderRecordsHttp(addProviderWG *sync.WaitGroup, trackableCidsChannel <-chan []src.TrackableCid) {
defer addProviderWG.Done()
ctx := discoverer.ctx
counter := 0
for {
select {
case trackableCids, ok := <-trackableCidsChannel:
if !ok {
log.Debug("Received not ok message from channel")
break
}
if trackableCids == nil {
log.Debug("Received nil trackable CIDs from channel")
break
}

tr := trackableCids[0]
cidStr := tr.CID.Hash().B58String()

log.Debugf(
"New provide and CID received from channel. Cid:%s,Pid:%s,Mutliaddresses:%v,ProvideTime:%s,UserAgent:%s",
cidStr, tr.ID.String(),
tr.Addresses, tr.ProvideTime, tr.UserAgent,
"New trackable CID array received from http channel. Cid:%s,ProvideTime:%s,PublicationTime:%s,Creator:%s. It's number is %d",
cidStr, tr.ProvideTime, tr.PublicationTime, tr.Creator, counter,
)
counter++

//the starting values for the discoverer
cidIn, err := cid.Parse(cidStr)
Expand All @@ -181,15 +184,22 @@ func (discoverer *CidDiscoverer) addProviderRecordsHttp(addProviderWG *sync.Wait

cidInfo := models.NewCidInfo(cidIn, discoverer.ReqInterval, discoverer.StudyDuration, config.JsonFileSource,
discoverer.CidSource.Type(), "")

cidInfo.AddPublicationTime(tr.PublicationTime)
cidInfo.AddProvideTime(tr.ProvideTime)
cidInfo.AddCreator(tr.Creator)
cortze marked this conversation as resolved.
Show resolved Hide resolved

fetchRes := models.NewCidFetchResults(cidIn, 0)

// generate a new CidFetchResults
//TODO starting data for the discoverer
fetchRes.TotalHops = 0
fetchRes.HopsToClosest = 0
for _, trackableCid := range trackableCids {

err := addPeerToProviderStore(ctx, discoverer.host, trackableCid.ID, trackableCid.CID, trackableCid.Addresses)
log.Debugf(
"For looping the trackable CID array for trackable CID: %d. The peer ID is: %s. The peer Multiaddresses are: %v. The user agent is: %s ",
counter, trackableCid.ID.String(), trackableCid.Addresses, trackableCid.UserAgent)
/* err := addPeerToProviderStore(ctx, discoverer.host, trackableCid.ID, trackableCid.CID, trackableCid.Addresses)
if err != nil {
log.Errorf("error %s calling addpeertoproviderstore method", err)
} else {
Expand All @@ -202,9 +212,7 @@ func (discoverer *CidDiscoverer) addProviderRecordsHttp(addProviderWG *sync.Wait
log.Errorf("error %s calling addAgentVersionToProvideStore", err)
} else {
log.Debug("Added agent version to provider store")
}
cidInfo.AddPublicationTime(trackableCid.PublicationTime)
cidInfo.AddProvideTime(trackableCid.ProvideTime)
} */

//TODO discoverer starting ping res
pingRes := models.NewPRPingResults(
cortze marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -218,21 +226,29 @@ func (discoverer *CidDiscoverer) addProviderRecordsHttp(addProviderWG *sync.Wait
true,
p2p.NoConnError,
)
cidInfo.AddCreator(trackableCid.Creator)
fetchRes.AddPRPingResults(pingRes)

log.Debugf("User agent received from provider store: %s", discoverer.host.GetUserAgentOfPeer(trackableCid.ID))
/* log.Debugf("User agent received from provider store: %s", discoverer.host.GetUserAgentOfPeer(trackableCid.ID)) */

prHolderInfo := models.NewPeerInfo(
trackableCid.ID,
discoverer.host.Peerstore().Addrs(trackableCid.ID),
discoverer.host.GetUserAgentOfPeer(trackableCid.ID),
trackableCid.Addresses,
trackableCid.UserAgent,
)

cidInfo.AddPRHolder(prHolderInfo)
fetchRes.AddPRPingResults(pingRes)

}
cidInfo.AddPRFetchResults(fetchRes)

tot, success, failed := cidInfo.GetFetchResultSummaryOfRound(0)
if tot < 0 {
log.Warnf("no ping results for the PR provide round of Cid %s", cidInfo.CID.Hash().B58String())
} else {
log.Infof("Cid %s - %d total PRHolders | %d successfull PRHolders | %d failed PRHolders",
cidIn, tot, success, failed)
}

discoverer.DBCli.AddCidInfo(cidInfo)
discoverer.DBCli.AddFetchResult(fetchRes)

Expand Down
7 changes: 4 additions & 3 deletions pkg/hoarder/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ func NewCidTracker(
func (tracker *CidTracker) generateCidsHttp(genWG *sync.WaitGroup, trackableCidArrayC chan<- []src.TrackableCid) {
defer genWG.Done()
// generate a timer to determine
minTimeT := time.NewTicker(10 * time.Second)
minTimeT := time.NewTicker(5 * time.Second)
log.Debugf("Source is: %s and config source is: ", tracker.CidSource.Type(), config.HttpServerSource)

counter := 0
for true {
trackableCids, err := src.GetNewHttpCid(tracker.CidSource)

Expand All @@ -110,7 +110,8 @@ func (tracker *CidTracker) generateCidsHttp(genWG *sync.WaitGroup, trackableCidA
<-minTimeT.C
continue
}

log.Debugf("Sending CID number from get request: %d", counter)
counter++
trackableCidArrayC <- trackableCids
// check if ticker for next iteration was raised
<-minTimeT.C
Expand Down