Skip to content

Commit

Permalink
add missing Prometheus exports (#2620, #2619):
Browse files Browse the repository at this point in the history
paths_bytes_sent, srt_conns, srt_conns_bytes_received, srt_conns_bytes_sent
  • Loading branch information
rse committed Nov 5, 2023
1 parent 621a10a commit 9ca41b1
Show file tree
Hide file tree
Showing 11 changed files with 75 additions and 0 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,7 @@ Obtaining:
# metrics of every path
paths{name="[path_name]",state="[state]"} 1
paths_bytes_received{name="[path_name]",state="[state]"} 1234
paths_bytes_sent{name="[path_name]",state="[state]"} 1234
# metrics of every HLS muxer
hls_muxers{name="[name]"} 1
Expand Down Expand Up @@ -1480,6 +1481,11 @@ rtmp_conns{id="[id]",state="[state]"} 1
rtmp_conns_bytes_received{id="[id]",state="[state]"} 1234
rtmp_conns_bytes_sent{id="[id]",state="[state]"} 187
# metrics of every SRT connection
srt_conns{id="[id]",state="[state]"} 1
srt_conns_bytes_received{id="[id]",state="[state]"} 1234
srt_conns_bytes_sent{id="[id]",state="[state]"} 187
# metrics of every WebRTC session
webrtc_sessions{id="[id]"} 1
webrtc_sessions_bytes_received{id="[id]",state="[state]"} 1234
Expand Down
3 changes: 3 additions & 0 deletions apidocs/openapi.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,9 @@ components:
bytesReceived:
type: integer
format: int64
bytesSent:
type: integer
format: int64
readers:
type: array
items:
Expand Down
2 changes: 2 additions & 0 deletions internal/core/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,6 +422,7 @@ func TestAPIPathsList(t *testing.T) {
Ready bool `json:"ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}

type pathList struct {
Expand Down Expand Up @@ -625,6 +626,7 @@ func TestAPIPathsGet(t *testing.T) {
Ready bool `json:"Ready"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
}

var pathName string
Expand Down
1 change: 1 addition & 0 deletions internal/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ func (p *Core) createResources(initial bool) error {
p.conf.RunOnConnectRestart,
p.conf.RunOnDisconnect,
p.externalCmdPool,
p.metrics,
p.pathManager,
p,
)
Expand Down
25 changes: 25 additions & 0 deletions internal/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type metrics struct {
rtspServer apiRTSPServer
rtspsServer apiRTSPServer
rtmpServer apiRTMPServer
srtServer apiSRTServer
hlsManager apiHLSManager
webRTCManager apiWebRTCManager
}
Expand Down Expand Up @@ -96,6 +97,7 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
tags := "{name=\"" + i.Name + "\",state=\"" + state + "\"}"
out += metric("paths", tags, 1)
out += metric("paths_bytes_received", tags, int64(i.BytesReceived))
out += metric("paths_bytes_sent", tags, int64(i.BytesSent))
}
} else {
out += metric("paths", "", 0)
Expand Down Expand Up @@ -199,6 +201,22 @@ func (m *metrics) onMetrics(ctx *gin.Context) {
}
}

if !interfaceIsEmpty(m.srtServer) {
data, err := m.srtServer.apiConnsList()
if err == nil && len(data.Items) != 0 {
for _, i := range data.Items {
tags := "{id=\"" + i.ID.String() + "\",state=\"" + string(i.State) + "\"}"
out += metric("srt_conns", tags, 1)
out += metric("srt_conns_bytes_received", tags, int64(i.BytesReceived))
out += metric("srt_conns_bytes_sent", tags, int64(i.BytesSent))
}

Check warning on line 212 in internal/core/metrics.go

View check run for this annotation

Codecov / codecov/patch

internal/core/metrics.go#L207-L212

Added lines #L207 - L212 were not covered by tests
} else {
out += metric("srt_conns", "", 0)
out += metric("srt_conns_bytes_received", "", 0)
out += metric("srt_conns_bytes_sent", "", 0)
}
}

