separate out SampledSignals part

This commit is contained in:
Brandon Taylor 2021-06-26 14:24:48 -04:00
parent 05bcc26124
commit 6de81d1227

View file

@ -1,5 +1,6 @@
module PortAudio
using Base: alloc_buf_hook, Bool
using alsa_plugins_jll: alsa_plugins_jll
import Base: close, eltype, getproperty, isopen, read, read!, show, showerror, write
using Base.Threads: @spawn
@ -160,7 +161,7 @@ function PortAudioDevice(info::PaDeviceInfo, index)
PortAudioDevice(
unsafe_string(info.name),
# replace host api code with its name
unsafe_string(safe_key(Pa_GetHostApiInfo, info.hostApi,).name),
unsafe_string(safe_key(Pa_GetHostApiInfo, info.hostApi).name),
info.defaultSampleRate,
index,
Bounds(
@ -217,36 +218,157 @@ function devices()
]
end
const BUFFER_TYPE{Sample} = Array{Sample, 2}
# because we're calling Pa_ReadStream and Pa_WriteStream from separate threads,
# we put a lock around these calls
function write_stream(
stream_lock,
pointer_to,
port_audio_buffer,
chunk_frames;
warn_xruns = true,
)
handle_status(
lock(stream_lock) do
Pa_WriteStream(pointer_to, port_audio_buffer, chunk_frames)
end,
warn_xruns = warn_xruns,
)
end
function read_stream(
stream_lock,
pointer_to,
port_audio_buffer,
chunk_frames;
warn_xruns = true,
)
handle_status(
lock(stream_lock) do
Pa_ReadStream(pointer_to, port_audio_buffer, chunk_frames)
end;
warn_xruns = warn_xruns,
)
end
# these will do the actual reading and writing
# you can switch out the SampledSignalsReader/Writer defaults if you want more direct access
# a ReaderOrWriter must implement the following interface:
# must have a warn_xruns::Bool field
# must have get_input_type and get_output_type methods
# must overload call for four arguments:
# 1) stream_lock: a lock around the stream
# 2) pointer_to: the pointer to the stream
# 3) port_audio_buffer: the source/sink buffer
# 4) custom inputs
# and return the output type
# this call method should make use of read_stream/write_stream methods above
abstract type ReaderOrWriter end
struct SampledSignalsReader{Sample} <: ReaderOrWriter
warn_xruns::Bool
end
struct SampledSignalsWriter{Sample} <: ReaderOrWriter
warn_xruns::Bool
end
# inputs will be a triple of the last 3 arguments to unsafe_read/write
# we will already have access to the stream itself
const INPUT_CHANNEL_TYPE{Sample} = Channel{Tuple{BUFFER_TYPE{Sample}, Int, Int}}
# outputs are the number of frames read/written
const OUTPUT_CHANNEL_TYPE = Channel{Int}
function get_input_type(
::Union{<:SampledSignalsReader{Sample}, <:SampledSignalsWriter{Sample}},
) where {Sample}
Tuple{Array{Sample, 2}, Int, Int}
end
# output is the number of frames read/written
function get_output_type(::Union{SampledSignalsReader, SampledSignalsWriter})
Int
end
# we need to transpose column-major buffer from Julia back and forth between the row-major buffer from PortAudio
function translate!(
julia_buffer,
port_audio_buffer,
chunk_frames,
offset,
already,
port_audio_to_julia,
)
port_audio_range = 1:chunk_frames
# the julia buffer is longer, so we might need to start from the middle
julia_view = view(julia_buffer, port_audio_range .+ offset .+ already, :)
port_audio_view = view(port_audio_buffer, :, port_audio_range)
if port_audio_to_julia
transpose!(julia_view, port_audio_view)
else
transpose!(port_audio_view, julia_view)
end
end
function (writer::SampledSignalsWriter)(
stream_lock,
pointer_to,
port_audio_buffer,
(julia_buffer, offset, frame_count),
)
warn_xruns = writer.warn_xruns
already = 0
# if we still have frames to write
while already < frame_count
# take either a whole chunk, or whatever is left if it's smaller
chunk_frames = min(frame_count - already, CHUNK_FRAMES)
# transpose, then send the data
translate!(julia_buffer, port_audio_buffer, chunk_frames, offset, already, false)
# TODO: if the stream is closed we just want to return a
# shorter-than-requested frame count instead of throwing an error
write_stream(
stream_lock,
pointer_to,
port_audio_buffer,
chunk_frames;
warn_xruns = warn_xruns,
)
already += chunk_frames
end
already
end
function (reader::SampledSignalsReader)(
stream_lock,
pointer_to,
port_audio_buffer,
(julia_buffer, offset, frame_count),
)
warn_xruns = reader.warn_xruns
already = 0
# if we still have frames to write
while already < frame_count
# take either a whole chunk, or whatever is left if it's smaller
chunk_frames = min(frame_count - already, CHUNK_FRAMES)
# receive the data, then transpose
# TODO: if the stream is closed we just want to return a
# shorter-than-requested frame count instead of throwing an error
read_stream(
stream_lock,
pointer_to,
port_audio_buffer,
chunk_frames;
warn_xruns = warn_xruns,
)
translate!(julia_buffer, port_audio_buffer, chunk_frames, offset, already, true)
already += chunk_frames
end
already
end
# a Messanger contains
# the PortAudio device
# the PortAudio buffer
# the number of channels
# an input channel, for passing inputs to the messanger
# an output channel for sending outputs from the messanger
struct Messanger{Sample}
struct Messanger{InputType, OutputType}
device::PortAudioDevice
port_audio_buffer::BUFFER_TYPE{Sample}
number_of_channels::Int
inputs::INPUT_CHANNEL_TYPE{Sample}
outputs::OUTPUT_CHANNEL_TYPE
end
function Messanger{Sample}(device, channels) where {Sample}
Messanger(
device,
zeros(Sample, channels, CHUNK_FRAMES),
channels,
# unbuffered channels so putting and taking will block till everyone's ready
INPUT_CHANNEL_TYPE{Sample}(0),
OUTPUT_CHANNEL_TYPE(0),
)
inputs::Channel{InputType}
outputs::Channel{OutputType}
end
nchannels(messanger::Messanger) = messanger.number_of_channels
@ -261,12 +383,26 @@ end
# PortAudioStream
#
struct PortAudioStream{Sample}
struct PortAudioStream{Sample, SinkMessanger, SourceMessanger}
sample_rate::Float64
# pointer to the c object
pointer_to::Ptr{PaStream}
sink_messanger::Messanger{Sample}
source_messanger::Messanger{Sample}
sink_messanger::SinkMessanger
source_messanger::SourceMessanger
end
function PortAudioStream{Sample}(
sample_rate::Float64,
pointer_to::Ptr{PaStream},
sink_messanger::SinkMessanger,
source_messanger::SourceMessanger,
) where {Sample, SinkMessanger, SourceMessanger}
PortAudioStream{Sample, SinkMessanger, SourceMessanger}(
sample_rate,
pointer_to,
sink_messanger,
source_messanger,
)
end
# portaudio uses codes instead of types for the sample format
@ -393,17 +529,18 @@ end
# the messanger will be running on a separate thread in the background
# alternating transposing and
# waiting to pass inputs and outputs back and forth to PortAudio
function run_messanger(
a_function,
function run_reader_or_writer(
messanger,
reader_or_writer,
stream_lock,
pointer_to,
port_audio_buffer,
inputs,
outputs;
warn_xruns = true,
)
inputs = messanger.inputs
outputs = messanger.outputs
while true
output = if isopen(inputs)
a_function(pointer_to, port_audio_buffer, take!(inputs)...; warn_xruns = warn_xruns)
reader_or_writer(stream_lock, pointer_to, port_audio_buffer, take!(inputs))
else
# no frames can be read/written if the input channel is closed
0
@ -417,112 +554,30 @@ function run_messanger(
end
end
# we will spawn new threads to read from and write to to port audio
# while the reading thread is talking to PortAudio, the writing thread can be setting up
function start_messanger(
a_function,
Sample,
pointer_to,
device,
channels;
warn_xruns = true,
)
messanger = Messanger{Sample}(device, channels)
port_audio_buffer = messanger.port_audio_buffer
inputs = messanger.inputs
outputs = messanger.outputs
# we will spawn new threads to read from and write to port audio
# while the reading thread is talking to PortAudio, the writing thread can be setting up, and vice versa
function Messanger(reader_or_writer, stream_lock, Sample, pointer_to, device, channels)
InputType = get_input_type(reader_or_writer)
OutputType = get_output_type(reader_or_writer)
# unbuffered channels so putting and taking will block till everyone's ready
port_audio_buffer = zeros(Sample, channels, CHUNK_FRAMES)
messanger = Messanger{InputType, OutputType}(
device,
channels,
Channel{InputType}(0),
Channel{OutputType}(0),
)
# start the messanger thread when its created
@spawn run_messanger(
a_function,
@spawn run_reader_or_writer(
messanger,
reader_or_writer,
stream_lock,
pointer_to,
port_audio_buffer,
inputs,
outputs;
warn_xruns = warn_xruns,
)
messanger
end
# we need to transpose column-major buffer from Julia back and forth between the row-major buffer from PortAudio
function translate!(
julia_buffer,
port_audio_buffer,
chunk_frames,
offset,
already,
port_audio_to_julia,
)
port_audio_range = 1:chunk_frames
# the julia buffer is longer, so we might need to start from the middle
julia_view = view(julia_buffer, port_audio_range .+ offset .+ already, :)
port_audio_view = view(port_audio_buffer, :, port_audio_range)
if port_audio_to_julia
transpose!(julia_view, port_audio_view)
else
transpose!(port_audio_view, julia_view)
end
end
# because we're calling Pa_ReadStream and Pa_WriteStream from separate threads,
# we put a lock around these calls
const PORT_AUDIO_LOCK = ReentrantLock()
function real_write!(
pointer_to,
port_audio_buffer,
julia_buffer,
offset,
frame_count;
warn_xruns = true,
)
already = 0
# if we still have frames to write
while already < frame_count
# take either a whole chunk, or whatever is left if it's smaller
chunk_frames = min(frame_count - already, CHUNK_FRAMES)
# transpose, then send the data
translate!(julia_buffer, port_audio_buffer, chunk_frames, offset, already, false)
# TODO: if the stream is closed we just want to return a
# shorter-than-requested frame count instead of throwing an error
handle_status(
lock(PORT_AUDIO_LOCK) do
Pa_WriteStream(pointer_to, port_audio_buffer, chunk_frames)
end,
warn_xruns = warn_xruns,
)
already += chunk_frames
end
already
end
function real_read!(
pointer_to,
port_audio_buffer,
julia_buffer,
offset,
frame_count;
warn_xruns = true,
)
already = 0
# if we still have frames to write
while already < frame_count
# take either a whole chunk, or whatever is left if it's smaller
chunk_frames = min(frame_count - already, CHUNK_FRAMES)
# receive the data, then transpose
# TODO: if the stream is closed we just want to return a
# shorter-than-requested frame count instead of throwing an error
handle_status(
lock(PORT_AUDIO_LOCK) do
Pa_ReadStream(pointer_to, port_audio_buffer, chunk_frames)
end;
warn_xruns = warn_xruns,
)
translate!(julia_buffer, port_audio_buffer, chunk_frames, offset, already, true)
already += chunk_frames
end
already
end
# this is the top-level outer constructor that all the other outer constructors end up calling
"""
PortAudioStream(input_channels = 2, output_channels = 2; options...)
@ -572,6 +627,10 @@ function PortAudioStream(
user_data = nothing,
input_info = nothing,
output_info = nothing,
stream_lock = ReentrantLock(),
# this is where you can insert custom readers or writers instead
writer = SampledSignalsWriter{Sample}(warn_xruns),
reader = SampledSignalsReader{Sample}(warn_xruns),
)
input_channels_filled, output_channels_filled =
fill_both_channels(input_channels, input_device, output_channels, output_device)
@ -607,21 +666,21 @@ function PortAudioStream(
PortAudioStream{Sample}(
sample_rate,
pointer_to,
start_messanger(
real_write!,
Messanger(
writer,
stream_lock,
Sample,
pointer_to,
output_device,
output_channels_filled;
warn_xruns = warn_xruns,
output_channels_filled,
),
start_messanger(
real_read!,
Messanger(
reader,
stream_lock,
Sample,
pointer_to,
input_device,
input_channels_filled;
warn_xruns = warn_xruns,
input_channels_filled,
),
)
end
@ -699,7 +758,7 @@ end
isopen(stream::PortAudioStream) = isopen(stream.pointer_to)
samplerate(stream::PortAudioStream) = stream.sample_rate
eltype(::Type{PortAudioStream{Sample}}) where {Sample} = Sample
eltype(::Type{<:PortAudioStream{Sample}}) where {Sample} = Sample
read(stream::PortAudioStream, arguments...) = read(stream.source, arguments...)
read!(stream::PortAudioStream, arguments...) = read!(stream.source, arguments...)
@ -709,7 +768,10 @@ function write(sink::PortAudioStream, source::PortAudioStream, arguments...)
end
function show(io::IO, stream::PortAudioStream)
println(io, typeof(stream))
# just show the first parameter (eltype)
print(io, "PortAudioStream{")
print(io, eltype(stream))
println(io, "}")
print(io, " Samplerate: ", samplerate(stream), "Hz")
sink = stream.sink
if nchannels(sink) > 0
@ -737,6 +799,8 @@ for (TypeName, Super) in ((:PortAudioSink, :SampleSink), (:PortAudioSource, :Sam
end
# provided for backwards compatibility
# we should probably deprecate at some point
# because PortAudioSink and PortAudioSource would not be compatible with custom readers/writers
function getproperty(stream::PortAudioStream, property::Symbol)
if property === :sink
PortAudioSink(stream)
@ -780,8 +844,9 @@ function show(io::IO, sink_or_source::Union{PortAudioSink, PortAudioSource})
)
end
# both reading and writing will outsource to the reading and writing demons
# both reading and writing will outsource to the readers or writers
# so we just need to pass inputs in and take outputs out
# SampledSignals can take care of this feeding for us
function exchange(messanger, arguments...)
put!(messanger.inputs, arguments)
take!(messanger.outputs)