diff --git a/src/PortAudio.jl b/src/PortAudio.jl index 1326b26..2bba706 100644 --- a/src/PortAudio.jl +++ b/src/PortAudio.jl @@ -4,6 +4,7 @@ module PortAudio using SampleTypes using Devectorize +using RingBuffers # Get binary dependencies loaded from BinDeps include( "../deps/deps.jl") @@ -102,8 +103,10 @@ type PortAudioStream{T, U} Ref(Pa_StreamParameters(outdev.idx, outchans, type_to_fmt[T], 0.0, C_NULL)) this = new(sr, bufsize, C_NULL) finalizer(this, close) - this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize) - this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize) + this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize; + prefill=false, underflow=PAD) + this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize; + prefill=true, overflow=OVERWRITE) this.taskwork = Base.SingleAsyncWork(_ -> wakeaudiotask(this)) this.taskcond = Condition() this.bufstate = PortAudioPending @@ -183,16 +186,20 @@ for (TypeName, Super) in ((:PortAudioSink, :SampleSink), @eval type $TypeName{T, U} <: $Super name::UTF8String stream::PortAudioStream{T, U} - waiters::Vector{Condition} jlbuf::Array{T, 2} pabuf::Array{T, 2} + ringbuf::RingBuffer{T} - function $TypeName(name, stream, channels, bufsize) + function $TypeName(name, stream, channels, bufsize; prefill=false, ringbuf_args...) # portaudio data comes in interleaved, so we'll end up transposing # it back and forth to julia column-major jlbuf = zeros(T, bufsize, channels) pabuf = zeros(T, channels, bufsize) - new(name, stream, Condition[], jlbuf, pabuf) + ringbuf = RingBuffer(T, bufsize, channels; ringbuf_args...) + if prefill + write(ringbuf, zeros(T, bufsize, channels)) + end + new(name, stream, jlbuf, pabuf, ringbuf) end end end @@ -207,53 +214,11 @@ SampleTypes.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s. Base.eltype{T, U}(::Union{PortAudioSink{T, U}, PortAudioSource{T, U}}) = T function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf) - c = Condition() - push!(sink.waiters, c) - - total = nframes(buf) - written = 0 - try - while written < total - donecond = wait(c) - n = min(size(sink.jlbuf, 1), total-written) - bufstart = 1+written - bufend = n+written - @devec sink.jlbuf[1:n, :] = buf[bufstart:bufend, :] - written += n - notify(donecond) - end - finally - # make sure we remove our condition even if the user ctrl-C'ed out - shift!(sink.waiters) - end - - written + write(sink.ringbuf, buf) end function SampleTypes.unsafe_read!(source::PortAudioSource, buf::SampleBuf) - c = Condition() - push!(source.waiters, c) - - total = nframes(buf) - read = 0 - - try - while read < total - # we'll get woken up when the next data comes in - donecond = wait(c) - n = min(size(source.jlbuf, 1), total-read) - bufstart = 1+read - bufend = n+read - @devec buf[bufstart:bufend, :] = source.jlbuf[1:n, :] - read += n - notify(donecond) - end - finally - # make sure we remove our condition even if the user ctrl-C'ed out - shift!(source.waiters) - end - - read + read!(source.ringbuf, buf) end # This is the callback function that gets called directly in the PortAudio @@ -282,7 +247,6 @@ end wakeaudiotask(stream::PortAudioStream) = notify(stream.taskcond) function audiotask{T, U}(stream::PortAudioStream{T, U}) - donecond = Condition() while true try wait(stream.taskcond) @@ -291,35 +255,26 @@ function audiotask{T, U}(stream::PortAudioStream{T, U}) continue end - # we notify the source waiters (readers) first so that if there's a loop - # it will be able to feed back to the writers - if length(stream.source.waiters) > 0 - transpose!(stream.source.jlbuf, stream.source.pabuf) - notify(stream.source.waiters[1], donecond) - wait(donecond) - end - if length(stream.sink.waiters) > 0 - notify(stream.sink.waiters[1], donecond) - wait(donecond) - transpose!(stream.sink.pabuf, stream.sink.jlbuf) - else - fill!(stream.sink.pabuf, zero(T)) - end + transpose!(stream.source.jlbuf, stream.source.pabuf) + write(stream.source.ringbuf, stream.source.jlbuf) + + read!(stream.sink.ringbuf, stream.sink.jlbuf) + transpose!(stream.sink.pabuf, stream.sink.jlbuf) stream.bufstate = PortAudioPending catch ex - if isa(ex, InterruptException) - for w in stream.source.waiters - notify(w, ex; error=true) - end - for w in stream.sink.waiters - notify(w, ex; error=true) - end - else - warn("Audio Task died with exception: $ex") - Base.show_backtrace(STDOUT, catch_backtrace()) - break - end + # if isa(ex, InterruptException) + # for w in stream.source.waiters + # notify(w, ex; error=true) + # end + # for w in stream.sink.waiters + # notify(w, ex; error=true) + # end + # else + warn("Audio Task died with exception: $ex") + Base.show_backtrace(STDOUT, catch_backtrace()) + break + # end end end end