switches to using RingBuffer to better handle sub-buffer writes
This commit is contained in:
parent
333bbbf8d8
commit
f868087e99
1 changed files with 31 additions and 76 deletions
107
src/PortAudio.jl
107
src/PortAudio.jl
|
@ -4,6 +4,7 @@ module PortAudio
|
||||||
|
|
||||||
using SampleTypes
|
using SampleTypes
|
||||||
using Devectorize
|
using Devectorize
|
||||||
|
using RingBuffers
|
||||||
|
|
||||||
# Get binary dependencies loaded from BinDeps
|
# Get binary dependencies loaded from BinDeps
|
||||||
include( "../deps/deps.jl")
|
include( "../deps/deps.jl")
|
||||||
|
@ -102,8 +103,10 @@ type PortAudioStream{T, U}
|
||||||
Ref(Pa_StreamParameters(outdev.idx, outchans, type_to_fmt[T], 0.0, C_NULL))
|
Ref(Pa_StreamParameters(outdev.idx, outchans, type_to_fmt[T], 0.0, C_NULL))
|
||||||
this = new(sr, bufsize, C_NULL)
|
this = new(sr, bufsize, C_NULL)
|
||||||
finalizer(this, close)
|
finalizer(this, close)
|
||||||
this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize)
|
this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize;
|
||||||
this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize)
|
prefill=false, underflow=PAD)
|
||||||
|
this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize;
|
||||||
|
prefill=true, overflow=OVERWRITE)
|
||||||
this.taskwork = Base.SingleAsyncWork(_ -> wakeaudiotask(this))
|
this.taskwork = Base.SingleAsyncWork(_ -> wakeaudiotask(this))
|
||||||
this.taskcond = Condition()
|
this.taskcond = Condition()
|
||||||
this.bufstate = PortAudioPending
|
this.bufstate = PortAudioPending
|
||||||
|
@ -183,16 +186,20 @@ for (TypeName, Super) in ((:PortAudioSink, :SampleSink),
|
||||||
@eval type $TypeName{T, U} <: $Super
|
@eval type $TypeName{T, U} <: $Super
|
||||||
name::UTF8String
|
name::UTF8String
|
||||||
stream::PortAudioStream{T, U}
|
stream::PortAudioStream{T, U}
|
||||||
waiters::Vector{Condition}
|
|
||||||
jlbuf::Array{T, 2}
|
jlbuf::Array{T, 2}
|
||||||
pabuf::Array{T, 2}
|
pabuf::Array{T, 2}
|
||||||
|
ringbuf::RingBuffer{T}
|
||||||
|
|
||||||
function $TypeName(name, stream, channels, bufsize)
|
function $TypeName(name, stream, channels, bufsize; prefill=false, ringbuf_args...)
|
||||||
# portaudio data comes in interleaved, so we'll end up transposing
|
# portaudio data comes in interleaved, so we'll end up transposing
|
||||||
# it back and forth to julia column-major
|
# it back and forth to julia column-major
|
||||||
jlbuf = zeros(T, bufsize, channels)
|
jlbuf = zeros(T, bufsize, channels)
|
||||||
pabuf = zeros(T, channels, bufsize)
|
pabuf = zeros(T, channels, bufsize)
|
||||||
new(name, stream, Condition[], jlbuf, pabuf)
|
ringbuf = RingBuffer(T, bufsize, channels; ringbuf_args...)
|
||||||
|
if prefill
|
||||||
|
write(ringbuf, zeros(T, bufsize, channels))
|
||||||
|
end
|
||||||
|
new(name, stream, jlbuf, pabuf, ringbuf)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -207,53 +214,11 @@ SampleTypes.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s.
|
||||||
Base.eltype{T, U}(::Union{PortAudioSink{T, U}, PortAudioSource{T, U}}) = T
|
Base.eltype{T, U}(::Union{PortAudioSink{T, U}, PortAudioSource{T, U}}) = T
|
||||||
|
|
||||||
function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf)
|
function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf)
|
||||||
c = Condition()
|
write(sink.ringbuf, buf)
|
||||||
push!(sink.waiters, c)
|
|
||||||
|
|
||||||
total = nframes(buf)
|
|
||||||
written = 0
|
|
||||||
try
|
|
||||||
while written < total
|
|
||||||
donecond = wait(c)
|
|
||||||
n = min(size(sink.jlbuf, 1), total-written)
|
|
||||||
bufstart = 1+written
|
|
||||||
bufend = n+written
|
|
||||||
@devec sink.jlbuf[1:n, :] = buf[bufstart:bufend, :]
|
|
||||||
written += n
|
|
||||||
notify(donecond)
|
|
||||||
end
|
|
||||||
finally
|
|
||||||
# make sure we remove our condition even if the user ctrl-C'ed out
|
|
||||||
shift!(sink.waiters)
|
|
||||||
end
|
|
||||||
|
|
||||||
written
|
|
||||||
end
|
end
|
||||||
|
|
||||||
function SampleTypes.unsafe_read!(source::PortAudioSource, buf::SampleBuf)
|
function SampleTypes.unsafe_read!(source::PortAudioSource, buf::SampleBuf)
|
||||||
c = Condition()
|
read!(source.ringbuf, buf)
|
||||||
push!(source.waiters, c)
|
|
||||||
|
|
||||||
total = nframes(buf)
|
|
||||||
read = 0
|
|
||||||
|
|
||||||
try
|
|
||||||
while read < total
|
|
||||||
# we'll get woken up when the next data comes in
|
|
||||||
donecond = wait(c)
|
|
||||||
n = min(size(source.jlbuf, 1), total-read)
|
|
||||||
bufstart = 1+read
|
|
||||||
bufend = n+read
|
|
||||||
@devec buf[bufstart:bufend, :] = source.jlbuf[1:n, :]
|
|
||||||
read += n
|
|
||||||
notify(donecond)
|
|
||||||
end
|
|
||||||
finally
|
|
||||||
# make sure we remove our condition even if the user ctrl-C'ed out
|
|
||||||
shift!(source.waiters)
|
|
||||||
end
|
|
||||||
|
|
||||||
read
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# This is the callback function that gets called directly in the PortAudio
|
# This is the callback function that gets called directly in the PortAudio
|
||||||
|
@ -282,7 +247,6 @@ end
|
||||||
wakeaudiotask(stream::PortAudioStream) = notify(stream.taskcond)
|
wakeaudiotask(stream::PortAudioStream) = notify(stream.taskcond)
|
||||||
|
|
||||||
function audiotask{T, U}(stream::PortAudioStream{T, U})
|
function audiotask{T, U}(stream::PortAudioStream{T, U})
|
||||||
donecond = Condition()
|
|
||||||
while true
|
while true
|
||||||
try
|
try
|
||||||
wait(stream.taskcond)
|
wait(stream.taskcond)
|
||||||
|
@ -291,35 +255,26 @@ function audiotask{T, U}(stream::PortAudioStream{T, U})
|
||||||
continue
|
continue
|
||||||
end
|
end
|
||||||
|
|
||||||
# we notify the source waiters (readers) first so that if there's a loop
|
transpose!(stream.source.jlbuf, stream.source.pabuf)
|
||||||
# it will be able to feed back to the writers
|
write(stream.source.ringbuf, stream.source.jlbuf)
|
||||||
if length(stream.source.waiters) > 0
|
|
||||||
transpose!(stream.source.jlbuf, stream.source.pabuf)
|
read!(stream.sink.ringbuf, stream.sink.jlbuf)
|
||||||
notify(stream.source.waiters[1], donecond)
|
transpose!(stream.sink.pabuf, stream.sink.jlbuf)
|
||||||
wait(donecond)
|
|
||||||
end
|
|
||||||
if length(stream.sink.waiters) > 0
|
|
||||||
notify(stream.sink.waiters[1], donecond)
|
|
||||||
wait(donecond)
|
|
||||||
transpose!(stream.sink.pabuf, stream.sink.jlbuf)
|
|
||||||
else
|
|
||||||
fill!(stream.sink.pabuf, zero(T))
|
|
||||||
end
|
|
||||||
|
|
||||||
stream.bufstate = PortAudioPending
|
stream.bufstate = PortAudioPending
|
||||||
catch ex
|
catch ex
|
||||||
if isa(ex, InterruptException)
|
# if isa(ex, InterruptException)
|
||||||
for w in stream.source.waiters
|
# for w in stream.source.waiters
|
||||||
notify(w, ex; error=true)
|
# notify(w, ex; error=true)
|
||||||
end
|
# end
|
||||||
for w in stream.sink.waiters
|
# for w in stream.sink.waiters
|
||||||
notify(w, ex; error=true)
|
# notify(w, ex; error=true)
|
||||||
end
|
# end
|
||||||
else
|
# else
|
||||||
warn("Audio Task died with exception: $ex")
|
warn("Audio Task died with exception: $ex")
|
||||||
Base.show_backtrace(STDOUT, catch_backtrace())
|
Base.show_backtrace(STDOUT, catch_backtrace())
|
||||||
break
|
break
|
||||||
end
|
# end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue