diff --git a/src/PortAudio.jl b/src/PortAudio.jl index 239a83b..69d6ffb 100644 --- a/src/PortAudio.jl +++ b/src/PortAudio.jl @@ -86,6 +86,7 @@ type PortAudioStream{T, U} sink # untyped because of circular type definition source # untyped because of circular type definition taskwork::Base.SingleAsyncWork + taskcond::Condition # used to wake the audio task when the PA callback happens bufstate::BufferState # used to synchronize the portaudio and julia sides bufinfo::CallbackInfo{T} # immutable data used in the portaudio callback @@ -103,7 +104,8 @@ type PortAudioStream{T, U} finalizer(this, close) this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize) this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize) - this.taskwork = Base.SingleAsyncWork(data -> audiotask(this)) + this.taskwork = Base.SingleAsyncWork(_ -> wakeaudiotask(this)) + this.taskcond = Condition() this.bufstate = PortAudioPending this.bufinfo = CallbackInfo(inchans, pointer(this.source.pabuf), outchans, pointer(this.sink.pabuf), @@ -111,6 +113,7 @@ type PortAudioStream{T, U} fieldptr(this, :bufstate)) this.stream = Pa_OpenStream(inparams, outparams, float(sr), bufsize, paNoFlag, pa_callbacks[T], fieldptr(this, :bufinfo)) + @schedule audiotask(this) Pa_StartStream(this.stream) @@ -165,9 +168,13 @@ function Base.close(stream::PortAudioStream) Pa_StopStream(stream.stream) Pa_CloseStream(stream.stream) stream.stream = C_NULL + # wake the audio task so it can exit + notify(stream.taskcond) end end +Base.isopen(stream::PortAudioStream) = stream.stream != C_NULL + SampleTypes.samplerate(stream::PortAudioStream) = stream.samplerate # Define our source and sink types @@ -199,76 +206,55 @@ SampleTypes.nchannels(s::Union{PortAudioSink, PortAudioSource}) = size(s.jlbuf, SampleTypes.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s.stream) Base.eltype{T, U}(::Union{PortAudioSink{T, U}, PortAudioSource{T, U}}) = T -# function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf) -# if sink.busy -# c = Condition() -# push!(sink.waiters, c) -# wait(c) -# shift!(sink.waiters) -# end -# -# total = nframes(buf) -# written = 0 -# try -# sink.busy = true -# -# while written < total -# n = min(size(sink.pabuf, 2), total-written, Pa_GetStreamWriteAvailable(sink.stream)) -# bufstart = 1+written -# bufend = n+written -# @devec sink.jlbuf[1:n, :] = buf[bufstart:bufend, :] -# transpose!(sink.pabuf, sink.jlbuf) -# Pa_WriteStream(sink.stream, sink.pabuf, n, false) -# written += n -# sleep(POLL_SECONDS) -# end -# finally -# # make sure we release the busy flag even if the user ctrl-C'ed out -# sink.busy = false -# if length(sink.waiters) > 0 -# # let the next task in line go -# notify(sink.waiters[1]) -# end -# end -# -# written -# end -# -# function SampleTypes.unsafe_read!(source::PortAudioSource, buf::SampleBuf) -# if source.busy -# c = Condition() -# push!(source.waiters, c) -# wait(c) -# shift!(source.waiters) -# end -# -# total = nframes(buf) -# read = 0 -# -# try -# source.busy = true -# -# while read < total -# n = min(size(source.pabuf, 2), total-read, Pa_GetStreamReadAvailable(source.stream)) -# Pa_ReadStream(source.stream, source.pabuf, n, false) -# transpose!(source.jlbuf, source.pabuf) -# bufstart = 1+read -# bufend = n+read -# @devec buf[bufstart:bufend, :] = source.jlbuf[1:n, :] -# read += n -# sleep(POLL_SECONDS) -# end -# -# finally -# source.busy = false -# if length(source.waiters) > 0 -# # let the next task in line go -# notify(source.waiters[1]) -# end -# end -# -# read -# end +function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf) + c = Condition() + push!(sink.waiters, c) + + total = nframes(buf) + written = 0 + try + while written < total + donecond = wait(c) + n = min(size(sink.pabuf, 2), total-written) + bufstart = 1+written + bufend = n+written + @devec sink.jlbuf[1:n, :] = buf[bufstart:bufend, :] + written += n + notify(donecond) + end + finally + # make sure we remove our condition even if the user ctrl-C'ed out + shift!(sink.waiters) + end + + written +end + +function SampleTypes.unsafe_read!(source::PortAudioSource, buf::SampleBuf) + c = Condition() + push!(source.waiters, c) + + total = nframes(buf) + read = 0 + + try + while read < total + # we'll get woken up when the next data comes in + donecond = wait(c) + n = min(size(source.jlbuf, 1), total-read) + bufstart = 1+read + bufend = n+read + @devec buf[bufstart:bufend, :] = source.jlbuf[1:n, :] + read += n + notify(donecond) + end + finally + # make sure we remove our condition even if the user ctrl-C'ed out + shift!(source.waiters) + end + + read +end # This is the callback function that gets called directly in the PortAudio # audio thread, so it's critical that it not interact with the Julia GC @@ -293,17 +279,39 @@ function portaudio_callback{T}(inptr::Ptr{T}, outptr::Ptr{T}, paContinue end -# as of portaudio 19.20140130 (which is the HomeBrew version as of 20160319) -# noninterleaved data is not supported for the read/write interface on OSX -# so we need to use another buffer to interleave (transpose) +wakeaudiotask(stream::PortAudioStream) = notify(stream.taskcond) + function audiotask{T, U}(stream::PortAudioStream{T, U}) - if stream.bufstate != JuliaPending - return + donecond = Condition() + while true + try + wait(stream.taskcond) + isopen(stream) || break + if stream.bufstate != JuliaPending + continue + end + + # we notify the source waiters (readers) first so that if there's a loop + # it will be able to feed back to the writers + if length(stream.source.waiters) > 0 + transpose!(stream.source.jlbuf, stream.source.pabuf) + notify(stream.source.waiters[1], donecond) + wait(donecond) + end + if length(stream.sink.waiters) > 0 + notify(stream.sink.waiters[1], donecond) + wait(donecond) + transpose!(stream.sink.pabuf, stream.sink.jlbuf) + else + fill!(stream.sink.pabuf, zero(T)) + end + + stream.bufstate = PortAudioPending + catch ex + warn("Audio Task died with exception: $ex") + Base.show_backtrace(STDOUT, catch_backtrace()) + end end - - # do stuff - - stream.bufstate = PortAudioPending end memset(buf, val, count) = ccall(:memset, Ptr{Void},