Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix]gb主动关闭客户端 #100

Merged
merged 1 commit into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ type HlsConfig struct {
type GB28181Config struct {
Enable bool `json:"enable"` // gb28181使能标志
ListenAddr string `json:"listen_addr"` // gb28181监听地址
SipNetwork string `json:"sip_network"` // 传输协议,默认UDP,可选TCP
SipIP string `json:"sip_ip"` // sip 服务器公网IP
SipPort uint16 `json:"sip_port"` // sip 服务器端口,默认 5060
Serial string `json:"serial"` // sip 服务器 id, 默认 34020000002000000001
Expand Down
1 change: 0 additions & 1 deletion conf/lalmax.conf.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
"realm": "3402000000",
"sip_ip": "192.168.254.165",
"sip_port": 5060,
"sip_network": "udp",
"username": "",
"media_config": {
"media_ip": "192.168.254.165"
Expand Down
35 changes: 14 additions & 21 deletions gb28181/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,9 @@ func (channel *Channel) Invite(opt *InviteOptions, streamName string, playInfo *
nazalog.Error("invite failed, err:", err, " invite msg:", invite.String())

//jay 在media端口监听成功后,但是sip发送失败时
if !playInfo.SinglePort {
if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}
if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId, ""); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}
}

Expand Down Expand Up @@ -249,15 +247,15 @@ func (channel *Channel) Invite(opt *InviteOptions, streamName string, playInfo *
channel.ackReq = ackReq
channel.playInfo = playInfo

err = sipsvr.Send(ackReq)
err = channel.device.sipSvr.Send(ackReq)
} else {
if !playInfo.SinglePort {
if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}

if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(playInfo.NetWork, playInfo.SinglePort, channel.device.ID, channel.ChannelId, ""); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}
}

}
return
}
Expand All @@ -271,11 +269,9 @@ func (channel *Channel) GetCallId() string {
}
func (channel *Channel) stopMediaServer() (err error) {
if channel.playInfo != nil {
if !channel.playInfo.SinglePort {
if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(channel.playInfo.NetWork, channel.playInfo.SinglePort, channel.device.ID, channel.ChannelId); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}
if channel.observer != nil {
if err = channel.observer.OnStopMediaServer(channel.playInfo.NetWork, channel.playInfo.SinglePort, channel.device.ID, channel.ChannelId, channel.playInfo.StreamName); err != nil {
nazalog.Errorf("gb28181 MediaServer stop err:%s", err.Error())
}
}
}
Expand All @@ -288,21 +284,18 @@ func (channel *Channel) byeClear() (err error) {
return
}
func (channel *Channel) Bye(streamName string) (err error) {

if channel.ackReq != nil {
byeReq := channel.ackReq
channel.ackReq = nil
byeReq.SetMethod(sip.BYE)
seq, _ := byeReq.CSeq()
seq.SeqNo += 1
sipsvr.Send(byeReq)
channel.device.sipSvr.Send(byeReq)
} else {
err = errors.New("channel has been closed")
}

channel.stopMediaServer()
return err

}
func (channel *Channel) CreateRequst(Method sip.RequestMethod, conf config.GB28181Config) (req sip.Request) {
d := channel.device
Expand Down Expand Up @@ -357,7 +350,7 @@ func (channel *Channel) CreateRequst(Method sip.RequestMethod, conf config.GB281
nil,
)

req.SetTransport(conf.SipNetwork)
req.SetTransport(channel.device.network)
req.SetDestination(d.NetAddr)
return req
}
Expand Down
13 changes: 11 additions & 2 deletions gb28181/device.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gb28181

import (
"context"
"github.com/ghettovoice/gosip"
"net/http"
"strings"
"sync"
Expand Down Expand Up @@ -58,12 +59,20 @@ type Device struct {

observer IMediaOpObserver
conf config.GB28181Config

network string
sipSvr gosip.Server
}

func (d *Device) WithMediaServer(observer IMediaOpObserver) {
d.observer = observer
}

func (d *Device) WithSipSvr(sipSvr gosip.Server) *Device {
d.sipSvr = sipSvr
return d
}

func (d *Device) syncChannels() {
if time.Since(d.lastSyncTime) > 2*time.Second {
d.lastSyncTime = time.Now()
Expand Down Expand Up @@ -171,7 +180,7 @@ func (d *Device) CreateRequest(Method sip.RequestMethod, conf config.GB28181Conf
nil,
)

req.SetTransport(conf.SipNetwork)
req.SetTransport(d.network)
req.SetDestination(d.NetAddr)
return
}
Expand Down Expand Up @@ -330,5 +339,5 @@ func (d *Device) UpdateChannelPosition(channelId string, gpsTime string, lng str
}

func (d *Device) SipRequestForResponse(request sip.Request) (sip.Response, error) {
return sipsvr.RequestWithContext(context.Background(), request)
return d.sipSvr.RequestWithContext(context.Background(), request)
}
23 changes: 20 additions & 3 deletions gb28181/mediaserver/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"io"
"net"
"sync"
"time"

"github.com/q191201771/lalmax/gb28181/mpegps"
Expand Down Expand Up @@ -49,6 +50,10 @@ type Conn struct {

buffer *bytes.Buffer
key string

mediaServer *GB28181MediaServer
one sync.Once
oneSaveConn sync.Once
}

func NewConn(conn net.Conn, observer IGbObserver, lal logic.ILalServer) *Conn {
Expand All @@ -65,13 +70,16 @@ func NewConn(conn net.Conn, observer IGbObserver, lal logic.ILalServer) *Conn {

return c
}
func (c *Conn) SetMediaServer(mediaServer *GB28181MediaServer) {
c.mediaServer = mediaServer
}
func (c *Conn) SetKey(key string) {
c.key = key
}
func (c *Conn) Serve() (err error) {
defer func() {
nazalog.Info("conn close, err:", err)
c.conn.Close()
c.Close()

if c.observer != nil {
c.observer.NotifyClose(c.streamName)
Expand All @@ -85,7 +93,7 @@ func (c *Conn) Serve() (err error) {
nazalog.Info("gb28181 conn, remoteaddr:", c.conn.RemoteAddr().String(), " localaddr:", c.conn.LocalAddr().String())

for {
c.conn.SetReadDeadline(time.Now().Add(30 * time.Second))
c.conn.SetReadDeadline(time.Now().Add(10 * time.Second))
pkt := &rtp.Packet{}
if c.conn.RemoteAddr().Network() == "udp" {
buf := make([]byte, 1472*4)
Expand Down Expand Up @@ -137,6 +145,11 @@ func (c *Conn) Serve() (err error) {
}
c.check = true
c.streamName = mediaInfo.StreamName
c.oneSaveConn.Do(func() {
if c.mediaServer != nil {
c.mediaServer.conns.Store(c.streamName, c)
}
})
if len(mediaInfo.DumpFileName) > 0 {
c.psDumpFile = base.NewDumpFile()
if err = c.psDumpFile.OpenToWrite(mediaInfo.DumpFileName); err != nil {
Expand Down Expand Up @@ -253,7 +266,11 @@ func (c *Conn) OnFrame(frame []byte, cid mpegps.PsStreamType, pts uint64, dts ui
c.lalSession.FeedAvPacket(pkt)
}
}

func (c *Conn) Close() {
c.one.Do(func() {
c.conn.Close()
})
}
func getPayloadType(cid mpegps.PsStreamType) base.AvPacketPt {
switch cid {
case mpegps.PsStreamAac:
Expand Down
26 changes: 17 additions & 9 deletions gb28181/mediaserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ type GB28181MediaServer struct {
observer IGbObserver
mediaKey string

conn *Conn //增加链接对象,目前只适用于多端口
conns sync.Map //增加链接对象,目前只适用于多端口
}

func NewGB28181MediaServer(listenPort int, mediaKey string, observer IGbObserver, lal logic.ILalServer) *GB28181MediaServer {
Expand Down Expand Up @@ -61,21 +61,29 @@ func (s *GB28181MediaServer) Start(listener net.Listener) (err error) {

c := NewConn(conn, s.observer, s.lalServer)
c.SetKey(s.mediaKey)

s.conn = c
go c.Serve()
c.SetMediaServer(s)
go func() {
c.Serve()
s.conns.Delete(c.streamName)
}()
}
}()
}
return
}
func (s *GB28181MediaServer) CloseConn(streamName string) {
if v, ok := s.conns.Load(streamName); ok {
conn := v.(*Conn)
conn.Close()
}
}
func (s *GB28181MediaServer) Dispose() {
s.disposeOnce.Do(func() {

if s.conn != nil {
s.conn.conn.Close()
}

s.conns.Range(func(_, value any) bool {
conn := value.(*Conn)
conn.Close()
return true
})
if s.listener != nil {
s.listener.Close()
s.listener = nil
Expand Down
Loading
Loading