From 8d6abcb7856eb92816f070fc6dbef7427301b139 Mon Sep 17 00:00:00 2001 From: Brandon Taylor Date: Sat, 3 Jul 2021 17:22:49 -0400 Subject: [PATCH] performance improvements --- src/PortAudio.jl | 165 ++++++++++++++++++++++++++++------------------- 1 file changed, 98 insertions(+), 67 deletions(-) diff --git a/src/PortAudio.jl b/src/PortAudio.jl index 3e89e29..ecd7986 100644 --- a/src/PortAudio.jl +++ b/src/PortAudio.jl @@ -110,7 +110,9 @@ function initialize() # ALSA will throw extraneous warnings on start-up # send them to debug instead debug_message = @capture_err handle_status(Pa_Initialize()) - @debug debug_message + if !isempty(debug_message) + @debug debug_message + end end function terminate() @@ -260,9 +262,15 @@ abstract type Scribe end struct SampledSignalsReader{Sample} <: Scribe warn_xruns::Bool end +function SampledSignalsReader(; Sample = Float32, warn_xruns = true) + SampledSignalsReader{Sample}(warn_xruns) +end struct SampledSignalsWriter{Sample} <: Scribe warn_xruns::Bool end +function SampledSignalsWriter(; Sample = Float32, warn_xruns = true) + SampledSignalsWriter{Sample}(warn_xruns) +end # define on types # throw an error if not defined @@ -295,48 +303,59 @@ function get_output_type(::Type{<:Union{SampledSignalsReader, SampledSignalsWrit Int end -function cut_to_size(buffer, julia_buffer, use_frames, offset, already) - port_audio_range = 1:use_frames - ( - view(buffer.data, :, port_audio_range), - # the julia buffer is longer, so we might need to start from the middle - view(julia_buffer, port_audio_range .+ offset .+ already, :), +function full_write!(buffer, julia_buffer, already) + chunk_frames = buffer.chunk_frames + @inbounds transpose!( + buffer.data, + view(julia_buffer, (1:chunk_frames) .+ already, :), ) + write_buffer(buffer, chunk_frames) end function (writer::SampledSignalsWriter)(buffer, (julia_buffer, offset, frame_count)) - already = 0 chunk_frames = buffer.chunk_frames - # if we still have frames to write - while already < frame_count - # take either a whole chunk, or whatever is left if it's smaller - use_frames = min(frame_count - already, chunk_frames) - port_audio_view, julia_view = - cut_to_size(buffer, julia_buffer, use_frames, offset, already) - # we need to transpose column-major buffer from Julia back and forth between the row-major buffer from PortAudio - # transpose, then send the data - transpose!(port_audio_view, julia_view) - write_buffer(buffer, use_frames) - already += use_frames - end - already + foreach( + let buffer = buffer, julia_buffer = julia_buffer + already -> full_write!(buffer, julia_buffer, already) + end, + # keep going until there is less than a chunk left + offset:chunk_frames:(offset + frame_count - chunk_frames), + ) + left = frame_count % chunk_frames + port_audio_range = 1:left + @inbounds transpose!( + view(buffer.data, :, port_audio_range), + view(julia_buffer, port_audio_range .+ (offset + frame_count - left), :), + ) + write_buffer(buffer, left) + frame_count +end + +function full_read!(buffer, julia_buffer, already) + chunk_frames = buffer.chunk_frames + read_buffer(buffer, chunk_frames) + @inbounds transpose!( + view(julia_buffer, (1:chunk_frames) .+ already, :), + buffer.data, + ) end function (reader::SampledSignalsReader)(buffer, (julia_buffer, offset, frame_count)) - already = 0 chunk_frames = buffer.chunk_frames - # if we still have frames to write - while already < frame_count - # take either a whole chunk, or whatever is left if it's smaller - use_frames = min(frame_count - already, chunk_frames) - # receive the data, then transpose - read_buffer(buffer, use_frames) - port_audio_view, julia_view = - cut_to_size(buffer, julia_buffer, use_frames, offset, already) - transpose!(julia_view, port_audio_view) - already += use_frames - end - already + foreach( + let buffer = buffer, julia_buffer = julia_buffer + already -> full_read!(buffer, julia_buffer, already) + end, + offset:chunk_frames:(offset + frame_count - chunk_frames), + ) + left = frame_count % chunk_frames + port_audio_range = 1:left + read_buffer(buffer, left) + @inbounds transpose!( + view(julia_buffer, port_audio_range .+ (offset + frame_count - left), :), + view(buffer.data, :, port_audio_range), + ) + frame_count end # a Buffer contains not just a buffer @@ -351,6 +370,7 @@ struct Buffer{Sample, Scribe, InputType, OutputType} scribe::Scribe inputs::Channel{InputType} outputs::Channel{OutputType} + debug::IOStream end has_channels(something) = nchannels(something) > 0 @@ -360,9 +380,10 @@ function buffer_task( pointer_to, device, channels, - scribe::Scribe; + scribe::Scribe, + debug; Sample = Float32, - chunk_frames = 128, + chunk_frames = 128 ) where {Scribe} InputType = get_input_type(scribe) OutputType = get_output_type(scribe) @@ -379,16 +400,25 @@ function buffer_task( scribe, input_channel, output_channel, + debug ) # we will spawn new threads to read from and write to port audio # while the reading thread is talking to PortAudio, the writing thread can be setting up, and vice versa # start the scribe thread when its created # if there's channels at all # we can't make the task a field of the buffer, because the task uses the buffer - task = Task(() -> run_scribe(buffer)) + task = Task(let buffer = buffer + # xruns will return an error code and send a duplicate warning to stderr + # since we handle the error codes, we don't need the duplicate warnings + # so we send them to a debug log + () -> redirect_stderr(buffer.debug) do + run_scribe(buffer) + end + end) task.sticky = false if has_channels(buffer) schedule(task) + bind(output_channel, task) else close(input_channel) close(output_channel) @@ -416,6 +446,7 @@ struct PortAudioStream{SinkBuffer, SourceBuffer} sink_task::Task source_buffer::SourceBuffer source_task::Task + debug::IOStream end # portaudio uses codes instead of types for the sample format @@ -428,16 +459,12 @@ const TYPE_TO_FORMAT = Dict{Type, PaSampleFormat}( UInt8 => 3, ) -# we need to convert nothing so it will be handled by C correctly -nothing_to_c_null(::Nothing) = C_NULL -nothing_to_c_null(something) = something - function make_parameters( device, channels, latency; Sample = Float32, - host_api_specific_stream_info = nothing, + host_api_specific_stream_info = C_NULL, ) if channels == 0 # if we don't need any channels, we don't need the source/sink at all @@ -449,7 +476,7 @@ function make_parameters( channels, TYPE_TO_FORMAT[Sample], latency, - nothing_to_c_null(host_api_specific_stream_info), + host_api_specific_stream_info, ), ) end @@ -556,14 +583,15 @@ function PortAudioStream( chunk_frames = 128, frames_per_buffer = 0, flags = paNoFlag, - call_back = nothing, - user_data = nothing, - input_info = nothing, - output_info = nothing, + call_back = C_NULL, + user_data = C_NULL, + input_info = C_NULL, + output_info = C_NULL, stream_lock = ReentrantLock(), # this is where you can insert custom readers or writers instead - writer = nothing, - reader = nothing, + writer = SampledSignalsWriter(; Sample = Sample, warn_xruns = warn_xruns), + reader = SampledSignalsReader(; Sample = Sample, warn_xruns = warn_xruns), + debug = mktemp()[2] ) input_channels_filled = fill_max_channels("input", input_device, input_device.input_bounds, input_channels) @@ -611,12 +639,6 @@ function PortAudioStream( throw(ArgumentError("Input or output must have at least 1 channel")) end end - if writer === nothing - writer = SampledSignalsWriter{Sample}(warn_xruns) - end - if reader === nothing - reader = SampledSignalsReader{Sample}(warn_xruns) - end # we need a mutable pointer so portaudio can set it for us mutable_pointer = Ref{Ptr{PaStream}}(0) handle_status( @@ -638,8 +660,8 @@ function PortAudioStream( sample_rate, frames_per_buffer, flags, - nothing_to_c_null(call_back), - nothing_to_c_null(user_data), + call_back, + user_data, ), ) pointer_to = mutable_pointer[] @@ -652,7 +674,8 @@ function PortAudioStream( pointer_to, output_device, output_channels_filled, - writer; + writer, + debug; Sample = Sample, chunk_frames = chunk_frames, )..., @@ -661,10 +684,12 @@ function PortAudioStream( pointer_to, input_device, input_channels_filled, - reader; + reader, + debug; Sample = Sample, chunk_frames = chunk_frames, )..., + debug ) end @@ -717,17 +742,18 @@ function PortAudioStream(do_function::Function, arguments...; keywords...) end function close(stream::PortAudioStream) - # this will shut down the channels, which will shut down the threads - sink_buffer = stream.sink_buffer - if has_channels(sink_buffer) - close(sink_buffer) - # wait for tasks to finish to make sure any errors get caught - wait(stream.sink_task) - end source_buffer = stream.source_buffer + sink_buffer = stream.sink_buffer if has_channels(source_buffer) - close(source_buffer) + # this will shut down the channels, which will shut down the threads + close(source_buffer.inputs) + # wait for tasks to finish to make sure any errors get caught wait(stream.source_task) + # output channels will close because they are bound to the task + end + if has_channels(sink_buffer) + close(sink_buffer.inputs) + wait(stream.sink_task) end pointer_to = stream.pointer_to # only stop if it's not already stopped @@ -735,6 +761,11 @@ function close(stream::PortAudioStream) handle_status(Pa_StopStream(pointer_to)) end handle_status(Pa_CloseStream(pointer_to)) + debug_log = read(stream.debug, String) + # this will contain duplicate xrun warnings mentioned above + if !isempty(debug_log) + @debug debug_log + end end function isopen(pointer_to::Ptr{PaStream})