diff --git a/pkg/sfu/downtrack.go b/pkg/sfu/downtrack.go index 92047f3cb..2a1fa0e21 100644 --- a/pkg/sfu/downtrack.go +++ b/pkg/sfu/downtrack.go @@ -24,7 +24,7 @@ const ( // to SFU Subscriber, the track handle the packets for simple, simulcast // and SVC Publisher. type DownTrack struct { - sync.Mutex + mu sync.Mutex id string peerID string bound atomicBool @@ -192,6 +192,8 @@ func (d *DownTrack) SetInitialLayers(spatialLayer, temporalLayer int64) { func (d *DownTrack) SwitchSpatialLayer(targetLayer int64, setAsMax bool) { if d.trackType == SimulcastDownTrack { + d.mu.Lock() + defer d.mu.Unlock() layer := atomic.LoadInt32(&d.spatialLayer) currentLayer := uint16(layer) currentTargetLayer := uint16(layer >> 16) @@ -283,6 +285,11 @@ func (d *DownTrack) UpdateStats(packetLen uint32) { atomic.AddUint32(&d.packetCount, 1) } +func (d *DownTrack) currentSpatialLayer() int { + spatialLayer := atomic.LoadInt32(&d.spatialLayer) + return int(spatialLayer & 0x0f) +} + func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { if d.reSync.get() { if d.Kind() == webrtc.RTPCodecTypeVideo { @@ -330,7 +337,6 @@ func (d *DownTrack) writeSimpleRTP(extPkt *buffer.ExtPacket) error { func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket) error { // Check if packet SSRC is different from before // if true, the video source changed - d.Lock() reSync := d.reSync.get() lastSSRC := atomic.LoadUint32(&d.lastSSRC) if lastSSRC != extPkt.Packet.SSRC || reSync { @@ -338,7 +344,6 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket) error { currentLayer := uint16(layer) targetLayer := uint16(layer >> 16) if currentLayer == targetLayer && !reSync { - d.Unlock() return nil } // Wait for a keyframe to sync new source @@ -347,16 +352,13 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket) error { d.receiver.SendRTCP([]rtcp.Packet{ &rtcp.PictureLossIndication{SenderSSRC: d.ssrc, MediaSSRC: extPkt.Packet.SSRC}, }) - d.Unlock() return nil } if reSync && d.simulcast.lTSCalc != 0 { d.simulcast.lTSCalc = extPkt.Arrival } - // Switch is done remove sender from previous layer - // and update current layer + // Switch is done remove update current layer if currentLayer != targetLayer && !reSync { - go d.receiver.DeleteDownTrack(int(currentLayer), d.peerID) atomic.StoreInt32(&d.spatialLayer, int32(targetLayer)<<16|int32(targetLayer)) } @@ -394,7 +396,6 @@ func (d *DownTrack) writeSimulcastRTP(extPkt *buffer.ExtPacket) error { } newSN := extPkt.Packet.SequenceNumber - d.snOffset newTS := extPkt.Packet.Timestamp - d.tsOffset - d.Unlock() var ( picID uint16 diff --git a/pkg/sfu/receiver.go b/pkg/sfu/receiver.go index 4b591747d..444f04199 100644 --- a/pkg/sfu/receiver.go +++ b/pkg/sfu/receiver.go @@ -346,7 +346,10 @@ func (w *WebRTCReceiver) writeRTP(layer int) { if w.isSimulcast && len(w.pendingTracks[layer]) > 0 { if pkt.KeyFrame { - w.downTracks[layer] = append(w.downTracks[layer], w.pendingTracks[layer]...) + for _, dt := range w.pendingTracks[layer] { + w.downTracks[layer] = append(w.downTracks[layer], dt) + w.DeleteDownTrack(dt.currentSpatialLayer(), dt.peerID) + } w.pendingTracks[layer] = w.pendingTracks[layer][:0] } else { w.SendRTCP(pli) @@ -354,7 +357,7 @@ func (w *WebRTCReceiver) writeRTP(layer int) { } for idx, dt := range w.downTracks[layer] { - if err := dt.WriteRTP(pkt); err == io.EOF || err == io.ErrClosedPipe { + if err = dt.WriteRTP(pkt); err == io.EOF || err == io.ErrClosedPipe { del = append(del, idx) } }