From 6de81d1227ca6de39d9d294b7a1bb6c1b8da136f Mon Sep 17 00:00:00 2001 From: Brandon Taylor Date: Sat, 26 Jun 2021 14:24:48 -0400 Subject: [PATCH] separate out SampledSignals part --- src/PortAudio.jl | 345 ++++++++++++++++++++++++++++------------------- 1 file changed, 205 insertions(+), 140 deletions(-) diff --git a/src/PortAudio.jl b/src/PortAudio.jl index ce516f0..383cdcb 100644 --- a/src/PortAudio.jl +++ b/src/PortAudio.jl @@ -1,5 +1,6 @@ module PortAudio +using Base: alloc_buf_hook, Bool using alsa_plugins_jll: alsa_plugins_jll import Base: close, eltype, getproperty, isopen, read, read!, show, showerror, write using Base.Threads: @spawn @@ -160,7 +161,7 @@ function PortAudioDevice(info::PaDeviceInfo, index) PortAudioDevice( unsafe_string(info.name), # replace host api code with its name - unsafe_string(safe_key(Pa_GetHostApiInfo, info.hostApi,).name), + unsafe_string(safe_key(Pa_GetHostApiInfo, info.hostApi).name), info.defaultSampleRate, index, Bounds( @@ -217,36 +218,157 @@ function devices() ] end -const BUFFER_TYPE{Sample} = Array{Sample, 2} +# because we're calling Pa_ReadStream and Pa_WriteStream from separate threads, +# we put a lock around these calls +function write_stream( + stream_lock, + pointer_to, + port_audio_buffer, + chunk_frames; + warn_xruns = true, +) + handle_status( + lock(stream_lock) do + Pa_WriteStream(pointer_to, port_audio_buffer, chunk_frames) + end, + warn_xruns = warn_xruns, + ) +end + +function read_stream( + stream_lock, + pointer_to, + port_audio_buffer, + chunk_frames; + warn_xruns = true, +) + handle_status( + lock(stream_lock) do + Pa_ReadStream(pointer_to, port_audio_buffer, chunk_frames) + end; + warn_xruns = warn_xruns, + ) +end + +# these will do the actual reading and writing +# you can switch out the SampledSignalsReader/Writer defaults if you want more direct access +# a ReaderOrWriter must implement the following interface: +# must have a warn_xruns::Bool field +# must have get_input_type and get_output_type methods +# must overload call for four arguments: +# 1) stream_lock: a lock around the stream +# 2) pointer_to: the pointer to the stream +# 3) port_audio_buffer: the source/sink buffer +# 4) custom inputs +# and return the output type +# this call method should make use of read_stream/write_stream methods above +abstract type ReaderOrWriter end + +struct SampledSignalsReader{Sample} <: ReaderOrWriter + warn_xruns::Bool +end +struct SampledSignalsWriter{Sample} <: ReaderOrWriter + warn_xruns::Bool +end + # inputs will be a triple of the last 3 arguments to unsafe_read/write # we will already have access to the stream itself -const INPUT_CHANNEL_TYPE{Sample} = Channel{Tuple{BUFFER_TYPE{Sample}, Int, Int}} -# outputs are the number of frames read/written -const OUTPUT_CHANNEL_TYPE = Channel{Int} +function get_input_type( + ::Union{<:SampledSignalsReader{Sample}, <:SampledSignalsWriter{Sample}}, +) where {Sample} + Tuple{Array{Sample, 2}, Int, Int} +end +# output is the number of frames read/written +function get_output_type(::Union{SampledSignalsReader, SampledSignalsWriter}) + Int +end + +# we need to transpose column-major buffer from Julia back and forth between the row-major buffer from PortAudio +function translate!( + julia_buffer, + port_audio_buffer, + chunk_frames, + offset, + already, + port_audio_to_julia, +) + port_audio_range = 1:chunk_frames + # the julia buffer is longer, so we might need to start from the middle + julia_view = view(julia_buffer, port_audio_range .+ offset .+ already, :) + port_audio_view = view(port_audio_buffer, :, port_audio_range) + if port_audio_to_julia + transpose!(julia_view, port_audio_view) + else + transpose!(port_audio_view, julia_view) + end +end + +function (writer::SampledSignalsWriter)( + stream_lock, + pointer_to, + port_audio_buffer, + (julia_buffer, offset, frame_count), +) + warn_xruns = writer.warn_xruns + already = 0 + # if we still have frames to write + while already < frame_count + # take either a whole chunk, or whatever is left if it's smaller + chunk_frames = min(frame_count - already, CHUNK_FRAMES) + # transpose, then send the data + translate!(julia_buffer, port_audio_buffer, chunk_frames, offset, already, false) + # TODO: if the stream is closed we just want to return a + # shorter-than-requested frame count instead of throwing an error + write_stream( + stream_lock, + pointer_to, + port_audio_buffer, + chunk_frames; + warn_xruns = warn_xruns, + ) + already += chunk_frames + end + already +end + +function (reader::SampledSignalsReader)( + stream_lock, + pointer_to, + port_audio_buffer, + (julia_buffer, offset, frame_count), +) + warn_xruns = reader.warn_xruns + already = 0 + # if we still have frames to write + while already < frame_count + # take either a whole chunk, or whatever is left if it's smaller + chunk_frames = min(frame_count - already, CHUNK_FRAMES) + # receive the data, then transpose + # TODO: if the stream is closed we just want to return a + # shorter-than-requested frame count instead of throwing an error + read_stream( + stream_lock, + pointer_to, + port_audio_buffer, + chunk_frames; + warn_xruns = warn_xruns, + ) + translate!(julia_buffer, port_audio_buffer, chunk_frames, offset, already, true) + already += chunk_frames + end + already +end # a Messanger contains # the PortAudio device -# the PortAudio buffer # the number of channels # an input channel, for passing inputs to the messanger # an output channel for sending outputs from the messanger -struct Messanger{Sample} +struct Messanger{InputType, OutputType} device::PortAudioDevice - port_audio_buffer::BUFFER_TYPE{Sample} number_of_channels::Int - inputs::INPUT_CHANNEL_TYPE{Sample} - outputs::OUTPUT_CHANNEL_TYPE -end - -function Messanger{Sample}(device, channels) where {Sample} - Messanger( - device, - zeros(Sample, channels, CHUNK_FRAMES), - channels, - # unbuffered channels so putting and taking will block till everyone's ready - INPUT_CHANNEL_TYPE{Sample}(0), - OUTPUT_CHANNEL_TYPE(0), - ) + inputs::Channel{InputType} + outputs::Channel{OutputType} end nchannels(messanger::Messanger) = messanger.number_of_channels @@ -261,12 +383,26 @@ end # PortAudioStream # -struct PortAudioStream{Sample} +struct PortAudioStream{Sample, SinkMessanger, SourceMessanger} sample_rate::Float64 # pointer to the c object pointer_to::Ptr{PaStream} - sink_messanger::Messanger{Sample} - source_messanger::Messanger{Sample} + sink_messanger::SinkMessanger + source_messanger::SourceMessanger +end + +function PortAudioStream{Sample}( + sample_rate::Float64, + pointer_to::Ptr{PaStream}, + sink_messanger::SinkMessanger, + source_messanger::SourceMessanger, +) where {Sample, SinkMessanger, SourceMessanger} + PortAudioStream{Sample, SinkMessanger, SourceMessanger}( + sample_rate, + pointer_to, + sink_messanger, + source_messanger, + ) end # portaudio uses codes instead of types for the sample format @@ -393,17 +529,18 @@ end # the messanger will be running on a separate thread in the background # alternating transposing and # waiting to pass inputs and outputs back and forth to PortAudio -function run_messanger( - a_function, +function run_reader_or_writer( + messanger, + reader_or_writer, + stream_lock, pointer_to, port_audio_buffer, - inputs, - outputs; - warn_xruns = true, ) + inputs = messanger.inputs + outputs = messanger.outputs while true output = if isopen(inputs) - a_function(pointer_to, port_audio_buffer, take!(inputs)...; warn_xruns = warn_xruns) + reader_or_writer(stream_lock, pointer_to, port_audio_buffer, take!(inputs)) else # no frames can be read/written if the input channel is closed 0 @@ -417,112 +554,30 @@ function run_messanger( end end -# we will spawn new threads to read from and write to to port audio -# while the reading thread is talking to PortAudio, the writing thread can be setting up -function start_messanger( - a_function, - Sample, - pointer_to, - device, - channels; - warn_xruns = true, -) - messanger = Messanger{Sample}(device, channels) - port_audio_buffer = messanger.port_audio_buffer - inputs = messanger.inputs - outputs = messanger.outputs +# we will spawn new threads to read from and write to port audio +# while the reading thread is talking to PortAudio, the writing thread can be setting up, and vice versa +function Messanger(reader_or_writer, stream_lock, Sample, pointer_to, device, channels) + InputType = get_input_type(reader_or_writer) + OutputType = get_output_type(reader_or_writer) + # unbuffered channels so putting and taking will block till everyone's ready + port_audio_buffer = zeros(Sample, channels, CHUNK_FRAMES) + messanger = Messanger{InputType, OutputType}( + device, + channels, + Channel{InputType}(0), + Channel{OutputType}(0), + ) # start the messanger thread when its created - @spawn run_messanger( - a_function, + @spawn run_reader_or_writer( + messanger, + reader_or_writer, + stream_lock, pointer_to, port_audio_buffer, - inputs, - outputs; - warn_xruns = warn_xruns, ) messanger end -# we need to transpose column-major buffer from Julia back and forth between the row-major buffer from PortAudio -function translate!( - julia_buffer, - port_audio_buffer, - chunk_frames, - offset, - already, - port_audio_to_julia, -) - port_audio_range = 1:chunk_frames - # the julia buffer is longer, so we might need to start from the middle - julia_view = view(julia_buffer, port_audio_range .+ offset .+ already, :) - port_audio_view = view(port_audio_buffer, :, port_audio_range) - if port_audio_to_julia - transpose!(julia_view, port_audio_view) - else - transpose!(port_audio_view, julia_view) - end -end - -# because we're calling Pa_ReadStream and Pa_WriteStream from separate threads, -# we put a lock around these calls -const PORT_AUDIO_LOCK = ReentrantLock() - -function real_write!( - pointer_to, - port_audio_buffer, - julia_buffer, - offset, - frame_count; - warn_xruns = true, -) - already = 0 - # if we still have frames to write - while already < frame_count - # take either a whole chunk, or whatever is left if it's smaller - chunk_frames = min(frame_count - already, CHUNK_FRAMES) - # transpose, then send the data - translate!(julia_buffer, port_audio_buffer, chunk_frames, offset, already, false) - # TODO: if the stream is closed we just want to return a - # shorter-than-requested frame count instead of throwing an error - handle_status( - lock(PORT_AUDIO_LOCK) do - Pa_WriteStream(pointer_to, port_audio_buffer, chunk_frames) - end, - warn_xruns = warn_xruns, - ) - already += chunk_frames - end - already -end - -function real_read!( - pointer_to, - port_audio_buffer, - julia_buffer, - offset, - frame_count; - warn_xruns = true, -) - already = 0 - # if we still have frames to write - while already < frame_count - # take either a whole chunk, or whatever is left if it's smaller - chunk_frames = min(frame_count - already, CHUNK_FRAMES) - # receive the data, then transpose - # TODO: if the stream is closed we just want to return a - # shorter-than-requested frame count instead of throwing an error - handle_status( - lock(PORT_AUDIO_LOCK) do - Pa_ReadStream(pointer_to, port_audio_buffer, chunk_frames) - end; - warn_xruns = warn_xruns, - ) - translate!(julia_buffer, port_audio_buffer, chunk_frames, offset, already, true) - already += chunk_frames - end - already -end - # this is the top-level outer constructor that all the other outer constructors end up calling """ PortAudioStream(input_channels = 2, output_channels = 2; options...) @@ -572,6 +627,10 @@ function PortAudioStream( user_data = nothing, input_info = nothing, output_info = nothing, + stream_lock = ReentrantLock(), + # this is where you can insert custom readers or writers instead + writer = SampledSignalsWriter{Sample}(warn_xruns), + reader = SampledSignalsReader{Sample}(warn_xruns), ) input_channels_filled, output_channels_filled = fill_both_channels(input_channels, input_device, output_channels, output_device) @@ -607,21 +666,21 @@ function PortAudioStream( PortAudioStream{Sample}( sample_rate, pointer_to, - start_messanger( - real_write!, + Messanger( + writer, + stream_lock, Sample, pointer_to, output_device, - output_channels_filled; - warn_xruns = warn_xruns, + output_channels_filled, ), - start_messanger( - real_read!, + Messanger( + reader, + stream_lock, Sample, pointer_to, input_device, - input_channels_filled; - warn_xruns = warn_xruns, + input_channels_filled, ), ) end @@ -699,7 +758,7 @@ end isopen(stream::PortAudioStream) = isopen(stream.pointer_to) samplerate(stream::PortAudioStream) = stream.sample_rate -eltype(::Type{PortAudioStream{Sample}}) where {Sample} = Sample +eltype(::Type{<:PortAudioStream{Sample}}) where {Sample} = Sample read(stream::PortAudioStream, arguments...) = read(stream.source, arguments...) read!(stream::PortAudioStream, arguments...) = read!(stream.source, arguments...) @@ -709,7 +768,10 @@ function write(sink::PortAudioStream, source::PortAudioStream, arguments...) end function show(io::IO, stream::PortAudioStream) - println(io, typeof(stream)) + # just show the first parameter (eltype) + print(io, "PortAudioStream{") + print(io, eltype(stream)) + println(io, "}") print(io, " Samplerate: ", samplerate(stream), "Hz") sink = stream.sink if nchannels(sink) > 0 @@ -737,6 +799,8 @@ for (TypeName, Super) in ((:PortAudioSink, :SampleSink), (:PortAudioSource, :Sam end # provided for backwards compatibility +# we should probably deprecate at some point +# because PortAudioSink and PortAudioSource would not be compatible with custom readers/writers function getproperty(stream::PortAudioStream, property::Symbol) if property === :sink PortAudioSink(stream) @@ -780,8 +844,9 @@ function show(io::IO, sink_or_source::Union{PortAudioSink, PortAudioSource}) ) end -# both reading and writing will outsource to the reading and writing demons +# both reading and writing will outsource to the readers or writers # so we just need to pass inputs in and take outputs out +# SampledSignals can take care of this feeding for us function exchange(messanger, arguments...) put!(messanger.inputs, arguments) take!(messanger.outputs)