Skip to content

Commit

Permalink
fix(sfu): lock spatial layer change
Browse files Browse the repository at this point in the history
  • Loading branch information
OrlandoCo committed May 10, 2021
1 parent 5f66903 commit cf118ae
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 10 deletions.
17 changes: 9 additions & 8 deletions pkg/sfu/downtrack.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
// to SFU Subscriber, the track handle the packets for simple, simulcast
// and SVC Publisher.
type DownTrack struct {
sync.Mutex
mu sync.Mutex
id string
peerID string
bound atomicBool
Expand Down Expand Up @@ -192,6 +192,8 @@ func (d *DownTrack) SetInitialLayers(spatialLayer, temporalLayer int64) {

func (d *DownTrack) SwitchSpatialLayer(targetLayer int64, setAsMax bool) {
if d.trackType == SimulcastDownTrack {
d.mu.Lock()
defer d.mu.Unlock()
layer := atomic.LoadInt32(&d.spatialLayer)
currentLayer := uint16(layer)
currentTargetLayer := uint16(layer >> 16)
Expand Down Expand Up @@ -283,6 +285,11 @@ func (d *DownTrack) UpdateStats(packetLen uint32) {
atomic.AddUint32(&d.packetCount, 1)
}

func (d *DownTrack) currentSpatialLayer() int {
spatialLayer := atomic.LoadInt32(&d.spatialLayer)
return int(spatialLayer & 0x0f)
}

func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error {
if d.reSync.get() {
if d.Kind() == webrtc.RTPCodecTypeVideo {
Expand Down Expand Up @@ -330,15 +337,13 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error {
func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket) error {
// Check if packet SSRC is different from before
// if true, the video source changed
d.Lock()
reSync := d.reSync.get()
lastSSRC := atomic.LoadUint32(&d.lastSSRC)
if lastSSRC != extPkt.Packet.SSRC || reSync {
layer := atomic.LoadInt32(&d.spatialLayer)
currentLayer := uint16(layer)
targetLayer := uint16(layer >> 16)
if currentLayer == targetLayer && !reSync {
d.Unlock()
return nil
}
// Wait for a keyframe to sync new source
Expand All @@ -347,16 +352,13 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket) error {
d.receiver.SendRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{SenderSSRC: d.ssrc, MediaSSRC: extPkt.Packet.SSRC},
})
d.Unlock()
return nil
}
if reSync && d.simulcast.lTSCalc != 0 {
d.simulcast.lTSCalc = extPkt.Arrival
}
// Switch is done remove sender from previous layer
// and update current layer
// Switch is done remove update current layer
if currentLayer != targetLayer && !reSync {
go d.receiver.DeleteDownTrack(int(currentLayer), d.peerID)
atomic.StoreInt32(&d.spatialLayer, int32(targetLayer)<<16|int32(targetLayer))
}

Expand Down Expand Up @@ -394,7 +396,6 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket) error {
}
newSN := extPkt.Packet.SequenceNumber - d.snOffset
newTS := extPkt.Packet.Timestamp - d.tsOffset
d.Unlock()

var (
picID uint16
Expand Down
7 changes: 5 additions & 2 deletions pkg/sfu/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,15 +346,18 @@ func (w *WebRTCReceiver) writeRTP(layer int) {

if w.isSimulcast && len(w.pendingTracks[layer]) > 0 {
if pkt.KeyFrame {
w.downTracks[layer] = append(w.downTracks[layer], w.pendingTracks[layer]...)
for _, dt := range w.pendingTracks[layer] {
w.downTracks[layer] = append(w.downTracks[layer], dt)
w.DeleteDownTrack(dt.currentSpatialLayer(), dt.peerID)
}
w.pendingTracks[layer] = w.pendingTracks[layer][:0]
} else {
w.SendRTCP(pli)
}
}

for idx, dt := range w.downTracks[layer] {
if err := dt.WriteRTP(pkt); err == io.EOF || err == io.ErrClosedPipe {
if err = dt.WriteRTP(pkt); err == io.EOF || err == io.ErrClosedPipe {
del = append(del, idx)
}
}
Expand Down

0 comments on commit cf118ae

Please sign in to comment.