separate messanger from buffer

This commit is contained in:
Brandon Taylor 2021-07-11 17:29:38 -04:00
parent d836fe655d
commit ce8c66b223
2 changed files with 220 additions and 198 deletions

View file

@ -143,9 +143,7 @@ function __init__()
end end
end end
initialize() initialize()
atexit() do atexit(() -> terminate())
terminate()
end
end end
function versioninfo(io::IO = stdout) function versioninfo(io::IO = stdout)
@ -224,18 +222,21 @@ end
function devices() function devices()
# need to use 0 indexing for C # need to use 0 indexing for C
map(get_device_info, (1:handle_status(Pa_GetDeviceCount())) .- 1) map(get_device_info, 0:(handle_status(Pa_GetDeviceCount()) - 1))
end end
# we can handle reading and writing from buffers in a similar way # we can handle reading and writing from buffers in a similar way
function read_or_write(a_function, buffer, args...) function read_or_write(a_function, buffer, use_frames)
handle_status( handle_status(
# because we're calling Pa_ReadStream and Pa_WriteStream from separate threads, # because we're calling Pa_ReadStream and Pa_WriteStream from separate threads,
# we put a lock around these calls # we put a lock around these calls
lock(buffer.stream_lock) do lock(
a_function(buffer.pointer_to, buffer.data, args...) let a_function = a_function, pointer_to = buffer.pointer_to, data = buffer.data, use_frames = use_frames
end, () -> a_function(pointer_to, data, use_frames)
warn_xruns = buffer.scribe.warn_xruns, end,
buffer.stream_lock
),
warn_xruns = buffer.warn_xruns,
) )
end end
@ -250,26 +251,18 @@ end
# these will do the actual reading and writing # these will do the actual reading and writing
# you can switch out the SampledSignalsReader/Writer defaults if you want more direct access # you can switch out the SampledSignalsReader/Writer defaults if you want more direct access
# a Scribe must implement the following interface: # a Scribe must implement the following interface:
# must have a warn_xruns::Bool field
# must have get_input_type and get_output_type methods # must have get_input_type and get_output_type methods
# must overload call for 2 arguments: # must overload call for 2 arguments:
# 1) the buffer # 1) the buffer
# 2) a tuple of custom inputs # 2) a tuple of custom input_channel
# and return the output type # and return the output type
# this call method can make use of read_buffer/write_buffer methods above # this call method can make use of read_buffer/write_buffer methods above
abstract type Scribe end abstract type Scribe end
struct SampledSignalsReader{Sample} <: Scribe struct SampledSignalsReader{Sample} <: Scribe
warn_xruns::Bool
end
function SampledSignalsReader(; Sample = Float32, warn_xruns = true)
SampledSignalsReader{Sample}(warn_xruns)
end end
struct SampledSignalsWriter{Sample} <: Scribe struct SampledSignalsWriter{Sample} <: Scribe
warn_xruns::Bool
end
function SampledSignalsWriter(; Sample = Float32, warn_xruns = true)
SampledSignalsWriter{Sample}(warn_xruns)
end end
# define on types # define on types
@ -290,7 +283,7 @@ function get_output_type(::Thing) where {Thing}
get_output_type(Thing) get_output_type(Thing)
end end
# SampledSignals inputs will be a triple of the last 3 arguments to unsafe_read/write # SampledSignals input_channel will be a triple of the last 3 arguments to unsafe_read/write
# we will already have access to the stream itself # we will already have access to the stream itself
function get_input_type( function get_input_type(
::Type{<:Union{<:SampledSignalsReader{Sample}, <:SampledSignalsWriter{Sample}}}, ::Type{<:Union{<:SampledSignalsReader{Sample}, <:SampledSignalsWriter{Sample}}},
@ -303,120 +296,173 @@ function get_output_type(::Type{<:Union{SampledSignalsReader, SampledSignalsWrit
Int Int
end end
# write a full chunk, starting from the middle of the julia buffer # the julia buffer is bigger than the port audio buffer
function full_write!(buffer, julia_buffer, already) # so we need to split it up into chunks
chunk_frames = buffer.chunk_frames # we do this the same way for both reading and writing
@inbounds transpose!( function split_up(buffer, julia_buffer, already, frame_count, whole_function, partial_function)
buffer.data, frames_per_buffer = buffer.frames_per_buffer
view(julia_buffer, (1:chunk_frames) .+ already, :), # when we're done, we'll have written this many frames
goal = already + frame_count
# this is what we'll have left after doing all complete chunks
left = frame_count % frames_per_buffer
# this is how many we'll have written after doing all complete chunks
even = goal - left
foreach(
let whole_function = whole_function, buffer = buffer, julia_buffer = julia_buffer, frames_per_buffer = frames_per_buffer
already -> whole_function(buffer, julia_buffer, (already + 1):(already + frames_per_buffer), frames_per_buffer)
end,
# start at the already, keep going until there is less than a chunk left
already:frames_per_buffer:(even - frames_per_buffer),
# each time we loop, add chunk frames to already
# after the last loop, we'll reach "even"
) )
write_buffer(buffer, chunk_frames) # now we just have to read/write what's left
if left > 0
partial_function(buffer, julia_buffer, (even + 1):goal, left)
end
frame_count
end end
function (writer::SampledSignalsWriter)(buffer, (julia_buffer, offset, frame_count)) # the full version doesn't have to make a view, but the partial version does
chunk_frames = buffer.chunk_frames function full_write!(buffer, julia_buffer, julia_range, frames)
foreach(
let buffer = buffer, julia_buffer = julia_buffer
already -> full_write!(buffer, julia_buffer, already)
end,
# start at the offset, keep going until there is less than a chunk left
offset:chunk_frames:(offset + frame_count - chunk_frames),
)
# write what's left
left = frame_count % chunk_frames
@inbounds transpose!( @inbounds transpose!(
view(buffer.data, :, 1:left), buffer.data,
# the last left frames of the julia buffer view(julia_buffer, julia_range, :),
view(julia_buffer, (1:left) .+ (offset + frame_count - left), :),
) )
write_buffer(buffer, left) write_buffer(buffer, frames)
frame_count end
function partial_write!(buffer, julia_buffer, julia_range, frames)
@inbounds transpose!(
view(buffer.data, :, 1:frames),
view(julia_buffer, julia_range, :),
)
write_buffer(buffer, frames)
end
function (writer::SampledSignalsWriter)(buffer, arguments)
split_up(buffer, arguments..., full_write!, partial_write!)
end end
# similar to above # similar to above
function full_read!(buffer, julia_buffer, already) function full_read!(buffer, julia_buffer, julia_range, frames_per_buffer)
chunk_frames = buffer.chunk_frames read_buffer(buffer, frames_per_buffer)
read_buffer(buffer, chunk_frames)
@inbounds transpose!( @inbounds transpose!(
view(julia_buffer, (1:chunk_frames) .+ already, :), view(julia_buffer, julia_range, :),
buffer.data, buffer.data,
) )
end end
function (reader::SampledSignalsReader)(buffer, (julia_buffer, offset, frame_count)) function partial_read!(buffer, julia_buffer, end_range, left)
chunk_frames = buffer.chunk_frames
foreach(
let buffer = buffer, julia_buffer = julia_buffer
already -> full_read!(buffer, julia_buffer, already)
end,
offset:chunk_frames:(offset + frame_count - chunk_frames),
)
left = frame_count % chunk_frames
read_buffer(buffer, left) read_buffer(buffer, left)
@inbounds transpose!( @inbounds transpose!(
view(julia_buffer, (1:left) .+ (offset + frame_count - left), :), view(julia_buffer, end_range, :),
view(buffer.data, :, 1:left), view(buffer.data, :, 1:left),
) )
frame_count
end end
# a Buffer contains not just a buffer function (reader::SampledSignalsReader)(buffer, arguments)
# but everything you might need for a source or sink split_up(buffer, arguments..., full_read!, partial_read!)
# hopefully, because it is immutable, the optimizer can just pass what it needs end
struct Buffer{Sample, Scribe, InputType, OutputType}
# a buffer is contains just what we need to do reading/writing
struct Buffer{Sample}
stream_lock::ReentrantLock stream_lock::ReentrantLock
pointer_to::Ptr{PaStream} pointer_to::Ptr{PaStream}
device::PortAudioDevice
data::Array{Sample} data::Array{Sample}
number_of_channels::Int number_of_channels::Int
chunk_frames::Int frames_per_buffer::Int
scribe::Scribe warn_xruns::Bool
inputs::Channel{InputType}
outputs::Channel{OutputType}
debug_io::IOStream
end end
has_channels(something) = nchannels(something) > 0 function Buffer(stream_lock, pointer_to, number_of_channels; Sample = Float32, frames_per_buffer = 128, warn_xruns = true)
Buffer{Sample}(
function buffer_task(
stream_lock,
pointer_to,
device,
channels,
scribe::Scribe,
debug_io;
Sample = Float32,
chunk_frames = 128
) where {Scribe}
InputType = get_input_type(scribe)
OutputType = get_output_type(scribe)
input_channel = Channel{InputType}(0)
output_channel = Channel{OutputType}(0)
# unbuffered channels so putting and taking will block till everyone's ready
buffer = Buffer{Sample, Scribe, InputType, OutputType}(
stream_lock, stream_lock,
pointer_to, pointer_to,
device, zeros(Sample, number_of_channels, frames_per_buffer),
zeros(Sample, channels, chunk_frames), number_of_channels,
channels, frames_per_buffer,
chunk_frames, warn_xruns
)
end
eltype(::Type{Buffer{Sample}}) where Sample = Sample
nchannels(buffer::Buffer) = buffer.number_of_channels
# the messanger will send tasks to the scribe
# the scribe will read/write from the buffer
struct Messanger{Sample, Scribe, Input, Output}
device_name::String
buffer::Buffer{Sample}
scribe::Scribe
input_channel::Channel{Input}
output_channel::Channel{Output}
end
eltype(::Type{Messanger{Sample}}) where {Sample} = Sample
name(messanger::Messanger) = messanger.device_name
nchannels(messanger::Messanger) = nchannels(messanger.buffer)
# the scribe will be running on a separate thread in the background
# alternating transposing and
# waiting to pass inputs and outputs back and forth to PortAudio
function send(messanger)
buffer = messanger.buffer
scribe = messanger.scribe
input_channel = messanger.input_channel
output_channel = messanger.output_channel
while true
input = try
take!(input_channel)
catch an_error
# if the input channel is closed, the scribe knows its done
if an_error isa InvalidStateException && an_error.state === :closed
break
else
rethrow(an_error)
end
end
put!(output_channel, scribe(buffer, input))
end
end
# convenience method
has_channels(something) = nchannels(something) > 0
# create the messanger, and start the scribe on a separate task
function messanger_task(
device_name,
buffer::Buffer{Sample},
scribe::Scribe,
debug_io
) where {Sample, Scribe}
Input = get_input_type(Scribe)
Output = get_output_type(Scribe)
input_channel = Channel{Input}(0)
output_channel = Channel{Output}(0)
# unbuffered channels so putting and taking will block till everyone's ready
messanger = Messanger{Sample, Scribe, Input, Output}(
device_name,
buffer,
scribe, scribe,
input_channel, input_channel,
output_channel, output_channel
debug_io
) )
# we will spawn new threads to read from and write to port audio # we will spawn new threads to read from and write to port audio
# while the reading thread is talking to PortAudio, the writing thread can be setting up, and vice versa # while the reading thread is talking to PortAudio, the writing thread can be setting up, and vice versa
# start the scribe thread when its created # start the scribe thread when its created
# if there's channels at all # if there's channels at all
# we can't make the task a field of the buffer, because the task uses the buffer # we can't make the task a field of the buffer, because the task uses the buffer
task = Task(let buffer = buffer task = Task(let messanger = messanger, debug_io = debug_io
# xruns will return an error code and send a duplicate warning to stderr # xruns will return an error code and send a duplicate warning to stderr
# since we handle the error codes, we don't need the duplicate warnings # since we handle the error codes, we don't need the duplicate warnings
# so we send them to a debug log # so we send them to a debug log
() -> redirect_stderr(buffer.debug_io) do () -> redirect_stderr(
run_scribe(buffer) let messanger = messanger
end () -> send(messanger)
end,
debug_io
)
end) end)
# makes it able to run on a separate thread # makes it able to run on a separate thread
task.sticky = false task.sticky = false
@ -428,23 +474,30 @@ function buffer_task(
close(input_channel) close(input_channel)
close(output_channel) close(output_channel)
end end
buffer, task messanger, task
end end
nchannels(buffer::Buffer) = buffer.number_of_channels function close_messanger_task(messanger, task)
name(buffer::Buffer) = name(buffer.device) if has_channels(messanger)
# this will shut down the channels, which will shut down the thread
close(messanger.input_channel)
# wait for tasks to finish to make sure any errors get caught
wait(task)
# output channel will close because it is bound to the task
end
end
# #
# PortAudioStream # PortAudioStream
# #
struct PortAudioStream{SinkBuffer, SourceBuffer} struct PortAudioStream{SinkMessanger, SourceMessanger}
sample_rate::Float64 sample_rate::Float64
# pointer to the c object # pointer to the c object
pointer_to::Ptr{PaStream} pointer_to::Ptr{PaStream}
sink_buffer::SinkBuffer sink_messanger::SinkMessanger
sink_task::Task sink_task::Task
source_buffer::SourceBuffer source_messanger::SourceMessanger
source_task::Task source_task::Task
debug_file::String debug_file::String
debug_io::IOStream debug_io::IOStream
@ -523,28 +576,6 @@ Please specify a sample rate.
input_sample_rate input_sample_rate
end end
# the scribe will be running on a separate thread in the background
# alternating transposing and
# waiting to pass inputs and outputs back and forth to PortAudio
function run_scribe(buffer)
scribe = buffer.scribe
inputs = buffer.inputs
outputs = buffer.outputs
while true
input = try
take!(inputs)
catch an_error
# if the input channel is closed, the scribe knows its done
if an_error isa InvalidStateException && an_error.state === :closed
break
else
rethrow(an_error)
end
end
put!(outputs, scribe(buffer, input))
end
end
# this is the top-level outer constructor that all the other outer constructors end up calling # this is the top-level outer constructor that all the other outer constructors end up calling
""" """
PortAudioStream(input_channels = 2, output_channels = 2; options...) PortAudioStream(input_channels = 2, output_channels = 2; options...)
@ -560,7 +591,7 @@ used.
Options: Options:
- `Sample`: Sample type of the audio stream (defaults to Float32) - `Sample`: Sample type of the audio stream (defaults to Float32)
- `sample_rate`: Sample rate (defaults to device sample rate) - `samplerate`: Sample rate (defaults to device sample rate)
- `latency`: Requested latency. Stream could underrun when too low, consider - `latency`: Requested latency. Stream could underrun when too low, consider
using provided device defaults using provided device defaults
- `warn_xruns`: Display a warning if there is a stream overrun or underrun, which - `warn_xruns`: Display a warning if there is a stream overrun or underrun, which
@ -575,13 +606,12 @@ function PortAudioStream(
output_channels = 2; output_channels = 2;
Sample = Float32, Sample = Float32,
# for several keywords, nothing means we will fill them with defaults # for several keywords, nothing means we will fill them with defaults
sample_rate = nothing, samplerate = nothing,
latency = nothing, latency = nothing,
warn_xruns = true, warn_xruns = true,
# these defaults are currently undocumented # these defaults are currently undocumented
# data is passed to and from portaudio in chunks with this many frames # data is passed to and from portaudio in chunks with this many frames
chunk_frames = 128, frames_per_buffer = 128,
frames_per_buffer = 0,
flags = paNoFlag, flags = paNoFlag,
call_back = C_NULL, call_back = C_NULL,
user_data = C_NULL, user_data = C_NULL,
@ -589,8 +619,8 @@ function PortAudioStream(
output_info = C_NULL, output_info = C_NULL,
stream_lock = ReentrantLock(), stream_lock = ReentrantLock(),
# this is where you can insert custom readers or writers instead # this is where you can insert custom readers or writers instead
writer = SampledSignalsWriter(; Sample = Sample, warn_xruns = warn_xruns), writer = SampledSignalsWriter{Sample}(),
reader = SampledSignalsReader(; Sample = Sample, warn_xruns = warn_xruns) reader = SampledSignalsReader{Sample}()
) )
debug_file, debug_io = mktemp() debug_file, debug_io = mktemp()
input_channels_filled = input_channels_filled =
@ -611,8 +641,8 @@ function PortAudioStream(
output_device.output_bounds.high_latency, output_device.output_bounds.high_latency,
) )
end end
if sample_rate === nothing if samplerate === nothing
sample_rate = combine_default_sample_rates( samplerate = combine_default_sample_rates(
input_device, input_device,
input_device.default_sample_rate, input_device.default_sample_rate,
output_device, output_device,
@ -623,8 +653,8 @@ function PortAudioStream(
if latency === nothing if latency === nothing
latency = input_device.input_bounds.high_latency latency = input_device.input_bounds.high_latency
end end
if sample_rate === nothing if samplerate === nothing
sample_rate = input_device.default_sample_rate samplerate = input_device.default_sample_rate
end end
end end
else else
@ -632,8 +662,8 @@ function PortAudioStream(
if latency === nothing if latency === nothing
latency = output_device.output_bounds.high_latency latency = output_device.output_bounds.high_latency
end end
if sample_rate === nothing if samplerate === nothing
sample_rate = output_device.default_sample_rate samplerate = output_device.default_sample_rate
end end
else else
throw(ArgumentError("Input or output must have at least 1 channel")) throw(ArgumentError("Input or output must have at least 1 channel"))
@ -657,7 +687,7 @@ function PortAudioStream(
Sample = Sample, Sample = Sample,
host_api_specific_stream_info = output_info, host_api_specific_stream_info = output_info,
), ),
sample_rate, samplerate,
frames_per_buffer, frames_per_buffer,
flags, flags,
call_back, call_back,
@ -667,29 +697,35 @@ function PortAudioStream(
pointer_to = mutable_pointer[] pointer_to = mutable_pointer[]
handle_status(Pa_StartStream(pointer_to)) handle_status(Pa_StartStream(pointer_to))
PortAudioStream( PortAudioStream(
sample_rate, samplerate,
pointer_to, pointer_to,
# we need to keep track of the tasks # we need to keep track of the tasks
# so we can wait for them to finish and catch errors # so we can wait for them to finish and catch errors
buffer_task( messanger_task(
stream_lock, output_device.name,
pointer_to, Buffer(
output_device, stream_lock,
output_channels_filled, pointer_to,
output_channels_filled;
Sample = Sample,
frames_per_buffer = frames_per_buffer,
warn_xruns = warn_xruns
),
writer, writer,
debug_io; debug_io
Sample = Sample,
chunk_frames = chunk_frames,
)..., )...,
buffer_task( messanger_task(
stream_lock, input_device.name,
pointer_to, Buffer(
input_device, stream_lock,
input_channels_filled, pointer_to,
input_channels_filled;
Sample = Sample,
frames_per_buffer = frames_per_buffer,
warn_xruns = warn_xruns
),
reader, reader,
debug_io; debug_io
Sample = Sample,
chunk_frames = chunk_frames,
)..., )...,
debug_file, debug_file,
debug_io debug_io
@ -745,22 +781,11 @@ function PortAudioStream(do_function::Function, arguments...; keywords...)
end end
function close(stream::PortAudioStream) function close(stream::PortAudioStream)
source_buffer = stream.source_buffer
sink_buffer = stream.sink_buffer
# closing is tricky, because we want to make sure we've read exactly as much as we've written # closing is tricky, because we want to make sure we've read exactly as much as we've written
# but we have don't know exactly what the tasks are doing # but we have don't know exactly what the tasks are doing
# for now, just close one and then the other # for now, just close one and then the other
if has_channels(source_buffer) close_messanger_task(stream.source_messanger, stream.source_task)
# this will shut down the channels, which will shut down the threads close_messanger_task(stream.sink_messanger, stream.sink_task)
close(source_buffer.inputs)
# wait for tasks to finish to make sure any errors get caught
wait(stream.source_task)
# output channels will close because they are bound to the task
end
if has_channels(sink_buffer)
close(sink_buffer.inputs)
wait(stream.sink_task)
end
pointer_to = stream.pointer_to pointer_to = stream.pointer_to
# only stop if it's not already stopped # only stop if it's not already stopped
if !Bool(handle_status(Pa_IsStreamStopped(pointer_to))) if !Bool(handle_status(Pa_IsStreamStopped(pointer_to)))
@ -770,9 +795,7 @@ function close(stream::PortAudioStream)
# close the debug log and then read the file # close the debug log and then read the file
# this will contain duplicate xrun warnings mentioned above # this will contain duplicate xrun warnings mentioned above
close(stream.debug_io) close(stream.debug_io)
debug_log = open(stream.debug_file, "r") do io debug_log = open(io -> read(io, String), stream.debug_file, "r")
read(io, String)
end
if !isempty(debug_log) if !isempty(debug_log)
@debug debug_log @debug debug_log
end end
@ -792,7 +815,7 @@ isopen(stream::PortAudioStream) = isopen(stream.pointer_to)
samplerate(stream::PortAudioStream) = stream.sample_rate samplerate(stream::PortAudioStream) = stream.sample_rate
function eltype( function eltype(
::Type{<:PortAudioStream{<:Buffer{Sample}, <:Buffer{Sample}}}, ::Type{<:PortAudioStream{<:Messanger{Sample}, <:Messanger{Sample}}},
) where {Sample} ) where {Sample}
Sample Sample
end end
@ -833,8 +856,8 @@ end
# If we had multiple inheritance, then PortAudioStreams could be both a sink and source # If we had multiple inheritance, then PortAudioStreams could be both a sink and source
# Since we don't, we have to make wrappers instead # Since we don't, we have to make wrappers instead
for (TypeName, Super) in ((:PortAudioSink, :SampleSink), (:PortAudioSource, :SampleSource)) for (TypeName, Super) in ((:PortAudioSink, :SampleSink), (:PortAudioSource, :SampleSource))
@eval struct $TypeName{InputBuffer, OutputBuffer} <: $Super @eval struct $TypeName{InputMessanger, OutputMessanger} <: $Super
stream::PortAudioStream{InputBuffer, OutputBuffer} stream::PortAudioStream{InputMessanger, OutputMessanger}
end end
end end
@ -842,8 +865,8 @@ end
# only defined for SampledSignals scribes # only defined for SampledSignals scribes
function getproperty( function getproperty(
stream::PortAudioStream{ stream::PortAudioStream{
<:Buffer{<:Any, <:SampledSignalsWriter}, <:Messanger{<:Any, <:SampledSignalsWriter},
<:Buffer{<:Any, <:SampledSignalsReader}, <:Messanger{<:Any, <:SampledSignalsReader},
}, },
property::Symbol, property::Symbol,
) )
@ -857,10 +880,10 @@ function getproperty(
end end
function nchannels(source_or_sink::PortAudioSource) function nchannels(source_or_sink::PortAudioSource)
nchannels(source_or_sink.stream.source_buffer) nchannels(source_or_sink.stream.source_messanger)
end end
function nchannels(source_or_sink::PortAudioSink) function nchannels(source_or_sink::PortAudioSink)
nchannels(source_or_sink.stream.sink_buffer) nchannels(source_or_sink.stream.sink_messanger)
end end
function samplerate(source_or_sink::Union{PortAudioSink, PortAudioSource}) function samplerate(source_or_sink::Union{PortAudioSink, PortAudioSource})
samplerate(source_or_sink.stream) samplerate(source_or_sink.stream)
@ -868,8 +891,8 @@ end
function eltype( function eltype(
::Type{ ::Type{
<:Union{ <:Union{
<:PortAudioSink{<:Buffer{Sample}, <:Buffer{Sample}}, <:PortAudioSink{<:Messanger{Sample}, <:Messanger{Sample}},
<:PortAudioSource{<:Buffer{Sample}, <:Buffer{Sample}}, <:PortAudioSource{<:Messanger{Sample}, <:Messanger{Sample}},
}, },
}, },
) where {Sample} ) where {Sample}
@ -878,8 +901,8 @@ end
function isopen(source_or_sink::Union{PortAudioSink, PortAudioSource}) function isopen(source_or_sink::Union{PortAudioSink, PortAudioSource})
isopen(source_or_sink.stream) isopen(source_or_sink.stream)
end end
name(source_or_sink::PortAudioSink) = name(source_or_sink.stream.sink_buffer) name(source_or_sink::PortAudioSink) = name(source_or_sink.stream.sink_messanger)
name(source_or_sink::PortAudioSource) = name(source_or_sink.stream.source_buffer) name(source_or_sink::PortAudioSource) = name(source_or_sink.stream.source_messanger)
# could show full type name, but the PortAudio part is probably redundant # could show full type name, but the PortAudio part is probably redundant
# because these will usually only get printed as part of show for PortAudioStream # because these will usually only get printed as part of show for PortAudioStream
@ -898,30 +921,30 @@ function show(io::IO, sink_or_source::Union{PortAudioSink, PortAudioSource})
end end
# both reading and writing will outsource to the readers or writers # both reading and writing will outsource to the readers or writers
# so we just need to pass inputs in and take outputs out # so we just need to pass input_channel in and take output_channel out
# SampledSignals can take care of this feeding for us # SampledSignals can take care of this feeding for us
function exchange(buffer, arguments...) function exchange(messanger, arguments...)
put!(buffer.inputs, arguments) put!(messanger.input_channel, arguments)
take!(buffer.outputs) take!(messanger.output_channel)
end end
# these will only work with sampledsignals scribes # these will only work with sampledsignals scribes
function unsafe_write( function unsafe_write(
sink::PortAudioSink{<:Buffer{<:Any, <:SampledSignalsWriter}}, sink::PortAudioSink{<:Messanger{<:Any, <:SampledSignalsWriter}},
julia_buffer::Array, julia_buffer::Array,
offset, already,
frame_count, frame_count,
) )
exchange(sink.stream.sink_buffer, julia_buffer, offset, frame_count) exchange(sink.stream.sink_messanger, julia_buffer, already, frame_count)
end end
function unsafe_read!( function unsafe_read!(
source::PortAudioSource{<:Any, <:Buffer{<:Any, <:SampledSignalsReader}}, source::PortAudioSource{<:Any, <:Messanger{<:Any, <:SampledSignalsReader}},
julia_buffer::Array, julia_buffer::Array,
offset, already,
frame_count, frame_count,
) )
exchange(source.stream.source_buffer, julia_buffer, offset, frame_count) exchange(source.stream.source_messanger, julia_buffer, already, frame_count)
end end
end # module PortAudio end # module PortAudio

View file

@ -145,15 +145,16 @@ if !isempty(devices())
@test sprint(show, source) == @test sprint(show, source) ==
"2 channel source: $(repr(default_output_device_name))" "2 channel source: $(repr(default_output_device_name))"
write(stream, stream, 5s) write(stream, stream, 5s)
sleep(1)
@test PaErrorCode(handle_status(Pa_StopStream(stream.pointer_to))) == paNoError @test PaErrorCode(handle_status(Pa_StopStream(stream.pointer_to))) == paNoError
@test isopen(stream) @test isopen(stream)
close(stream) close(stream)
sleep(1)
@test !isopen(stream) @test !isopen(stream)
@test !isopen(sink) @test !isopen(sink)
@test !isopen(source) @test !isopen(source)
println("done") println("done")
end end
sleep(1)
@testset "Samplerate-converting writing" begin @testset "Samplerate-converting writing" begin
PortAudioStream(0, 2) do stream PortAudioStream(0, 2) do stream
write( write(
@ -161,19 +162,14 @@ if !isempty(devices())
SinSource(eltype(stream), samplerate(stream) * 0.8, [220, 330]), SinSource(eltype(stream), samplerate(stream) * 0.8, [220, 330]),
3s, 3s,
) )
sleep(1)
write( write(
stream, stream,
SinSource(eltype(stream), samplerate(stream) * 1.2, [220, 330]), SinSource(eltype(stream), samplerate(stream) * 1.2, [220, 330]),
3s, 3s,
) )
sleep(1)
end
end
@testset "Open Device by name" begin
PortAudioStream(default_input_device_name, default_output_device_name) do stream
end end
end end
sleep(1)
# no way to check that the right data is actually getting read or written here, # no way to check that the right data is actually getting read or written here,
# but at least it's not crashing. # but at least it's not crashing.
@testset "Queued Writing" begin @testset "Queued Writing" begin
@ -211,6 +207,9 @@ if !isempty(devices())
PortAudioStream(default_input_device_name) do stream PortAudioStream(default_input_device_name) do stream
@test isopen(stream) @test isopen(stream)
end end
PortAudioStream(default_input_device_name, default_output_device_name) do stream
@test isopen(stream)
end
end end
@testset "Errors with sound" begin @testset "Errors with sound" begin
big = typemax(Int) big = typemax(Int)