Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
add more comments of supernode implements
Browse files Browse the repository at this point in the history
Signed-off-by [email protected]

Signed-off-by: henry.hj <[email protected]>
  • Loading branch information
henry.hj committed Apr 28, 2020
1 parent 154e79d commit ed59371
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 8 deletions.
17 changes: 14 additions & 3 deletions supernode/daemon/mgr/seed_task/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,14 @@ type TaskRegistryResponce struct {
}

type Manager struct {
/* interested in MaxSeedPerObject & PeerExpireTime */
cfg *config.Config
taskStore *dutil.Store /* taskid --> peerid set */
/* store all seed task info */
taskStore *dutil.Store
/* store all seed peer info */
p2pInfoStore *dutil.Store
ipPortMap *safeMap
/* create time of seed manager */
timeStamp time.Time
}

Expand Down Expand Up @@ -127,8 +131,13 @@ func ipPortToStr(ip strfmt.IPv4, port int32) string {
return fmt.Sprintf("%s-%d", ip.String(), port)
}

/*
1. Register peer(if not registered) & update heartbeat timestamp
2. if peer was restarted(with same ip-port pair), remove old peer info
3. try schedule a new seed task
*/
func (mgr *Manager) Register(ctx context.Context, request *types.TaskRegisterRequest) (*TaskRegistryResponce, error) {
logrus.Debugf("registry rt task %v", request)
logrus.Debugf("registry seed task %v", request)
request.TaskID = digest.Sha256(request.TaskURL)
resp := &TaskRegistryResponce{ TaskID: request.TaskID }

Expand All @@ -138,7 +147,7 @@ func (mgr *Manager) Register(ctx context.Context, request *types.TaskRegisterReq
Port: request.Port,
Version: request.Version,
}
// In real-time situation, cid == peer id
// cid == peer id
peerId := request.CID
p2pInfo := mgr.getOrCreateP2pInfo(ctx, peerId, peerCreateReq)
// update peer hb time
Expand Down Expand Up @@ -232,10 +241,12 @@ func (mgr *Manager) IsSeedTask(ctx context.Context, request *http.Request) bool
func (mgr *Manager) ReportPeerHealth (ctx context.Context, peerId string) (*types.HeartBeatResponse, error) {
p2pInfo, err := mgr.getP2pInfo(ctx, peerId)
if err != nil {
// tell peer to register again
return &types.HeartBeatResponse{ NeedRegister:true, Version:mgr.timeStamp.String() }, nil
}
p2pInfo.update()

// return all tasks peer owned
return &types.HeartBeatResponse{
SeedTaskIds: p2pInfo.taskIds.list(),
Version: mgr.timeStamp.String(),
Expand Down
1 change: 1 addition & 0 deletions supernode/daemon/mgr/seed_task/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func (scheduler *defaultScheduler) Schedule(nowTasks []*SeedTaskInfo, newTask *S
idx := 0
for idx < len(nowTasks) {
if nowTasks[idx] == nil {
/* number of seed < MaxSeedPerObj */
if busyPeer != nil {
busyPeer = nil
pos = idx
Expand Down
8 changes: 4 additions & 4 deletions supernode/daemon/mgr/seed_task/seed_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,14 @@ type SeedTaskInfo struct {

// point to a real-time task
type SeedTaskMap struct {
/* seed task id */
taskId string
lock *sync.RWMutex
/* latest access time */
accessTime int64
/* store all task-peer info */
tasks []*SeedTaskInfo
availTasks int
/* seed schedule method */
scheduler seedScheduler
}

Expand All @@ -61,7 +64,6 @@ func newSeedTaskMap(taskId string, maxTaskPeers int) *SeedTaskMap {
taskId: taskId,
tasks: make([]*SeedTaskInfo, maxTaskPeers),
lock: new(sync.RWMutex),
availTasks: 0,
accessTime: -1,
scheduler: &defaultScheduler{},
}
Expand Down Expand Up @@ -130,7 +132,6 @@ func (taskMap *SeedTaskMap) remove(id string) bool {
taskMap.tasks[i].P2pInfo.deleteTask(taskMap.taskId)
taskMap.tasks[i] = nil
left -= 1
taskMap.availTasks -= 1
}
i += 1
}
Expand All @@ -149,5 +150,4 @@ func (taskMap *SeedTaskMap) removeAllPeers() {
taskMap.tasks[idx] = nil
task.P2pInfo.deleteTask(task.TaskInfo.ID)
}
taskMap.availTasks = 0
}
2 changes: 1 addition & 1 deletion supernode/server/0.3_bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func (s *Server) reportServiceDown(ctx context.Context, rw http.ResponseWriter,
taskID := params.Get("taskId")
cID := params.Get("cid")

if req.Header.Get("X-report-resource") != "" {
if s.seedTaskMgr.IsSeedTask(ctx, req) {
err := s.seedTaskMgr.DeRegisterTask(ctx, cID, taskID)
if err != nil {
return err
Expand Down

0 comments on commit ed59371

Please sign in to comment.