diff --git a/src/PortAudio.jl b/src/PortAudio.jl index 1761aab..1c1bc44 100644 --- a/src/PortAudio.jl +++ b/src/PortAudio.jl @@ -143,9 +143,7 @@ function __init__() end end initialize() - atexit() do - terminate() - end + atexit(() -> terminate()) end function versioninfo(io::IO = stdout) @@ -224,18 +222,21 @@ end function devices() # need to use 0 indexing for C - map(get_device_info, (1:handle_status(Pa_GetDeviceCount())) .- 1) + map(get_device_info, 0:(handle_status(Pa_GetDeviceCount()) - 1)) end # we can handle reading and writing from buffers in a similar way -function read_or_write(a_function, buffer, args...) +function read_or_write(a_function, buffer, use_frames) handle_status( # because we're calling Pa_ReadStream and Pa_WriteStream from separate threads, # we put a lock around these calls - lock(buffer.stream_lock) do - a_function(buffer.pointer_to, buffer.data, args...) - end, - warn_xruns = buffer.scribe.warn_xruns, + lock( + let a_function = a_function, pointer_to = buffer.pointer_to, data = buffer.data, use_frames = use_frames + () -> a_function(pointer_to, data, use_frames) + end, + buffer.stream_lock + ), + warn_xruns = buffer.warn_xruns, ) end @@ -250,26 +251,18 @@ end # these will do the actual reading and writing # you can switch out the SampledSignalsReader/Writer defaults if you want more direct access # a Scribe must implement the following interface: -# must have a warn_xruns::Bool field # must have get_input_type and get_output_type methods # must overload call for 2 arguments: # 1) the buffer -# 2) a tuple of custom inputs +# 2) a tuple of custom input_channel # and return the output type # this call method can make use of read_buffer/write_buffer methods above abstract type Scribe end struct SampledSignalsReader{Sample} <: Scribe - warn_xruns::Bool -end -function SampledSignalsReader(; Sample = Float32, warn_xruns = true) - SampledSignalsReader{Sample}(warn_xruns) end + struct SampledSignalsWriter{Sample} <: Scribe - warn_xruns::Bool -end -function SampledSignalsWriter(; Sample = Float32, warn_xruns = true) - SampledSignalsWriter{Sample}(warn_xruns) end # define on types @@ -290,7 +283,7 @@ function get_output_type(::Thing) where {Thing} get_output_type(Thing) end -# SampledSignals inputs will be a triple of the last 3 arguments to unsafe_read/write +# SampledSignals input_channel will be a triple of the last 3 arguments to unsafe_read/write # we will already have access to the stream itself function get_input_type( ::Type{<:Union{<:SampledSignalsReader{Sample}, <:SampledSignalsWriter{Sample}}}, @@ -303,120 +296,173 @@ function get_output_type(::Type{<:Union{SampledSignalsReader, SampledSignalsWrit Int end -# write a full chunk, starting from the middle of the julia buffer -function full_write!(buffer, julia_buffer, already) - chunk_frames = buffer.chunk_frames - @inbounds transpose!( - buffer.data, - view(julia_buffer, (1:chunk_frames) .+ already, :), +# the julia buffer is bigger than the port audio buffer +# so we need to split it up into chunks +# we do this the same way for both reading and writing +function split_up(buffer, julia_buffer, already, frame_count, whole_function, partial_function) + frames_per_buffer = buffer.frames_per_buffer + # when we're done, we'll have written this many frames + goal = already + frame_count + # this is what we'll have left after doing all complete chunks + left = frame_count % frames_per_buffer + # this is how many we'll have written after doing all complete chunks + even = goal - left + foreach( + let whole_function = whole_function, buffer = buffer, julia_buffer = julia_buffer, frames_per_buffer = frames_per_buffer + already -> whole_function(buffer, julia_buffer, (already + 1):(already + frames_per_buffer), frames_per_buffer) + end, + # start at the already, keep going until there is less than a chunk left + already:frames_per_buffer:(even - frames_per_buffer), + # each time we loop, add chunk frames to already + # after the last loop, we'll reach "even" ) - write_buffer(buffer, chunk_frames) + # now we just have to read/write what's left + if left > 0 + partial_function(buffer, julia_buffer, (even + 1):goal, left) + end + frame_count end -function (writer::SampledSignalsWriter)(buffer, (julia_buffer, offset, frame_count)) - chunk_frames = buffer.chunk_frames - foreach( - let buffer = buffer, julia_buffer = julia_buffer - already -> full_write!(buffer, julia_buffer, already) - end, - # start at the offset, keep going until there is less than a chunk left - offset:chunk_frames:(offset + frame_count - chunk_frames), - ) - # write what's left - left = frame_count % chunk_frames +# the full version doesn't have to make a view, but the partial version does +function full_write!(buffer, julia_buffer, julia_range, frames) @inbounds transpose!( - view(buffer.data, :, 1:left), - # the last left frames of the julia buffer - view(julia_buffer, (1:left) .+ (offset + frame_count - left), :), + buffer.data, + view(julia_buffer, julia_range, :), ) - write_buffer(buffer, left) - frame_count + write_buffer(buffer, frames) +end + +function partial_write!(buffer, julia_buffer, julia_range, frames) + @inbounds transpose!( + view(buffer.data, :, 1:frames), + view(julia_buffer, julia_range, :), + ) + write_buffer(buffer, frames) +end + +function (writer::SampledSignalsWriter)(buffer, arguments) + split_up(buffer, arguments..., full_write!, partial_write!) end # similar to above -function full_read!(buffer, julia_buffer, already) - chunk_frames = buffer.chunk_frames - read_buffer(buffer, chunk_frames) +function full_read!(buffer, julia_buffer, julia_range, frames_per_buffer) + read_buffer(buffer, frames_per_buffer) @inbounds transpose!( - view(julia_buffer, (1:chunk_frames) .+ already, :), + view(julia_buffer, julia_range, :), buffer.data, ) end -function (reader::SampledSignalsReader)(buffer, (julia_buffer, offset, frame_count)) - chunk_frames = buffer.chunk_frames - foreach( - let buffer = buffer, julia_buffer = julia_buffer - already -> full_read!(buffer, julia_buffer, already) - end, - offset:chunk_frames:(offset + frame_count - chunk_frames), - ) - left = frame_count % chunk_frames +function partial_read!(buffer, julia_buffer, end_range, left) read_buffer(buffer, left) @inbounds transpose!( - view(julia_buffer, (1:left) .+ (offset + frame_count - left), :), + view(julia_buffer, end_range, :), view(buffer.data, :, 1:left), ) - frame_count end -# a Buffer contains not just a buffer -# but everything you might need for a source or sink -# hopefully, because it is immutable, the optimizer can just pass what it needs -struct Buffer{Sample, Scribe, InputType, OutputType} +function (reader::SampledSignalsReader)(buffer, arguments) + split_up(buffer, arguments..., full_read!, partial_read!) +end + +# a buffer is contains just what we need to do reading/writing +struct Buffer{Sample} stream_lock::ReentrantLock pointer_to::Ptr{PaStream} - device::PortAudioDevice data::Array{Sample} number_of_channels::Int - chunk_frames::Int - scribe::Scribe - inputs::Channel{InputType} - outputs::Channel{OutputType} - debug_io::IOStream + frames_per_buffer::Int + warn_xruns::Bool end -has_channels(something) = nchannels(something) > 0 - -function buffer_task( - stream_lock, - pointer_to, - device, - channels, - scribe::Scribe, - debug_io; - Sample = Float32, - chunk_frames = 128 -) where {Scribe} - InputType = get_input_type(scribe) - OutputType = get_output_type(scribe) - input_channel = Channel{InputType}(0) - output_channel = Channel{OutputType}(0) - # unbuffered channels so putting and taking will block till everyone's ready - buffer = Buffer{Sample, Scribe, InputType, OutputType}( +function Buffer(stream_lock, pointer_to, number_of_channels; Sample = Float32, frames_per_buffer = 128, warn_xruns = true) + Buffer{Sample}( stream_lock, pointer_to, - device, - zeros(Sample, channels, chunk_frames), - channels, - chunk_frames, + zeros(Sample, number_of_channels, frames_per_buffer), + number_of_channels, + frames_per_buffer, + warn_xruns + ) +end + +eltype(::Type{Buffer{Sample}}) where Sample = Sample +nchannels(buffer::Buffer) = buffer.number_of_channels + +# the messanger will send tasks to the scribe +# the scribe will read/write from the buffer +struct Messanger{Sample, Scribe, Input, Output} + device_name::String + buffer::Buffer{Sample} + scribe::Scribe + input_channel::Channel{Input} + output_channel::Channel{Output} +end + +eltype(::Type{Messanger{Sample}}) where {Sample} = Sample +name(messanger::Messanger) = messanger.device_name +nchannels(messanger::Messanger) = nchannels(messanger.buffer) + +# the scribe will be running on a separate thread in the background +# alternating transposing and +# waiting to pass inputs and outputs back and forth to PortAudio +function send(messanger) + buffer = messanger.buffer + scribe = messanger.scribe + input_channel = messanger.input_channel + output_channel = messanger.output_channel + while true + input = try + take!(input_channel) + catch an_error + # if the input channel is closed, the scribe knows its done + if an_error isa InvalidStateException && an_error.state === :closed + break + else + rethrow(an_error) + end + end + put!(output_channel, scribe(buffer, input)) + end +end + +# convenience method +has_channels(something) = nchannels(something) > 0 + +# create the messanger, and start the scribe on a separate task +function messanger_task( + device_name, + buffer::Buffer{Sample}, + scribe::Scribe, + debug_io +) where {Sample, Scribe} + Input = get_input_type(Scribe) + Output = get_output_type(Scribe) + input_channel = Channel{Input}(0) + output_channel = Channel{Output}(0) + # unbuffered channels so putting and taking will block till everyone's ready + messanger = Messanger{Sample, Scribe, Input, Output}( + device_name, + buffer, scribe, input_channel, - output_channel, - debug_io + output_channel ) # we will spawn new threads to read from and write to port audio # while the reading thread is talking to PortAudio, the writing thread can be setting up, and vice versa # start the scribe thread when its created # if there's channels at all # we can't make the task a field of the buffer, because the task uses the buffer - task = Task(let buffer = buffer + task = Task(let messanger = messanger, debug_io = debug_io # xruns will return an error code and send a duplicate warning to stderr # since we handle the error codes, we don't need the duplicate warnings # so we send them to a debug log - () -> redirect_stderr(buffer.debug_io) do - run_scribe(buffer) - end + () -> redirect_stderr( + let messanger = messanger + () -> send(messanger) + end, + debug_io + ) end) # makes it able to run on a separate thread task.sticky = false @@ -428,23 +474,30 @@ function buffer_task( close(input_channel) close(output_channel) end - buffer, task + messanger, task end -nchannels(buffer::Buffer) = buffer.number_of_channels -name(buffer::Buffer) = name(buffer.device) +function close_messanger_task(messanger, task) + if has_channels(messanger) + # this will shut down the channels, which will shut down the thread + close(messanger.input_channel) + # wait for tasks to finish to make sure any errors get caught + wait(task) + # output channel will close because it is bound to the task + end +end # # PortAudioStream # -struct PortAudioStream{SinkBuffer, SourceBuffer} +struct PortAudioStream{SinkMessanger, SourceMessanger} sample_rate::Float64 # pointer to the c object pointer_to::Ptr{PaStream} - sink_buffer::SinkBuffer + sink_messanger::SinkMessanger sink_task::Task - source_buffer::SourceBuffer + source_messanger::SourceMessanger source_task::Task debug_file::String debug_io::IOStream @@ -523,28 +576,6 @@ Please specify a sample rate. input_sample_rate end -# the scribe will be running on a separate thread in the background -# alternating transposing and -# waiting to pass inputs and outputs back and forth to PortAudio -function run_scribe(buffer) - scribe = buffer.scribe - inputs = buffer.inputs - outputs = buffer.outputs - while true - input = try - take!(inputs) - catch an_error - # if the input channel is closed, the scribe knows its done - if an_error isa InvalidStateException && an_error.state === :closed - break - else - rethrow(an_error) - end - end - put!(outputs, scribe(buffer, input)) - end -end - # this is the top-level outer constructor that all the other outer constructors end up calling """ PortAudioStream(input_channels = 2, output_channels = 2; options...) @@ -560,7 +591,7 @@ used. Options: - `Sample`: Sample type of the audio stream (defaults to Float32) - - `sample_rate`: Sample rate (defaults to device sample rate) + - `samplerate`: Sample rate (defaults to device sample rate) - `latency`: Requested latency. Stream could underrun when too low, consider using provided device defaults - `warn_xruns`: Display a warning if there is a stream overrun or underrun, which @@ -575,13 +606,12 @@ function PortAudioStream( output_channels = 2; Sample = Float32, # for several keywords, nothing means we will fill them with defaults - sample_rate = nothing, + samplerate = nothing, latency = nothing, warn_xruns = true, # these defaults are currently undocumented # data is passed to and from portaudio in chunks with this many frames - chunk_frames = 128, - frames_per_buffer = 0, + frames_per_buffer = 128, flags = paNoFlag, call_back = C_NULL, user_data = C_NULL, @@ -589,8 +619,8 @@ function PortAudioStream( output_info = C_NULL, stream_lock = ReentrantLock(), # this is where you can insert custom readers or writers instead - writer = SampledSignalsWriter(; Sample = Sample, warn_xruns = warn_xruns), - reader = SampledSignalsReader(; Sample = Sample, warn_xruns = warn_xruns) + writer = SampledSignalsWriter{Sample}(), + reader = SampledSignalsReader{Sample}() ) debug_file, debug_io = mktemp() input_channels_filled = @@ -611,8 +641,8 @@ function PortAudioStream( output_device.output_bounds.high_latency, ) end - if sample_rate === nothing - sample_rate = combine_default_sample_rates( + if samplerate === nothing + samplerate = combine_default_sample_rates( input_device, input_device.default_sample_rate, output_device, @@ -623,8 +653,8 @@ function PortAudioStream( if latency === nothing latency = input_device.input_bounds.high_latency end - if sample_rate === nothing - sample_rate = input_device.default_sample_rate + if samplerate === nothing + samplerate = input_device.default_sample_rate end end else @@ -632,8 +662,8 @@ function PortAudioStream( if latency === nothing latency = output_device.output_bounds.high_latency end - if sample_rate === nothing - sample_rate = output_device.default_sample_rate + if samplerate === nothing + samplerate = output_device.default_sample_rate end else throw(ArgumentError("Input or output must have at least 1 channel")) @@ -657,7 +687,7 @@ function PortAudioStream( Sample = Sample, host_api_specific_stream_info = output_info, ), - sample_rate, + samplerate, frames_per_buffer, flags, call_back, @@ -667,29 +697,35 @@ function PortAudioStream( pointer_to = mutable_pointer[] handle_status(Pa_StartStream(pointer_to)) PortAudioStream( - sample_rate, + samplerate, pointer_to, # we need to keep track of the tasks # so we can wait for them to finish and catch errors - buffer_task( - stream_lock, - pointer_to, - output_device, - output_channels_filled, + messanger_task( + output_device.name, + Buffer( + stream_lock, + pointer_to, + output_channels_filled; + Sample = Sample, + frames_per_buffer = frames_per_buffer, + warn_xruns = warn_xruns + ), writer, - debug_io; - Sample = Sample, - chunk_frames = chunk_frames, + debug_io )..., - buffer_task( - stream_lock, - pointer_to, - input_device, - input_channels_filled, + messanger_task( + input_device.name, + Buffer( + stream_lock, + pointer_to, + input_channels_filled; + Sample = Sample, + frames_per_buffer = frames_per_buffer, + warn_xruns = warn_xruns + ), reader, - debug_io; - Sample = Sample, - chunk_frames = chunk_frames, + debug_io )..., debug_file, debug_io @@ -745,22 +781,11 @@ function PortAudioStream(do_function::Function, arguments...; keywords...) end function close(stream::PortAudioStream) - source_buffer = stream.source_buffer - sink_buffer = stream.sink_buffer # closing is tricky, because we want to make sure we've read exactly as much as we've written # but we have don't know exactly what the tasks are doing # for now, just close one and then the other - if has_channels(source_buffer) - # this will shut down the channels, which will shut down the threads - close(source_buffer.inputs) - # wait for tasks to finish to make sure any errors get caught - wait(stream.source_task) - # output channels will close because they are bound to the task - end - if has_channels(sink_buffer) - close(sink_buffer.inputs) - wait(stream.sink_task) - end + close_messanger_task(stream.source_messanger, stream.source_task) + close_messanger_task(stream.sink_messanger, stream.sink_task) pointer_to = stream.pointer_to # only stop if it's not already stopped if !Bool(handle_status(Pa_IsStreamStopped(pointer_to))) @@ -770,9 +795,7 @@ function close(stream::PortAudioStream) # close the debug log and then read the file # this will contain duplicate xrun warnings mentioned above close(stream.debug_io) - debug_log = open(stream.debug_file, "r") do io - read(io, String) - end + debug_log = open(io -> read(io, String), stream.debug_file, "r") if !isempty(debug_log) @debug debug_log end @@ -792,7 +815,7 @@ isopen(stream::PortAudioStream) = isopen(stream.pointer_to) samplerate(stream::PortAudioStream) = stream.sample_rate function eltype( - ::Type{<:PortAudioStream{<:Buffer{Sample}, <:Buffer{Sample}}}, + ::Type{<:PortAudioStream{<:Messanger{Sample}, <:Messanger{Sample}}}, ) where {Sample} Sample end @@ -833,8 +856,8 @@ end # If we had multiple inheritance, then PortAudioStreams could be both a sink and source # Since we don't, we have to make wrappers instead for (TypeName, Super) in ((:PortAudioSink, :SampleSink), (:PortAudioSource, :SampleSource)) - @eval struct $TypeName{InputBuffer, OutputBuffer} <: $Super - stream::PortAudioStream{InputBuffer, OutputBuffer} + @eval struct $TypeName{InputMessanger, OutputMessanger} <: $Super + stream::PortAudioStream{InputMessanger, OutputMessanger} end end @@ -842,8 +865,8 @@ end # only defined for SampledSignals scribes function getproperty( stream::PortAudioStream{ - <:Buffer{<:Any, <:SampledSignalsWriter}, - <:Buffer{<:Any, <:SampledSignalsReader}, + <:Messanger{<:Any, <:SampledSignalsWriter}, + <:Messanger{<:Any, <:SampledSignalsReader}, }, property::Symbol, ) @@ -857,10 +880,10 @@ function getproperty( end function nchannels(source_or_sink::PortAudioSource) - nchannels(source_or_sink.stream.source_buffer) + nchannels(source_or_sink.stream.source_messanger) end function nchannels(source_or_sink::PortAudioSink) - nchannels(source_or_sink.stream.sink_buffer) + nchannels(source_or_sink.stream.sink_messanger) end function samplerate(source_or_sink::Union{PortAudioSink, PortAudioSource}) samplerate(source_or_sink.stream) @@ -868,8 +891,8 @@ end function eltype( ::Type{ <:Union{ - <:PortAudioSink{<:Buffer{Sample}, <:Buffer{Sample}}, - <:PortAudioSource{<:Buffer{Sample}, <:Buffer{Sample}}, + <:PortAudioSink{<:Messanger{Sample}, <:Messanger{Sample}}, + <:PortAudioSource{<:Messanger{Sample}, <:Messanger{Sample}}, }, }, ) where {Sample} @@ -878,8 +901,8 @@ end function isopen(source_or_sink::Union{PortAudioSink, PortAudioSource}) isopen(source_or_sink.stream) end -name(source_or_sink::PortAudioSink) = name(source_or_sink.stream.sink_buffer) -name(source_or_sink::PortAudioSource) = name(source_or_sink.stream.source_buffer) +name(source_or_sink::PortAudioSink) = name(source_or_sink.stream.sink_messanger) +name(source_or_sink::PortAudioSource) = name(source_or_sink.stream.source_messanger) # could show full type name, but the PortAudio part is probably redundant # because these will usually only get printed as part of show for PortAudioStream @@ -898,30 +921,30 @@ function show(io::IO, sink_or_source::Union{PortAudioSink, PortAudioSource}) end # both reading and writing will outsource to the readers or writers -# so we just need to pass inputs in and take outputs out +# so we just need to pass input_channel in and take output_channel out # SampledSignals can take care of this feeding for us -function exchange(buffer, arguments...) - put!(buffer.inputs, arguments) - take!(buffer.outputs) +function exchange(messanger, arguments...) + put!(messanger.input_channel, arguments) + take!(messanger.output_channel) end # these will only work with sampledsignals scribes function unsafe_write( - sink::PortAudioSink{<:Buffer{<:Any, <:SampledSignalsWriter}}, + sink::PortAudioSink{<:Messanger{<:Any, <:SampledSignalsWriter}}, julia_buffer::Array, - offset, + already, frame_count, ) - exchange(sink.stream.sink_buffer, julia_buffer, offset, frame_count) + exchange(sink.stream.sink_messanger, julia_buffer, already, frame_count) end function unsafe_read!( - source::PortAudioSource{<:Any, <:Buffer{<:Any, <:SampledSignalsReader}}, + source::PortAudioSource{<:Any, <:Messanger{<:Any, <:SampledSignalsReader}}, julia_buffer::Array, - offset, + already, frame_count, ) - exchange(source.stream.source_buffer, julia_buffer, offset, frame_count) + exchange(source.stream.source_messanger, julia_buffer, already, frame_count) end end # module PortAudio diff --git a/test/runtests.jl b/test/runtests.jl index 1b80c8d..7d20e43 100755 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -145,15 +145,16 @@ if !isempty(devices()) @test sprint(show, source) == "2 channel source: $(repr(default_output_device_name))" write(stream, stream, 5s) - sleep(1) @test PaErrorCode(handle_status(Pa_StopStream(stream.pointer_to))) == paNoError @test isopen(stream) close(stream) + sleep(1) @test !isopen(stream) @test !isopen(sink) @test !isopen(source) println("done") end + sleep(1) @testset "Samplerate-converting writing" begin PortAudioStream(0, 2) do stream write( @@ -161,19 +162,14 @@ if !isempty(devices()) SinSource(eltype(stream), samplerate(stream) * 0.8, [220, 330]), 3s, ) - sleep(1) write( stream, SinSource(eltype(stream), samplerate(stream) * 1.2, [220, 330]), 3s, ) - sleep(1) - end - end - @testset "Open Device by name" begin - PortAudioStream(default_input_device_name, default_output_device_name) do stream end end + sleep(1) # no way to check that the right data is actually getting read or written here, # but at least it's not crashing. @testset "Queued Writing" begin @@ -211,6 +207,9 @@ if !isempty(devices()) PortAudioStream(default_input_device_name) do stream @test isopen(stream) end + PortAudioStream(default_input_device_name, default_output_device_name) do stream + @test isopen(stream) + end end @testset "Errors with sound" begin big = typemax(Int)