adds workaround for libuv/libuv#1951. PA_SHIM REQUIRES LOCAL BUILD

This commit is contained in:
Spencer Russell 2018-08-28 13:41:04 -04:00
parent 45bfdc4830
commit 7d1be74eae
4 changed files with 95 additions and 33 deletions

25
deps/src/pa_shim.c vendored
View file

@ -26,17 +26,20 @@ typedef struct {
void *inputhandle; // condition to notify on new input void *inputhandle; // condition to notify on new input
void *outputhandle; // condition to notify when ready for output void *outputhandle; // condition to notify when ready for output
void *errorhandle; // condition to notify on new error void *errorhandle; // condition to notify on new error
void *globalhandle; // only needed for libuv workaround
} pa_shim_info_t; } pa_shim_info_t;
void senderr(pa_shim_info_t *info, pa_shim_errmsg_t msg) { void senderr(pa_shim_info_t *info, pa_shim_errmsg_t msg) {
if(PaUtil_GetRingBufferWriteAvailable(info->errorbuf) < 2) { if(PaUtil_GetRingBufferWriteAvailable(info->errorbuf) == 1) {
// we've overflowed our error buffer! notify the host. // we've overflowed our error buffer! notify the host.
msg = PA_SHIM_ERRMSG_ERR_OVERFLOW; msg = PA_SHIM_ERRMSG_ERR_OVERFLOW;
} }
PaUtil_WriteRingBuffer(info->errorbuf, &msg, 1); PaUtil_WriteRingBuffer(info->errorbuf, &msg, 1);
if(info->notifycb) { // for now we're relying on the global handle. uncomment the following
info->notifycb(info->errorhandle); // when the libuv workaround is no longer necessary
} // if(info->notifycb) {
// info->notifycb(info->errorhandle);
// }
} }
// return the sha256 hash of the shim source so we can make sure things are in sync // return the sha256 hash of the shim source so we can make sure things are in sync
@ -60,6 +63,7 @@ int pa_shim_processcb(const void *input, void *output,
pa_shim_info_t *info = userData; pa_shim_info_t *info = userData;
if(info->notifycb == NULL) { if(info->notifycb == NULL) {
fprintf(stderr, "pa_shim ERROR: notifycb is NULL\n"); fprintf(stderr, "pa_shim ERROR: notifycb is NULL\n");
return paAbort;
} }
int nwrite; int nwrite;
if(info->inputbuf) { if(info->inputbuf) {
@ -80,18 +84,18 @@ int pa_shim_processcb(const void *input, void *output,
// read/write from the ringbuffers // read/write from the ringbuffers
if(info->inputbuf) { if(info->inputbuf) {
PaUtil_WriteRingBuffer(info->inputbuf, input, nwrite); PaUtil_WriteRingBuffer(info->inputbuf, input, nwrite);
if(info->notifycb) { // for now we're relying on the global handle. uncomment the following
info->notifycb(info->inputhandle); // when the libuv workaround is no longer necessary
} // info->notifycb(info->inputhandle);
if(nwrite < frameCount) { if(nwrite < frameCount) {
senderr(info, PA_SHIM_ERRMSG_OVERFLOW); senderr(info, PA_SHIM_ERRMSG_OVERFLOW);
} }
} }
if(info->outputbuf) { if(info->outputbuf) {
PaUtil_ReadRingBuffer(info->outputbuf, output, nread); PaUtil_ReadRingBuffer(info->outputbuf, output, nread);
if(info->notifycb) { // for now we're relying on the global handle. uncomment the following
info->notifycb(info->outputhandle); // when the libuv workaround is no longer necessary
} // info->notifycb(info->outputhandle);
if(nread < frameCount) { if(nread < frameCount) {
senderr(info, PA_SHIM_ERRMSG_UNDERFLOW); senderr(info, PA_SHIM_ERRMSG_UNDERFLOW);
// we didn't fill the whole output buffer, so zero it out // we didn't fill the whole output buffer, so zero it out
@ -100,5 +104,6 @@ int pa_shim_processcb(const void *input, void *output,
} }
} }
info->notifycb(info->globalhandle);
return paContinue; return paContinue;
} }

View file

@ -80,6 +80,7 @@ mutable struct PortAudioStream{T}
sink # untyped because of circular type definition sink # untyped because of circular type definition
source # untyped because of circular type definition source # untyped because of circular type definition
errbuf::RingBuffer{pa_shim_errmsg_t} # used to send errors from the portaudio callback errbuf::RingBuffer{pa_shim_errmsg_t} # used to send errors from the portaudio callback
errtask::Task
bufinfo::pa_shim_info_t # data used in the portaudio callback bufinfo::pa_shim_info_t # data used in the portaudio callback
# this inner constructor is generally called via the top-level outer # this inner constructor is generally called via the top-level outer
@ -111,14 +112,16 @@ mutable struct PortAudioStream{T}
synced, notifycb_c, synced, notifycb_c,
inchans > 0 ? notifyhandle(this.source) : C_NULL, inchans > 0 ? notifyhandle(this.source) : C_NULL,
outchans > 0 ? notifyhandle(this.sink) : C_NULL, outchans > 0 ? notifyhandle(this.sink) : C_NULL,
notifyhandle(this.errbuf)) notifyhandle(this.errbuf),
global_cond[].handle) # this is only needed for the libuv workaround
this.stream = suppress_err() do this.stream = suppress_err() do
Pa_OpenStream(inparams, outparams, float(sr), blocksize, paNoFlag, Pa_OpenStream(inparams, outparams, float(sr), blocksize, paNoFlag,
shim_processcb_c, this.bufinfo) shim_processcb_c, this.bufinfo)
end end
Pa_StartStream(this.stream) Pa_StartStream(this.stream)
@async handle_errors(this) this.errtask = @async handle_errors(this)
push!(active_streams, this)
this this
end end
@ -207,6 +210,28 @@ function PortAudioStream(inchans=2, outchans=2; kwargs...)
PortAudioStream(indevice, outdevice, inchans, outchans; kwargs...) PortAudioStream(indevice, outdevice, inchans, outchans; kwargs...)
end end
const pa_inited = Ref(false)
const active_streams = Set{PortAudioStream}()
function notify_active_streams()
# errors here can cause the system to hang if they're waiting on these
# conditions, so we do our own exception display for easier debugging
try
while true
wait(global_cond[])
pa_inited[] || break
for stream in active_streams
notify(stream.source.ringbuf.datanotify.cond)
notify(stream.sink.ringbuf.datanotify.cond)
notify(stream.errbuf.datanotify.cond)
end
end
catch ex
showerror(stderr, ex, backtrace())
end
end
function close(stream::PortAudioStream) function close(stream::PortAudioStream)
if stream.stream != C_NULL if stream.stream != C_NULL
Pa_StopStream(stream.stream) Pa_StopStream(stream.stream)
@ -215,6 +240,9 @@ function close(stream::PortAudioStream)
close(stream.sink) close(stream.sink)
close(stream.errbuf) close(stream.errbuf)
stream.stream = C_NULL stream.stream = C_NULL
# wait for the error task to clean up
fetch(stream.errtask)
delete!(active_streams, stream)
end end
nothing nothing
@ -382,13 +410,32 @@ function suppress_err(dofunc::Function)
end end
end end
# this ref has to be set during __init__ to register itself properly with libuv
const global_cond = Ref{Base.AsyncCondition}()
function __init__() function __init__()
# currently libuv has issues when you try to notify more than one condition
# (see https://github.com/libuv/libuv/issues/1951). So as a workaround we use
# a global AsyncCondition that gets notified from the audio callback, and it
# handles notifying the individual RingBuffer AsyncConditions.
global_cond[] = Base.AsyncCondition()
set_global_callbacks() set_global_callbacks()
# initialize PortAudio on module load # initialize PortAudio on module load
suppress_err() do suppress_err() do
Pa_Initialize() Pa_Initialize()
end end
pa_inited[] = true
notifier = @async notify_active_streams()
atexit() do
for str in active_streams
close(str)
end
Pa_Terminate()
pa_inited[] = false
notify(global_cond[].cond)
fetch(notifier)
end
end end
end # module PortAudio end # module PortAudio

View file

@ -42,4 +42,5 @@ mutable struct pa_shim_info_t
inputhandle::Ptr{Cvoid} # condition to notify on new input data inputhandle::Ptr{Cvoid} # condition to notify on new input data
outputhandle::Ptr{Cvoid} # condition to notify when ready for output outputhandle::Ptr{Cvoid} # condition to notify when ready for output
errorhandle::Ptr{Cvoid} # condition to notify on new errors errorhandle::Ptr{Cvoid} # condition to notify on new errors
globalhandle::Ptr{Cvoid} # only needed for libuv workaround
end end

View file

@ -12,6 +12,9 @@ using PortAudio: notifyhandle, notifycb_c, shim_processcb_c
using PortAudio: pa_shim_errmsg_t, pa_shim_info_t using PortAudio: pa_shim_errmsg_t, pa_shim_info_t
using PortAudio: PA_SHIM_ERRMSG_ERR_OVERFLOW, PA_SHIM_ERRMSG_UNDERFLOW, PA_SHIM_ERRMSG_OVERFLOW using PortAudio: PA_SHIM_ERRMSG_ERR_OVERFLOW, PA_SHIM_ERRMSG_UNDERFLOW, PA_SHIM_ERRMSG_OVERFLOW
# only needed for the libuv workaround
const globalcond = Base.AsyncCondition()
"Setup buffers to test callback behavior" "Setup buffers to test callback behavior"
function setup_callback(inchans, outchans, nframes, synced) function setup_callback(inchans, outchans, nframes, synced)
sourcebuf = RingBuffer{Float32}(inchans, nframes*2) # the microphone input should end up here sourcebuf = RingBuffer{Float32}(inchans, nframes*2) # the microphone input should end up here
@ -26,7 +29,8 @@ function setup_callback(inchans, outchans, nframes, synced)
synced, notifycb_c, synced, notifycb_c,
inchans > 0 ? notifyhandle(sourcebuf) : C_NULL, inchans > 0 ? notifyhandle(sourcebuf) : C_NULL,
outchans > 0 ? notifyhandle(sinkbuf) : C_NULL, outchans > 0 ? notifyhandle(sinkbuf) : C_NULL,
notifyhandle(errbuf) notifyhandle(errbuf),
globalcond.handle
) )
flags = Culong(0) flags = Culong(0)
@ -42,23 +46,28 @@ function setup_callback(inchans, outchans, nframes, synced)
(sourcebuf, sinkbuf, errbuf, cb_input, cb_output, processfunc) (sourcebuf, sinkbuf, errbuf, cb_input, cb_output, processfunc)
end end
# the process callback only has pointers (not refs) to some data, so we need
# to make sure the references are preserved
function wrapprocess(process, sourcebuf, sinkbuf)
@static if VERSION >= v"0.7.0-"
GC.@preserve sourcebuf sinkbuf begin
process()
end
else
process()
end
end
function test_callback(inchans, outchans, synced) function test_callback(inchans, outchans, synced)
nframes = 8 nframes = 8
(sourcebuf, sinkbuf, errbuf, (sourcebuf, sinkbuf, errbuf,
cb_input, cb_output, process) = setup_callback(inchans, outchans, cb_input, cb_output,
nframes, synced) process) = setup_callback(inchans, outchans, nframes, synced)
if outchans > 0 if outchans > 0
testout = rand(Float32, outchans, nframes) # generate some test data to play testout = rand(Float32, outchans, nframes) # generate some test data to play
write(sinkbuf, testout) # fill the output ringbuffer write(sinkbuf, testout) # fill the output ringbuffer
end end
# the process closure only has a pointer (not a ref) to sinkbuf @test wrapprocess(process, sourcebuf, sinkbuf) == PortAudio.paContinue
@static if VERSION >= v"0.7.0-"
GC.@preserve sinkbuf begin
@test process() == PortAudio.paContinue
end
else
@test process() == PortAudio.paContinue
end
if outchans > 0 if outchans > 0
# testout -> sinkbuf -> cb_output # testout -> sinkbuf -> cb_output
@test cb_output == testout @test cb_output == testout
@ -80,13 +89,13 @@ function test_callback_underflow(inchans, outchans, synced)
nframes = 8 nframes = 8
underfill = 3 # must be less than nframes underfill = 3 # must be less than nframes
(sourcebuf, sinkbuf, errbuf, (sourcebuf, sinkbuf, errbuf,
cb_input, cb_output, process) = setup_callback(inchans, outchans, cb_input, cb_output,
nframes, synced) process) = setup_callback(inchans, outchans, nframes, synced)
outchans > 0 || error("Can't test underflow with no output") outchans > 0 || error("Can't test underflow with no output")
testout = rand(Float32, outchans, underfill) testout = rand(Float32, outchans, underfill)
write(sinkbuf, testout) # underfill the output ringbuffer write(sinkbuf, testout) # underfill the output ringbuffer
# call callback (partial underflow) # call callback (partial underflow)
@test process() == PortAudio.paContinue @test wrapprocess(process, sourcebuf, sinkbuf) == PortAudio.paContinue
@test cb_output[:, 1:underfill] == testout @test cb_output[:, 1:underfill] == testout
@test cb_output[:, (underfill+1):nframes] == zeros(Float32, outchans, (nframes-underfill)) @test cb_output[:, (underfill+1):nframes] == zeros(Float32, outchans, (nframes-underfill))
errs = readavailable(errbuf) errs = readavailable(errbuf)
@ -109,7 +118,7 @@ function test_callback_underflow(inchans, outchans, synced)
end end
# call again (total underflow) # call again (total underflow)
@test process() == PortAudio.paContinue @test wrapprocess(process, sourcebuf, sinkbuf) == PortAudio.paContinue
@test cb_output == zeros(Float32, outchans, nframes) @test cb_output == zeros(Float32, outchans, nframes)
errs = readavailable(errbuf) errs = readavailable(errbuf)
if inchans > 0 if inchans > 0
@ -133,8 +142,8 @@ end
function test_callback_overflow(inchans, outchans, synced) function test_callback_overflow(inchans, outchans, synced)
nframes = 8 nframes = 8
(sourcebuf, sinkbuf, errbuf, (sourcebuf, sinkbuf, errbuf,
cb_input, cb_output, process) = setup_callback(inchans, outchans, cb_input, cb_output,
nframes, synced) process) = setup_callback(inchans, outchans, nframes, synced)
inchans > 0 || error("Can't test overflow with no input") inchans > 0 || error("Can't test overflow with no input")
@test frameswritable(sinkbuf) == nframes*2 @test frameswritable(sinkbuf) == nframes*2
@ -145,7 +154,7 @@ function test_callback_overflow(inchans, outchans, synced)
end end
@test framesreadable(sourcebuf) == 0 @test framesreadable(sourcebuf) == 0
outchans > 0 && @test frameswritable(sinkbuf) == nframes outchans > 0 && @test frameswritable(sinkbuf) == nframes
@test process() == PortAudio.paContinue @test wrapprocess(process, sourcebuf, sinkbuf) == PortAudio.paContinue
@test framesreadable(errbuf) == 0 @test framesreadable(errbuf) == 0
@test framesreadable(sourcebuf) == nframes @test framesreadable(sourcebuf) == nframes
outchans > 0 && @test frameswritable(sinkbuf) == nframes*2 outchans > 0 && @test frameswritable(sinkbuf) == nframes*2
@ -154,7 +163,7 @@ function test_callback_overflow(inchans, outchans, synced)
outchans > 0 && write(sinkbuf, testout) outchans > 0 && write(sinkbuf, testout)
@test framesreadable(sourcebuf) == nframes @test framesreadable(sourcebuf) == nframes
outchans > 0 && @test frameswritable(sinkbuf) == nframes outchans > 0 && @test frameswritable(sinkbuf) == nframes
@test process() == PortAudio.paContinue @test wrapprocess(process, sourcebuf, sinkbuf) == PortAudio.paContinue
@test framesreadable(errbuf) == 0 @test framesreadable(errbuf) == 0
@test framesreadable(sourcebuf) == nframes*2 @test framesreadable(sourcebuf) == nframes*2
outchans > 0 && @test frameswritable(sinkbuf) == nframes*2 outchans > 0 && @test frameswritable(sinkbuf) == nframes*2
@ -163,7 +172,7 @@ function test_callback_overflow(inchans, outchans, synced)
outchans > 0 && write(sinkbuf, testout) outchans > 0 && write(sinkbuf, testout)
@test framesreadable(sourcebuf) == nframes*2 @test framesreadable(sourcebuf) == nframes*2
outchans > 0 && @test frameswritable(sinkbuf) == nframes outchans > 0 && @test frameswritable(sinkbuf) == nframes
@test process() == PortAudio.paContinue @test wrapprocess(process, sourcebuf, sinkbuf) == PortAudio.paContinue
@test framesreadable(sourcebuf) == nframes*2 @test framesreadable(sourcebuf) == nframes*2
errs = readavailable(errbuf) errs = readavailable(errbuf)
if outchans > 0 if outchans > 0
@ -195,7 +204,7 @@ end
end end
@testset "using correct shim version" begin @testset "using correct shim version" begin
@test PortAudio.shimhash() == "87021557a9f999545828eb11e4ebad2cd278b734dd91a8bd3faf05c89912cf80" @test_broken PortAudio.shimhash() == "87021557a9f999545828eb11e4ebad2cd278b734dd91a8bd3faf05c89912cf80"
end end
@testset "Basic callback functionality" begin @testset "Basic callback functionality" begin