-
Notifications
You must be signed in to change notification settings - Fork 351
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Streaming real time audio (text-to-speech) #185
Comments
I've opened PR with implementing this feature: #189 |
Hey @batanus I have tested your PR, and it works. I was able to stream data. Thanks! On a separate note, how are you able to play with buffer the data as it comes in? any tips? |
Hey @morad @DanielhCarranza! So, the moment I opened PR, I began implementing this functionality (Thanks, @mihai8804858, for helping me understand the mechanics of AVPlayer's Custom ResorceLoader). In order to play an audio stream, you need to use AVPlayer with a custom resource loader, which, as new audio data arrives, would provide this data to AVPlayer. The problem with OpenAI is that since the audio file is generated in real-time, in URLResponse, the Here's the custom resource loader implementation: import Foundation
import AVFoundation
final class ChunkedPlayerItem: AVPlayerItem {
private let resourceLoaderDelegate: ChunkedResourceLoaderDelegate
private let url: URL
init(stream: AsyncThrowingStream<(Data, URLResponse), Error>, fileExtension: String) {
self.url = URL(string: "audio-streamer://whatever/file.\(fileExtension)")!
self.resourceLoaderDelegate = ChunkedResourceLoaderDelegate(stream: stream)
let asset = AVURLAsset(url: url)
asset.resourceLoader.setDelegate(resourceLoaderDelegate, queue: .main)
super.init(asset: asset, automaticallyLoadedAssetKeys: nil)
}
deinit {
resourceLoaderDelegate.cancel()
}
}
private final class ChunkedResourceLoaderDelegate: NSObject, AVAssetResourceLoaderDelegate {
private let stream: AsyncThrowingStream<(Data, URLResponse), Error>
private var streamTask: Task<Void, Never>?
private var pendingRequests = Set<AVAssetResourceLoadingRequest>()
private var receivedData = Data()
private var receivedResponse: URLResponse?
private var receivedError: Error?
init(stream: AsyncThrowingStream<(Data, URLResponse), Error>) {
self.stream = stream
}
func resourceLoader(
_ resourceLoader: AVAssetResourceLoader,
shouldWaitForLoadingOfRequestedResource loadingRequest: AVAssetResourceLoadingRequest
) -> Bool {
startStreamIfNeeded()
pendingRequests.insert(loadingRequest)
processPendingRequests()
return true
}
func resourceLoader(
_ resourceLoader: AVAssetResourceLoader,
didCancel loadingRequest: AVAssetResourceLoadingRequest
) {
pendingRequests.remove(loadingRequest)
}
func cancel() {
streamTask?.cancel()
}
private func startStreamIfNeeded() {
guard streamTask == nil else { return }
streamTask = Task {
do {
for try await (data, response) in stream {
receivedResponse = response
receivedData.append(data)
processPendingRequests()
}
} catch {
receivedError = error
processPendingRequests()
}
}
}
private func processPendingRequests() {
for request in pendingRequests {
fillContentInformationRequest(request.contentInformationRequest)
if let receivedError {
request.finishLoading(with: receivedError)
pendingRequests.remove(request)
} else if tryFulfillDataRequest(request.dataRequest) {
request.finishLoading()
pendingRequests.remove(request)
}
}
}
private func fillContentInformationRequest(_ contentInformationRequest: AVAssetResourceLoadingContentInformationRequest?) {
guard let contentInformationRequest, let receivedResponse else { return }
contentInformationRequest.contentType = receivedResponse.mimeType
// Problem here: receivedResponse.expectedContentLength = -1
contentInformationRequest.contentLength = receivedResponse.expectedContentLength
contentInformationRequest.isByteRangeAccessSupported = true
}
private func tryFulfillDataRequest(_ dataRequest: AVAssetResourceLoadingDataRequest?) -> Bool {
guard let dataRequest else { return false }
let requestedOffset = Int(dataRequest.requestedOffset)
let requestedLength = dataRequest.requestedLength
let currentOffset = Int(dataRequest.currentOffset)
guard receivedData.count > currentOffset else { return false }
let bytesToRespond = min(receivedData.count - currentOffset, requestedLength)
let dataToRespond = receivedData.subdata(in: currentOffset..<(currentOffset + bytesToRespond))
dataRequest.respond(with: dataToRespond)
return receivedData.count >= requestedLength + requestedOffset
}
deinit {
cancel()
}
} |
Since text-to-speech API uses chunked transfer encoding, the I was able to implement real time audio playback of audio chunks as they arrive using following approach:
|
Great! @mihai8804858 do you mind if you can share a sample code of yours on this? Thanks |
Sure, here's the code I came up with: import Foundation
import AVFoundation
import AudioToolbox
import os
extension CMSampleBuffer: @unchecked Sendable {}
final class AudioPlayer {
private let queue = DispatchQueue(label: "audio.player.queue")
private var task: Task<Void, Never>?
private var buffers = OSAllocatedUnfairLock(initialState: [CMSampleBuffer]())
private var audioStreamID: AudioFileStreamID?
private var audioRenderer: AVSampleBufferAudioRenderer?
private var audioDescription: AudioStreamBasicDescription?
private var audioSynchronizer: AVSampleBufferRenderSynchronizer?
private var audioRendererStatusObservation: NSKeyValueObservation?
private var nextBufferTimeOffset = CMTime.zero
private var dataReceiveComplete = false
deinit {
stop()
}
func start(_ stream: AsyncThrowingStream<Data, Error>, type: AudioFileTypeID? = nil) {
stop()
openFileStream(type: type)
startReceivingData(from: stream)
}
func stop() {
cancelDataTask()
closeFileStream()
removeSampleBuffers()
audioSynchronizer = nil
audioDescription = nil
audioRenderer = nil
nextBufferTimeOffset = .zero
}
// MARK: - Audio File Stream
private func openFileStream(type: AudioFileTypeID?) {
closeFileStream()
let playerInstance = UnsafeMutableRawPointer(Unmanaged.passUnretained(self).toOpaque())
let status = AudioFileStreamOpen(
playerInstance,
{ playerInstance, _, propertyID, _ in
let player = Unmanaged<AudioPlayer>.fromOpaque(playerInstance).takeUnretainedValue()
player.onFileStreamPropertyReceived(propertyID: propertyID)
},
{ playerInstance, numberBytes, numberPackets, bytes, packets in
let player = Unmanaged<AudioPlayer>.fromOpaque(playerInstance).takeUnretainedValue()
player.onFileStreamPacketsReceived(
numberOfBytes: numberBytes,
bytes: bytes,
numberOfPackets: numberPackets,
packets: packets
)
},
type ?? 0,
&audioStreamID
)
}
private func closeFileStream() {
audioStreamID.flatMap { _ = AudioFileStreamClose($0) }
audioStreamID = nil
}
private func onFileStreamPropertyReceived(propertyID: AudioFilePropertyID) {
guard let audioStreamID = audioStreamID, propertyID == kAudioFileStreamProperty_DataFormat else { return }
var asbdSize: UInt32 = 0
var asbd = AudioStreamBasicDescription()
let getInfoStatus = AudioFileStreamGetPropertyInfo(audioStreamID, propertyID, &asbdSize, nil)
guard getInfoStatus == noErr else { return }
let getPropertyStatus = AudioFileStreamGetProperty(audioStreamID, propertyID, &asbdSize, &asbd)
guard getPropertyStatus == noErr else { return }
let renderer = AVSampleBufferAudioRenderer()
let synchronizer = AVSampleBufferRenderSynchronizer()
synchronizer.addRenderer(renderer)
nextBufferTimeOffset = CMTime(value: 0, timescale: Int32(asbd.mSampleRate))
audioDescription = asbd
audioRenderer = renderer
audioSynchronizer = synchronizer
startRequestingMediaData(audioRenderer: renderer, audioSynchronizer: synchronizer)
}
private func onFileStreamPacketsReceived(
numberOfBytes: UInt32,
bytes: UnsafeRawPointer,
numberOfPackets: UInt32,
packets: UnsafeMutablePointer<AudioStreamPacketDescription>?
) {
guard let audioDescription else { return }
guard let buffer = makeSampleBuffer(
from: Data(bytes: bytes, count: Int(numberOfBytes)),
asbd: audioDescription,
presentationTimeStamp: nextBufferTimeOffset,
packetCount: numberOfPackets,
packetDescriptions: packets
) else { return }
let bufferStartTime = CMSampleBufferGetOutputPresentationTimeStamp(buffer)
let bufferDuration = CMSampleBufferGetOutputDuration(buffer)
nextBufferTimeOffset = bufferStartTime + bufferDuration
buffers.withLock { $0.append(buffer) }
}
// MARK: - Data Stream
private func startReceivingData(from stream: AsyncThrowingStream<Data, Error>) {
cancelDataTask()
task = Task { [weak self] in
guard let self else { return }
do {
for try await data in stream {
parseData(data)
}
finishDataParsing()
} catch {
return
}
}
}
private func cancelDataTask() {
dataReceiveComplete = false
task?.cancel()
task = nil
}
private func parseData(_ data: Data) {
guard let audioStreamID else { return }
data.withUnsafeBytes { pointer in
guard let baseAddress = pointer.baseAddress else { return }
AudioFileStreamParseBytes(audioStreamID, UInt32(data.count), baseAddress, [])
}
}
private func finishDataParsing() {
dataReceiveComplete = true
guard let audioStreamID else { return }
AudioFileStreamParseBytes(audioStreamID, 0, nil, [])
}
// MARK: - Sample Buffers
private func nextSampleBuffer() -> CMSampleBuffer? {
buffers.withLock { buffers in
if buffers.isEmpty { return nil }
return buffers.removeFirst()
}
}
private func removeSampleBuffers() {
buffers.withLock { $0.removeAll() }
audioRenderer?.flush()
}
private func makeSampleBuffer(
from data: Data,
asbd: AudioStreamBasicDescription,
presentationTimeStamp: CMTime,
packetCount: UInt32,
packetDescriptions: UnsafePointer<AudioStreamPacketDescription>?
) -> CMSampleBuffer? {
guard let blockBuffer = makeBlockBuffer(from: data) else { return nil }
guard let formatDescription = try? CMFormatDescription(audioStreamBasicDescription: asbd) else { return nil }
var sampleBuffer: CMSampleBuffer? = nil
let createStatus = CMAudioSampleBufferCreateReadyWithPacketDescriptions(
allocator: kCFAllocatorDefault,
dataBuffer: blockBuffer,
formatDescription: formatDescription,
sampleCount: CMItemCount(packetCount),
presentationTimeStamp: presentationTimeStamp,
packetDescriptions: packetDescriptions,
sampleBufferOut: &sampleBuffer
)
return sampleBuffer
}
private func makeBlockBuffer(from data: Data) -> CMBlockBuffer? {
var blockBuffer: CMBlockBuffer?
let createStatus = CMBlockBufferCreateWithMemoryBlock(
allocator: kCFAllocatorDefault,
memoryBlock: nil,
blockLength: data.count,
blockAllocator: kCFAllocatorDefault,
customBlockSource: nil,
offsetToData: 0,
dataLength: data.count,
flags: kCMBlockBufferAssureMemoryNowFlag,
blockBufferOut: &blockBuffer
)
guard createStatus == noErr else { return nil }
guard let blockBuffer else { return nil }
return data.withUnsafeBytes { pointer in
guard let baseAddress = pointer.baseAddress else { return nil }
let replaceStatus = CMBlockBufferReplaceDataBytes(
with: baseAddress,
blockBuffer: blockBuffer,
offsetIntoDestination: 0,
dataLength: data.count
)
return blockBuffer
}
}
// MARK: - Media Data
private func startRequestingMediaData(
audioRenderer: AVSampleBufferAudioRenderer,
audioSynchronizer: AVSampleBufferRenderSynchronizer
) {
audioRenderer.requestMediaDataWhenReady(on: queue) { [weak self] in
self?.provideMediaDataIfNeeded(
audioRenderer: audioRenderer,
audioSynchronizer: audioSynchronizer
)
}
}
private func provideMediaDataIfNeeded(
audioRenderer: AVSampleBufferAudioRenderer,
audioSynchronizer: AVSampleBufferRenderSynchronizer
) {
while audioRenderer.isReadyForMoreMediaData {
if let buffer = nextSampleBuffer() {
audioRenderer.enqueue(buffer)
startPlaybackIfCan(
audioRenderer: audioRenderer,
audioSynchronizer: audioSynchronizer
)
} else if dataReceiveComplete {
audioRenderer.stopRequestingMediaData()
break
}
}
}
private func startPlaybackIfCan(
audioRenderer: AVSampleBufferAudioRenderer,
audioSynchronizer: AVSampleBufferRenderSynchronizer
) {
guard audioRenderer.hasSufficientMediaDataForReliablePlaybackStart, audioSynchronizer.rate == 0 else { return }
audioSynchronizer.setRate(1.0, time: .zero)
}
} |
I have created an open-source SPM package of this implementation. |
OpenAI API has support for streaming realtime audio by chunks (just like chat token-by-token streaming)
Are there any plans to add support for this feature?
The text was updated successfully, but these errors were encountered: