Skip to content

Commit

Permalink
- added file path to startRecordingResponse
Browse files Browse the repository at this point in the history
- send recordingRtpStatusChanged only on actual flow status change
- added pion interceptors to webrtc
  • Loading branch information
arthurpro committed Feb 24, 2023
1 parent afeb0f5 commit 67168b1
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 8 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ journalctl -u bbb-webrtc-recorder -f
status: "ok" | "failed",
error: undefined | <String>,
sdp: <String | undefined>, // answer
fileName: <String | undefined>, // full path to recording
}
```

Expand Down
5 changes: 4 additions & 1 deletion internal/pubsub/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,14 @@ func (e *StartRecording) Fail(err error) *StartRecordingResponse {
return &r
}

func (e *StartRecording) Success(sdp string) *StartRecordingResponse {
func (e *StartRecording) Success(sdp, fileName string) *StartRecordingResponse {
r := StartRecordingResponse{
Id: "startRecordingResponse",
SessionId: e.SessionId,
Status: "ok",
Error: nil,
SDP: pointer.ToString(sdp),
FileName: pointer.ToString(fileName),
}
return &r
}
Expand All @@ -84,6 +85,7 @@ startRecordingResponse (Recorder -> SFU)
status: ‘ok’ | ‘failed’,
error: undefined | <String>,
sdp: <String | undefined>, // answer
fileName: <String | undefined>, // full path to recording
}
```
*/
Expand All @@ -94,6 +96,7 @@ type StartRecordingResponse struct {
Status string `json:"status,omitempty"`
Error *string `json:"error,omitempty"`
SDP *string `json:"sdp,omitempty"`
FileName *string `json:"fileName,omitempty"`
}

/*
Expand Down
4 changes: 1 addition & 3 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,7 @@ func (s *Server) HandlePubSub(ctx context.Context, msg []byte) {
sess := NewSession(e.SessionId, s, wrtc, rec)
s.sessions.Store(e.SessionId, sess)
sdp = sess.StartRecording(e.SDP)
s.PublishPubSub(e.Success(sdp))
log.WithField("session", ctx.Value("session")).
Debug(sdp)
s.PublishPubSub(e.Success(sdp, rec.GetFilePath()))
}()
if err != nil {
log.WithField("session", ctx.Value("session")).
Expand Down
1 change: 1 addition & 0 deletions internal/webrtc/recorder/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
type FlowCallbackFn func(isFlowing bool, keyframeSequence int64, videoTimestamp time.Duration, closed bool)

type Recorder interface {
GetFilePath() string
Push(rtp *rtp.Packet, track *webrtc.TrackRemote)
SetContext(ctx context.Context)
Close() time.Duration
Expand Down
11 changes: 9 additions & 2 deletions internal/webrtc/recorder/webm.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewWebmRecorder(file string, fn FlowCallbackFn) *WebmRecorder {
case <-r.statusTickerChan:
return
case <-r.statusTicker.C:
if ts == r.videoTimestamp {
if ts == r.videoTimestamp && r.flowing {
r.flowing = false
r.flowCallbackFn(r.flowing, r.keyframeSequence, r.videoTimestamp, r.closed)
}
Expand All @@ -71,6 +71,10 @@ func NewWebmRecorder(file string, fn FlowCallbackFn) *WebmRecorder {
return r
}

func (r *WebmRecorder) GetFilePath() string {
return r.file
}

func (r *WebmRecorder) SetContext(ctx context.Context) {
r.ctx = ctx
}
Expand Down Expand Up @@ -147,6 +151,7 @@ func (r *WebmRecorder) pushVP8(rtpPacket *rtp.Packet) {

var ts time.Duration
for {
flowing := r.flowing
sample := r.videoBuilder.Pop()
if sample == nil {
return
Expand Down Expand Up @@ -180,12 +185,14 @@ func (r *WebmRecorder) pushVP8(rtpPacket *rtp.Packet) {
panic(err)
}
}

if ts == r.videoTimestamp {
r.flowing = false
} else {
ts = r.videoTimestamp
r.flowing = true
}
if !r.flowing || videoKeyframe {
if r.flowing != flowing {
r.flowCallbackFn(r.flowing, r.keyframeSequence, r.videoTimestamp, r.closed)
}

Expand Down
24 changes: 22 additions & 2 deletions internal/webrtc/webrtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"github.com/bigbluebutton/bbb-webrtc-recorder/internal/config"
"github.com/bigbluebutton/bbb-webrtc-recorder/internal/webrtc/recorder"
"github.com/bigbluebutton/bbb-webrtc-recorder/internal/webrtc/utils"
"github.com/pion/interceptor"
"github.com/pion/rtcp"
"github.com/pion/sdp/v3"
"github.com/pion/webrtc/v3"
log "github.com/sirupsen/logrus"
"io"
Expand Down Expand Up @@ -33,6 +35,12 @@ func (w WebRTC) Init(offer webrtc.SessionDescription, r recorder.Recorder, connS
// Create a MediaEngine object to configure the supported codec
m := &webrtc.MediaEngine{}

sdpOffer := sdp.SessionDescription{}

if err := sdpOffer.Unmarshal([]byte(offer.SDP)); err != nil {
panic(err)
}

// Setup the codecs you want to use.
// Only support VP8 and OPUS, this makes our WebM muxer code simpler
if err := m.RegisterCodec(webrtc.RTPCodecParameters{
Expand All @@ -50,10 +58,22 @@ func (w WebRTC) Init(offer webrtc.SessionDescription, r recorder.Recorder, connS

se := &webrtc.SettingEngine{}
se.SetSRTPReplayProtectionWindow(1024)
se.SetEphemeralUDPPortRange(w.cfg.RTCMinPort, w.cfg.RTCMaxPort)
if err := se.SetEphemeralUDPPortRange(w.cfg.RTCMinPort, w.cfg.RTCMaxPort); err != nil {
panic(err)
}

i := &interceptor.Registry{}
// Use the default set of Interceptors
if err := webrtc.RegisterDefaultInterceptors(m, i); err != nil {
panic(err)
}

// Create the API object with the MediaEngine
api := webrtc.NewAPI(webrtc.WithMediaEngine(m), webrtc.WithSettingEngine(*se))
api := webrtc.NewAPI(
webrtc.WithMediaEngine(m),
webrtc.WithSettingEngine(*se),
webrtc.WithInterceptorRegistry(i),
)

// Create a new RTCPeerConnection
peerConnection, err := api.NewPeerConnection(cfg)
Expand Down

0 comments on commit 67168b1

Please sign in to comment.