callback-based interface mostly working, but lots of dropouts

This commit is contained in:
Spencer Russell 2016-03-23 13:10:14 -04:00
parent f02e733fe7
commit 5854270183

View file

@ -86,6 +86,7 @@ type PortAudioStream{T, U}
sink # untyped because of circular type definition sink # untyped because of circular type definition
source # untyped because of circular type definition source # untyped because of circular type definition
taskwork::Base.SingleAsyncWork taskwork::Base.SingleAsyncWork
taskcond::Condition # used to wake the audio task when the PA callback happens
bufstate::BufferState # used to synchronize the portaudio and julia sides bufstate::BufferState # used to synchronize the portaudio and julia sides
bufinfo::CallbackInfo{T} # immutable data used in the portaudio callback bufinfo::CallbackInfo{T} # immutable data used in the portaudio callback
@ -103,7 +104,8 @@ type PortAudioStream{T, U}
finalizer(this, close) finalizer(this, close)
this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize) this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize)
this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize) this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize)
this.taskwork = Base.SingleAsyncWork(data -> audiotask(this)) this.taskwork = Base.SingleAsyncWork(_ -> wakeaudiotask(this))
this.taskcond = Condition()
this.bufstate = PortAudioPending this.bufstate = PortAudioPending
this.bufinfo = CallbackInfo(inchans, pointer(this.source.pabuf), this.bufinfo = CallbackInfo(inchans, pointer(this.source.pabuf),
outchans, pointer(this.sink.pabuf), outchans, pointer(this.sink.pabuf),
@ -111,6 +113,7 @@ type PortAudioStream{T, U}
fieldptr(this, :bufstate)) fieldptr(this, :bufstate))
this.stream = Pa_OpenStream(inparams, outparams, float(sr), bufsize, this.stream = Pa_OpenStream(inparams, outparams, float(sr), bufsize,
paNoFlag, pa_callbacks[T], fieldptr(this, :bufinfo)) paNoFlag, pa_callbacks[T], fieldptr(this, :bufinfo))
@schedule audiotask(this)
Pa_StartStream(this.stream) Pa_StartStream(this.stream)
@ -165,9 +168,13 @@ function Base.close(stream::PortAudioStream)
Pa_StopStream(stream.stream) Pa_StopStream(stream.stream)
Pa_CloseStream(stream.stream) Pa_CloseStream(stream.stream)
stream.stream = C_NULL stream.stream = C_NULL
# wake the audio task so it can exit
notify(stream.taskcond)
end end
end end
Base.isopen(stream::PortAudioStream) = stream.stream != C_NULL
SampleTypes.samplerate(stream::PortAudioStream) = stream.samplerate SampleTypes.samplerate(stream::PortAudioStream) = stream.samplerate
# Define our source and sink types # Define our source and sink types
@ -199,76 +206,55 @@ SampleTypes.nchannels(s::Union{PortAudioSink, PortAudioSource}) = size(s.jlbuf,
SampleTypes.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s.stream) SampleTypes.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s.stream)
Base.eltype{T, U}(::Union{PortAudioSink{T, U}, PortAudioSource{T, U}}) = T Base.eltype{T, U}(::Union{PortAudioSink{T, U}, PortAudioSource{T, U}}) = T
# function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf) function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf)
# if sink.busy c = Condition()
# c = Condition() push!(sink.waiters, c)
# push!(sink.waiters, c)
# wait(c) total = nframes(buf)
# shift!(sink.waiters) written = 0
# end try
# while written < total
# total = nframes(buf) donecond = wait(c)
# written = 0 n = min(size(sink.pabuf, 2), total-written)
# try bufstart = 1+written
# sink.busy = true bufend = n+written
# @devec sink.jlbuf[1:n, :] = buf[bufstart:bufend, :]
# while written < total written += n
# n = min(size(sink.pabuf, 2), total-written, Pa_GetStreamWriteAvailable(sink.stream)) notify(donecond)
# bufstart = 1+written end
# bufend = n+written finally
# @devec sink.jlbuf[1:n, :] = buf[bufstart:bufend, :] # make sure we remove our condition even if the user ctrl-C'ed out
# transpose!(sink.pabuf, sink.jlbuf) shift!(sink.waiters)
# Pa_WriteStream(sink.stream, sink.pabuf, n, false) end
# written += n
# sleep(POLL_SECONDS) written
# end end
# finally
# # make sure we release the busy flag even if the user ctrl-C'ed out function SampleTypes.unsafe_read!(source::PortAudioSource, buf::SampleBuf)
# sink.busy = false c = Condition()
# if length(sink.waiters) > 0 push!(source.waiters, c)
# # let the next task in line go
# notify(sink.waiters[1]) total = nframes(buf)
# end read = 0
# end
# try
# written while read < total
# end # we'll get woken up when the next data comes in
# donecond = wait(c)
# function SampleTypes.unsafe_read!(source::PortAudioSource, buf::SampleBuf) n = min(size(source.jlbuf, 1), total-read)
# if source.busy bufstart = 1+read
# c = Condition() bufend = n+read
# push!(source.waiters, c) @devec buf[bufstart:bufend, :] = source.jlbuf[1:n, :]
# wait(c) read += n
# shift!(source.waiters) notify(donecond)
# end end
# finally
# total = nframes(buf) # make sure we remove our condition even if the user ctrl-C'ed out
# read = 0 shift!(source.waiters)
# end
# try
# source.busy = true read
# end
# while read < total
# n = min(size(source.pabuf, 2), total-read, Pa_GetStreamReadAvailable(source.stream))
# Pa_ReadStream(source.stream, source.pabuf, n, false)
# transpose!(source.jlbuf, source.pabuf)
# bufstart = 1+read
# bufend = n+read
# @devec buf[bufstart:bufend, :] = source.jlbuf[1:n, :]
# read += n
# sleep(POLL_SECONDS)
# end
#
# finally
# source.busy = false
# if length(source.waiters) > 0
# # let the next task in line go
# notify(source.waiters[1])
# end
# end
#
# read
# end
# This is the callback function that gets called directly in the PortAudio # This is the callback function that gets called directly in the PortAudio
# audio thread, so it's critical that it not interact with the Julia GC # audio thread, so it's critical that it not interact with the Julia GC
@ -293,17 +279,39 @@ function portaudio_callback{T}(inptr::Ptr{T}, outptr::Ptr{T},
paContinue paContinue
end end
# as of portaudio 19.20140130 (which is the HomeBrew version as of 20160319) wakeaudiotask(stream::PortAudioStream) = notify(stream.taskcond)
# noninterleaved data is not supported for the read/write interface on OSX
# so we need to use another buffer to interleave (transpose)
function audiotask{T, U}(stream::PortAudioStream{T, U}) function audiotask{T, U}(stream::PortAudioStream{T, U})
if stream.bufstate != JuliaPending donecond = Condition()
return while true
try
wait(stream.taskcond)
isopen(stream) || break
if stream.bufstate != JuliaPending
continue
end
# we notify the source waiters (readers) first so that if there's a loop
# it will be able to feed back to the writers
if length(stream.source.waiters) > 0
transpose!(stream.source.jlbuf, stream.source.pabuf)
notify(stream.source.waiters[1], donecond)
wait(donecond)
end
if length(stream.sink.waiters) > 0
notify(stream.sink.waiters[1], donecond)
wait(donecond)
transpose!(stream.sink.pabuf, stream.sink.jlbuf)
else
fill!(stream.sink.pabuf, zero(T))
end
stream.bufstate = PortAudioPending
catch ex
warn("Audio Task died with exception: $ex")
Base.show_backtrace(STDOUT, catch_backtrace())
end
end end
# do stuff
stream.bufstate = PortAudioPending
end end
memset(buf, val, count) = ccall(:memset, Ptr{Void}, memset(buf, val, count) = ccall(:memset, Ptr{Void},