seems to be mostly working with lockfree ringbuffer

This commit is contained in:
Spencer Russell 2016-07-29 01:44:02 -04:00
parent e40933b97b
commit 77dcb8965c
2 changed files with 93 additions and 84 deletions

View file

@ -6,7 +6,7 @@ using SampledSignals
using Devectorize
using RingBuffers
using Compat
import Compat: UTF8String
import Compat: UTF8String, view
# Get binary dependencies loaded from BinDeps
include( "../deps/deps.jl")
@ -14,7 +14,13 @@ include("libportaudio.jl")
export PortAudioStream
# Size of the ringbuffer in frames. 85ms latency at 48kHz
const DEFAULT_BUFSIZE=4096
# data is passed to and from the ringbuffer in chunks with this many frames
# it should be at most the ringbuffer size, and must evenly divide into the
# the underlying portaudio buffer size. E.g. if PortAudio is running with a
# 2048-frame buffer period, the chunk size can be 2048, 1024, 512, 256, etc.
const CHUNKSIZE=128
function __init__()
# initialize PortAudio on module load
@ -68,17 +74,12 @@ function fieldptr{T}(obj::T, field::Symbol)
Ptr{FT}(pointer_from_objref(obj) + offset)
end
# Used to synchronize the portaudio callback and Julia task
@enum BufferState JuliaPending PortAudioPending
# we want this to be immutable so we can stack allocate it
immutable CallbackInfo{T}
inchannels::Int
inbuf::Ptr{T}
inbuf::LockFreeRingBuffer{T}
outchannels::Int
outbuf::Ptr{T}
taskhandle::Ptr{Void}
bufstate::Ptr{BufferState}
outbuf::LockFreeRingBuffer{T}
end
# paramaterized on the sample type and sampling rate type
@ -88,8 +89,6 @@ type PortAudioStream{T, U}
stream::PaStream
sink # untyped because of circular type definition
source # untyped because of circular type definition
taskwork::Base.SingleAsyncWork
bufstate::BufferState # used to synchronize the portaudio and julia sides
bufinfo::CallbackInfo{T} # immutable data used in the portaudio callback
# this inner constructor is generally called via the top-level outer
@ -104,16 +103,14 @@ type PortAudioStream{T, U}
Ref(Pa_StreamParameters(outdev.idx, outchans, type_to_fmt[T], 0.0, C_NULL))
this = new(sr, bufsize, C_NULL)
finalizer(this, close)
this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize;
prefill=false, underflow=PAD)
this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize;
prefill=true, overflow=OVERWRITE)
this.taskwork = Base.SingleAsyncWork(_ -> audiotask(this))
this.bufstate = PortAudioPending
this.bufinfo = CallbackInfo(inchans, pointer(this.source.pabuf),
outchans, pointer(this.sink.pabuf),
this.taskwork.handle,
fieldptr(this, :bufstate))
this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize)
this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize)
if inchans > 0 && outchans > 0
# we've got a duplex stream. initialize with the output buffer full
write(this.sink, SampleBuf(zeros(T, bufsize, outchans), sr))
end
this.bufinfo = CallbackInfo(inchans, this.source.ringbuf,
outchans, this.sink.ringbuf)
this.stream = Pa_OpenStream(inparams, outparams, float(sr), bufsize,
paNoFlag, pa_callbacks[T], fieldptr(this, :bufinfo))
@ -121,7 +118,6 @@ type PortAudioStream{T, U}
this
end
end
# this is the top-level outer constructor that all the other outer constructors
@ -169,6 +165,8 @@ function Base.close(stream::PortAudioStream)
if stream.stream != C_NULL
Pa_StopStream(stream.stream)
Pa_CloseStream(stream.stream)
close(stream.source)
close(stream.sink)
stream.stream = C_NULL
end
@ -203,27 +201,24 @@ for (TypeName, Super) in ((:PortAudioSink, :SampleSink),
@eval type $TypeName{T, U} <: $Super
name::UTF8String
stream::PortAudioStream{T, U}
jlbuf::Array{T, 2}
pabuf::Array{T, 2}
ringbuf::RingBuffer{T}
chunkbuf::Array{T, 2}
ringbuf::LockFreeRingBuffer{T}
nchannels::Int
function $TypeName(name, stream, channels, bufsize; prefill=false, ringbuf_args...)
function $TypeName(name, stream, channels, bufsize)
# portaudio data comes in interleaved, so we'll end up transposing
# it back and forth to julia column-major
jlbuf = zeros(T, bufsize, channels)
pabuf = zeros(T, channels, bufsize)
ringbuf = RingBuffer(T, bufsize, channels; ringbuf_args...)
if prefill
write(ringbuf, zeros(T, bufsize, channels))
end
new(name, stream, jlbuf, pabuf, ringbuf)
chunkbuf = zeros(T, channels, CHUNKSIZE)
ringbuf = LockFreeRingBuffer(T, bufsize * channels)
new(name, stream, chunkbuf, ringbuf, channels)
end
end
end
SampledSignals.nchannels(s::Union{PortAudioSink, PortAudioSource}) = size(s.jlbuf, 2)
SampledSignals.nchannels(s::Union{PortAudioSink, PortAudioSource}) = s.nchannels
SampledSignals.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s.stream)
Base.eltype{T, U}(::Union{PortAudioSink{T, U}, PortAudioSource{T, U}}) = T
Base.close(s::Union{PortAudioSink, PortAudioSource}) = close(s.ringbuf)
function Base.show{T <: Union{PortAudioSink, PortAudioSource}}(io::IO, stream::T)
println(io, T, "(\"", stream.name, "\")")
@ -232,11 +227,43 @@ end
function SampledSignals.unsafe_write(sink::PortAudioSink, buf::SampleBuf)
write(sink.ringbuf, buf)
total = nframes(buf)
nwritten = 0
while nwritten < total
towrite = min(CHUNKSIZE, total-nwritten)
# make a buffer of interleaved samples
# TODO: don't directly access buf.data
transpose!(view(sink.chunkbuf, :, 1:towrite),
view(buf.data, (1:towrite)+nwritten, :))
while nwritable(sink.ringbuf) < towrite
wait(sink.ringbuf)
end
write(sink.ringbuf, sink.chunkbuf, towrite*nchannels(sink))
nwritten += towrite
end
nwritten
end
function SampledSignals.unsafe_read!(source::PortAudioSource, buf::SampleBuf)
read!(source.ringbuf, buf)
total = nframes(buf)
nread = 0
while nread < total
toread = min(CHUNKSIZE, total-nread)
while nreadable(source.ringbuf) < toread
wait(source.ringbuf)
end
read!(source.ringbuf, source.chunkbuf, toread*nchannels(source))
# de-interleave the samples
# TODO: don't directly access buf.data
transpose!(view(buf.data, (1:toread)+nread, :),
view(source.chunkbuf, :, 1:toread))
nread += toread
end
nread
end
# This is the callback function that gets called directly in the PortAudio
@ -244,43 +271,25 @@ end
function portaudio_callback{T}(inptr::Ptr{T}, outptr::Ptr{T},
nframes, timeinfo, flags, userdata::Ptr{CallbackInfo{T}})
info = unsafe_load(userdata)
insamples = nframes * info.inchannels
outsamples = nframes * info.outchannels
if(unsafe_load(info.bufstate) != PortAudioPending)
if nreadable(info.outbuf) < outsamples ||
nwritable(info.inbuf) < insamples
# xrun, copy zeros to outbuffer
# TODO: send a notification to an error msg ringbuf
# TODO (maybe): we could do a partial write if there's anything in the
# ringbuf, and minimize the dropout
memset(outptr, 0, sizeof(T)*nframes*info.outchannels)
return paContinue
end
unsafe_copy!(info.inbuf, inptr, nframes * info.inchannels)
unsafe_copy!(outptr, info.outbuf, nframes * info.outchannels)
unsafe_store!(info.bufstate, JuliaPending)
# notify the julia audio task
ccall(:uv_async_send, Void, (Ptr{Void},), info.taskhandle)
read!(info.outbuf, outptr, outsamples)
write(info.inbuf, inptr, insamples)
paContinue
end
# this gets called from uv_async_send, so it MUST NOT BLOCK
function audiotask{T, U}(stream::PortAudioStream{T, U})
try
if stream.bufstate != JuliaPending
return
end
transpose!(stream.source.jlbuf, stream.source.pabuf)
write(stream.source.ringbuf, stream.source.jlbuf)
read!(stream.sink.ringbuf, stream.sink.jlbuf)
transpose!(stream.sink.pabuf, stream.sink.jlbuf)
stream.bufstate = PortAudioPending
catch ex
warn("Audio Task died with exception: $ex")
Base.show_backtrace(STDOUT, catch_backtrace())
end
end
memset(buf, val, count) = ccall(:memset, Ptr{Void},
(Ptr{Void}, Cint, Csize_t),

View file

@ -7,6 +7,7 @@ else
end
using PortAudio
using SampledSignals
using RingBuffers
# these test are currently set up to run on OSX
@ -31,17 +32,16 @@ using SampledSignals
end
@testset "PortAudio Callback works and doesn't allocate" begin
inbuf = rand(Float32, 2, 8)
outbuf = Array(Float32, 2, 8)
sinkbuf = rand(Float32, 2, 8)
sourcebuf = Array(Float32, 2, 8)
state = Ref(PortAudio.PortAudioPending)
work = Base.SingleAsyncWork(data -> nothing)
inbuf = rand(Float32, 16) # simulate microphone input
sourcebuf = LockFreeRingBuffer(Float32, 64) # the microphone input should end up here
testin = zeros(Float32, 16)
info = PortAudio.CallbackInfo(2, pointer(sourcebuf),
2, pointer(sinkbuf),
work.handle,
Ptr{PortAudio.BufferState}(pointer_from_objref(state)))
outbuf = zeros(Float32, 24) # this is where the output should go
sinkbuf = LockFreeRingBuffer(Float32, 64) # the callback should copy this to outbuf
testout = rand(Float32, 24)
write(sinkbuf, testout) # fill the output ringbuffer
info = PortAudio.CallbackInfo(2, sourcebuf, 3, sinkbuf)
# handle any conversions here so they don't mess with the allocation
inptr = pointer(inbuf)
@ -53,18 +53,18 @@ using SampledSignals
ret = PortAudio.portaudio_callback(inptr, outptr, nframes, C_NULL, flags, infoptr)
@test isa(ret, Cint)
@test ret == PortAudio.paContinue
@test outbuf == sinkbuf
@test inbuf == sourcebuf
@test state[] == PortAudio.JuliaPending
@test outbuf == testout
read!(sourcebuf, testin)
@test inbuf == testin
# call again (underrun)
ret = PortAudio.portaudio_callback(inptr, outptr, nframes, C_NULL, flags, infoptr)
@test isa(ret, Cint)
@test ret == PortAudio.paContinue
@test outbuf == zeros(Float32, 2, 8)
@test outbuf == zeros(Float32, 24)
write(sinkbuf, testout) # fill the output ringbuffer
# test allocation
state[] = PortAudio.PortAudioPending
alloc = @allocated PortAudio.portaudio_callback(inptr, outptr, nframes, C_NULL, flags, infoptr)
@test alloc == 0
# now test allocation in underrun state
@ -74,15 +74,15 @@ using SampledSignals
@testset "Open Default Device" begin
stream = PortAudioStream()
buf = read(stream, 0.1s)
@test size(buf) == (round(Int, 0.1s * samplerate(stream)), nchannels(stream.source))
buf = read(stream, 0.001s)
@test size(buf) == (round(Int, 0.001s * samplerate(stream)), nchannels(stream.source))
write(stream, buf)
close(stream)
end
@testset "Open Device by name" begin
stream = PortAudioStream("Built-in Microph", "Built-in Output")
buf = read(stream, 0.1s)
@test size(buf) == (round(Int, 0.1s * samplerate(stream)), nchannels(stream.source))
buf = read(stream, 0.001s)
@test size(buf) == (round(Int, 0.001s * samplerate(stream)), nchannels(stream.source))
write(stream, buf)
io = IOBuffer()
show(io, stream)
@ -100,8 +100,8 @@ using SampledSignals
# no way to check that the right data is actually getting read or written here,
# but at least it's not crashing.
@testset "Queued Writing" begin
stream = PortAudioStream()
buf = SampleBuf(rand(eltype(stream), 48000, nchannels(stream.sink))*0.1, samplerate(stream))
stream = PortAudioStream(0, 2)
buf = SampleBuf(rand(eltype(stream), 48000, nchannels(stream.sink)), samplerate(stream))
t1 = @async write(stream, buf)
t2 = @async write(stream, buf)
@test wait(t1) == 48000
@ -109,7 +109,7 @@ using SampledSignals
close(stream)
end
@testset "Queued Reading" begin
stream = PortAudioStream()
stream = PortAudioStream(2, 0)
buf = SampleBuf(rand(eltype(stream), 48000, nchannels(stream.source)), samplerate(stream))
t1 = @async read!(stream, buf)
t2 = @async read!(stream, buf)