adds queuing for writers

This commit is contained in:
Spencer Russell 2016-03-19 23:59:19 -04:00
parent 9bafaeaa45
commit 9b57992d83

View file

@ -44,18 +44,20 @@ type PortAudioSink{T, U} <: SampleSink
bufsize::Int bufsize::Int
buffer::Array{T, 2} buffer::Array{T, 2}
transbuf::Array{T, 2} transbuf::Array{T, 2}
waiters::Vector{Condition}
busy::Bool
function PortAudioSink(eltype, rate, channels, bufsize) function PortAudioSink(eltype, rate, channels, bufsize)
stream = Pa_OpenDefaultStream(0, channels, type_to_fmt[eltype], float(rate), bufsize) stream = Pa_OpenDefaultStream(0, channels, type_to_fmt[eltype], float(rate), bufsize)
writers = Condition[]
buffer = Array(eltype, bufsize, channels) buffer = Array(eltype, bufsize, channels)
# as of portaudio 19.20140130 (which is the HomeBrew version as of 20160319) # as of portaudio 19.20140130 (which is the HomeBrew version as of 20160319)
# noninterleaved data is not supported for the read/write interface on OSX # noninterleaved data is not supported for the read/write interface on OSX
transbuf = Array(eltype, channels, bufsize) transbuf = Array(eltype, channels, bufsize)
waiters = Condition[]
Pa_StartStream(stream) Pa_StartStream(stream)
this = new(stream, channels, rate, bufsize, buffer, transbuf) this = new(stream, channels, rate, bufsize, buffer, transbuf, waiters, false)
finalizer(this, close) finalizer(this, close)
this this
@ -81,8 +83,17 @@ Base.eltype{T, U}(sink::PortAudioSink{T, U}) = T
# end # end
function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf) function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf)
if sink.busy
c = Condition()
push!(sink.waiters, c)
wait(c)
shift!(sink.waiters)
end
sink.busy = true
total = nframes(buf) total = nframes(buf)
written = 0 written = 0
while written < total while written < total
n = min(size(sink.transbuf, 2), total-written, Pa_GetStreamWriteAvailable(sink.stream)) n = min(size(sink.transbuf, 2), total-written, Pa_GetStreamWriteAvailable(sink.stream))
bufstart = 1+written bufstart = 1+written
@ -93,6 +104,11 @@ function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf)
written += n written += n
sleep(0.005) sleep(0.005)
end end
sink.busy = false
if length(sink.waiters) > 0
# let the next task in line go
notify(sink.waiters[1])
end
written written
end end