Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming real time audio (text-to-speech) #185

Open
batanus opened this issue Mar 22, 2024 · 7 comments
Open

Streaming real time audio (text-to-speech) #185

batanus opened this issue Mar 22, 2024 · 7 comments

Comments

@batanus
Copy link

batanus commented Mar 22, 2024

OpenAI API has support for streaming realtime audio by chunks (just like chat token-by-token streaming)

Screenshot 2024-03-22 at 15 10 36

Are there any plans to add support for this feature?

@batanus
Copy link
Author

batanus commented Mar 27, 2024

I've opened PR with implementing this feature: #189

@morad
Copy link

morad commented Apr 24, 2024

Hey @batanus

I have tested your PR, and it works. I was able to stream data. Thanks!

On a separate note, how are you able to play with buffer the data as it comes in? any tips?

@batanus
Copy link
Author

batanus commented May 3, 2024

Hey @morad @DanielhCarranza! So, the moment I opened PR, I began implementing this functionality (Thanks, @mihai8804858, for helping me understand the mechanics of AVPlayer's Custom ResorceLoader). In order to play an audio stream, you need to use AVPlayer with a custom resource loader, which, as new audio data arrives, would provide this data to AVPlayer. The problem with OpenAI is that since the audio file is generated in real-time, in URLResponse, the expectedContentLength field returns -1value, while for AVPlayer, it's necessary to know the total length (contentLength) of the played file. I tried to hardcode contentLength to a certain value (for example, 10_000 or 100_000), but due to the fact that it doesn't match the actual size of the audio file, it doesn't play to the end. So if you have any ideas on how to get around this problem, I'd be glad to hear your ideas.

Here's the custom resource loader implementation:

import Foundation
import AVFoundation

final class ChunkedPlayerItem: AVPlayerItem {
    private let resourceLoaderDelegate: ChunkedResourceLoaderDelegate
    private let url: URL

    init(stream: AsyncThrowingStream<(Data, URLResponse), Error>, fileExtension: String) {
        self.url = URL(string: "audio-streamer://whatever/file.\(fileExtension)")!
        self.resourceLoaderDelegate = ChunkedResourceLoaderDelegate(stream: stream)
        let asset = AVURLAsset(url: url)
        asset.resourceLoader.setDelegate(resourceLoaderDelegate, queue: .main)
        super.init(asset: asset, automaticallyLoadedAssetKeys: nil)
    }

    deinit {
        resourceLoaderDelegate.cancel()
    }
}

private final class ChunkedResourceLoaderDelegate: NSObject, AVAssetResourceLoaderDelegate {
    private let stream:  AsyncThrowingStream<(Data, URLResponse), Error>
    private var streamTask: Task<Void, Never>?

    private var pendingRequests = Set<AVAssetResourceLoadingRequest>()

    private var receivedData = Data()
    private var receivedResponse: URLResponse?
    private var receivedError: Error?

    init(stream: AsyncThrowingStream<(Data, URLResponse), Error>) {
        self.stream = stream
    }

    func resourceLoader(
        _ resourceLoader: AVAssetResourceLoader,
        shouldWaitForLoadingOfRequestedResource loadingRequest: AVAssetResourceLoadingRequest
    ) -> Bool {
        startStreamIfNeeded()
        pendingRequests.insert(loadingRequest)
        processPendingRequests()

        return true
    }

    func resourceLoader(
        _ resourceLoader: AVAssetResourceLoader,
        didCancel loadingRequest: AVAssetResourceLoadingRequest
    ) {
        pendingRequests.remove(loadingRequest)
    }

    func cancel() {
        streamTask?.cancel()
    }

    private func startStreamIfNeeded() {
        guard streamTask == nil else { return }
        streamTask = Task {
            do {
                for try await (data, response) in stream {
                    receivedResponse = response
                    receivedData.append(data)
                    processPendingRequests()
                }
            } catch {
                receivedError = error
                processPendingRequests()
            }
        }
    }

    private func processPendingRequests() {
        for request in pendingRequests {
            fillContentInformationRequest(request.contentInformationRequest)
            if let receivedError {
                request.finishLoading(with: receivedError)
                pendingRequests.remove(request)
            } else if tryFulfillDataRequest(request.dataRequest) {
                request.finishLoading()
                pendingRequests.remove(request)
            }
        }
    }

    private func fillContentInformationRequest(_ contentInformationRequest: AVAssetResourceLoadingContentInformationRequest?) {
        guard let contentInformationRequest, let receivedResponse else { return }
        contentInformationRequest.contentType = receivedResponse.mimeType
        // Problem here: receivedResponse.expectedContentLength = -1
        contentInformationRequest.contentLength = receivedResponse.expectedContentLength
        contentInformationRequest.isByteRangeAccessSupported = true
    }

    private func tryFulfillDataRequest(_ dataRequest: AVAssetResourceLoadingDataRequest?) -> Bool {
        guard let dataRequest else { return false }
        let requestedOffset = Int(dataRequest.requestedOffset)
        let requestedLength = dataRequest.requestedLength
        let currentOffset = Int(dataRequest.currentOffset)
        guard receivedData.count > currentOffset else { return false }
        let bytesToRespond = min(receivedData.count - currentOffset, requestedLength)
        let dataToRespond = receivedData.subdata(in: currentOffset..<(currentOffset + bytesToRespond))
        dataRequest.respond(with: dataToRespond)

        return receivedData.count >= requestedLength + requestedOffset
    }

    deinit {
        cancel()
    }
}

@mihai8804858
Copy link

Hey @morad @DanielhCarranza! So, the moment I opened PR, I began implementing this functionality. In order to play an audio stream, you need to use AVPlayer with a custom resource loader, which, as new audio data arrives, would provide this data to AVPlayer. The problem with OpenAI is that since the audio file is generated in real-time, in URLResponse, the expectedContentLength field returns -1value, while for AVPlayer, it's necessary to know the total length (contentLength) of the played file. I tried to hardcode contentLength to a certain value (for example, 10_000 or 100_000), but due to the fact that it doesn't match the actual size of the audio file, it doesn't play to the end. So if you have any ideas on how to get around this problem, I'd be glad to hear your ideas.

Here's the custom resource loader implementation:

import Foundation
import AVFoundation

final class ChunkedPlayerItem: AVPlayerItem {
    private let resourceLoaderDelegate: ChunkedResourceLoaderDelegate
    private let url: URL

    init(stream: AsyncThrowingStream<(Data, URLResponse), Error>, fileExtension: String) {
        self.url = URL(string: "audio-streamer://whatever/file.\(fileExtension)")!
        self.resourceLoaderDelegate = ChunkedResourceLoaderDelegate(stream: stream)
        let asset = AVURLAsset(url: url)
        asset.resourceLoader.setDelegate(resourceLoaderDelegate, queue: .main)
        super.init(asset: asset, automaticallyLoadedAssetKeys: nil)
    }

    deinit {
        resourceLoaderDelegate.cancel()
    }
}

private final class ChunkedResourceLoaderDelegate: NSObject, AVAssetResourceLoaderDelegate {
    private let stream:  AsyncThrowingStream<(Data, URLResponse), Error>
    private var streamTask: Task<Void, Never>?

    private var pendingRequests = Set<AVAssetResourceLoadingRequest>()

    private var receivedData = Data()
    private var receivedResponse: URLResponse?
    private var receivedError: Error?

    init(stream: AsyncThrowingStream<(Data, URLResponse), Error>) {
        self.stream = stream
    }

    func resourceLoader(
        _ resourceLoader: AVAssetResourceLoader,
        shouldWaitForLoadingOfRequestedResource loadingRequest: AVAssetResourceLoadingRequest
    ) -> Bool {
        startStreamIfNeeded()
        pendingRequests.insert(loadingRequest)
        processPendingRequests()

        return true
    }

    func resourceLoader(
        _ resourceLoader: AVAssetResourceLoader,
        didCancel loadingRequest: AVAssetResourceLoadingRequest
    ) {
        pendingRequests.remove(loadingRequest)
    }

    func cancel() {
        streamTask?.cancel()
    }

    private func startStreamIfNeeded() {
        guard streamTask == nil else { return }
        streamTask = Task {
            do {
                for try await (data, response) in stream {
                    receivedResponse = response
                    receivedData.append(data)
                    processPendingRequests()
                }
            } catch {
                receivedError = error
                processPendingRequests()
            }
        }
    }

    private func processPendingRequests() {
        for request in pendingRequests {
            fillContentInformationRequest(request.contentInformationRequest)
            if let receivedError {
                request.finishLoading(with: receivedError)
                pendingRequests.remove(request)
            } else if tryFulfillDataRequest(request.dataRequest) {
                request.finishLoading()
                pendingRequests.remove(request)
            }
        }
    }

    private func fillContentInformationRequest(_ contentInformationRequest: AVAssetResourceLoadingContentInformationRequest?) {
        guard let contentInformationRequest, let receivedResponse else { return }
        contentInformationRequest.contentType = receivedResponse.mimeType
        // Problem here: receivedResponse.expectedContentLength = -1
        contentInformationRequest.contentLength = receivedResponse.expectedContentLength
        contentInformationRequest.isByteRangeAccessSupported = true
    }

    private func tryFulfillDataRequest(_ dataRequest: AVAssetResourceLoadingDataRequest?) -> Bool {
        guard let dataRequest else { return false }
        let requestedOffset = Int(dataRequest.requestedOffset)
        let requestedLength = dataRequest.requestedLength
        let currentOffset = Int(dataRequest.currentOffset)
        guard receivedData.count > currentOffset else { return false }
        let bytesToRespond = min(receivedData.count - currentOffset, requestedLength)
        let dataToRespond = receivedData.subdata(in: currentOffset..<(currentOffset + bytesToRespond))
        dataRequest.respond(with: dataToRespond)

        return receivedData.count >= requestedLength + requestedOffset
    }

    deinit {
        cancel()
    }
}

Since text-to-speech API uses chunked transfer encoding, the Content-Length is omitted in this case so AVPlayer doesn't know how much data to expect from resource loader so the playback breaks.

I was able to implement real time audio playback of audio chunks as they arrive using following approach:

@morad
Copy link

morad commented May 9, 2024

Great! @mihai8804858 do you mind if you can share a sample code of yours on this? Thanks

@mihai8804858
Copy link

Great! @mihai8804858 do you mind if you can share a sample code of yours on this? Thanks

Sure, here's the code I came up with:

import Foundation
import AVFoundation
import AudioToolbox
import os

extension CMSampleBuffer: @unchecked Sendable {}

final class AudioPlayer {
    private let queue = DispatchQueue(label: "audio.player.queue")

    private var task: Task<Void, Never>?
    private var buffers = OSAllocatedUnfairLock(initialState: [CMSampleBuffer]())
    private var audioStreamID: AudioFileStreamID?
    private var audioRenderer: AVSampleBufferAudioRenderer?
    private var audioDescription: AudioStreamBasicDescription?
    private var audioSynchronizer: AVSampleBufferRenderSynchronizer?
    private var audioRendererStatusObservation: NSKeyValueObservation?
    private var nextBufferTimeOffset = CMTime.zero
    private var dataReceiveComplete =  false

    deinit {
        stop()
    }

    func start(_ stream: AsyncThrowingStream<Data, Error>, type: AudioFileTypeID? = nil) {
        stop()
        openFileStream(type: type)
        startReceivingData(from: stream)
    }

    func stop() {
        cancelDataTask()
        closeFileStream()
        removeSampleBuffers()
        audioSynchronizer = nil
        audioDescription = nil
        audioRenderer = nil
        nextBufferTimeOffset = .zero
    }

    // MARK: - Audio File Stream

    private func openFileStream(type: AudioFileTypeID?) {
        closeFileStream()
        let playerInstance = UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())
        let status = AudioFileStreamOpen(
            playerInstance,
            { playerInstance, _, propertyID, _ in
                let player = Unmanaged<AudioPlayer>.fromOpaque(playerInstance).takeUnretainedValue()
                player.onFileStreamPropertyReceived(propertyID: propertyID)
            },
            { playerInstance, numberBytes, numberPackets, bytes, packets in
                let player = Unmanaged<AudioPlayer>.fromOpaque(playerInstance).takeUnretainedValue()
                player.onFileStreamPacketsReceived(
                    numberOfBytes: numberBytes,
                    bytes: bytes,
                    numberOfPackets: numberPackets,
                    packets: packets
                )
            },
            type ?? 0,
            &audioStreamID
        )
    }

    private func closeFileStream() {
        audioStreamID.flatMap { _ = AudioFileStreamClose($0) }
        audioStreamID = nil
    }

    private func onFileStreamPropertyReceived(propertyID: AudioFilePropertyID) {
        guard let audioStreamID = audioStreamID, propertyID == kAudioFileStreamProperty_DataFormat else { return }
        var asbdSize: UInt32 = 0
        var asbd = AudioStreamBasicDescription()
        let getInfoStatus = AudioFileStreamGetPropertyInfo(audioStreamID, propertyID, &asbdSize, nil)
        guard getInfoStatus == noErr else { return }
        let getPropertyStatus = AudioFileStreamGetProperty(audioStreamID, propertyID, &asbdSize, &asbd)
        guard getPropertyStatus == noErr else { return }
        let renderer = AVSampleBufferAudioRenderer()
        let synchronizer = AVSampleBufferRenderSynchronizer()
        synchronizer.addRenderer(renderer)
        nextBufferTimeOffset = CMTime(value: 0, timescale: Int32(asbd.mSampleRate))
        audioDescription = asbd
        audioRenderer = renderer
        audioSynchronizer = synchronizer
        startRequestingMediaData(audioRenderer: renderer, audioSynchronizer: synchronizer)
    }

    private func onFileStreamPacketsReceived(
        numberOfBytes: UInt32,
        bytes: UnsafeRawPointer,
        numberOfPackets: UInt32,
        packets: UnsafeMutablePointer<AudioStreamPacketDescription>?
    ) {
        guard let audioDescription else { return }
        guard let buffer = makeSampleBuffer(
            from: Data(bytes: bytes, count: Int(numberOfBytes)),
            asbd: audioDescription,
            presentationTimeStamp: nextBufferTimeOffset,
            packetCount: numberOfPackets,
            packetDescriptions: packets
        ) else { return }
        let bufferStartTime = CMSampleBufferGetOutputPresentationTimeStamp(buffer)
        let bufferDuration = CMSampleBufferGetOutputDuration(buffer)
        nextBufferTimeOffset = bufferStartTime + bufferDuration
        buffers.withLock { $0.append(buffer) }
    }

    // MARK: - Data Stream

    private func startReceivingData(from stream: AsyncThrowingStream<Data, Error>) {
        cancelDataTask()
        task = Task { [weak self] in
            guard let self else { return }
            do {
                for try await data in stream {
                    parseData(data)
                }
                finishDataParsing()
            } catch {
                return
            }
        }
    }

    private func cancelDataTask() {
        dataReceiveComplete = false
        task?.cancel()
        task = nil
    }

    private func parseData(_ data: Data) {
        guard let audioStreamID else { return }
        data.withUnsafeBytes { pointer in
            guard let baseAddress = pointer.baseAddress else { return }
            AudioFileStreamParseBytes(audioStreamID, UInt32(data.count), baseAddress, [])
        }
    }

    private func finishDataParsing() {
        dataReceiveComplete = true
        guard let audioStreamID else { return }
        AudioFileStreamParseBytes(audioStreamID, 0, nil, [])
    }

    // MARK: - Sample Buffers

    private func nextSampleBuffer() -> CMSampleBuffer? {
        buffers.withLock { buffers in
            if buffers.isEmpty { return nil }
            return buffers.removeFirst()
        }
    }

    private func removeSampleBuffers() {
        buffers.withLock { $0.removeAll() }
        audioRenderer?.flush()
    }

    private func makeSampleBuffer(
        from data: Data,
        asbd: AudioStreamBasicDescription,
        presentationTimeStamp: CMTime,
        packetCount: UInt32,
        packetDescriptions: UnsafePointer<AudioStreamPacketDescription>?
    ) -> CMSampleBuffer? {
        guard let blockBuffer = makeBlockBuffer(from: data) else { return nil }
        guard let formatDescription = try? CMFormatDescription(audioStreamBasicDescription: asbd) else { return nil }
        var sampleBuffer: CMSampleBuffer? = nil
        let createStatus = CMAudioSampleBufferCreateReadyWithPacketDescriptions(
            allocator: kCFAllocatorDefault,
            dataBuffer: blockBuffer,
            formatDescription: formatDescription,
            sampleCount: CMItemCount(packetCount),
            presentationTimeStamp: presentationTimeStamp,
            packetDescriptions: packetDescriptions,
            sampleBufferOut: &sampleBuffer
        )

        return sampleBuffer
    }

    private func makeBlockBuffer(from data: Data) -> CMBlockBuffer? {
        var blockBuffer: CMBlockBuffer?
        let createStatus = CMBlockBufferCreateWithMemoryBlock(
            allocator: kCFAllocatorDefault,
            memoryBlock: nil,
            blockLength: data.count,
            blockAllocator: kCFAllocatorDefault,
            customBlockSource: nil,
            offsetToData: 0,
            dataLength: data.count,
            flags: kCMBlockBufferAssureMemoryNowFlag,
            blockBufferOut: &blockBuffer
        )
        guard createStatus == noErr else { return nil }
        guard let blockBuffer else { return nil }
        return data.withUnsafeBytes { pointer in
            guard let baseAddress = pointer.baseAddress else { return nil }
            let replaceStatus = CMBlockBufferReplaceDataBytes(
                with: baseAddress,
                blockBuffer: blockBuffer,
                offsetIntoDestination: 0,
                dataLength: data.count
            )

            return blockBuffer
        }
    }

    // MARK: - Media Data

    private func startRequestingMediaData(
        audioRenderer: AVSampleBufferAudioRenderer,
        audioSynchronizer: AVSampleBufferRenderSynchronizer
    ) {
        audioRenderer.requestMediaDataWhenReady(on: queue) { [weak self] in
            self?.provideMediaDataIfNeeded(
                audioRenderer: audioRenderer,
                audioSynchronizer: audioSynchronizer
            )
        }
    }

    private func provideMediaDataIfNeeded(
        audioRenderer: AVSampleBufferAudioRenderer,
        audioSynchronizer: AVSampleBufferRenderSynchronizer
    ) {
        while audioRenderer.isReadyForMoreMediaData {
            if let buffer = nextSampleBuffer() {
                audioRenderer.enqueue(buffer)
                startPlaybackIfCan(
                    audioRenderer: audioRenderer,
                    audioSynchronizer: audioSynchronizer
                )
            } else if dataReceiveComplete {
                audioRenderer.stopRequestingMediaData()
                break
            }
        }
    }

    private func startPlaybackIfCan(
        audioRenderer: AVSampleBufferAudioRenderer,
        audioSynchronizer: AVSampleBufferRenderSynchronizer
    ) {
        guard audioRenderer.hasSufficientMediaDataForReliablePlaybackStart, audioSynchronizer.rate == 0 else { return }
        audioSynchronizer.setRate(1.0, time: .zero)
    }
}

@mihai8804858
Copy link

Great! @mihai8804858 do you mind if you can share a sample code of yours on this? Thanks

Sure, here's the code I came up with:
...

I have created an open-source SPM package of this implementation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants