Skip to content

Commit

Permalink
Manage dynamic sessions the same as common sessions for acceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
haoyang1994 committed Aug 8, 2024
1 parent c07597e commit e78276a
Showing 1 changed file with 8 additions and 45 deletions.
53 changes: 8 additions & 45 deletions acceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,6 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
}
a.dynamicSessionChan <- dynamicSession
session = dynamicSession
defer session.stop()
}

a.sessionAddr.Store(sessID, netConn.RemoteAddr())
Expand All @@ -363,52 +362,16 @@ func (a *Acceptor) handleConnection(netConn net.Conn) {
}

func (a *Acceptor) dynamicSessionsLoop() {
var id int
var sessions = map[int]*session{}
var complete = make(chan int)
defer close(complete)
LOOP:
for {
select {
case session, ok := <-a.dynamicSessionChan:
if !ok {
for _, oldSession := range sessions {
oldSession.stop()
}
break LOOP
}
id++
sessionID := id
sessions[sessionID] = session
go func() {
session.run()
err := UnregisterSession(session.sessionID)
if err != nil {
a.globalLog.OnEventf("Unregister dynamic session %v failed: %v", session.sessionID, err)
return
}
complete <- sessionID
}()
case id := <-complete:
session, ok := sessions[id]
if ok {
a.sessionAddr.Delete(session.sessionID)
delete(sessions, id)
} else {
a.globalLog.OnEventf("Missing dynamic session %v!", id)
}
}
}

if len(sessions) == 0 {
return
}

for id := range complete {
delete(sessions, id)
if len(sessions) == 0 {
return
session, ok := <-a.dynamicSessionChan
if !ok {
break
}
a.sessionGroup.Add(1)
go func() {
session.run()
a.sessionGroup.Done()
}()
}
}

Expand Down

0 comments on commit e78276a

Please sign in to comment.