new C-based ringbuffer handling seems to be mostly working

This commit is contained in:
Spencer Russell 2017-05-09 11:39:42 -04:00
parent ba5f60e097
commit 5c40329df6
9 changed files with 555 additions and 135 deletions

1
.gitignore vendored
View file

@ -6,3 +6,4 @@ deps/deps.jl
*.flac *.flac
*.cov *.cov
coverage coverage
deps/usr

View file

@ -80,3 +80,14 @@ julia> buf = read(stream, 10s)
julia> save(joinpath(homedir(), "Desktop", "myvoice.ogg"), buf) julia> save(joinpath(homedir(), "Desktop", "myvoice.ogg"), buf)
``` ```
## Building the shim library
Because PortAudio calls its callback from a separate audio thread, we can't handle it in Julia directly. To work around this we've included a small shim library written in C that uses ring buffers to pass audio data between the callback context and the main Julia context. To build the shim you'll need a few prerequisites:
* libportaudio
* make
* a C compiler (gcc on linux/macOS, mingw64 on Windows)
* The `RingBuffers` julia package, installed in a folder next to this one. The portaudio shim links against the `pa_ringbuffer` library that comes with `RingBuffers`.
To build the shim, go into the `deps/src` directory and type `make`.

View file

@ -1,8 +1,7 @@
julia 0.4 julia 0.6
Compat 0.8.8
BinDeps BinDeps
Devectorize
SampledSignals 0.3.0 SampledSignals 0.3.0
RingBuffers 0.1.0 RingBuffers 0.1.0
Suppressor
@osx Homebrew @osx Homebrew
@windows WinRPM @windows WinRPM

66
deps/src/Makefile vendored Normal file
View file

@ -0,0 +1,66 @@
# Makefile lifted from Clang.jl
# Copyright (c) 2012-: Isaiah Norton and [contributors](https://github.com/ihnorton/Clang.jl/graphs/contributors)
ifeq (exists, $(shell [ -e Make.user ] && echo exists ))
include Make.user
endif
#check-env:
#ifndef JULIA_INC
# $(error Environment variable JULIA_INC is not set.)
#endif
#INC =-I"$(JULIA_INC)"
CFLAGS =-Wall -Wno-strict-aliasing -fno-omit-frame-pointer -fPIC -g
LDFLAGS =-L../../../RingBuffers/deps/usr/lib -lpa_ringbuffer
# LINUX_LIBS =-lrt
# LINUX_LDFLAGS =-rdynamic
# add the Homebrew.jl tree to the include dirs in case we used it for
# portaudio and libsndfile
# DARWIN_LDFLAGS =-L../../../Homebrew/deps/usr/lib
# DARWIN_INC =-I../../../Homebrew/deps/usr/include
TARGETDIR=../usr/lib
OBJS = pa_shim.o
# Figure out OS and architecture
OS = $(shell uname)
ifneq (,$(findstring MINGW,$(OS)))
OS=WINNT
endif
# file extensions and platform-specific libs
ifeq ($(OS), WINNT)
LIBS += $(WINNT_LIBS)
LDFLAGS += $(WINNT_LDFLAGS)
INC += $(WINNT_INC)
SHLIB_EXT = dll
else ifeq ($(OS), Darwin)
INC += $(DARWIN_INC)
LDFLAGS += $(DARWIN_LDFLAGS)
SHLIB_EXT = dylib
else
LIBS += $(LINUX_LIBS)
LDFLAGS += $(LINUX_LDFLAGS)
INC += $(LINUX_INC)
SHLIB_EXT = so
endif
TARGET=$(TARGETDIR)/pa_shim.$(SHLIB_EXT)
.PHONY: clean default
default: $(TARGET)
%.o: %.c Makefile
$(CC) $< -fPIC -c -o $@ $(INC) $(CFLAGS)
$(TARGETDIR):
mkdir -p $@
$(TARGET): $(OBJS) $(TARGETDIR) Makefile
$(CC) $(OBJS) -shared -o $@ $(LDFLAGS) $(LIBS)
clean:
rm -f $(OBJS)
rm -f $(TARGET)

236
deps/src/pa_ringbuffer.h vendored Normal file
View file

