diff --git a/README.md b/README.md index f0192c2ffef..7db9f2d821e 100644 --- a/README.md +++ b/README.md @@ -1450,6 +1450,7 @@ Obtaining: # metrics of every path paths{name="[path_name]",state="[state]"} 1 paths_bytes_received{name="[path_name]",state="[state]"} 1234 +paths_bytes_sent{name="[path_name]",state="[state]"} 1234 # metrics of every HLS muxer hls_muxers{name="[name]"} 1 @@ -1480,6 +1481,11 @@ rtmp_conns{id="[id]",state="[state]"} 1 rtmp_conns_bytes_received{id="[id]",state="[state]"} 1234 rtmp_conns_bytes_sent{id="[id]",state="[state]"} 187 +# metrics of every SRT connection +srt_conns{id="[id]",state="[state]"} 1 +srt_conns_bytes_received{id="[id]",state="[state]"} 1234 +srt_conns_bytes_sent{id="[id]",state="[state]"} 187 + # metrics of every WebRTC session webrtc_sessions{id="[id]"} 1 webrtc_sessions_bytes_received{id="[id]",state="[state]"} 1234 diff --git a/apidocs/openapi.yaml b/apidocs/openapi.yaml index 4db07544f23..cfb760db479 100644 --- a/apidocs/openapi.yaml +++ b/apidocs/openapi.yaml @@ -395,6 +395,9 @@ components: bytesReceived: type: integer format: int64 + bytesSent: + type: integer + format: int64 readers: type: array items: diff --git a/internal/core/api_test.go b/internal/core/api_test.go index 05278bce0ae..7964c8cac40 100644 --- a/internal/core/api_test.go +++ b/internal/core/api_test.go @@ -422,6 +422,7 @@ func TestAPIPathsList(t *testing.T) { Ready bool `json:"ready"` Tracks []string `json:"tracks"` BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` } type pathList struct { @@ -625,6 +626,7 @@ func TestAPIPathsGet(t *testing.T) { Ready bool `json:"Ready"` Tracks []string `json:"tracks"` BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` } var pathName string diff --git a/internal/core/core.go b/internal/core/core.go index 793351acf63..c9d21fee6e6 100644 --- a/internal/core/core.go +++ b/internal/core/core.go @@ -509,6 +509,7 @@ func (p *Core) createResources(initial bool) error { p.conf.RunOnConnectRestart, p.conf.RunOnDisconnect, p.externalCmdPool, + p.metrics, p.pathManager, p, ) diff --git a/internal/core/metrics.go b/internal/core/metrics.go index 1b3460f8533..8ff2132a825 100644 --- a/internal/core/metrics.go +++ b/internal/core/metrics.go @@ -32,6 +32,7 @@ type metrics struct { rtspServer apiRTSPServer rtspsServer apiRTSPServer rtmpServer apiRTMPServer + srtServer apiSRTServer hlsManager apiHLSManager webRTCManager apiWebRTCManager } @@ -96,6 +97,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) { tags := "{name=\"" + i.Name + "\",state=\"" + state + "\"}" out += metric("paths", tags, 1) out += metric("paths_bytes_received", tags, int64(i.BytesReceived)) + out += metric("paths_bytes_sent", tags, int64(i.BytesSent)) } } else { out += metric("paths", "", 0) @@ -199,6 +201,22 @@ func (m *metrics) onMetrics(ctx *gin.Context) { } } + if !interfaceIsEmpty(m.srtServer) { + data, err := m.srtServer.apiConnsList() + if err == nil && len(data.Items) != 0 { + for _, i := range data.Items { + tags := "{id=\"" + i.ID.String() + "\",state=\"" + string(i.State) + "\"}" + out += metric("srt_conns", tags, 1) + out += metric("srt_conns_bytes_received", tags, int64(i.BytesReceived)) + out += metric("srt_conns_bytes_sent", tags, int64(i.BytesSent)) + } + } else { + out += metric("srt_conns", "", 0) + out += metric("srt_conns_bytes_received", "", 0) + out += metric("srt_conns_bytes_sent", "", 0) + } + } + if !interfaceIsEmpty(m.webRTCManager) { data, err := m.webRTCManager.apiSessionsList() if err == nil && len(data.Items) != 0 { @@ -254,6 +272,13 @@ func (m *metrics) rtmpServerSet(s apiRTMPServer) { m.rtmpServer = s } +// srtServerSet is called by srtServer. +func (m *metrics) srtServerSet(s apiSRTServer) { + m.mutex.Lock() + defer m.mutex.Unlock() + m.srtServer = s +} + // webRTCManagerSet is called by webRTCManager. func (m *metrics) webRTCManagerSet(s apiWebRTCManager) { m.mutex.Lock() diff --git a/internal/core/metrics_test.go b/internal/core/metrics_test.go index 0b7a123f79b..eb8c93280dc 100644 --- a/internal/core/metrics_test.go +++ b/internal/core/metrics_test.go @@ -60,6 +60,9 @@ rtsps_sessions_bytes_sent 0 rtmp_conns 0 rtmp_conns_bytes_received 0 rtmp_conns_bytes_sent 0 +srt_conns 0 +srt_conns_bytes_received 0 +srt_conns_bytes_sent 0 webrtc_sessions 0 webrtc_sessions_bytes_received 0 webrtc_sessions_bytes_sent 0 @@ -110,10 +113,13 @@ webrtc_sessions_bytes_sent 0 require.Regexp(t, `^paths\{name=".*?",state="ready"\} 1`+"\n"+ `paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ `paths\{name=".*?",state="ready"\} 1`+"\n"+ `paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ `paths\{name=".*?",state="ready"\} 1`+"\n"+ `paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+ + `paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+ `hls_muxers\{name=".*?"\} 1`+"\n"+ `hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+ `hls_muxers\{name=".*?"\} 1`+"\n"+ @@ -135,6 +141,9 @@ webrtc_sessions_bytes_sent 0 `rtmp_conns\{id=".*?",state="publish"\} 1`+"\n"+ `rtmp_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+ `rtmp_conns_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+ + `srt_conns 0`+"\n"+ + `srt_conns_bytes_received 0`+"\n"+ + `srt_conns_bytes_sent 0`+"\n"+ `webrtc_sessions 0`+"\n"+ `webrtc_sessions_bytes_received 0`+"\n"+ `webrtc_sessions_bytes_sent 0`+"\n"+ diff --git a/internal/core/path.go b/internal/core/path.go index 27a419d9d00..1679690660a 100644 --- a/internal/core/path.go +++ b/internal/core/path.go @@ -718,6 +718,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) { } return pa.stream.BytesReceived() }(), + BytesSent: func() uint64 { + if pa.stream == nil { + return 0 + } + return pa.stream.BytesSent() + }(), Readers: func() []defs.APIPathSourceOrReader { ret := []defs.APIPathSourceOrReader{} for r := range pa.readers { diff --git a/internal/core/srt_server.go b/internal/core/srt_server.go index 2c482d13c02..b8876323563 100644 --- a/internal/core/srt_server.go +++ b/internal/core/srt_server.go @@ -67,6 +67,7 @@ type srtServer struct { runOnConnectRestart bool runOnDisconnect string externalCmdPool *externalcmd.Pool + metrics *metrics pathManager *pathManager parent srtServerParent @@ -96,6 +97,7 @@ func newSRTServer( runOnConnectRestart bool, runOnDisconnect string, externalCmdPool *externalcmd.Pool, + metrics *metrics, pathManager *pathManager, parent srtServerParent, ) (*srtServer, error) { @@ -120,6 +122,7 @@ func newSRTServer( runOnConnectRestart: runOnConnectRestart, runOnDisconnect: runOnDisconnect, externalCmdPool: externalCmdPool, + metrics: metrics, pathManager: pathManager, parent: parent, ctx: ctx, @@ -136,6 +139,10 @@ func newSRTServer( s.Log(logger.Info, "listener opened on "+address+" (UDP)") + if s.metrics != nil { + s.metrics.srtServerSet(s) + } + newSRTListener( s.ln, &s.wg, diff --git a/internal/defs/api.go b/internal/defs/api.go index bee536ce324..f0f9150bc80 100644 --- a/internal/defs/api.go +++ b/internal/defs/api.go @@ -35,6 +35,7 @@ type APIPath struct { ReadyTime *time.Time `json:"readyTime"` Tracks []string `json:"tracks"` BytesReceived uint64 `json:"bytesReceived"` + BytesSent uint64 `json:"bytesSent"` Readers []APIPathSourceOrReader `json:"readers"` } diff --git a/internal/stream/stream.go b/internal/stream/stream.go index 31aab9867f5..6e9e2e9f7d5 100644 --- a/internal/stream/stream.go +++ b/internal/stream/stream.go @@ -24,6 +24,7 @@ type Stream struct { desc *description.Session bytesReceived *uint64 + bytesSent *uint64 smedias map[*description.Media]*streamMedia mutex sync.RWMutex rtspStream *gortsplib.ServerStream @@ -40,6 +41,7 @@ func New( s := &Stream{ desc: desc, bytesReceived: new(uint64), + bytesSent: new(uint64), } s.smedias = make(map[*description.Media]*streamMedia) @@ -75,6 +77,18 @@ func (s *Stream) BytesReceived() uint64 { return atomic.LoadUint64(s.bytesReceived) } +// BytesSent returns sent bytes. +func (s *Stream) BytesSent() uint64 { + bytesSent := atomic.LoadUint64(s.bytesSent) + if s.rtspStream != nil { + bytesSent += s.rtspStream.BytesSent() + } + if s.rtspsStream != nil { + bytesSent += s.rtspsStream.BytesSent() + } + return bytesSent +} + // RTSPStream returns the RTSP stream. func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream { s.mutex.Lock() diff --git a/internal/stream/stream_format.go b/internal/stream/stream_format.go index 0899a9e784b..68dca2905d8 100644 --- a/internal/stream/stream_format.go +++ b/internal/stream/stream_format.go @@ -102,6 +102,7 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u uni for writer, cb := range sf.readers { ccb := cb writer.Push(func() error { + atomic.AddUint64(s.bytesSent, unitSize(u)) return ccb(u) }) }