callback creation/scheduling seems to be working

This commit is contained in:
Spencer Russell 2016-03-22 22:45:40 -04:00
parent bea06357b8
commit f02e733fe7
3 changed files with 258 additions and 228 deletions

View file

@ -1,3 +1,5 @@
__precompile__()
module PortAudio
using SampleTypes
@ -7,7 +9,7 @@ using Devectorize
include( "../deps/deps.jl")
include("libportaudio.jl")
export PortAudioSink, PortAudioSource
export PortAudioStream, PortAudioSink, PortAudioSource
const DEFAULT_BUFSIZE=4096
@ -15,27 +17,14 @@ function __init__()
# initialize PortAudio on module load
Pa_Initialize()
global const portaudio_callback_float =
cfunction(portaudio_callback, Cint,
(Ptr{Float32}, Ptr{Float32}, Culong, Ptr{Void}, Culong,
Ptr{CallbackInfo{Float32}}))
global const portaudio_callback_int32 =
cfunction(portaudio_callback, Cint,
(Ptr{Int32}, Ptr{Int32}, Culong, Ptr{Void}, Culong,
Ptr{CallbackInfo{Int32}}))
# TODO: figure out how we're handling Int24
global const portaudio_callback_int16 =
cfunction(portaudio_callback, Cint,
(Ptr{Int16}, Ptr{Int16}, Culong, Ptr{Void}, Culong,
Ptr{CallbackInfo{Int16}}))
global const portaudio_callback_int8 =
cfunction(portaudio_callback, Cint,
(Ptr{Int8}, Ptr{Int8}, Culong, Ptr{Void}, Culong,
Ptr{CallbackInfo{Int8}}))
global const portaudio_callback_uint8 =
cfunction(portaudio_callback, Cint,
(Ptr{UInt8}, Ptr{UInt8}, Culong, Ptr{Void}, Culong,
Ptr{CallbackInfo{UInt8}}))
# the portaudio callbacks are parametric on the sample type
global const pa_callbacks = Dict{Type, Ptr{Void}}()
for T in (Float32, Int32, Int16, Int8, UInt8)
pa_callbacks[T] = cfunction(portaudio_callback, Cint,
(Ptr{T}, Ptr{T}, Culong, Ptr{Void}, Culong,
Ptr{CallbackInfo{T}}))
end
end
function versioninfo(io::IO=STDOUT)
@ -71,12 +60,13 @@ devnames() = join(["\"$(dev.name)\"" for dev in devices()], "\n")
function fieldptr{T}(obj::T, field::Symbol)
fieldnum = findfirst(fieldnames(T), field)
offset = fieldoffsets(T)[fieldnum]
FT = fieldtype(T, field)
pointer_from_objref(obj) + offset
Ptr{FT}(pointer_from_objref(obj) + offset)
end
# Used to synchronize the portaudio callback and Julia task
@enum BufferState JuliaPending PortaudioPending
@enum BufferState JuliaPending PortAudioPending
# we want this to be immutable so we can stack allocate it
immutable CallbackInfo{T}
@ -90,50 +80,50 @@ end
# paramaterized on the sample type and sampling rate type
type PortAudioStream{T, U}
stream::PaStream
name::UTF8String
samplerate::U
bufsize::Int
stream::PaStream
sink # untyped because of circular type definition
source # untyped because of circular type definition
bufinfo::CallbackInfo{T}
bufstate::BufferState
taskwork::Base.SingleAsyncWork
bufstate::BufferState # used to synchronize the portaudio and julia sides
bufinfo::CallbackInfo{T} # immutable data used in the portaudio callback
function PortAudioStream(T, stream, sr, inchans, outchans, bufsize, name)
this = new(stream, utf8(name), sr, bufsize)
# this inner constructor is generally called via the top-level outer
# constructor below
function PortAudioStream(indev::PortAudioDevice, outdev::PortAudioDevice,
sr, inchans, outchans, bufsize)
inparams = (inchans == 0) ?
Ptr{Pa_StreamParameters}(0) :
Ref(Pa_StreamParameters(indev.idx, inchans, type_to_fmt[T], 0.0, C_NULL))
outparams = (outchans == 0) ?
Ptr{Pa_StreamParameters}(0) :
Ref(Pa_StreamParameters(outdev.idx, outchans, type_to_fmt[T], 0.0, C_NULL))
this = new(sr, bufsize, C_NULL)
finalizer(this, close)
this.sink = PortAudioSink{T, U}(this, outchans, bufsize)
this.source = PortAudioSource{T, U}(this, inchans, bufsize)
this.sink = PortAudioSink{T, U}(outdev.name, this, outchans, bufsize)
this.source = PortAudioSource{T, U}(indev.name, this, inchans, bufsize)
this.taskwork = Base.SingleAsyncWork(data -> audiotask(this))
inbuf = pointer_from_objref(this.source) + fieldoffsets(PortAudioSource)[]
this.bufstate = PortAudioPending
this.bufinfo = CallbackInfo(inchans, fieldptr(this.source, :pabuf),
outchans, fieldptr(this.sink, :pabuf),
this.bufinfo = CallbackInfo(inchans, pointer(this.source.pabuf),
outchans, pointer(this.sink.pabuf),
this.taskwork.handle,
fieldptr(this, bufstate))
fieldptr(this, :bufstate))
this.stream = Pa_OpenStream(inparams, outparams, float(sr), bufsize,
paNoFlag, pa_callbacks[T], fieldptr(this, :bufinfo))
Pa_StartStream(stream)
Pa_StartStream(this.stream)
this
end
end
# this is the to-level outer constructor that all the other outer constructors
# end up calling
function PortAudioStream(indev::PortAudioDevice, outdev::PortAudioDevice,
eltype=Float32, sr=48000Hz, inchans=2, outchans=2, bufsize=DEFAULT_BUFSIZE)
if inchans == 0
inparams = Ptr{Pa_StreamParameters}(0)
else
inparams = Ref(Pa_StreamParameters(indev.idx, inchans, type_to_fmt[eltype], 0.0, C_NULL))
end
if outchans == 0
outparams = Ptr{Pa_StreamParameters}(0)
else
outparams = Ref(Pa_StreamParameters(outdev.idx, outchans, type_to_fmt[eltype], 0.0, C_NULL))
end
stream = Pa_OpenStream(inparams, outparams, float(sr), bufsize, paNoFlag)
PortAudioStream{eltype, typeof(sr)}(eltype, stream, sr, inchans, outchans, bufsize, device.name)
PortAudioStream{eltype, typeof(sr)}(indev, outdev, sr, inchans, outchans, bufsize)
end
function PortAudioStream(indevname::AbstractString, outdevname::AbstractString, args...)
@ -161,37 +151,13 @@ end
PortAudioStream(device::PortAudioDevice, args...) = PortAudioStream(device, device, args...)
PortAudioStream(device::AbstractString, args...) = PortAudioStream(device, device, args...)
# use the default input and output devices
function PortAudioStream(args...)
outidx = Pa_GetDefaultOutputDevice()
outdevice = PortAudioDevice(Pa_GetDeviceInfo(outidx), outidx)
inidx = Pa_GetDefaultInputDevice()
indevice = PortAudioDevice(Pa_GetDeviceInfo(inidx), inidx)
PortAudioSink(indevice, outdevice, args...)
end
for (TypeName, Super) in ((:PortAudioSink, :SampleSink),
(:PortAudioSource, :SampleSource))
@eval type $TypeName{T, U} <: $Super
stream::PortAudioStream{T, U}
waiters::Vector{Condition}
jlbuf::Array{T, 2}
pabuf::Array{T, 2}
function $TypeName(stream, channels, bufsize)
jlbuf = zeros(T, busize, channels)
pabuf = zeros(T, channels, bufsize)
new(stream, Condition[], jlbuf, pabuf)
end
end
end
# most of these methods are the same for Sources and Sinks, so define them on
# the union
typealias PortAudioStream{T, U} Union{PortAudioSink{T, U}, PortAudioSource{T, U}}
function Base.show{T <: PortAudioStream}(io::IO, stream::T)
println(io, T, "(\"", stream.name, "\")")
print(io, nchannels(stream), " channels sampled at ", samplerate(stream))
outidx = Pa_GetDefaultOutputDevice()
outdevice = PortAudioDevice(Pa_GetDeviceInfo(outidx), outidx)
PortAudioStream(indevice, outdevice, args...)
end
function Base.close(stream::PortAudioStream)
@ -202,99 +168,124 @@ function Base.close(stream::PortAudioStream)
end
end
SampleTypes.nchannels(stream::PortAudioStream) = size(stream.jlbuf, 2)
SampleTypes.samplerate(stream::PortAudioStream) = stream.samplerate
Base.eltype{T, U}(::PortAudioStream{T, U}) = T
function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf)
if sink.busy
c = Condition()
push!(sink.waiters, c)
wait(c)
shift!(sink.waiters)
# Define our source and sink types
for (TypeName, Super) in ((:PortAudioSink, :SampleSink),
(:PortAudioSource, :SampleSource))
@eval type $TypeName{T, U} <: $Super
name::UTF8String
stream::PortAudioStream{T, U}
waiters::Vector{Condition}
jlbuf::Array{T, 2}
pabuf::Array{T, 2}
function $TypeName(name, stream, channels, bufsize)
# portaudio data comes in interleaved, so we'll end up transposing
# it back and forth to julia column-major
jlbuf = zeros(T, bufsize, channels)
pabuf = zeros(T, channels, bufsize)
new(name, stream, Condition[], jlbuf, pabuf)
end
total = nframes(buf)
written = 0
try
sink.busy = true
while written < total
n = min(size(sink.pabuf, 2), total-written, Pa_GetStreamWriteAvailable(sink.stream))
bufstart = 1+written
bufend = n+written
@devec sink.jlbuf[1:n, :] = buf[bufstart:bufend, :]
transpose!(sink.pabuf, sink.jlbuf)
Pa_WriteStream(sink.stream, sink.pabuf, n, false)
written += n
sleep(POLL_SECONDS)
end
finally
# make sure we release the busy flag even if the user ctrl-C'ed out
sink.busy = false
if length(sink.waiters) > 0
# let the next task in line go
notify(sink.waiters[1])
end
end
written
end
# function Base.show{T <: Union{PortAudioSink, PortAudioSource}}(io::IO, stream::T)
# println(io, T, "(\"", stream.name, "\")")
# print(io, nchannels(stream), " channels sampled at ", samplerate(stream))
# end
function SampleTypes.unsafe_read!(source::PortAudioSource, buf::SampleBuf)
if source.busy
c = Condition()
push!(source.waiters, c)
wait(c)
shift!(source.waiters)
end
SampleTypes.nchannels(s::Union{PortAudioSink, PortAudioSource}) = size(s.jlbuf, 2)
SampleTypes.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s.stream)
Base.eltype{T, U}(::Union{PortAudioSink{T, U}, PortAudioSource{T, U}}) = T
total = nframes(buf)
read = 0
# function SampleTypes.unsafe_write(sink::PortAudioSink, buf::SampleBuf)
# if sink.busy
# c = Condition()
# push!(sink.waiters, c)
# wait(c)
# shift!(sink.waiters)
# end
#
# total = nframes(buf)
# written = 0
# try
# sink.busy = true
#
# while written < total
# n = min(size(sink.pabuf, 2), total-written, Pa_GetStreamWriteAvailable(sink.stream))
# bufstart = 1+written
# bufend = n+written
# @devec sink.jlbuf[1:n, :] = buf[bufstart:bufend, :]
# transpose!(sink.pabuf, sink.jlbuf)
# Pa_WriteStream(sink.stream, sink.pabuf, n, false)
# written += n
# sleep(POLL_SECONDS)
# end
# finally
# # make sure we release the busy flag even if the user ctrl-C'ed out
# sink.busy = false
# if length(sink.waiters) > 0
# # let the next task in line go
# notify(sink.waiters[1])
# end
# end
#
# written
# end
#
# function SampleTypes.unsafe_read!(source::PortAudioSource, buf::SampleBuf)
# if source.busy
# c = Condition()
# push!(source.waiters, c)
# wait(c)
# shift!(source.waiters)
# end
#
# total = nframes(buf)
# read = 0
#
# try
# source.busy = true
#
# while read < total
# n = min(size(source.pabuf, 2), total-read, Pa_GetStreamReadAvailable(source.stream))
# Pa_ReadStream(source.stream, source.pabuf, n, false)
# transpose!(source.jlbuf, source.pabuf)
# bufstart = 1+read
# bufend = n+read
# @devec buf[bufstart:bufend, :] = source.jlbuf[1:n, :]
# read += n
# sleep(POLL_SECONDS)
# end
#
# finally
# source.busy = false
# if length(source.waiters) > 0
# # let the next task in line go
# notify(source.waiters[1])
# end
# end
#
# read
# end
try
source.busy = true
while read < total
n = min(size(source.pabuf, 2), total-read, Pa_GetStreamReadAvailable(source.stream))
Pa_ReadStream(source.stream, source.pabuf, n, false)
transpose!(source.jlbuf, source.pabuf)
bufstart = 1+read
bufend = n+read
@devec buf[bufstart:bufend, :] = source.jlbuf[1:n, :]
read += n
sleep(POLL_SECONDS)
end
finally
source.busy = false
if length(source.waiters) > 0
# let the next task in line go
notify(source.waiters[1])
end
end
read
end
"""This is the callback function that gets called directly in the PortAudio
audio thread, so it's critical that it not interact with the Julia GC"""
# This is the callback function that gets called directly in the PortAudio
# audio thread, so it's critical that it not interact with the Julia GC
function portaudio_callback{T}(inptr::Ptr{T}, outptr::Ptr{T},
nframes, timeinfo, flags, userdata::Ptr{Ptr{Void}})
infoptr = Ptr{BufferInfo{T}}(unsafe_load(userdata, 1))
info = unsafe_load(infoptr)
bufstateptr = Ptr{BufferState}(unsafe_load(userdata, 2))
bufstate = unsafe_load(bufstateptr)
nframes, timeinfo, flags, userdata::Ptr{CallbackInfo{T}})
info = unsafe_load(userdata)
if(bufstate != PortAudioPending)
if(unsafe_load(info.bufstate) != PortAudioPending)
# xrun, copy zeros to outbuffer
memset(info.outbuf, 0, sizeof(T)*nframes*info.outchannels)
return
memset(outptr, 0, sizeof(T)*nframes*info.outchannels)
return paContinue
end
unsafe_copy!(info.inbuf, inptr, nframes * info.inchannels)
unsafe_copy!(outptr, info.outbuf, nframes * info.outchannels)
unsafe_store!(bufstateptr, JuliaPending)
unsafe_store!(info.bufstate, JuliaPending)
# notify the julia audio task
ccall(:uv_async_send, Void, (Ptr{Void},), info.taskhandle)
@ -305,21 +296,18 @@ end
# as of portaudio 19.20140130 (which is the HomeBrew version as of 20160319)
# noninterleaved data is not supported for the read/write interface on OSX
# so we need to use another buffer to interleave (transpose)
function audiotask{T}(userdata::Ptr{Ptr{Void}})
infoptr = Ptr{BufferInfo{T}}(unsafe_load(userdata, 1))
info = unsafe_load(infoptr)
bufstateptr = Ptr{BufferState}(unsafe_load(userdata, 2))
bufstate = unsafe_load(bufstateptr)
if info.bufstate != JuliaPending
function audiotask{T, U}(stream::PortAudioStream{T, U})
if stream.bufstate != JuliaPending
return
end
unsafe_store!(bufstateptr, PortaudioPending)
end
# do stuff
end # module PortAudio
stream.bufstate = PortAudioPending
end
memset(buf, val, count) = ccall(:memset, Ptr{Void},
(Ptr{Void}, Cint, Csize_t),
buf, val, count)
end # module PortAudio

View file

@ -39,9 +39,9 @@ const type_to_fmt = Dict{Type, PaSampleFormat}(
typealias PaStreamCallbackResult Cint
# Callback return values
const paContinue 0
const paComplete 1
const paAbort 2
const paContinue = PaStreamCallbackResult(0)
const paComplete = PaStreamCallbackResult(1)
const paAbort = PaStreamCallbackResult(2)
function Pa_Initialize()
err = ccall((:Pa_Initialize, libportaudio), PaError, ())

View file

@ -7,67 +7,109 @@ using SampleTypes
# these test are currently set up to run on OSX
@testset "PortAudio Tests" begin
@testset "Reports version" begin
io = IOBuffer()
PortAudio.versioninfo(io)
result = takebuf_string(io)
# make sure this is the same version I tested with
@test result ==
"""PortAudio V19-devel (built Aug 6 2014 17:54:39)
Version Number: 1899
"""
end
@testset "Open Default Device" begin
devs = PortAudio.devices()
source = PortAudioSource()
sink = PortAudioSink()
buf = read(source, 0.1s)
@test size(buf) == (round(Int, 0.1s * samplerate(source)), nchannels(source))
write(sink, buf)
close(source)
close(sink)
end
@testset "Open Device by name" begin
devs = PortAudio.devices()
source = PortAudioSource("Built-in Microph")
sink = PortAudioSink("Built-in Output")
buf = read(source, 0.1s)
@test size(buf) == (round(Int, 0.1s * samplerate(source)), nchannels(source))
write(sink, buf)
io = IOBuffer()
show(io, source)
@test takebuf_string(io) ==
"""PortAudio.PortAudioSource{Float32,SIUnits.SIQuantity{Int64,0,0,-1,0,0,0,0,0,0}}("Built-in Microph")
2 channels sampled at 48000 s⁻¹"""
show(io, sink)
@test takebuf_string(io) ==
"""PortAudio.PortAudioSink{Float32,SIUnits.SIQuantity{Int64,0,0,-1,0,0,0,0,0,0}}("Built-in Output")
2 channels sampled at 48000 s⁻¹"""
close(source)
close(sink)
end
@testset "Error on wrong name" begin
@test_throws ErrorException PortAudioSource("foobarbaz")
@test_throws ErrorException PortAudioSink("foobarbaz")
end
# no way to check that the right data is actually getting read or written here,
# but at least it's not crashing.
@testset "Queued Writing" begin
sink = PortAudioSink()
buf = SampleBuf(rand(eltype(sink), 48000, nchannels(sink))*0.1, samplerate(sink))
t1 = @async write(sink, buf)
t2 = @async write(sink, buf)
@test wait(t1) == 48000
@test wait(t2) == 48000
close(sink)
end
@testset "Queued Reading" begin
source = PortAudioSource()
buf = SampleBuf(rand(eltype(source), 48000, nchannels(source)), samplerate(source))
t1 = @async read!(source, buf)
t2 = @async read!(source, buf)
@test wait(t1) == 48000
@test wait(t2) == 48000
close(source)
# @testset "Reports version" begin
# io = IOBuffer()
# PortAudio.versioninfo(io)
# result = takebuf_string(io)
# # make sure this is the same version I tested with
# @test result ==
# """PortAudio V19-devel (built Aug 6 2014 17:54:39)
# Version Number: 1899
# """
# end
@testset "PortAudio Callback works and doesn't allocate" begin
inbuf = rand(Float32, 2, 8)
outbuf = Array(Float32, 2, 8)
sinkbuf = rand(Float32, 2, 8)
sourcebuf = Array(Float32, 2, 8)
state = Ref(PortAudio.PortAudioPending)
work = Base.SingleAsyncWork(data -> nothing)
info = PortAudio.CallbackInfo(2, pointer(sourcebuf),
2, pointer(sinkbuf),
work.handle,
Ptr{PortAudio.BufferState}(pointer_from_objref(state)))
# handle any conversions here so they don't mess with the allocation
inptr = pointer(inbuf)
outptr = pointer(outbuf)
nframes = Culong(8)
flags = Culong(0)
infoptr = Ptr{PortAudio.CallbackInfo{Float32}}(pointer_from_objref(info))
ret = PortAudio.portaudio_callback(inptr, outptr, nframes, C_NULL, flags, infoptr)
@test isa(ret, Cint)
@test ret == PortAudio.paContinue
@test outbuf == sinkbuf
@test inbuf == sourcebuf
@test state[] == PortAudio.JuliaPending
# call again (underrun)
ret = PortAudio.portaudio_callback(inptr, outptr, nframes, C_NULL, flags, infoptr)
@test isa(ret, Cint)
@test ret == PortAudio.paContinue
@test outbuf == zeros(Float32, 2, 8)
# test allocation
state[] = PortAudio.PortAudioPending
alloc = @allocated PortAudio.portaudio_callback(inptr, outptr, nframes, C_NULL, flags, infoptr)
@test alloc == 0
# now test allocation in underrun state
alloc = @allocated PortAudio.portaudio_callback(inptr, outptr, nframes, C_NULL, flags, infoptr)
@test alloc == 0
end
# @testset "Open Default Device" begin
# devs = PortAudio.devices()
# source = PortAudioSource()
# sink = PortAudioSink()
# buf = read(source, 0.1s)
# @test size(buf) == (round(Int, 0.1s * samplerate(source)), nchannels(source))
# write(sink, buf)
# close(source)
# close(sink)
# end
# @testset "Open Device by name" begin
# devs = PortAudio.devices()
# source = PortAudioSource("Built-in Microph")
# sink = PortAudioSink("Built-in Output")
# buf = read(source, 0.1s)
# @test size(buf) == (round(Int, 0.1s * samplerate(source)), nchannels(source))
# write(sink, buf)
# io = IOBuffer()
# show(io, source)
# @test takebuf_string(io) ==
# """PortAudio.PortAudioSource{Float32,SIUnits.SIQuantity{Int64,0,0,-1,0,0,0,0,0,0}}("Built-in Microph")
# 2 channels sampled at 48000 s⁻¹"""
# show(io, sink)
# @test takebuf_string(io) ==
# """PortAudio.PortAudioSink{Float32,SIUnits.SIQuantity{Int64,0,0,-1,0,0,0,0,0,0}}("Built-in Output")
# 2 channels sampled at 48000 s⁻¹"""
# close(source)
# close(sink)
# end
# @testset "Error on wrong name" begin
# @test_throws ErrorException PortAudioSource("foobarbaz")
# @test_throws ErrorException PortAudioSink("foobarbaz")
# end
# # no way to check that the right data is actually getting read or written here,
# # but at least it's not crashing.
# @testset "Queued Writing" begin
# sink = PortAudioSink()
# buf = SampleBuf(rand(eltype(sink), 48000, nchannels(sink))*0.1, samplerate(sink))
# t1 = @async write(sink, buf)
# t2 = @async write(sink, buf)
# @test wait(t1) == 48000
# @test wait(t2) == 48000
# close(sink)
# end
# @testset "Queued Reading" begin
# source = PortAudioSource()
# buf = SampleBuf(rand(eltype(source), 48000, nchannels(source)), samplerate(source))
# t1 = @async read!(source, buf)
# t2 = @async read!(source, buf)
# @test wait(t1) == 48000
# @test wait(t2) == 48000
# close(source)
# end
end