@ -0,0 +1,236 @@
#ifndef PA_RINGBUFFER_H
#define PA_RINGBUFFER_H
/*
* $Id$
* Portable Audio I/O Library
* Ring Buffer utility.
*
* Author: Phil Burk, http://www.softsynth.com
* modified for SMP safety on OS X by Bjorn Roche.
* also allowed for const where possible.
* modified for multiple-byte-sized data elements by Sven Fischer
*
* Note that this is safe only for a single-thread reader
* and a single-thread writer.
*
* This program is distributed with the PortAudio Portable Audio Library.
* For more information see: http://www.portaudio.com
* Copyright (c) 1999-2000 Ross Bencina and Phil Burk
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights to use, copy, modify, merge,
* publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR
* ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
* WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
/*
* The text above constitutes the entire PortAudio license; however,
* the PortAudio community also makes the following non-binding requests:
*
* Any person wishing to distribute modifications to the Software is
* requested to send the modifications to the original developer so that
* they can be incorporated into the canonical version. It is also
* requested that these non-binding requests be included along with the
* license above.
*/
/** @file
@ingroup common_src
@brief Single-reader single-writer lock-free ring buffer
PaUtilRingBuffer is a ring buffer used to transport samples between
different execution contexts (threads, OS callbacks, interrupt handlers)
without requiring the use of any locks. This only works when there is
a single reader and a single writer (ie. one thread or callback writes
to the ring buffer, another thread or callback reads from it).
The PaUtilRingBuffer structure manages a ring buffer containing N
elements, where N must be a power of two. An element may be any size
(specified in bytes).
The memory area used to store the buffer elements must be allocated by
the client prior to calling PaUtil_InitializeRingBuffer() and must outlive
the use of the ring buffer.
@note The ring buffer functions are not normally exposed in the PortAudio libraries.
If you want to call them then you will need to add pa_ringbuffer.c to your application source code.
*/
#if defined(__APPLE__)
#include <sys/types.h>
typedef int32_t ring_buffer_size_t;
#elif defined( __GNUC__ )
typedef long ring_buffer_size_t;
#elif (_MSC_VER >= 1400)
typedef long ring_buffer_size_t;
#elif defined(_MSC_VER) || defined(__BORLANDC__)
typedef long ring_buffer_size_t;
#else
typedef long ring_buffer_size_t;
#endif
#ifdef __cplusplus
extern "C"
{
#endif /* __cplusplus */
typedef struct PaUtilRingBuffer
{
ring_buffer_size_t bufferSize; /**< Number of elements in FIFO. Power of 2. Set by PaUtil_InitRingBuffer. */
volatile ring_buffer_size_t writeIndex; /**< Index of next writable element. Set by PaUtil_AdvanceRingBufferWriteIndex. */
volatile ring_buffer_size_t readIndex; /**< Index of next readable element. Set by PaUtil_AdvanceRingBufferReadIndex. */
ring_buffer_size_t bigMask; /**< Used for wrapping indices with extra bit to distinguish full/empty. */
ring_buffer_size_t smallMask; /**< Used for fitting indices to buffer. */
ring_buffer_size_t elementSizeBytes; /**< Number of bytes per element. */
char *buffer; /**< Pointer to the buffer containing the actual data. */
}PaUtilRingBuffer;
/** Initialize Ring Buffer to empty state ready to have elements written to it.
@param rbuf The ring buffer.
@param elementSizeBytes The size of a single data element in bytes.
@param elementCount The number of elements in the buffer (must be a power of 2).
@param dataPtr A pointer to a previously allocated area where the data
will be maintained. It must be elementCount*elementSizeBytes long.
@return -1 if elementCount is not a power of 2, otherwise 0.
*/
ring_buffer_size_t PaUtil_InitializeRingBuffer( PaUtilRingBuffer *rbuf, ring_buffer_size_t elementSizeBytes, ring_buffer_size_t elementCount, void *dataPtr );
/** Reset buffer to empty. Should only be called when buffer is NOT being read or written.
@param rbuf The ring buffer.
*/
void PaUtil_FlushRingBuffer( PaUtilRingBuffer *rbuf );
/** Retrieve the number of elements available in the ring buffer for writing.
@param rbuf The ring buffer.
@return The number of elements available for writing.
*/
ring_buffer_size_t PaUtil_GetRingBufferWriteAvailable( const PaUtilRingBuffer *rbuf );
/** Retrieve the number of elements available in the ring buffer for reading.
@param rbuf The ring buffer.
@return The number of elements available for reading.
*/
ring_buffer_size_t PaUtil_GetRingBufferReadAvailable( const PaUtilRingBuffer *rbuf );
/** Write data to the ring buffer.
@param rbuf The ring buffer.
@param data The address of new data to write to the buffer.
@param elementCount The number of elements to be written.
@return The number of elements written.
*/
ring_buffer_size_t PaUtil_WriteRingBuffer( PaUtilRingBuffer *rbuf, const void *data, ring_buffer_size_t elementCount );
/** Read data from the ring buffer.
@param rbuf The ring buffer.
@param data The address where the data should be stored.
@param elementCount The number of elements to be read.
@return The number of elements read.
*/
ring_buffer_size_t PaUtil_ReadRingBuffer( PaUtilRingBuffer *rbuf, void *data, ring_buffer_size_t elementCount );
/** Get address of region(s) to which we can write data.
@param rbuf The ring buffer.
@param elementCount The number of elements desired.
@param dataPtr1 The address where the first (or only) region pointer will be
stored.
@param sizePtr1 The address where the first (or only) region length will be
stored.
@param dataPtr2 The address where the second region pointer will be stored if
the first region is too small to satisfy elementCount.
@param sizePtr2 The address where the second region length will be stored if
the first region is too small to satisfy elementCount.
@return The room available to be written or elementCount, whichever is smaller.
*/
ring_buffer_size_t PaUtil_GetRingBufferWriteRegions( PaUtilRingBuffer *rbuf, ring_buffer_size_t elementCount,
void **dataPtr1, ring_buffer_size_t *sizePtr1,
void **dataPtr2, ring_buffer_size_t *sizePtr2 );
/** Advance the write index to the next location to be written.
@param rbuf The ring buffer.
@param elementCount The number of elements to advance.
@return The new position.
*/
ring_buffer_size_t PaUtil_AdvanceRingBufferWriteIndex( PaUtilRingBuffer *rbuf, ring_buffer_size_t elementCount );
/** Get address of region(s) from which we can read data.
@param rbuf The ring buffer.
@param elementCount The number of elements desired.
@param dataPtr1 The address where the first (or only) region pointer will be
stored.
@param sizePtr1 The address where the first (or only) region length will be
stored.
@param dataPtr2 The address where the second region pointer will be stored if
the first region is too small to satisfy elementCount.
@param sizePtr2 The address where the second region length will be stored if
the first region is too small to satisfy elementCount.
@return The number of elements available for reading.
*/
ring_buffer_size_t PaUtil_GetRingBufferReadRegions( PaUtilRingBuffer *rbuf, ring_buffer_size_t elementCount,
void **dataPtr1, ring_buffer_size_t *sizePtr1,
void **dataPtr2, ring_buffer_size_t *sizePtr2 );
/** Advance the read index to the next location to be read.
@param rbuf The ring buffer.
@param elementCount The number of elements to advance.
@return The new position.
*/
ring_buffer_size_t PaUtil_AdvanceRingBufferReadIndex( PaUtilRingBuffer *rbuf, ring_buffer_size_t elementCount );
#ifdef __cplusplus
}
#endif /* __cplusplus */
#endif /* PA_RINGBUFFER_H */

94
deps/src/pa_shim.c vendored Normal file
View file

@ -0,0 +1,94 @@
#include <portaudio.h>
#include "pa_ringbuffer.h"
#include <stdio.h>
#include <unistd.h>
#define SHIM_VERSION 3
#define MIN(x, y) ((x) < (y) ? (x) : (y))
typedef enum {
PA_SHIM_ERRMSG_OVERFLOW, // input overflow
PA_SHIM_ERRMSG_UNDERFLOW, // output underflow
PA_SHIM_ERRMSG_ERR_OVERFLOW, // error buffer overflowed
} pa_shim_errmsg_t;
// this callback type is used to notify the Julia side that the portaudio
// callback has run
typedef void (*pa_shim_notifycb_t)(void *userdata);
// This struct is shared between the Julia side and C
typedef struct {
PaUtilRingBuffer *inputbuf; // ringbuffer for input
PaUtilRingBuffer *outputbuf; // ringbuffer for output
PaUtilRingBuffer *errorbuf; // ringbuffer to send error notifications
int sync; // keep input/output ring buffers synchronized (0/1)
pa_shim_notifycb_t notifycb; // Julia callback to notify conditions
void *inputhandle; // condition to notify on new input
void *outputhandle; // condition to notify when ready for output
void *errorhandle; // condition to notify on new error
} pa_shim_info_t;
void senderr(pa_shim_info_t *info, pa_shim_errmsg_t msg) {
if(PaUtil_GetRingBufferWriteAvailable(info->errorbuf) < 2) {
// we've overflowed our error buffer! notify the host.
msg = PA_SHIM_ERRMSG_ERR_OVERFLOW;
}
PaUtil_WriteRingBuffer(info->errorbuf, &msg, 1);
if(info->notifycb) {
info->notifycb(info->errorhandle);
}
}
// return the version of the shim so we can make sure things are in sync
int pa_shim_getversion(void)
{
return SHIM_VERSION;
}
/*
* This routine will be called by the PortAudio engine when audio is needed.
* It may called at interrupt level on some machines so don't do anything that
* could mess up the system like calling malloc() or free().
*/
int pa_shim_processcb(const void *input, void *output,
unsigned long frameCount,
const PaStreamCallbackTimeInfo* timeInfo,
PaStreamCallbackFlags statusFlags,
void *userData)
{
pa_shim_info_t *info = userData;
if(info->notifycb == NULL) {
fprintf(stderr, "pa_shim ERROR: notifycb is NULL\n");
}
int nwrite = PaUtil_GetRingBufferWriteAvailable(info->inputbuf);
int nread = PaUtil_GetRingBufferReadAvailable(info->outputbuf);
nwrite = MIN(frameCount, nwrite);
nread = MIN(frameCount, nread);
if(info->sync) {
// to keep the buffers synchronized, set readable and writable to
// their minimum value
nread = MIN(nread, nwrite);
nwrite = nread;
}
// read/write from the ringbuffers
PaUtil_WriteRingBuffer(info->inputbuf, input, nwrite);
if(info->notifycb) {
info->notifycb(info->inputhandle);
}
PaUtil_ReadRingBuffer(info->outputbuf, output, nread);
if(info->notifycb) {
info->notifycb(info->outputhandle);
}
if(nwrite < frameCount) {
senderr(info, PA_SHIM_ERRMSG_OVERFLOW);
}
if(nread < frameCount) {
senderr(info, PA_SHIM_ERRMSG_UNDERFLOW);
// we didn't fill the whole output buffer, so zero it out
memset(output+nread*info->outputbuf->elementSizeBytes, 0,
(frameCount - nread)*info->outputbuf->elementSizeBytes);
}
return paContinue;
}

View file

@ -3,51 +3,50 @@ __precompile__()
module PortAudio module PortAudio
using SampledSignals using SampledSignals
using Devectorize
using RingBuffers using RingBuffers
using Compat using Suppressor
import Compat: UTF8String, view
using Base: AsyncCondition
# Get binary dependencies loaded from BinDeps # Get binary dependencies loaded from BinDeps
include( "../deps/deps.jl") include("../deps/deps.jl")
include("pa_shim.jl")
include("libportaudio.jl") include("libportaudio.jl")
function __init__()
init_pa_shim()
global const notifycb_c = cfunction(notifycb, Cint, (Ptr{Void}, ))
# initialize PortAudio on module load
@suppress_err Pa_Initialize()
end
export PortAudioStream export PortAudioStream
# These sizes are all in frames # These sizes are all in frames
# the block size is what we request from portaudio if no blocksize is given. # the block size is what we request from portaudio if no blocksize is given.
# The ringbuffer and pre-fill will be twice the blocksize # The ringbuffer and pre-fill will be twice the blocksize
const DEFAULT_BLOCKSIZE=4096 const DEFAULT_BLOCKSIZE=4096
# data is passed to and from the ringbuffer in chunks with this many frames # data is passed to and from the ringbuffer in chunks with this many frames
# it should be at most the ringbuffer size, and must evenly divide into the # it should be at most the ringbuffer size, and must evenly divide into the
# the underlying portaudio buffer size. E.g. if PortAudio is running with a # the underlying portaudio buffer size. E.g. if PortAudio is running with a
# 2048-frame buffer period, the chunk size can be 2048, 1024, 512, 256, etc. # 2048-frame buffer period, the chunk size can be 2048, 1024, 512, 256, etc.
const CHUNKSIZE=128 const CHUNKSIZE=128
function __init__() # ringbuffer to receive errors from the audio processing thread
# initialize PortAudio on module load const ERR_BUFSIZE=512
swallow_stderr() do
Pa_Initialize()
end
# the portaudio callbacks are parametric on the sample type
global const pa_callbacks = Dict{Type, Ptr{Void}}()
for T in (Float32, Int32, Int16, Int8, UInt8)
pa_callbacks[T] = cfunction(portaudio_callback, Cint,
(Ptr{T}, Ptr{T}, Culong, Ptr{Void}, Culong,
Ptr{CallbackInfo{T}}))
end
end
function versioninfo(io::IO=STDOUT) function versioninfo(io::IO=STDOUT)
println(io, Pa_GetVersionText()) println(io, Pa_GetVersionText())
println(io, "Version Number: ", Pa_GetVersion()) println(io, "Version: ", Pa_GetVersion())
println(io, "Shim Version: ", shimversion())
end end
type PortAudioDevice type PortAudioDevice
name::UTF8String name::String
hostapi::UTF8String hostapi::String
maxinchans::Int maxinchans::Int
maxoutchans::Int maxoutchans::Int
defaultsamplerate::Float64 defaultsamplerate::Float64
@ -71,23 +70,9 @@ end
# not for external use, used in error message printing # not for external use, used in error message printing
devnames() = join(["\"$(dev.name)\"" for dev in devices()], "\n") devnames() = join(["\"$(dev.name)\"" for dev in devices()], "\n")
"""Give a pointer to the given field within a Julia object""" ##################
function fieldptr{T}(obj::T, field::Symbol) # PortAudioStream
fieldnum = findfirst(fieldnames(T), field) ##################
offset = fieldoffset(T, fieldnum)
FT = fieldtype(T, field)
Ptr{FT}(pointer_from_objref(obj) + offset)
end
# we want this to be immutable so we can stack allocate it
immutable CallbackInfo{T}
inchannels::Int
inbuf::LockFreeRingBuffer{T}
outchannels::Int
outbuf::LockFreeRingBuffer{T}
synced::Bool
end
type PortAudioStream{T} type PortAudioStream{T}
samplerate::Float64 samplerate::Float64
@ -95,12 +80,13 @@ type PortAudioStream{T}
stream::PaStream stream::PaStream
sink # untyped because of circular type definition sink # untyped because of circular type definition
source # untyped because of circular type definition source # untyped because of circular type definition
bufinfo::CallbackInfo{T} # immutable data used in the portaudio callback errbuf::RingBuffer{pa_shim_errmsg_t} # used to send errors from the portaudio callback
bufinfo::pa_shim_info_t # data used in the portaudio callback
# this inner constructor is generally called via the top-level outer # this inner constructor is generally called via the top-level outer
# constructor below # constructor below
function PortAudioStream(indev::PortAudioDevice, outdev::PortAudioDevice, function PortAudioStream{T}(indev::PortAudioDevice, outdev::PortAudioDevice,
inchans, outchans, sr, blocksize, synced) inchans, outchans, sr, blocksize, synced) where {T}
inchans = inchans == -1 ? indev.maxinchans : inchans inchans = inchans == -1 ? indev.maxinchans : inchans
outchans = outchans == -1 ? outdev.maxoutchans : outchans outchans = outchans == -1 ? outdev.maxoutchans : outchans
inparams = (inchans == 0) ? inparams = (inchans == 0) ?
@ -113,18 +99,25 @@ type PortAudioStream{T}
finalizer(this, close) finalizer(this, close)
this.sink = PortAudioSink{T}(outdev.name, this, outchans, blocksize*2) this.sink = PortAudioSink{T}(outdev.name, this, outchans, blocksize*2)
this.source = PortAudioSource{T}(indev.name, this, inchans, blocksize*2) this.source = PortAudioSource{T}(indev.name, this, inchans, blocksize*2)
this.errbuf = RingBuffer{pa_shim_errmsg_t}(1, ERR_BUFSIZE)
if synced && inchans > 0 && outchans > 0 if synced && inchans > 0 && outchans > 0
# we've got a synchronized duplex stream. initialize with the output buffer full # we've got a synchronized duplex stream. initialize with the output buffer full
write(this.sink, SampleBuf(zeros(T, blocksize*2, outchans), sr)) write(this.sink, SampleBuf(zeros(T, blocksize*2, outchans), sr))
end end
this.bufinfo = CallbackInfo(inchans, this.source.ringbuf, this.bufinfo = pa_shim_info_t(bufpointer(this.source),
outchans, this.sink.ringbuf, synced) bufpointer(this.sink),
this.stream = swallow_stderr() do pointer(this.errbuf),
Pa_OpenStream(inparams, outparams, float(sr), blocksize, synced, notifycb_c,
paNoFlag, pa_callbacks[T], fieldptr(this, :bufinfo)) getnotifyhandle(this.sink),
end getnotifyhandle(this.source),
getnotifyhandle(this.errbuf))
this.stream = @suppress_err Pa_OpenStream(inparams, outparams,
float(sr), blocksize,
paNoFlag, shim_processcb_c,
this.bufinfo)
Pa_StartStream(this.stream) Pa_StartStream(this.stream)
@async handle_errors(this)
this this
end end
@ -219,28 +212,58 @@ function Base.show(io::IO, stream::PortAudioStream)
println(io, " Samplerate: ", samplerate(stream), "Hz") println(io, " Samplerate: ", samplerate(stream), "Hz")
print(io, " Buffer Size: ", stream.blocksize, " frames") print(io, " Buffer Size: ", stream.blocksize, " frames")
if nchannels(stream.sink) > 0 if nchannels(stream.sink) > 0
print(io, "\n ", nchannels(stream.sink), " channel sink: \"", stream.sink.name, "\"") print(io, "\n ", nchannels(stream.sink), " channel sink: \"", name(stream.sink), "\"")
end end
if nchannels(stream.source) > 0 if nchannels(stream.source) > 0
print(io, "\n ", nchannels(stream.source), " channel source: \"", stream.source.name, "\"") print(io, "\n ", nchannels(stream.source), " channel source: \"", name(stream.source), "\"")
end end
end end
"""
handle_errors(stream::PortAudioStream)
Handle errors coming over the error stream from PortAudio. This is run as an
independent task while the stream is active.
"""
function handle_errors(stream::PortAudioStream)
err = Vector{pa_shim_errmsg_t}(1)
while true
nread = read!(stream.errbuf, err)
nread == 1 || break
if err[1] == PA_SHIM_ERRMSG_ERR_OVERFLOW
warn("Error buffer overflowed on stream $(stream.name)")
elseif err[1] == PA_SHIM_ERRMSG_OVERFLOW
# warn("Input overflowed from $(name(stream.source))")
elseif err[1] == PA_SHIM_ERRMSG_UNDERFLOW
# warn("Output underflowed to $(name(stream.sink))")
else
error("""
Got unrecognized error code $(err[1]) from audio thread for
stream "$(stream.name)". Please file an issue at
https://github.com/juliaaudio/portaudio.jl/issues""")
end
end
end
##################################
# PortAudioSink & PortAudioSource
##################################
# Define our source and sink types # Define our source and sink types
for (TypeName, Super) in ((:PortAudioSink, :SampleSink), for (TypeName, Super) in ((:PortAudioSink, :SampleSink),
(:PortAudioSource, :SampleSource)) (:PortAudioSource, :SampleSource))
@eval type $TypeName{T} <: $Super @eval type $TypeName{T} <: $Super
name::UTF8String name::String
stream::PortAudioStream{T} stream::PortAudioStream{T}
chunkbuf::Array{T, 2} chunkbuf::Array{T, 2}
ringbuf::LockFreeRingBuffer{T} ringbuf::RingBuffer{T}
nchannels::Int nchannels::Int
function $TypeName(name, stream, channels, ringbufsize) function $TypeName{T}(name, stream, channels, ringbufsize) where {T}
# portaudio data comes in interleaved, so we'll end up transposing # portaudio data comes in interleaved, so we'll end up transposing
# it back and forth to julia column-major # it back and forth to julia column-major
chunkbuf = zeros(T, channels, CHUNKSIZE) chunkbuf = zeros(T, channels, CHUNKSIZE)
ringbuf = LockFreeRingBuffer(T, ringbufsize * channels) ringbuf = RingBuffer{T}(channels, ringbufsize)
new(name, stream, chunkbuf, ringbuf, channels) new(name, stream, chunkbuf, ringbuf, channels)
end end
end end
@ -249,35 +272,35 @@ end
SampledSignals.nchannels(s::Union{PortAudioSink, PortAudioSource}) = s.nchannels SampledSignals.nchannels(s::Union{PortAudioSink, PortAudioSource}) = s.nchannels
SampledSignals.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s.stream) SampledSignals.samplerate(s::Union{PortAudioSink, PortAudioSource}) = samplerate(s.stream)
SampledSignals.blocksize(s::Union{PortAudioSink, PortAudioSource}) = s.stream.blocksize SampledSignals.blocksize(s::Union{PortAudioSink, PortAudioSource}) = s.stream.blocksize
Base.eltype{T}(::Union{PortAudioSink{T}, PortAudioSource{T}}) = T Base.eltype(::Union{PortAudioSink{T}, PortAudioSource{T}}) where {T} = T
Base.close(s::Union{PortAudioSink, PortAudioSource}) = close(s.ringbuf) Base.close(s::Union{PortAudioSink, PortAudioSource}) = close(s.ringbuf)
Base.isopen(s::Union{PortAudioSink, PortAudioSource}) = isopen(s.ringbuf)
RingBuffers.getnotifyhandle(s::Union{PortAudioSink, PortAudioSource}) = getnotifyhandle(s.ringbuf)
bufpointer(s::Union{PortAudioSink, PortAudioSource}) = pointer(s.ringbuf)
name(s::Union{PortAudioSink, PortAudioSource}) = s.name
function Base.show{T <: Union{PortAudioSink, PortAudioSource}}(io::IO, stream::T) function Base.show(io::IO, stream::T) where {T <: Union{PortAudioSink, PortAudioSource}}
println(io, T, "(\"", stream.name, "\")") println(io, T, "(\"", stream.name, "\")")
print(io, nchannels(stream), " channels") print(io, nchannels(stream), " channels")
end end
function Base.flush(sink::PortAudioSink) # function Base.flush(sink::PortAudioSink)
while nwritable(sink.ringbuf) < length(sink.ringbuf) # while nwritable(sink.ringbuf) < length(sink.ringbuf)
wait(sink.ringbuf) # wait(sink.ringbuf)
end # end
end # end
function SampledSignals.unsafe_write(sink::PortAudioSink, buf::Array, frameoffset, framecount) function SampledSignals.unsafe_write(sink::PortAudioSink, buf::Array, frameoffset, framecount)
nwritten = 0 nwritten = 0
while nwritten < framecount while nwritten < framecount
while nwritable(sink.ringbuf) == 0 towrite = min(framecount-nwritten, CHUNKSIZE)
wait(sink.ringbuf)
end
# in 0.4 transpose! throws an error if the range is a UInt
writable = div(nwritable(sink.ringbuf), nchannels(sink))
towrite = Int(min(writable, CHUNKSIZE, framecount-nwritten))
# make a buffer of interleaved samples # make a buffer of interleaved samples
transpose!(view(sink.chunkbuf, :, 1:towrite), transpose!(view(sink.chunkbuf, :, 1:towrite),
view(buf, (1:towrite)+nwritten+frameoffset, :)) view(buf, (1:towrite)+nwritten+frameoffset, :))
write(sink.ringbuf, sink.chunkbuf, towrite*nchannels(sink)) n = write(sink.ringbuf, sink.chunkbuf, towrite)
nwritten += n
nwritten += towrite # break early if the stream is closed
n < towrite && break
end end
nwritten nwritten
@ -286,66 +309,23 @@ end
function SampledSignals.unsafe_read!(source::PortAudioSource, buf::Array, frameoffset, framecount) function SampledSignals.unsafe_read!(source::PortAudioSource, buf::Array, frameoffset, framecount)
nread = 0 nread = 0
while nread < framecount while nread < framecount
while nreadable(source.ringbuf) == 0 toread = min(framecount-nread, CHUNKSIZE)
wait(source.ringbuf) n = read!(source.ringbuf, source.chunkbuf, toread)
end
# in 0.4 transpose! throws an error if the range is a UInt
readable = div(nreadable(source.ringbuf), nchannels(source))
toread = Int(min(readable, CHUNKSIZE, framecount-nread))
read!(source.ringbuf, source.chunkbuf, toread*nchannels(source))
# de-interleave the samples # de-interleave the samples
transpose!(view(buf, (1:toread)+nread+frameoffset, :), transpose!(view(buf, (1:toread)+nread+frameoffset, :),
view(source.chunkbuf, :, 1:toread)) view(source.chunkbuf, :, 1:toread))
nread += toread nread += toread
# break early if the stream is closed
n < toread && break
end end
nread nread
end end
# This is the callback function that gets called directly in the PortAudio # this is called by the shim process callback to notify that there is new data.
# audio thread, so it's critical that it not interact with the Julia GC # it's run in the audio context so don't do anything besides wake up the
function portaudio_callback{T}(inptr::Ptr{T}, outptr::Ptr{T}, # AsyncCondition
nframes, timeinfo, flags, userdata::Ptr{CallbackInfo{T}}) notifycb(handle) = ccall(:uv_async_send, Cint, (Ptr{Void}, ), handle)
info = unsafe_load(userdata)
# if there are no channels, treat it as if we can write as many 0-frame channels as we want
framesreadable = info.outchannels > 0 ? div(nreadable(info.outbuf), info.outchannels) : nframes
frameswritable = info.inchannels > 0 ? div(nwritable(info.inbuf), info.inchannels) : nframes
if info.synced
framesreadable = min(framesreadable, frameswritable)
frameswritable = framesreadable
end
towrite = min(frameswritable, nframes) * info.inchannels
toread = min(framesreadable, nframes) * info.outchannels
read!(info.outbuf, outptr, toread)
write(info.inbuf, inptr, towrite)
if framesreadable < nframes
outsamples = nframes * info.outchannels
# xrun, copy zeros to outbuffer
# TODO: send a notification to an error msg ringbuf
memset(outptr+sizeof(T)*toread, 0, sizeof(T)*(outsamples-toread))
end
paContinue
end
"""Call the given function and discard stdout and stderr"""
function swallow_stderr(f)
origerr = STDERR
(errread, errwrite) = redirect_stderr()
result = f()
redirect_stderr(origerr)
close(errwrite)
close(errread)
result
end
memset(buf, val, count) = ccall(:memset, Ptr{Void},
(Ptr{Void}, Cint, Csize_t),
buf, val, count)
end # module PortAudio end # module PortAudio

View file

@ -1,17 +1,17 @@
# Low-level wrappers for Portaudio calls # Low-level wrappers for Portaudio calls
# General type aliases # General type aliases
typealias PaTime Cdouble const PaTime = Cdouble
typealias PaError Cint const PaError = Cint
typealias PaSampleFormat Culong const PaSampleFormat = Culong
typealias PaDeviceIndex Cint const PaDeviceIndex = Cint
typealias PaHostApiIndex Cint const PaHostApiIndex = Cint
typealias PaHostApiTypeId Cint const PaHostApiTypeId = Cint
# PaStream is always used as an opaque type, so we're always dealing # PaStream is always used as an opaque type, so we're always dealing
# with the pointer # with the pointer
typealias PaStream Ptr{Void} const PaStream = Ptr{Void}
typealias PaStreamCallback Void const PaStreamCallback = Void
typealias PaStreamFlags Culong const PaStreamFlags = Culong
const paNoFlag = PaStreamFlags(0x00) const paNoFlag = PaStreamFlags(0x00)
@ -37,7 +37,7 @@ const type_to_fmt = Dict{Type, PaSampleFormat}(
UInt8 => 3 UInt8 => 3
) )
typealias PaStreamCallbackResult Cint const PaStreamCallbackResult = Cint
# Callback return values # Callback return values
const paContinue = PaStreamCallbackResult(0) const paContinue = PaStreamCallbackResult(0)
const paComplete = PaStreamCallbackResult(1) const paComplete = PaStreamCallbackResult(1)
@ -69,7 +69,7 @@ end
# all the host APIs on the system by iterating through those values. # all the host APIs on the system by iterating through those values.
# PaHostApiTypeId values # PaHostApiTypeId values
const pa_host_api_names = Dict{PaHostApiTypeId, UTF8String}( const pa_host_api_names = Dict{PaHostApiTypeId, String}(
0 => "In Development", # use while developing support for a new host API 0 => "In Development", # use while developing support for a new host API
1 => "Direct Sound", 1 => "Direct Sound",
2 => "MME", 2 => "MME",

33
src/pa_shim.jl Normal file
View file

@ -0,0 +1,33 @@
const libpa_shim = Libdl.find_library(
["pa_shim"],
[joinpath(dirname(@__FILE__), "..", "deps", "usr", "lib")])
function init_pa_shim()
shim_dlib = Libdl.dlopen(libpa_shim)
# pointer to the shim's process callback
global const shim_processcb_c = Libdl.dlsym(shim_dlib, :pa_shim_processcb)
if shim_processcb_c == C_NULL
error("Got NULL pointer loading `pa_shim_processcb`")
end
end
const pa_shim_errmsg_t = Cint
const PA_SHIM_ERRMSG_OVERFLOW = Cint(0) # input overflow
const PA_SHIM_ERRMSG_UNDERFLOW = Cint(1) # output underflow
const PA_SHIM_ERRMSG_ERR_OVERFLOW = Cint(2) # error buffer overflowed
# This struct is shared with pa_shim.c
mutable struct pa_shim_info_t
inputbuf::Ptr{PaUtilRingBuffer} # ringbuffer for input
outputbuf::Ptr{PaUtilRingBuffer} # ringbuffer for output
errorbuf::Ptr{PaUtilRingBuffer} # ringbuffer to send error notifications
sync::Cint # keep input/output ring buffers synchronized (0/1)
notifycb::Ptr{Void} # Julia callback to notify on updates (called from audio thread)
inputhandle::Ptr{Void} # condition to notify on new input data
outputhandle::Ptr{Void} # condition to notify when ready for output
errorhandle::Ptr{Void} # condition to notify on new errors
end
shimversion() = ccall((:pa_shim_getversion, libpa_shim), Cint, ())
Base.unsafe_convert(::Type{Ptr{Void}}, info::pa_shim_info_t) = pointer_from_objref(info)