diff --git a/src/PortAudio.jl b/src/PortAudio.jl index e1333e9..79e4211 100644 --- a/src/PortAudio.jl +++ b/src/PortAudio.jl @@ -6,7 +6,7 @@ using SampledSignals using Devectorize using RingBuffers using Compat -import Compat: UTF8String +import Compat: UTF8String, view # Get binary dependencies loaded from BinDeps include( "../deps/deps.jl") @@ -14,7 +14,13 @@ include("libportaudio.jl") export PortAudioStream +# Size of the ringbuffer in frames. 85ms latency at 48kHz const DEFAULT_BUFSIZE=4096 +# data is passed to and from the ringbuffer in chunks with this many frames +# it should be at most the ringbuffer size, and must evenly divide into the +# the underlying portaudio buffer size. E.g. if PortAudio is running with a +# 2048-frame buffer period, the chunk size can be 2048, 1024, 512, 256, etc. +const CHUNKSIZE=128 function __init__() # initialize PortAudio on module load @@ -68,17 +74,12 @@ function fieldptr{T}(obj::T, field::Symbol) Ptr{FT}(pointer_from_objref(obj) + offset) end -# Used to synchronize the portaudio callback and Julia task -@enum BufferState JuliaPending PortAudioPending - # we want this to be immutable so we can stack allocate it immutable CallbackInfo{T} inchannels::Int - inbuf::Ptr{T} + inbuf::LockFreeRingBuffer{T} outchannels::Int - outbuf::Ptr{T} - taskhandle::Ptr{Void} - bufstate::Ptr{BufferState} + outbuf::LockFreeRingBuffer{T} end # paramaterized on the sample type and sampling rate type @@ -88,8 +89,6 @@ type PortAudioStream{T, U} stream::PaStream sink # untyped because of circular type definition source # untyped because of circular type definition - taskwork::Base.SingleAsyncWork - bufstate::BufferState # used to synchronize the portaudio and julia sides bufinfo::CallbackInfo{T} # immutable data used in the portaudio callback # this inner constructor is generally called via the top-level outer @@ -104,16 +103,14 @@ type PortAudioStream{T, U} Ref(Pa_StreamParameters(outdev.idx, outchans, type_to_fmt[T], 0.0, C_NULL)) this = new(sr, bufsize, C_NULL) finalizer(this, close) - this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize; - prefill=false, underflow=PAD) - this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize; - prefill=true, overflow=OVERWRITE) - this.taskwork = Base.SingleAsyncWork(_ -> audiotask(this)) - this.bufstate = PortAudioPending - this.bufinfo = CallbackInfo(inchans, pointer(this.source.pabuf), - outchans, pointer(this.sink.pabuf), - this.taskwork.handle, - fieldptr(this, :bufstate)) + this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize) + this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize) + if inchans > 0 && outchans > 0 + # we've got a duplex stream. initialize with the output buffer full + write(this.sink, SampleBuf(zeros(T, bufsize, outchans), sr)) + end + this.bufinfo = CallbackInfo(inchans, this.source.ringbuf, + outchans, this.sink.ringbuf) this.stream = Pa_OpenStream(inparams, outparams, float(sr), bufsize, paNoFlag, pa_callbacks[T], fieldptr(this, :bufinfo)) @@ -121,7 +118,6 @@ type PortAudioStream{T, U} this end - end # this is the top-level outer constructor that all the other outer constructors @@ -169,6 +165,8 @@ function Base.close(stream::PortAudioStream) if stream.stream != C_NULL Pa_StopStream(stream.stream) Pa_CloseStream(stream.stream) + close(stream.source) + close(stream.sink) stream.stream = C_NULL end @@ -203,27 +201,24 @@ for (TypeName, Super) in ((:PortAudioSink, :SampleSink), @eval type $TypeName{T, U} <: $Super name::UTF8String stream::PortAudioStream{T, U} - jlbuf::Array{T, 2} - pabuf::Array{T, 2} - ringbuf::RingBuffer{T} + chunkbuf::Array{T, 2} + ringbuf::LockFreeRingBuffer{T} + nchannels::Int - function $TypeName(name, stream, channels, bufsize; prefill=false, ringbuf_args...) + function $TypeName(name, stream, channels, bufsize) # portaudio data comes in interleaved, so we'll end up transposing # it back and forth to julia column-major - jlbuf = zeros(T, bufsize, channels) - pabuf = zeros(T, channels, bufsize) - ringbuf = RingBuffer(T, bufsize, channels; ringbuf_args...) - if prefill - write(ringbuf, zeros(T, bufsize, channels)) - end - new(name, stream, jlbuf, pabuf, ringbuf) + chunkbuf = zeros(T, channels, CHUNKSIZE) + ringbuf = LockFreeRingBuffer(T, bufsize * channels) + new(name, stream, chunkbuf, ringbuf, channels) end end end -SampledSignals.nchannels(s::Union{PortAudioSink, PortAudioSource}) = size(s.jlbuf, 2) +SampledSignals.nchannels(s::Union{PortAudioSink, PortAudioSource}) = s.nchannels SampledSignals.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s.stream) Base.eltype{T, U}(::Union{PortAudioSink{T, U}, PortAudioSource{T, U}}) = T +Base.close(s::Union{PortAudioSink, PortAudioSource}) = close(s.ringbuf) function Base.show{T <: Union{PortAudioSink, PortAudioSource}}(io::IO, stream::T) println(io, T, "(\"", stream.name, "\")") @@ -232,11 +227,43 @@ end function SampledSignals.unsafe_write(sink::PortAudioSink, buf::SampleBuf) - write(sink.ringbuf, buf) + total = nframes(buf) + nwritten = 0 + while nwritten < total + towrite = min(CHUNKSIZE, total-nwritten) + # make a buffer of interleaved samples + # TODO: don't directly access buf.data + transpose!(view(sink.chunkbuf, :, 1:towrite), + view(buf.data, (1:towrite)+nwritten, :)) + while nwritable(sink.ringbuf) < towrite + wait(sink.ringbuf) + end + write(sink.ringbuf, sink.chunkbuf, towrite*nchannels(sink)) + + nwritten += towrite + end + + nwritten end function SampledSignals.unsafe_read!(source::PortAudioSource, buf::SampleBuf) - read!(source.ringbuf, buf) + total = nframes(buf) + nread = 0 + while nread < total + toread = min(CHUNKSIZE, total-nread) + while nreadable(source.ringbuf) < toread + wait(source.ringbuf) + end + read!(source.ringbuf, source.chunkbuf, toread*nchannels(source)) + # de-interleave the samples + # TODO: don't directly access buf.data + transpose!(view(buf.data, (1:toread)+nread, :), + view(source.chunkbuf, :, 1:toread)) + + nread += toread + end + + nread end # This is the callback function that gets called directly in the PortAudio @@ -244,43 +271,25 @@ end function portaudio_callback{T}(inptr::Ptr{T}, outptr::Ptr{T}, nframes, timeinfo, flags, userdata::Ptr{CallbackInfo{T}}) info = unsafe_load(userdata) + insamples = nframes * info.inchannels + outsamples = nframes * info.outchannels - if(unsafe_load(info.bufstate) != PortAudioPending) + if nreadable(info.outbuf) < outsamples || + nwritable(info.inbuf) < insamples # xrun, copy zeros to outbuffer + # TODO: send a notification to an error msg ringbuf + # TODO (maybe): we could do a partial write if there's anything in the + # ringbuf, and minimize the dropout memset(outptr, 0, sizeof(T)*nframes*info.outchannels) return paContinue end - unsafe_copy!(info.inbuf, inptr, nframes * info.inchannels) - unsafe_copy!(outptr, info.outbuf, nframes * info.outchannels) - - unsafe_store!(info.bufstate, JuliaPending) - - # notify the julia audio task - ccall(:uv_async_send, Void, (Ptr{Void},), info.taskhandle) + read!(info.outbuf, outptr, outsamples) + write(info.inbuf, inptr, insamples) paContinue end -# this gets called from uv_async_send, so it MUST NOT BLOCK -function audiotask{T, U}(stream::PortAudioStream{T, U}) - try - if stream.bufstate != JuliaPending - return - end - - transpose!(stream.source.jlbuf, stream.source.pabuf) - write(stream.source.ringbuf, stream.source.jlbuf) - - read!(stream.sink.ringbuf, stream.sink.jlbuf) - transpose!(stream.sink.pabuf, stream.sink.jlbuf) - - stream.bufstate = PortAudioPending - catch ex - warn("Audio Task died with exception: $ex") - Base.show_backtrace(STDOUT, catch_backtrace()) - end -end memset(buf, val, count) = ccall(:memset, Ptr{Void}, (Ptr{Void}, Cint, Csize_t), diff --git a/test/runtests.jl b/test/runtests.jl index 0fcfcdc..4572de5 100755 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -7,6 +7,7 @@ else end using PortAudio using SampledSignals +using RingBuffers # these test are currently set up to run on OSX @@ -31,17 +32,16 @@ using SampledSignals end @testset "PortAudio Callback works and doesn't allocate" begin - inbuf = rand(Float32, 2, 8) - outbuf = Array(Float32, 2, 8) - sinkbuf = rand(Float32, 2, 8) - sourcebuf = Array(Float32, 2, 8) - state = Ref(PortAudio.PortAudioPending) - work = Base.SingleAsyncWork(data -> nothing) + inbuf = rand(Float32, 16) # simulate microphone input + sourcebuf = LockFreeRingBuffer(Float32, 64) # the microphone input should end up here + testin = zeros(Float32, 16) - info = PortAudio.CallbackInfo(2, pointer(sourcebuf), - 2, pointer(sinkbuf), - work.handle, - Ptr{PortAudio.BufferState}(pointer_from_objref(state))) + outbuf = zeros(Float32, 24) # this is where the output should go + sinkbuf = LockFreeRingBuffer(Float32, 64) # the callback should copy this to outbuf + testout = rand(Float32, 24) + write(sinkbuf, testout) # fill the output ringbuffer + + info = PortAudio.CallbackInfo(2, sourcebuf, 3, sinkbuf) # handle any conversions here so they don't mess with the allocation inptr = pointer(inbuf) @@ -53,18 +53,18 @@ using SampledSignals ret = PortAudio.portaudio_callback(inptr, outptr, nframes, C_NULL, flags, infoptr) @test isa(ret, Cint) @test ret == PortAudio.paContinue - @test outbuf == sinkbuf - @test inbuf == sourcebuf - @test state[] == PortAudio.JuliaPending + @test outbuf == testout + read!(sourcebuf, testin) + @test inbuf == testin # call again (underrun) ret = PortAudio.portaudio_callback(inptr, outptr, nframes, C_NULL, flags, infoptr) @test isa(ret, Cint) @test ret == PortAudio.paContinue - @test outbuf == zeros(Float32, 2, 8) + @test outbuf == zeros(Float32, 24) + write(sinkbuf, testout) # fill the output ringbuffer # test allocation - state[] = PortAudio.PortAudioPending alloc = @allocated PortAudio.portaudio_callback(inptr, outptr, nframes, C_NULL, flags, infoptr) @test alloc == 0 # now test allocation in underrun state @@ -74,15 +74,15 @@ using SampledSignals @testset "Open Default Device" begin stream = PortAudioStream() - buf = read(stream, 0.1s) - @test size(buf) == (round(Int, 0.1s * samplerate(stream)), nchannels(stream.source)) + buf = read(stream, 0.001s) + @test size(buf) == (round(Int, 0.001s * samplerate(stream)), nchannels(stream.source)) write(stream, buf) close(stream) end @testset "Open Device by name" begin stream = PortAudioStream("Built-in Microph", "Built-in Output") - buf = read(stream, 0.1s) - @test size(buf) == (round(Int, 0.1s * samplerate(stream)), nchannels(stream.source)) + buf = read(stream, 0.001s) + @test size(buf) == (round(Int, 0.001s * samplerate(stream)), nchannels(stream.source)) write(stream, buf) io = IOBuffer() show(io, stream) @@ -100,8 +100,8 @@ using SampledSignals # no way to check that the right data is actually getting read or written here, # but at least it's not crashing. @testset "Queued Writing" begin - stream = PortAudioStream() - buf = SampleBuf(rand(eltype(stream), 48000, nchannels(stream.sink))*0.1, samplerate(stream)) + stream = PortAudioStream(0, 2) + buf = SampleBuf(rand(eltype(stream), 48000, nchannels(stream.sink)), samplerate(stream)) t1 = @async write(stream, buf) t2 = @async write(stream, buf) @test wait(t1) == 48000 @@ -109,7 +109,7 @@ using SampledSignals close(stream) end @testset "Queued Reading" begin - stream = PortAudioStream() + stream = PortAudioStream(2, 0) buf = SampleBuf(rand(eltype(stream), 48000, nchannels(stream.source)), samplerate(stream)) t1 = @async read!(stream, buf) t2 = @async read!(stream, buf)