performance improvements

This commit is contained in:
Brandon Taylor 2021-07-03 17:22:49 -04:00
parent 3c6bb1a27f
commit 8d6abcb785

View file

@ -110,7 +110,9 @@ function initialize()
# ALSA will throw extraneous warnings on start-up
# send them to debug instead
debug_message = @capture_err handle_status(Pa_Initialize())
@debug debug_message
if !isempty(debug_message)
@debug debug_message
end
end
function terminate()
@ -260,9 +262,15 @@ abstract type Scribe end
struct SampledSignalsReader{Sample} <: Scribe
warn_xruns::Bool
end
function SampledSignalsReader(; Sample = Float32, warn_xruns = true)
SampledSignalsReader{Sample}(warn_xruns)
end
struct SampledSignalsWriter{Sample} <: Scribe
warn_xruns::Bool
end
function SampledSignalsWriter(; Sample = Float32, warn_xruns = true)
SampledSignalsWriter{Sample}(warn_xruns)
end
# define on types
# throw an error if not defined
@ -295,48 +303,59 @@ function get_output_type(::Type{<:Union{SampledSignalsReader, SampledSignalsWrit
Int
end
function cut_to_size(buffer, julia_buffer, use_frames, offset, already)
port_audio_range = 1:use_frames
(
view(buffer.data, :, port_audio_range),
# the julia buffer is longer, so we might need to start from the middle
view(julia_buffer, port_audio_range .+ offset .+ already, :),
function full_write!(buffer, julia_buffer, already)
chunk_frames = buffer.chunk_frames
@inbounds transpose!(
buffer.data,
view(julia_buffer, (1:chunk_frames) .+ already, :),
)
write_buffer(buffer, chunk_frames)
end
function (writer::SampledSignalsWriter)(buffer, (julia_buffer, offset, frame_count))
already = 0
chunk_frames = buffer.chunk_frames
# if we still have frames to write
while already < frame_count
# take either a whole chunk, or whatever is left if it's smaller
use_frames = min(frame_count - already, chunk_frames)
port_audio_view, julia_view =
cut_to_size(buffer, julia_buffer, use_frames, offset, already)
# we need to transpose column-major buffer from Julia back and forth between the row-major buffer from PortAudio
# transpose, then send the data
transpose!(port_audio_view, julia_view)
write_buffer(buffer, use_frames)
already += use_frames
end
already
foreach(
let buffer = buffer, julia_buffer = julia_buffer
already -> full_write!(buffer, julia_buffer, already)
end,
# keep going until there is less than a chunk left
offset:chunk_frames:(offset + frame_count - chunk_frames),
)
left = frame_count % chunk_frames
port_audio_range = 1:left
@inbounds transpose!(
view(buffer.data, :, port_audio_range),
view(julia_buffer, port_audio_range .+ (offset + frame_count - left), :),
)
write_buffer(buffer, left)
frame_count
end
function full_read!(buffer, julia_buffer, already)
chunk_frames = buffer.chunk_frames
read_buffer(buffer, chunk_frames)
@inbounds transpose!(
view(julia_buffer, (1:chunk_frames) .+ already, :),
buffer.data,
)
end
function (reader::SampledSignalsReader)(buffer, (julia_buffer, offset, frame_count))
already = 0
chunk_frames = buffer.chunk_frames
# if we still have frames to write
while already < frame_count
# take either a whole chunk, or whatever is left if it's smaller
use_frames = min(frame_count - already, chunk_frames)
# receive the data, then transpose
read_buffer(buffer, use_frames)
port_audio_view, julia_view =
cut_to_size(buffer, julia_buffer, use_frames, offset, already)
transpose!(julia_view, port_audio_view)
already += use_frames
end
already
foreach(
let buffer = buffer, julia_buffer = julia_buffer
already -> full_read!(buffer, julia_buffer, already)
end,
offset:chunk_frames:(offset + frame_count - chunk_frames),
)
left = frame_count % chunk_frames
port_audio_range = 1:left
read_buffer(buffer, left)
@inbounds transpose!(
view(julia_buffer, port_audio_range .+ (offset + frame_count - left), :),
view(buffer.data, :, port_audio_range),
)
frame_count
end
# a Buffer contains not just a buffer
@ -351,6 +370,7 @@ struct Buffer{Sample, Scribe, InputType, OutputType}
scribe::Scribe
inputs::Channel{InputType}
outputs::Channel{OutputType}
debug::IOStream
end
has_channels(something) = nchannels(something) > 0
@ -360,9 +380,10 @@ function buffer_task(
pointer_to,
device,
channels,
scribe::Scribe;
scribe::Scribe,
debug;
Sample = Float32,
chunk_frames = 128,
chunk_frames = 128
) where {Scribe}
InputType = get_input_type(scribe)
OutputType = get_output_type(scribe)
@ -379,16 +400,25 @@ function buffer_task(
scribe,
input_channel,
output_channel,
debug
)
# we will spawn new threads to read from and write to port audio
# while the reading thread is talking to PortAudio, the writing thread can be setting up, and vice versa
# start the scribe thread when its created
# if there's channels at all
# we can't make the task a field of the buffer, because the task uses the buffer
task = Task(() -> run_scribe(buffer))
task = Task(let buffer = buffer
# xruns will return an error code and send a duplicate warning to stderr
# since we handle the error codes, we don't need the duplicate warnings
# so we send them to a debug log
() -> redirect_stderr(buffer.debug) do
run_scribe(buffer)
end
end)
task.sticky = false
if has_channels(buffer)
schedule(task)
bind(output_channel, task)
else
close(input_channel)
close(output_channel)
@ -416,6 +446,7 @@ struct PortAudioStream{SinkBuffer, SourceBuffer}
sink_task::Task
source_buffer::SourceBuffer
source_task::Task
debug::IOStream
end
# portaudio uses codes instead of types for the sample format
@ -428,16 +459,12 @@ const TYPE_TO_FORMAT = Dict{Type, PaSampleFormat}(
UInt8 => 3,
)
# we need to convert nothing so it will be handled by C correctly
nothing_to_c_null(::Nothing) = C_NULL
nothing_to_c_null(something) = something
function make_parameters(
device,
channels,
latency;
Sample = Float32,
host_api_specific_stream_info = nothing,
host_api_specific_stream_info = C_NULL,
)
if channels == 0
# if we don't need any channels, we don't need the source/sink at all
@ -449,7 +476,7 @@ function make_parameters(
channels,
TYPE_TO_FORMAT[Sample],
latency,
nothing_to_c_null(host_api_specific_stream_info),
host_api_specific_stream_info,
),
)
end
@ -556,14 +583,15 @@ function PortAudioStream(
chunk_frames = 128,
frames_per_buffer = 0,
flags = paNoFlag,
call_back = nothing,
user_data = nothing,
input_info = nothing,
output_info = nothing,
call_back = C_NULL,
user_data = C_NULL,
input_info = C_NULL,
output_info = C_NULL,
stream_lock = ReentrantLock(),
# this is where you can insert custom readers or writers instead
writer = nothing,
reader = nothing,
writer = SampledSignalsWriter(; Sample = Sample, warn_xruns = warn_xruns),
reader = SampledSignalsReader(; Sample = Sample, warn_xruns = warn_xruns),
debug = mktemp()[2]
)
input_channels_filled =
fill_max_channels("input", input_device, input_device.input_bounds, input_channels)
@ -611,12 +639,6 @@ function PortAudioStream(
throw(ArgumentError("Input or output must have at least 1 channel"))
end
end
if writer === nothing
writer = SampledSignalsWriter{Sample}(warn_xruns)
end
if reader === nothing
reader = SampledSignalsReader{Sample}(warn_xruns)
end
# we need a mutable pointer so portaudio can set it for us
mutable_pointer = Ref{Ptr{PaStream}}(0)
handle_status(
@ -638,8 +660,8 @@ function PortAudioStream(
sample_rate,
frames_per_buffer,
flags,
nothing_to_c_null(call_back),
nothing_to_c_null(user_data),
call_back,
user_data,
),
)
pointer_to = mutable_pointer[]
@ -652,7 +674,8 @@ function PortAudioStream(
pointer_to,
output_device,
output_channels_filled,
writer;
writer,
debug;
Sample = Sample,
chunk_frames = chunk_frames,
)...,
@ -661,10 +684,12 @@ function PortAudioStream(
pointer_to,
input_device,
input_channels_filled,
reader;
reader,
debug;
Sample = Sample,
chunk_frames = chunk_frames,
)...,
debug
)
end
@ -717,17 +742,18 @@ function PortAudioStream(do_function::Function, arguments...; keywords...)
end
function close(stream::PortAudioStream)
# this will shut down the channels, which will shut down the threads
sink_buffer = stream.sink_buffer
if has_channels(sink_buffer)
close(sink_buffer)
# wait for tasks to finish to make sure any errors get caught
wait(stream.sink_task)
end
source_buffer = stream.source_buffer
sink_buffer = stream.sink_buffer
if has_channels(source_buffer)
close(source_buffer)
# this will shut down the channels, which will shut down the threads
close(source_buffer.inputs)
# wait for tasks to finish to make sure any errors get caught
wait(stream.source_task)
# output channels will close because they are bound to the task
end
if has_channels(sink_buffer)
close(sink_buffer.inputs)
wait(stream.sink_task)
end
pointer_to = stream.pointer_to
# only stop if it's not already stopped
@ -735,6 +761,11 @@ function close(stream::PortAudioStream)
handle_status(Pa_StopStream(pointer_to))
end
handle_status(Pa_CloseStream(pointer_to))
debug_log = read(stream.debug, String)
# this will contain duplicate xrun warnings mentioned above
if !isempty(debug_log)
@debug debug_log
end
end
function isopen(pointer_to::Ptr{PaStream})