From 9eb565e487f2fe78ccd00ef41406e9cb1611ce31 Mon Sep 17 00:00:00 2001 From: Spencer Russell Date: Thu, 2 Jan 2020 00:02:00 -0500 Subject: [PATCH] mostly working, but crashes sometimes --- Project.toml | 2 - src/PortAudio.jl | 222 +++++++++----------------------------------- src/libportaudio.jl | 43 +++++++-- src/pa_shim.jl | 18 ---- 4 files changed, 77 insertions(+), 208 deletions(-) delete mode 100644 src/pa_shim.jl diff --git a/Project.toml b/Project.toml index 34dab79..c70700c 100644 --- a/Project.toml +++ b/Project.toml @@ -6,12 +6,10 @@ version = "1.1.0" [deps] Libdl = "8f399da3-3557-5675-b5ff-fb832c97cbdb" LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" -RingBuffers = "f6d8bcc6-4e01-5431-93c4-9d6004abab90" SampledSignals = "bd7594eb-a658-542f-9e75-4c4d8908c167" libportaudio_jll = "2d7b7beb-0762-5160-978e-1ab83a1e8a31" [compat] -RingBuffers = "1.2" julia = "1.3" [extras] diff --git a/src/PortAudio.jl b/src/PortAudio.jl index 17abe05..bb280d8 100644 --- a/src/PortAudio.jl +++ b/src/PortAudio.jl @@ -1,6 +1,6 @@ module PortAudio -using libportaudio_jll, Libdl, SampledSignals, RingBuffers +using libportaudio_jll, SampledSignals import Base: eltype, show import Base: close, isopen @@ -11,30 +11,20 @@ import LinearAlgebra: transpose! export PortAudioStream - -# Get binary dependencies loaded from BinDeps -include("pa_shim.jl") include("libportaudio.jl") # These sizes are all in frames # the block size is what we request from portaudio if no blocksize is given. -# The ringbuffer and pre-fill will be twice the blocksize const DEFAULT_BLOCKSIZE=4096 -# data is passed to and from the ringbuffer in chunks with this many frames -# it should be at most the ringbuffer size, and must evenly divide into the -# the underlying portaudio buffer size. E.g. if PortAudio is running with a -# 2048-frame buffer period, the chunk size can be 2048, 1024, 512, 256, etc. +# data is passed to and from portaudio in chunks with this many frames, because +# we need to interleave the samples const CHUNKSIZE=128 -# ringbuffer to receive errors from the audio processing thread -const ERR_BUFSIZE=512 - function versioninfo(io::IO=stdout) println(io, Pa_GetVersionText()) println(io, "Version: ", Pa_GetVersion()) - println(io, "Shim Source Hash: ", shimhash()[1:10]) end mutable struct PortAudioDevice @@ -74,14 +64,11 @@ mutable struct PortAudioStream{T} warn_xruns::Bool sink # untyped because of circular type definition source # untyped because of circular type definition - errbuf::RingBuffer{pa_shim_errmsg_t} # used to send errors from the portaudio callback - errtask::Task - bufinfo::pa_shim_info_t # data used in the portaudio callback # this inner constructor is generally called via the top-level outer # constructor below function PortAudioStream{T}(indev::PortAudioDevice, outdev::PortAudioDevice, - inchans, outchans, sr, blocksize, synced, + inchans, outchans, sr, blocksize, warn_xruns) where {T} inchans = inchans == -1 ? indev.maxinchans : inchans outchans = outchans == -1 ? outdev.maxoutchans : outchans @@ -92,32 +79,15 @@ mutable struct PortAudioStream{T} Ptr{Pa_StreamParameters}(0) : Ref(Pa_StreamParameters(outdev.idx, outchans, type_to_fmt[T], 0.0, C_NULL)) this = new(sr, blocksize, C_NULL, warn_xruns) - finalizer(close, this) - this.sink = PortAudioSink{T}(outdev.name, this, outchans, blocksize*2) - this.source = PortAudioSource{T}(indev.name, this, inchans, blocksize*2) - this.errbuf = RingBuffer{pa_shim_errmsg_t}(1, ERR_BUFSIZE) - if synced && inchans > 0 && outchans > 0 - # we've got a synchronized duplex stream. initialize with the output buffer full - write(this.sink, SampleBuf(zeros(T, blocksize*2, outchans), sr)) - end - # pass NULL for input/output we're not using - this.bufinfo = pa_shim_info_t( - inchans > 0 ? bufpointer(this.source) : C_NULL, - outchans > 0 ? bufpointer(this.sink) : C_NULL, - pointer(this.errbuf), - synced, notifycb_c, - inchans > 0 ? notifyhandle(this.source) : C_NULL, - outchans > 0 ? notifyhandle(this.sink) : C_NULL, - notifyhandle(this.errbuf), - global_cond[].handle) # this is only needed for the libuv workaround + # finalizer(close, this) + this.sink = PortAudioSink{T}(outdev.name, this, outchans) + this.source = PortAudioSource{T}(indev.name, this, inchans) this.stream = suppress_err() do - Pa_OpenStream(inparams, outparams, float(sr), blocksize, paNoFlag, - shim_processcb_c, this.bufinfo) + Pa_OpenStream(inparams, outparams, sr, blocksize, paNoFlag, + nothing, nothing) end Pa_StartStream(this.stream) - this.errtask = @async handle_errors(this) - push!(active_streams, this) this end @@ -141,11 +111,6 @@ Options: * `samplerate`: Sample rate (defaults to device sample rate) * `blocksize`: Size of the blocks that are written to and read from the audio device. (Defaults to $DEFAULT_BLOCKSIZE) -* `synced`: Determines whether the input and output streams are kept in - sync. If `true`, you must read and write an equal number of - frames, and the round-trip latency is guaranteed constant. If - `false`, you are free to read and write separately, but - overflow or underflow can affect the round-trip latency. * `warn_xruns`: Display a warning if there is a stream overrun or underrun, which often happens when Julia is compiling, or with a particularly large GC run. This can be quite verbose so is @@ -153,7 +118,7 @@ Options: """ function PortAudioStream(indev::PortAudioDevice, outdev::PortAudioDevice, inchans=2, outchans=2; eltype=Float32, samplerate=-1, blocksize=DEFAULT_BLOCKSIZE, - synced=false, warn_xruns=false) + warn_xruns=false) if samplerate == -1 sampleratein = indev.defaultsamplerate samplerateout = outdev.defaultsamplerate @@ -168,7 +133,7 @@ function PortAudioStream(indev::PortAudioDevice, outdev::PortAudioDevice, samplerate = samplerateout end end - PortAudioStream{eltype}(indev, outdev, inchans, outchans, samplerate, blocksize, synced, warn_xruns) + PortAudioStream{eltype}(indev, outdev, inchans, outchans, samplerate, blocksize, warn_xruns) end # handle device names given as streams @@ -212,9 +177,6 @@ function PortAudioStream(inchans=2, outchans=2; kwargs...) end # handle do-syntax -# TODO: there seems to be a buffering issue here. Running this multiple -# times creates weird echos, and even the first time there might be something -# fishy going on. Needs investigation function PortAudioStream(fn::Function, args...; kwargs...) str = PortAudioStream(args...; kwargs...) try @@ -224,39 +186,11 @@ function PortAudioStream(fn::Function, args...; kwargs...) end end -const pa_inited = Ref(false) -const active_streams = Set{PortAudioStream}() - -function notify_active_streams() - # errors here can cause the system to hang if they're waiting on these - # conditions, so we do our own exception display for easier debugging - try - while true - wait(global_cond[]) - pa_inited[] || break - - for stream in active_streams - notify(stream.source.ringbuf.datanotify.cond) - notify(stream.sink.ringbuf.datanotify.cond) - notify(stream.errbuf.datanotify.cond) - end - end - catch ex - showerror(stderr, ex, backtrace()) - end -end - function close(stream::PortAudioStream) if stream.stream != C_NULL Pa_StopStream(stream.stream) Pa_CloseStream(stream.stream) - close(stream.source) - close(stream.sink) - close(stream.errbuf) stream.stream = C_NULL - # wait for the error task to clean up - fetch(stream.errtask) - delete!(active_streams, stream) end nothing @@ -285,32 +219,6 @@ function show(io::IO, stream::PortAudioStream) end end -""" - handle_errors(stream::PortAudioStream) - -Handle errors coming over the error stream from PortAudio. This is run as an -independent task while the stream is active. -""" -function handle_errors(stream::PortAudioStream) - err = Vector{pa_shim_errmsg_t}(undef, 1) - while true - nread = read!(stream.errbuf, err) - nread == 1 || break - if err[1] == PA_SHIM_ERRMSG_ERR_OVERFLOW - @warn "Error buffer overflowed on portaudio stream" - elseif err[1] == PA_SHIM_ERRMSG_OVERFLOW - stream.warn_xruns && @warn "Input overflowed from $(name(stream.source))" - elseif err[1] == PA_SHIM_ERRMSG_UNDERFLOW - stream.warn_xruns && @warn "Output underflowed to $(name(stream.sink))" - else - error(""" - Got unrecognized error code $(err[1]) from audio thread for - portaudio stream. Please file an issue at - https://github.com/juliaaudio/portaudio.jl/issues""") - end - end -end - ################################## # PortAudioSink & PortAudioSource ################################## @@ -322,15 +230,13 @@ for (TypeName, Super) in ((:PortAudioSink, :SampleSink), name::String stream::PortAudioStream{T} chunkbuf::Array{T, 2} - ringbuf::RingBuffer{T} nchannels::Int - function $TypeName{T}(name, stream, channels, ringbufsize) where {T} + function $TypeName{T}(name, stream, channels) where {T} # portaudio data comes in interleaved, so we'll end up transposing # it back and forth to julia column-major chunkbuf = zeros(T, channels, CHUNKSIZE) - ringbuf = RingBuffer{T}(channels, ringbufsize) - new(name, stream, chunkbuf, ringbuf, channels) + new(name, stream, chunkbuf, channels) end end end @@ -339,30 +245,36 @@ SampledSignals.nchannels(s::Union{PortAudioSink, PortAudioSource}) = s.nchannels SampledSignals.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s.stream) SampledSignals.blocksize(s::Union{PortAudioSink, PortAudioSource}) = s.stream.blocksize eltype(::Union{PortAudioSink{T}, PortAudioSource{T}}) where {T} = T -close(s::Union{PortAudioSink, PortAudioSource}) = close(s.ringbuf) -isopen(s::Union{PortAudioSink, PortAudioSource}) = isopen(s.ringbuf) -RingBuffers.notifyhandle(s::Union{PortAudioSink, PortAudioSource}) = notifyhandle(s.ringbuf) -bufpointer(s::Union{PortAudioSink, PortAudioSource}) = pointer(s.ringbuf) +function close(s::Union{PortAudioSink, PortAudioSource}) + throw(ErrorException("Attempted to close PortAudioSink or PortAudioSource. + Close the containing PortAudioStream instead")) +end +isopen(s::Union{PortAudioSink, PortAudioSource}) = isopen(s.stream) name(s::Union{PortAudioSink, PortAudioSource}) = s.name -function show(io::IO, stream::T) where {T <: Union{PortAudioSink, PortAudioSource}} - println(io, T, "(\"", stream.name, "\")") - print(io, nchannels(stream), " channels") +function show(io::IO, ::Type{PortAudioSink{T}}) where T + print(io, "PortAudioSink{$T}") end -flush(sink::PortAudioSink) = flush(sink.ringbuf) +function show(io::IO, ::Type{PortAudioSource{T}}) where T + print(io, "PortAudioSource{$T}") +end + +function show(io::IO, stream::T) where {T <: Union{PortAudioSink, PortAudioSource}} + print(io, nchannels(stream), "-channel ", T, "(\"", stream.name, "\")") +end function SampledSignals.unsafe_write(sink::PortAudioSink, buf::Array, frameoffset, framecount) nwritten = 0 while nwritten < framecount - towrite = min(framecount-nwritten, CHUNKSIZE) + n = min(framecount-nwritten, CHUNKSIZE) # make a buffer of interleaved samples - transpose!(view(sink.chunkbuf, :, 1:towrite), - view(buf, (1:towrite) .+ nwritten .+ frameoffset, :)) - n = write(sink.ringbuf, sink.chunkbuf, towrite) + transpose!(view(sink.chunkbuf, :, 1:n), + view(buf, (1:n) .+ nwritten .+ frameoffset, :)) + # TODO: if the stream is closed we just want to return a + # shorter-than-requested frame count instead of throwing an error + Pa_WriteStream(sink.stream.stream, sink.chunkbuf, n, sink.stream.warn_xruns) nwritten += n - # break early if the stream is closed - n < towrite && break end nwritten @@ -371,82 +283,36 @@ end function SampledSignals.unsafe_read!(source::PortAudioSource, buf::Array, frameoffset, framecount) nread = 0 while nread < framecount - toread = min(framecount-nread, CHUNKSIZE) - n = read!(source.ringbuf, source.chunkbuf, toread) + n = min(framecount-nread, CHUNKSIZE) + # TODO: if the stream is closed we just want to return a + # shorter-than-requested frame count instead of throwing an error + Pa_ReadStream(source.stream.stream, source.chunkbuf, n, source.stream.warn_xruns) # de-interleave the samples - transpose!(view(buf, (1:toread) .+ nread .+ frameoffset, :), - view(source.chunkbuf, :, 1:toread)) + transpose!(view(buf, (1:n) .+ nread .+ frameoffset, :), + view(source.chunkbuf, :, 1:n)) - nread += toread - # break early if the stream is closed - n < toread && break + nread += n end nread end - -""" - PortAudio.shimhash() - -Return the sha256 hash(as a string) of the source file used to build the shim. -We may use this sometime to verify that the distributed binary stays in sync -with the rest of the package. -""" -shimhash() = unsafe_string(ccall((:pa_shim_getsourcehash, libpa_shim), Cstring, ())) - - -# this is called by the shim process callback to notify that there is new data. -# it's run in the audio context so don't do anything besides wake up the -# AsyncCondition handle associated with that ring buffer -notifycb(handle) = ccall(:uv_async_send, Cint, (Ref{Cvoid},), handle) - -global shim_processcb_c, notifycb_c - -function set_global_callbacks() - shim_dlib = Libdl.dlopen(libpa_shim) - - # pointer to the shim's process callback - global shim_processcb_c = Libdl.dlsym(shim_dlib, :pa_shim_processcb) - if shim_processcb_c == C_NULL - error("Got NULL pointer loading `pa_shim_processcb`") - end - - global notifycb_c = @cfunction notifycb Cint (Ptr{Cvoid},) -end - function suppress_err(dofunc::Function) nullfile = @static Sys.iswindows() ? "nul" : "/dev/null" open(nullfile, "w") do io - redirect_stdout(dofunc, io) + redirect_stderr(dofunc, io) end end -# this ref has to be set during __init__ to register itself properly with libuv -const global_cond = Ref{Base.AsyncCondition}() function __init__() - # currently libuv has issues when you try to notify more than one condition - # (see https://github.com/libuv/libuv/issues/1951). So as a workaround we use - # a global AsyncCondition that gets notified from the audio callback, and it - # handles notifying the individual RingBuffer AsyncConditions. - global_cond[] = Base.AsyncCondition() - set_global_callbacks() - + ENV["ALSA_CONFIG_DIR"] = "/usr/share/alsa" # initialize PortAudio on module load - suppress_err() do + # suppress_err() do Pa_Initialize() - end - pa_inited[] = true - notifier = @async notify_active_streams() + # end atexit() do - for str in active_streams - close(str) - end Pa_Terminate() - pa_inited[] = false - notify(global_cond[].cond) - fetch(notifier) end end diff --git a/src/libportaudio.jl b/src/libportaudio.jl index 3da627d..5037a54 100644 --- a/src/libportaudio.jl +++ b/src/libportaudio.jl @@ -43,6 +43,28 @@ const paContinue = PaStreamCallbackResult(0) const paComplete = PaStreamCallbackResult(1) const paAbort = PaStreamCallbackResult(2) +""" +Call the given expression in a separate thread, waiting on the result. This is +useful when running code that would otherwise block the Julia process (like a +`ccall` into a function that does IO). + +This will wait for the threaded call to complete even if an exception is thrown. +""" +macro tcall(ex) + quote + t = Base.Threads.@spawn $(esc(ex)) + try + fetch(t) + catch + # even if we got an exception (like an interrupt exception) make sure + # we wait for the spawned call to complete so we don't clean up + # any of its resources + wait(t) + rethrow() + end + end +end + function Pa_Initialize() err = ccall((:Pa_Initialize, libportaudio), PaError, ()) handle_status(err) @@ -170,8 +192,9 @@ function Pa_OpenStream(inParams, outParams, # matter because userdata should be GC-rooted anyways Ptr{Cvoid}), streamPtr, inParams, outParams, - sampleRate, framesPerBuffer, flags, callback, - pointer_from_objref(userdata)) + float(sampleRate), framesPerBuffer, flags, + callback === nothing ? C_NULL : callback, + userdata === nothing ? C_NULL : pointer_from_objref(userdata)) handle_status(err) streamPtr[] end @@ -211,9 +234,9 @@ end function Pa_ReadStream(stream::PaStream, buf::Array, frames::Integer=length(buf), show_warnings::Bool=true) frames <= length(buf) || error("Need a buffer at least $frames long") - err = ccall((:Pa_ReadStream, libportaudio), PaError, - (PaStream, Ref{Cvoid}, Culong), - stream, buf, frames) + err = @tcall ccall((:Pa_ReadStream, libportaudio), PaError, + (PaStream, Ptr{Cvoid}, Culong), + stream, buf, frames) handle_status(err, show_warnings) buf end @@ -221,9 +244,9 @@ end function Pa_WriteStream(stream::PaStream, buf::Array, frames::Integer=length(buf), show_warnings::Bool=true) frames <= length(buf) || error("Need a buffer at least $frames long") - err = ccall((:Pa_WriteStream, libportaudio), PaError, - (PaStream, Ref{Cvoid}, Culong), - stream, buf, frames) + err = @tcall ccall((:Pa_WriteStream, libportaudio), PaError, + (PaStream, Ptr{Cvoid}, Culong), + stream, buf, frames) handle_status(err, show_warnings) nothing end @@ -241,11 +264,11 @@ function handle_status(err::PaError, show_warnings::Bool=true) if show_warnings msg = ccall((:Pa_GetErrorText, libportaudio), Ptr{Cchar}, (PaError,), err) - warn("libportaudio: " * unsafe_string(msg)) + @warn("libportaudio: " * unsafe_string(msg)) end elseif err != PA_NO_ERROR msg = ccall((:Pa_GetErrorText, libportaudio), Ptr{Cchar}, (PaError,), err) - error("libportaudio: " * unsafe_string(msg)) + throw(ErrorException("libportaudio: " * unsafe_string(msg))) end end diff --git a/src/pa_shim.jl b/src/pa_shim.jl deleted file mode 100644 index 1e7c916..0000000 --- a/src/pa_shim.jl +++ /dev/null @@ -1,18 +0,0 @@ -const pa_shim_errmsg_t = Cint -const PA_SHIM_ERRMSG_OVERFLOW = Cint(0) # input overflow -const PA_SHIM_ERRMSG_UNDERFLOW = Cint(1) # output underflow -const PA_SHIM_ERRMSG_ERR_OVERFLOW = Cint(2) # error buffer overflowed - - -# This struct is shared with pa_shim.c -mutable struct pa_shim_info_t - inputbuf::Ptr{PaUtilRingBuffer} # ringbuffer for input - outputbuf::Ptr{PaUtilRingBuffer} # ringbuffer for output - errorbuf::Ptr{PaUtilRingBuffer} # ringbuffer to send error notifications - sync::Cint # keep input/output ring buffers synchronized (0/1) - notifycb::Ptr{Cvoid} # Julia callback to notify on updates (called from audio thread) - inputhandle::Ptr{Cvoid} # condition to notify on new input data - outputhandle::Ptr{Cvoid} # condition to notify when ready for output - errorhandle::Ptr{Cvoid} # condition to notify on new errors - globalhandle::Ptr{Cvoid} # only needed for libuv workaround -end