if !interfaceIsEmpty(m.webRTCManager) {
data, err := m.webRTCManager.apiSessionsList()
if err == nil && len(data.Items) != 0 {
Expand Down Expand Up @@ -254,6 +272,13 @@ func (m *metrics) rtmpServerSet(s apiRTMPServer) {
m.rtmpServer = s
}

// srtServerSet is called by srtServer.
func (m *metrics) srtServerSet(s apiSRTServer) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.srtServer = s
}

// webRTCManagerSet is called by webRTCManager.
func (m *metrics) webRTCManagerSet(s apiWebRTCManager) {
m.mutex.Lock()
Expand Down
9 changes: 9 additions & 0 deletions internal/core/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ rtsps_sessions_bytes_sent 0
rtmp_conns 0
rtmp_conns_bytes_received 0
rtmp_conns_bytes_sent 0
srt_conns 0
srt_conns_bytes_received 0
srt_conns_bytes_sent 0
webrtc_sessions 0
webrtc_sessions_bytes_received 0
webrtc_sessions_bytes_sent 0
Expand Down Expand Up @@ -110,10 +113,13 @@ webrtc_sessions_bytes_sent 0
require.Regexp(t,
`^paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`paths\{name=".*?",state="ready"\} 1`+"\n"+
`paths_bytes_received\{name=".*?",state="ready"\} 0`+"\n"+
`paths_bytes_sent\{name=".*?",state="ready"\} 0`+"\n"+
`hls_muxers\{name=".*?"\} 1`+"\n"+
`hls_muxers_bytes_sent\{name=".*?"\} [0-9]+`+"\n"+
`hls_muxers\{name=".*?"\} 1`+"\n"+
Expand All @@ -135,6 +141,9 @@ webrtc_sessions_bytes_sent 0
`rtmp_conns\{id=".*?",state="publish"\} 1`+"\n"+
`rtmp_conns_bytes_received\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`rtmp_conns_bytes_sent\{id=".*?",state="publish"\} [0-9]+`+"\n"+
`srt_conns 0`+"\n"+
`srt_conns_bytes_received 0`+"\n"+
`srt_conns_bytes_sent 0`+"\n"+
`webrtc_sessions 0`+"\n"+
`webrtc_sessions_bytes_received 0`+"\n"+
`webrtc_sessions_bytes_sent 0`+"\n"+
Expand Down
6 changes: 6 additions & 0 deletions internal/core/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,6 +718,12 @@ func (pa *path) doAPIPathsGet(req pathAPIPathsGetReq) {
}
return pa.stream.BytesReceived()
}(),
BytesSent: func() uint64 {
if pa.stream == nil {
return 0
}
return pa.stream.BytesSent()
}(),
Readers: func() []defs.APIPathSourceOrReader {
ret := []defs.APIPathSourceOrReader{}
for r := range pa.readers {
Expand Down
7 changes: 7 additions & 0 deletions internal/core/srt_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type srtServer struct {
runOnConnectRestart bool
runOnDisconnect string
externalCmdPool *externalcmd.Pool
metrics *metrics
pathManager *pathManager
parent srtServerParent

Expand Down Expand Up @@ -96,6 +97,7 @@ func newSRTServer(
runOnConnectRestart bool,
runOnDisconnect string,
externalCmdPool *externalcmd.Pool,
metrics *metrics,
pathManager *pathManager,
parent srtServerParent,
) (*srtServer, error) {
Expand All @@ -120,6 +122,7 @@ func newSRTServer(
runOnConnectRestart: runOnConnectRestart,
runOnDisconnect: runOnDisconnect,
externalCmdPool: externalCmdPool,
metrics: metrics,
pathManager: pathManager,
parent: parent,
ctx: ctx,
Expand All @@ -136,6 +139,10 @@ func newSRTServer(

s.Log(logger.Info, "listener opened on "+address+" (UDP)")

if s.metrics != nil {
s.metrics.srtServerSet(s)
}

newSRTListener(
s.ln,
&s.wg,
Expand Down
1 change: 1 addition & 0 deletions internal/defs/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ type APIPath struct {
ReadyTime *time.Time `json:"readyTime"`
Tracks []string `json:"tracks"`
BytesReceived uint64 `json:"bytesReceived"`
BytesSent uint64 `json:"bytesSent"`
Readers []APIPathSourceOrReader `json:"readers"`
}

Expand Down
14 changes: 14 additions & 0 deletions internal/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Stream struct {
desc *description.Session

bytesReceived *uint64
bytesSent *uint64
smedias map[*description.Media]*streamMedia
mutex sync.RWMutex
rtspStream *gortsplib.ServerStream
Expand All @@ -40,6 +41,7 @@ func New(
s := &Stream{
desc: desc,
bytesReceived: new(uint64),
bytesSent: new(uint64),
}

s.smedias = make(map[*description.Media]*streamMedia)
Expand Down Expand Up @@ -75,6 +77,18 @@ func (s *Stream) BytesReceived() uint64 {
return atomic.LoadUint64(s.bytesReceived)
}

// BytesSent returns sent bytes.
func (s *Stream) BytesSent() uint64 {
bytesSent := atomic.LoadUint64(s.bytesSent)
if s.rtspStream != nil {
bytesSent += s.rtspStream.BytesSent()
}
if s.rtspsStream != nil {
bytesSent += s.rtspsStream.BytesSent()
}
return bytesSent
}

// RTSPStream returns the RTSP stream.
func (s *Stream) RTSPStream(server *gortsplib.Server) *gortsplib.ServerStream {
s.mutex.Lock()
Expand Down
1 change: 1 addition & 0 deletions internal/stream/stream_format.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (sf *streamFormat) writeUnitInner(s *Stream, medi *description.Media, u uni
for writer, cb := range sf.readers {
ccb := cb
writer.Push(func() error {
atomic.AddUint64(s.bytesSent, unitSize(u))
return ccb(u)
})
}
Expand Down

0 comments on commit 9ca41b1

Please sign in to comment.