mostly working, but crashes sometimes

This commit is contained in:
Spencer Russell 2020-01-02 00:02:00 -05:00
parent 4c2ad4dc06
commit 9eb565e487
4 changed files with 77 additions and 208 deletions

View file

@ -6,12 +6,10 @@ version = "1.1.0"
[deps]
Libdl = "8f399da3-3557-5675-b5ff-fb832c97cbdb"
LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e"
RingBuffers = "f6d8bcc6-4e01-5431-93c4-9d6004abab90"
SampledSignals = "bd7594eb-a658-542f-9e75-4c4d8908c167"
libportaudio_jll = "2d7b7beb-0762-5160-978e-1ab83a1e8a31"
[compat]
RingBuffers = "1.2"
julia = "1.3"
[extras]

View file

@ -1,6 +1,6 @@
module PortAudio
using libportaudio_jll, Libdl, SampledSignals, RingBuffers
using libportaudio_jll, SampledSignals
import Base: eltype, show
import Base: close, isopen
@ -11,30 +11,20 @@ import LinearAlgebra: transpose!
export PortAudioStream
# Get binary dependencies loaded from BinDeps
include("pa_shim.jl")
include("libportaudio.jl")
# These sizes are all in frames
# the block size is what we request from portaudio if no blocksize is given.
# The ringbuffer and pre-fill will be twice the blocksize
const DEFAULT_BLOCKSIZE=4096
# data is passed to and from the ringbuffer in chunks with this many frames
# it should be at most the ringbuffer size, and must evenly divide into the
# the underlying portaudio buffer size. E.g. if PortAudio is running with a
# 2048-frame buffer period, the chunk size can be 2048, 1024, 512, 256, etc.
# data is passed to and from portaudio in chunks with this many frames, because
# we need to interleave the samples
const CHUNKSIZE=128
# ringbuffer to receive errors from the audio processing thread
const ERR_BUFSIZE=512
function versioninfo(io::IO=stdout)
println(io, Pa_GetVersionText())
println(io, "Version: ", Pa_GetVersion())
println(io, "Shim Source Hash: ", shimhash()[1:10])
end
mutable struct PortAudioDevice
@ -74,14 +64,11 @@ mutable struct PortAudioStream{T}
warn_xruns::Bool
sink # untyped because of circular type definition
source # untyped because of circular type definition
errbuf::RingBuffer{pa_shim_errmsg_t} # used to send errors from the portaudio callback
errtask::Task
bufinfo::pa_shim_info_t # data used in the portaudio callback
# this inner constructor is generally called via the top-level outer
# constructor below
function PortAudioStream{T}(indev::PortAudioDevice, outdev::PortAudioDevice,
inchans, outchans, sr, blocksize, synced,
inchans, outchans, sr, blocksize,
warn_xruns) where {T}
inchans = inchans == -1 ? indev.maxinchans : inchans
outchans = outchans == -1 ? outdev.maxoutchans : outchans
@ -92,32 +79,15 @@ mutable struct PortAudioStream{T}
Ptr{Pa_StreamParameters}(0) :
Ref(Pa_StreamParameters(outdev.idx, outchans, type_to_fmt[T], 0.0, C_NULL))
this = new(sr, blocksize, C_NULL, warn_xruns)
finalizer(close, this)
this.sink = PortAudioSink{T}(outdev.name, this, outchans, blocksize*2)
this.source = PortAudioSource{T}(indev.name, this, inchans, blocksize*2)
this.errbuf = RingBuffer{pa_shim_errmsg_t}(1, ERR_BUFSIZE)
if synced && inchans > 0 && outchans > 0
# we've got a synchronized duplex stream. initialize with the output buffer full
write(this.sink, SampleBuf(zeros(T, blocksize*2, outchans), sr))
end
# pass NULL for input/output we're not using
this.bufinfo = pa_shim_info_t(
inchans > 0 ? bufpointer(this.source) : C_NULL,
outchans > 0 ? bufpointer(this.sink) : C_NULL,
pointer(this.errbuf),
synced, notifycb_c,
inchans > 0 ? notifyhandle(this.source) : C_NULL,
outchans > 0 ? notifyhandle(this.sink) : C_NULL,
notifyhandle(this.errbuf),
global_cond[].handle) # this is only needed for the libuv workaround
# finalizer(close, this)
this.sink = PortAudioSink{T}(outdev.name, this, outchans)
this.source = PortAudioSource{T}(indev.name, this, inchans)
this.stream = suppress_err() do
Pa_OpenStream(inparams, outparams, float(sr), blocksize, paNoFlag,
shim_processcb_c, this.bufinfo)
Pa_OpenStream(inparams, outparams, sr, blocksize, paNoFlag,
nothing, nothing)
end
Pa_StartStream(this.stream)
this.errtask = @async handle_errors(this)
push!(active_streams, this)
this
end
@ -141,11 +111,6 @@ Options:
* `samplerate`: Sample rate (defaults to device sample rate)
* `blocksize`: Size of the blocks that are written to and read from the audio
device. (Defaults to $DEFAULT_BLOCKSIZE)
* `synced`: Determines whether the input and output streams are kept in
sync. If `true`, you must read and write an equal number of
frames, and the round-trip latency is guaranteed constant. If
`false`, you are free to read and write separately, but
overflow or underflow can affect the round-trip latency.
* `warn_xruns`: Display a warning if there is a stream overrun or underrun,
which often happens when Julia is compiling, or with a
particularly large GC run. This can be quite verbose so is
@ -153,7 +118,7 @@ Options:
"""
function PortAudioStream(indev::PortAudioDevice, outdev::PortAudioDevice,
inchans=2, outchans=2; eltype=Float32, samplerate=-1, blocksize=DEFAULT_BLOCKSIZE,
synced=false, warn_xruns=false)
warn_xruns=false)
if samplerate == -1
sampleratein = indev.defaultsamplerate
samplerateout = outdev.defaultsamplerate
@ -168,7 +133,7 @@ function PortAudioStream(indev::PortAudioDevice, outdev::PortAudioDevice,
samplerate = samplerateout
end
end
PortAudioStream{eltype}(indev, outdev, inchans, outchans, samplerate, blocksize, synced, warn_xruns)
PortAudioStream{eltype}(indev, outdev, inchans, outchans, samplerate, blocksize, warn_xruns)
end
# handle device names given as streams
@ -212,9 +177,6 @@ function PortAudioStream(inchans=2, outchans=2; kwargs...)
end
# handle do-syntax
# TODO: there seems to be a buffering issue here. Running this multiple
# times creates weird echos, and even the first time there might be something
# fishy going on. Needs investigation
function PortAudioStream(fn::Function, args...; kwargs...)
str = PortAudioStream(args...; kwargs...)
try
@ -224,39 +186,11 @@ function PortAudioStream(fn::Function, args...; kwargs...)
end
end
const pa_inited = Ref(false)
const active_streams = Set{PortAudioStream}()
function notify_active_streams()
# errors here can cause the system to hang if they're waiting on these
# conditions, so we do our own exception display for easier debugging
try
while true
wait(global_cond[])
pa_inited[] || break
for stream in active_streams
notify(stream.source.ringbuf.datanotify.cond)
notify(stream.sink.ringbuf.datanotify.cond)
notify(stream.errbuf.datanotify.cond)
end
end
catch ex
showerror(stderr, ex, backtrace())
end
end
function close(stream::PortAudioStream)
if stream.stream != C_NULL
Pa_StopStream(stream.stream)
Pa_CloseStream(stream.stream)
close(stream.source)
close(stream.sink)
close(stream.errbuf)
stream.stream = C_NULL
# wait for the error task to clean up
fetch(stream.errtask)
delete!(active_streams, stream)
end
nothing
@ -285,32 +219,6 @@ function show(io::IO, stream::PortAudioStream)
end
end
"""
handle_errors(stream::PortAudioStream)
Handle errors coming over the error stream from PortAudio. This is run as an
independent task while the stream is active.
"""
function handle_errors(stream::PortAudioStream)
err = Vector{pa_shim_errmsg_t}(undef, 1)
while true
nread = read!(stream.errbuf, err)
nread == 1 || break
if err[1] == PA_SHIM_ERRMSG_ERR_OVERFLOW
@warn "Error buffer overflowed on portaudio stream"
elseif err[1] == PA_SHIM_ERRMSG_OVERFLOW
stream.warn_xruns && @warn "Input overflowed from $(name(stream.source))"
elseif err[1] == PA_SHIM_ERRMSG_UNDERFLOW
stream.warn_xruns && @warn "Output underflowed to $(name(stream.sink))"
else
error("""
Got unrecognized error code $(err[1]) from audio thread for
portaudio stream. Please file an issue at
https://github.com/juliaaudio/portaudio.jl/issues""")
end
end
end
##################################
# PortAudioSink & PortAudioSource
##################################
@ -322,15 +230,13 @@ for (TypeName, Super) in ((:PortAudioSink, :SampleSink),
name::String
stream::PortAudioStream{T}
chunkbuf::Array{T, 2}
ringbuf::RingBuffer{T}
nchannels::Int
function $TypeName{T}(name, stream, channels, ringbufsize) where {T}
function $TypeName{T}(name, stream, channels) where {T}
# portaudio data comes in interleaved, so we'll end up transposing
# it back and forth to julia column-major
chunkbuf = zeros(T, channels, CHUNKSIZE)
ringbuf = RingBuffer{T}(channels, ringbufsize)
new(name, stream, chunkbuf, ringbuf, channels)
new(name, stream, chunkbuf, channels)
end
end
end
@ -339,30 +245,36 @@ SampledSignals.nchannels(s::Union{PortAudioSink, PortAudioSource}) = s.nchannels
SampledSignals.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s.stream)
SampledSignals.blocksize(s::Union{PortAudioSink, PortAudioSource}) = s.stream.blocksize
eltype(::Union{PortAudioSink{T}, PortAudioSource{T}}) where {T} = T
close(s::Union{PortAudioSink, PortAudioSource}) = close(s.ringbuf)
isopen(s::Union{PortAudioSink, PortAudioSource}) = isopen(s.ringbuf)
RingBuffers.notifyhandle(s::Union{PortAudioSink, PortAudioSource}) = notifyhandle(s.ringbuf)
bufpointer(s::Union{PortAudioSink, PortAudioSource}) = pointer(s.ringbuf)
function close(s::Union{PortAudioSink, PortAudioSource})
throw(ErrorException("Attempted to close PortAudioSink or PortAudioSource.
Close the containing PortAudioStream instead"))
end
isopen(s::Union{PortAudioSink, PortAudioSource}) = isopen(s.stream)
name(s::Union{PortAudioSink, PortAudioSource}) = s.name
function show(io::IO, stream::T) where {T <: Union{PortAudioSink, PortAudioSource}}
println(io, T, "(\"", stream.name, "\")")
print(io, nchannels(stream), " channels")
function show(io::IO, ::Type{PortAudioSink{T}}) where T
print(io, "PortAudioSink{$T}")
end
flush(sink::PortAudioSink) = flush(sink.ringbuf)
function show(io::IO, ::Type{PortAudioSource{T}}) where T
print(io, "PortAudioSource{$T}")
end
function show(io::IO, stream::T) where {T <: Union{PortAudioSink, PortAudioSource}}
print(io, nchannels(stream), "-channel ", T, "(\"", stream.name, "\")")
end
function SampledSignals.unsafe_write(sink::PortAudioSink, buf::Array, frameoffset, framecount)
nwritten = 0
while nwritten < framecount
towrite = min(framecount-nwritten, CHUNKSIZE)
n = min(framecount-nwritten, CHUNKSIZE)
# make a buffer of interleaved samples
transpose!(view(sink.chunkbuf, :, 1:towrite),
view(buf, (1:towrite) .+ nwritten .+ frameoffset, :))
n = write(sink.ringbuf, sink.chunkbuf, towrite)
transpose!(view(sink.chunkbuf, :, 1:n),
view(buf, (1:n) .+ nwritten .+ frameoffset, :))
# TODO: if the stream is closed we just want to return a
# shorter-than-requested frame count instead of throwing an error
Pa_WriteStream(sink.stream.stream, sink.chunkbuf, n, sink.stream.warn_xruns)
nwritten += n
# break early if the stream is closed
n < towrite && break
end
nwritten
@ -371,82 +283,36 @@ end
function SampledSignals.unsafe_read!(source::PortAudioSource, buf::Array, frameoffset, framecount)
nread = 0
while nread < framecount
toread = min(framecount-nread, CHUNKSIZE)
n = read!(source.ringbuf, source.chunkbuf, toread)
n = min(framecount-nread, CHUNKSIZE)
# TODO: if the stream is closed we just want to return a
# shorter-than-requested frame count instead of throwing an error
Pa_ReadStream(source.stream.stream, source.chunkbuf, n, source.stream.warn_xruns)
# de-interleave the samples
transpose!(view(buf, (1:toread) .+ nread .+ frameoffset, :),
view(source.chunkbuf, :, 1:toread))
transpose!(view(buf, (1:n) .+ nread .+ frameoffset, :),
view(source.chunkbuf, :, 1:n))
nread += toread
# break early if the stream is closed
n < toread && break
nread += n
end
nread
end
"""
PortAudio.shimhash()
Return the sha256 hash(as a string) of the source file used to build the shim.
We may use this sometime to verify that the distributed binary stays in sync
with the rest of the package.
"""
shimhash() = unsafe_string(ccall((:pa_shim_getsourcehash, libpa_shim), Cstring, ()))
# this is called by the shim process callback to notify that there is new data.
# it's run in the audio context so don't do anything besides wake up the
# AsyncCondition handle associated with that ring buffer
notifycb(handle) = ccall(:uv_async_send, Cint, (Ref{Cvoid},), handle)
global shim_processcb_c, notifycb_c
function set_global_callbacks()
shim_dlib = Libdl.dlopen(libpa_shim)
# pointer to the shim's process callback
global shim_processcb_c = Libdl.dlsym(shim_dlib, :pa_shim_processcb)
if shim_processcb_c == C_NULL
error("Got NULL pointer loading `pa_shim_processcb`")
end
global notifycb_c = @cfunction notifycb Cint (Ptr{Cvoid},)
end
function suppress_err(dofunc::Function)
nullfile = @static Sys.iswindows() ? "nul" : "/dev/null"
open(nullfile, "w") do io
redirect_stdout(dofunc, io)
redirect_stderr(dofunc, io)
end
end
# this ref has to be set during __init__ to register itself properly with libuv
const global_cond = Ref{Base.AsyncCondition}()
function __init__()
# currently libuv has issues when you try to notify more than one condition
# (see https://github.com/libuv/libuv/issues/1951). So as a workaround we use
# a global AsyncCondition that gets notified from the audio callback, and it
# handles notifying the individual RingBuffer AsyncConditions.
global_cond[] = Base.AsyncCondition()
set_global_callbacks()
ENV["ALSA_CONFIG_DIR"] = "/usr/share/alsa"
# initialize PortAudio on module load
suppress_err() do
# suppress_err() do
Pa_Initialize()
end
pa_inited[] = true
notifier = @async notify_active_streams()
# end
atexit() do
for str in active_streams
close(str)
end
Pa_Terminate()
pa_inited[] = false
notify(global_cond[].cond)
fetch(notifier)
end
end

View file

@ -43,6 +43,28 @@ const paContinue = PaStreamCallbackResult(0)
const paComplete = PaStreamCallbackResult(1)
const paAbort = PaStreamCallbackResult(2)
"""
Call the given expression in a separate thread, waiting on the result. This is
useful when running code that would otherwise block the Julia process (like a
`ccall` into a function that does IO).
This will wait for the threaded call to complete even if an exception is thrown.
"""
macro tcall(ex)
quote
t = Base.Threads.@spawn $(esc(ex))
try
fetch(t)
catch
# even if we got an exception (like an interrupt exception) make sure
# we wait for the spawned call to complete so we don't clean up
# any of its resources
wait(t)
rethrow()
end
end
end
function Pa_Initialize()
err = ccall((:Pa_Initialize, libportaudio), PaError, ())
handle_status(err)
@ -170,8 +192,9 @@ function Pa_OpenStream(inParams, outParams,
# matter because userdata should be GC-rooted anyways
Ptr{Cvoid}),
streamPtr, inParams, outParams,
sampleRate, framesPerBuffer, flags, callback,
pointer_from_objref(userdata))
float(sampleRate), framesPerBuffer, flags,
callback === nothing ? C_NULL : callback,
userdata === nothing ? C_NULL : pointer_from_objref(userdata))
handle_status(err)
streamPtr[]
end
@ -211,9 +234,9 @@ end
function Pa_ReadStream(stream::PaStream, buf::Array, frames::Integer=length(buf),
show_warnings::Bool=true)
frames <= length(buf) || error("Need a buffer at least $frames long")
err = ccall((:Pa_ReadStream, libportaudio), PaError,
(PaStream, Ref{Cvoid}, Culong),
stream, buf, frames)
err = @tcall ccall((:Pa_ReadStream, libportaudio), PaError,
(PaStream, Ptr{Cvoid}, Culong),
stream, buf, frames)
handle_status(err, show_warnings)
buf
end
@ -221,9 +244,9 @@ end
function Pa_WriteStream(stream::PaStream, buf::Array, frames::Integer=length(buf),
show_warnings::Bool=true)
frames <= length(buf) || error("Need a buffer at least $frames long")
err = ccall((:Pa_WriteStream, libportaudio), PaError,
(PaStream, Ref{Cvoid}, Culong),
stream, buf, frames)
err = @tcall ccall((:Pa_WriteStream, libportaudio), PaError,
(PaStream, Ptr{Cvoid}, Culong),
stream, buf, frames)
handle_status(err, show_warnings)
nothing
end
@ -241,11 +264,11 @@ function handle_status(err::PaError, show_warnings::Bool=true)
if show_warnings
msg = ccall((:Pa_GetErrorText, libportaudio),
Ptr{Cchar}, (PaError,), err)
warn("libportaudio: " * unsafe_string(msg))
@warn("libportaudio: " * unsafe_string(msg))
end
elseif err != PA_NO_ERROR
msg = ccall((:Pa_GetErrorText, libportaudio),
Ptr{Cchar}, (PaError,), err)
error("libportaudio: " * unsafe_string(msg))
throw(ErrorException("libportaudio: " * unsafe_string(msg)))
end
end

View file

@ -1,18 +0,0 @@
const pa_shim_errmsg_t = Cint
const PA_SHIM_ERRMSG_OVERFLOW = Cint(0) # input overflow
const PA_SHIM_ERRMSG_UNDERFLOW = Cint(1) # output underflow
const PA_SHIM_ERRMSG_ERR_OVERFLOW = Cint(2) # error buffer overflowed
# This struct is shared with pa_shim.c
mutable struct pa_shim_info_t
inputbuf::Ptr{PaUtilRingBuffer} # ringbuffer for input
outputbuf::Ptr{PaUtilRingBuffer} # ringbuffer for output
errorbuf::Ptr{PaUtilRingBuffer} # ringbuffer to send error notifications
sync::Cint # keep input/output ring buffers synchronized (0/1)
notifycb::Ptr{Cvoid} # Julia callback to notify on updates (called from audio thread)
inputhandle::Ptr{Cvoid} # condition to notify on new input data
outputhandle::Ptr{Cvoid} # condition to notify when ready for output
errorhandle::Ptr{Cvoid} # condition to notify on new errors
globalhandle::Ptr{Cvoid} # only needed for libuv workaround
end