Example Programs

Chatty Client

chatty_client.py

#!/usr/bin/env python3

"""Create a JACK client that prints a lot of information.

This client registers all possible callbacks (except the process
callback and the timebase callback, which would be just too much noise)
and prints some information whenever they are called.

"""
from __future__ import print_function  # only needed for Python 2.x
import jack

print('setting error/info functions')


@jack.set_error_function
def error(msg):
    print('Error:', msg)


@jack.set_info_function
def info(msg):
    print('Info:', msg)


print('starting chatty client')

client = jack.Client('Chatty-Client')

if client.status.server_started:
    print('JACK server was started')
else:
    print('JACK server was already running')
if client.status.name_not_unique:
    print('unique client name generated:', client.name)


print('registering callbacks')


@client.set_shutdown_callback
def shutdown(status, reason):
    print('JACK shutdown!')
    print('status:', status)
    print('reason:', reason)


@client.set_freewheel_callback
def freewheel(starting):
    print(['stopping', 'starting'][starting], 'freewheel mode')


@client.set_blocksize_callback
def blocksize(blocksize):
    print('setting blocksize to', blocksize)


@client.set_samplerate_callback
def samplerate(samplerate):
    print('setting samplerate to', samplerate)


@client.set_client_registration_callback
def client_registration(name, register):
    print('client', repr(name), ['unregistered', 'registered'][register])


@client.set_port_registration_callback
def port_registration(port, register):
    print(repr(port), ['unregistered', 'registered'][register])


@client.set_port_connect_callback
def port_connect(a, b, connect):
    print(['disconnected', 'connected'][connect], a, 'and', b)


try:
    @client.set_port_rename_callback
    def port_rename(port, old, new):
        print('renamed', port, 'from', repr(old), 'to', repr(new))
except AttributeError:
    print('Could not register port rename callback (not available on JACK1).')


@client.set_graph_order_callback
def graph_order():
    print('graph order changed')


@client.set_xrun_callback
def xrun(delay):
    print('xrun; delay', delay, 'microseconds')


print('activating JACK')
with client:
    print('#' * 80)
    print('press Return to quit')
    print('#' * 80)
    input()
    print('closing JACK')

Pass-Through Client

thru_client.py

#!/usr/bin/env python3

"""Create a JACK client that copies input audio directly to the outputs.

This is somewhat modeled after the "thru_client.c" example of JACK 2:
http://github.com/jackaudio/jack2/blob/master/example-clients/thru_client.c

If you have a microphone and loudspeakers connected, this might cause an
acoustical feedback!

"""
import sys
import signal
import os
import jack
import threading

if sys.version_info < (3, 0):
    # In Python 2.x, event.wait() cannot be interrupted with Ctrl+C.
    # Therefore, we disable the whole KeyboardInterrupt mechanism.
    # This will not close the JACK client properly, but at least we can
    # use Ctrl+C.
    signal.signal(signal.SIGINT, signal.SIG_DFL)
else:
    # If you use Python 3.x, everything is fine.
    pass

argv = iter(sys.argv)
# By default, use script name without extension as client name:
defaultclientname = os.path.splitext(os.path.basename(next(argv)))[0]
clientname = next(argv, defaultclientname)
servername = next(argv, None)

client = jack.Client(clientname, servername=servername)

if client.status.server_started:
    print('JACK server started')
if client.status.name_not_unique:
    print('unique name {0!r} assigned'.format(client.name))

event = threading.Event()


@client.set_process_callback
def process(frames):
    assert len(client.inports) == len(client.outports)
    assert frames == client.blocksize
    for i, o in zip(client.inports, client.outports):
        o.get_buffer()[:] = i.get_buffer()


@client.set_shutdown_callback
def shutdown(status, reason):
    print('JACK shutdown!')
    print('status:', status)
    print('reason:', reason)
    event.set()


# create two port pairs
for number in 1, 2:
    client.inports.register('input_{0}'.format(number))
    client.outports.register('output_{0}'.format(number))

with client:
    # When entering this with-statement, client.activate() is called.
    # This tells the JACK server that we are ready to roll.
    # Our process() callback will start running now.

    # Connect the ports.  You can't do this before the client is activated,
    # because we can't make connections to clients that aren't running.
    # Note the confusing (but necessary) orientation of the driver backend
    # ports: playback ports are "input" to the backend, and capture ports
    # are "output" from it.

    capture = client.get_ports(is_physical=True, is_output=True)
    if not capture:
        raise RuntimeError('No physical capture ports')

    for src, dest in zip(capture, client.inports):
        client.connect(src, dest)

    playback = client.get_ports(is_physical=True, is_input=True)
    if not playback:
        raise RuntimeError('No physical playback ports')

    for src, dest in zip(client.outports, playback):
        client.connect(src, dest)

    print('Press Ctrl+C to stop')
    try:
        event.wait()
    except KeyboardInterrupt:
        print('\nInterrupted by user')

# When the above with-statement is left (either because the end of the
# code block is reached, or because an exception was raised inside),
# client.deactivate() and client.close() are called automatically.

Sound File Playback

play_file.py

#!/usr/bin/env python3

"""Play a sound file.

This only reads a certain number of blocks at a time into memory,
therefore it can handle very long files and also files with many
channels.

NumPy and the soundfile module (http://PySoundFile.rtfd.io/) must be
installed for this to work.

"""
from __future__ import division
from __future__ import print_function
import argparse
try:
    import queue  # Python 3.x
except ImportError:
    import Queue as queue  # Python 2.x
import sys
import threading

parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('filename', help='audio file to be played back')
parser.add_argument(
    '-b', '--buffersize', type=int, default=20,
    help='number of blocks used for buffering (default: %(default)s)')
parser.add_argument('-c', '--clientname', default='file player',
                    help='JACK client name')
parser.add_argument('-m', '--manual', action='store_true',
                    help="don't connect to output ports automatically")
args = parser.parse_args()
if args.buffersize < 1:
    parser.error('buffersize must be at least 1')

q = queue.Queue(maxsize=args.buffersize)
event = threading.Event()


def print_error(*args):
    print(*args, file=sys.stderr)


def xrun(delay):
    print_error("An xrun occured, increase JACK's period size?")


def shutdown(status, reason):
    print_error('JACK shutdown!')
    print_error('status:', status)
    print_error('reason:', reason)
    event.set()


def stop_callback(msg=''):
    if msg:
        print_error(msg)
    for port in client.outports:
        port.get_array().fill(0)
    event.set()
    raise jack.CallbackExit


def process(frames):
    if frames != blocksize:
        stop_callback('blocksize must not be changed, I quit!')
    try:
        data = q.get_nowait()
    except queue.Empty:
        stop_callback('Buffer is empty: increase buffersize?')
    if data is None:
        stop_callback()  # Playback is finished
    for channel, port in zip(data.T, client.outports):
        port.get_array()[:] = channel


try:
    import jack
    import soundfile as sf

    client = jack.Client(args.clientname)
    blocksize = client.blocksize
    samplerate = client.samplerate
    client.set_xrun_callback(xrun)
    client.set_shutdown_callback(shutdown)
    client.set_process_callback(process)

    with sf.SoundFile(args.filename) as f:
        for ch in range(f.channels):
            client.outports.register('out_{0}'.format(ch + 1))
        block_generator = f.blocks(blocksize=blocksize, dtype='float32',
                                   always_2d=True, fill_value=0)
        for _, data in zip(range(args.buffersize), block_generator):
            q.put_nowait(data)  # Pre-fill queue
        with client:
            if not args.manual:
                target_ports = client.get_ports(
                    is_physical=True, is_input=True, is_audio=True)
                if len(client.outports) == 1 and len(target_ports) > 1:
                    # Connect mono file to stereo output
                    client.outports[0].connect(target_ports[0])
                    client.outports[0].connect(target_ports[1])
                else:
                    for source, target in zip(client.outports, target_ports):
                        source.connect(target)
            timeout = blocksize * args.buffersize / samplerate
            for data in block_generator:
                q.put(data, timeout=timeout)
            q.put(None, timeout=timeout)  # Signal end of file
            event.wait()  # Wait until playback is finished
except KeyboardInterrupt:
    parser.exit('\nInterrupted by user')
except (queue.Full):
    # A timeout occured, i.e. there was an error in the callback
    parser.exit(1)
except Exception as e:
    parser.exit(type(e).__name__ + ': ' + str(e))

“Showtime” Client

showtime.py

#!/usr/bin/env python3

"""Display information about time, transport state et cetera.

This is somewhat modeled after the "showtime.c" example of JACK.
https://github.com/jackaudio/example-clients/blob/master/showtime.c
https://github.com/jackaudio/jack2/blob/master/example-clients/showtime.c

"""
from contextlib import suppress
import time
import sys

import jack


try:
    client = jack.Client('showtime')
except jack.JackError:
    sys.exit('JACK server not running?')


def showtime():
    state, pos = client.transport_query()
    items = []
    items.append('frame = {}  frame_time = {} usecs = {} '.format(
        pos['frame'], client.frame_time, pos['usecs']))
    items.append('state: {}'.format(state))
    with suppress(KeyError):
        items.append('BBT: {bar:3}|{beat}|{tick:04}'.format(**pos))
    with suppress(KeyError):
        items.append('TC: ({frame_time:.6f}, {next_time:.6f})'.format(**pos))
    with suppress(KeyError):
     	items.append('BBT offset: ({bbt_offset})'.format(**pos))
    with suppress(KeyError):
     	items.append(
            'audio/video: ({audio_frames_per_video_frame})'.format(**pos))
    with suppress(KeyError):
        video_offset = pos['video_offset']
        if video_offset:
            items.append(' video@: ({})'.format(video_offset))
        else:
            items.append(' no video');
    print(*items, sep='\t')


@client.set_shutdown_callback
def shutdown(status, reason):
    sys.exit('JACK shut down, exiting ...')


with client:
    try:
        while True:
            time.sleep(0.00002)
            showtime()
    except KeyboardInterrupt:
        print('signal received, exiting ...', file=sys.stderr)
        sys.exit(0)

MIDI Monitor

midi_monitor.py

#!/usr/bin/env python3

"""JACK client that prints all received MIDI events."""

import jack
import binascii

client = jack.Client('MIDI-Monitor')
port = client.midi_inports.register('input')


@client.set_process_callback
def process(frames):
    for offset, data in port.incoming_midi_events():
        print('{0}: 0x{1}'.format(client.last_frame_time + offset,
                                  binascii.hexlify(data).decode()))

with client:
    print('#' * 80)
    print('press Return to quit')
    print('#' * 80)
    input()

MIDI Chord Generator

midi_chords.py

#!/usr/bin/env python3

"""JACK client that creates minor triads from single MIDI notes.

All MIDI events are passed through.
Two additional events are created for each NoteOn and NoteOff event.

"""
import jack
import struct

# First 4 bits of status byte:
NOTEON = 0x9
NOTEOFF = 0x8

INTERVALS = 3, 7  # minor triad

client = jack.Client('MIDI-Chord-Generator')
inport = client.midi_inports.register('input')
outport = client.midi_outports.register('output')


@client.set_process_callback
def process(frames):
    outport.clear_buffer()
    for offset, indata in inport.incoming_midi_events():
        # Note: This may raise an exception:
        outport.write_midi_event(offset, indata)  # pass through
        if len(indata) == 3:
            status, pitch, vel = struct.unpack('3B', indata)
            if status >> 4 in (NOTEON, NOTEOFF):
                for i in INTERVALS:
                    # Note: This may raise an exception:
                    outport.write_midi_event(offset, (status, pitch + i, vel))

with client:
    print('#' * 80)
    print('press Return to quit')
    print('#' * 80)
    input()

Simple MIDI Synth

midi_sine.py

#!/usr/bin/env python3
"""Very basic MIDI synthesizer.

This only works in Python 3.x because it uses memoryview.cast() and a
few other sweet Python-3-only features.

This is inspired by the JACK example program "jack_midisine":
http://github.com/jackaudio/jack2/blob/master/example-clients/midisine.c

But it is actually better:

+ ASR envelope
+ unlimited polyphony (well, "only" limited by CPU and memory)
+ arbitrarily many MIDI events per block
+ can handle NoteOn and NoteOff event of the same pitch in one block

It is also worse:

- horribly inefficient (dynamic allocations, sample-wise processing)
- unpredictable because of garbage collection (?)

It sounds a little better than the original, but still quite boring.

"""
import jack
import math
import operator
import threading

# First 4 bits of status byte:
NOTEON = 0x9
NOTEOFF = 0x8

attack = 0.01  # seconds
release = 0.2  # seconds

fs = None
voices = {}

client = jack.Client('MIDI-Sine')
midiport = client.midi_inports.register('midi_in')
audioport = client.outports.register('audio_out')
event = threading.Event()


def m2f(note):
    """Convert MIDI note number to frequency in Hertz.

    See https://en.wikipedia.org/wiki/MIDI_Tuning_Standard.

    """
    return 2 ** ((note - 69) / 12) * 440


class Voice:

    def __init__(self, pitch):
        self.time = 0
        self.time_increment = m2f(pitch) / fs
        self.weight = 0

        self.target_weight = 0
        self.weight_step = 0
        self.compare = None

    def trigger(self, vel):
        if vel:
            dur = attack * fs
        else:
            dur = release * fs
        self.target_weight = vel / 127
        self.weight_step = (self.target_weight - self.weight) / dur
        self.compare = operator.ge if self.weight_step > 0 else operator.le

    def update(self):
        """Increment weight."""
        if self.weight_step:
            self.weight += self.weight_step
            if self.compare(self.weight, self.target_weight):
                self.weight = self.target_weight
                self.weight_step = 0


@client.set_process_callback
def process(frames):
    """Main callback."""
    events = {}
    buf = memoryview(audioport.get_buffer()).cast('f')
    for offset, data in midiport.incoming_midi_events():
        if len(data) == 3:
            status, pitch, vel = bytes(data)
            # MIDI channel number is ignored!
            status >>= 4
            if status == NOTEON and vel > 0:
                events.setdefault(offset, []).append((pitch, vel))
            elif status in (NOTEON, NOTEOFF):
                # NoteOff velocity is ignored!
                events.setdefault(offset, []).append((pitch, 0))
            else:
                pass  # ignore
        else:
            pass  # ignore
    for i in range(len(buf)):
        buf[i] = 0
        try:
            eventlist = events[i]
        except KeyError:
            pass
        else:
            for pitch, vel in eventlist:
                if pitch not in voices:
                    if not vel:
                        break
                    voices[pitch] = Voice(pitch)
                voices[pitch].trigger(vel)
        for voice in voices.values():
            voice.update()
            if voice.weight > 0:
                buf[i] += voice.weight * math.sin(2 * math.pi * voice.time)
                voice.time += voice.time_increment
                if voice.time >= 1:
                    voice.time -= 1
    dead = [k for k, v in voices.items() if v.weight <= 0]
    for pitch in dead:
        del voices[pitch]


@client.set_samplerate_callback
def samplerate(samplerate):
    global fs
    fs = samplerate
    voices.clear()


@client.set_shutdown_callback
def shutdown(status, reason):
    print('JACK shutdown:', reason, status)
    event.set()


with client:
    print('Press Ctrl+C to stop')
    try:
        event.wait()
    except KeyboardInterrupt:
        print('\nInterrupted by user')

Simple MIDI Synth (NumPy Edition)

midi_sine_numpy.py

#!/usr/bin/env python3
"""Very basic MIDI synthesizer.

This does the same as midi_sine.py, but it uses NumPy and block
processing.  It is therefore much more efficient.  But there are still
many allocations and dynamically growing and shrinking data structures.

"""
import jack
import numpy as np
import threading

# First 4 bits of status byte:
NOTEON = 0x9
NOTEOFF = 0x8

attack_seconds = 0.01
release_seconds = 0.2

attack = None
release = None
fs = None
voices = {}

client = jack.Client('MIDI-Sine-NumPy')
midiport = client.midi_inports.register('midi_in')
audioport = client.outports.register('audio_out')
event = threading.Event()


def m2f(note):
    """Convert MIDI note number to frequency in Hertz.

    See https://en.wikipedia.org/wiki/MIDI_Tuning_Standard.

    """
    return 2 ** ((note - 69) / 12) * 440


def update_envelope(envelope, begin, target, vel):
    """Helper function to calculate envelopes.

    envelope: array of velocities, will be mutated
    begin: sample index where ramp begins
    target: sample index where *vel* shall be reached
    vel: final velocity value

    If the ramp goes beyond the blocksize, it is supposed to be
    continued in the next block.

    A reference to *envelope* is returned, as well as the (unchanged)
    *vel* and the target index of the following block where *vel* shall
    be reached.

    """
    blocksize = len(envelope)
    old_vel = envelope[begin]
    slope = (vel - old_vel) / (target - begin + 1)
    ramp = np.arange(min(target, blocksize) - begin) + 1
    envelope[begin:target] = ramp * slope + old_vel
    if target < blocksize:
        envelope[target:] = vel
        target = 0
    else:
        target -= blocksize
    return envelope, vel, target


@client.set_process_callback
def process(blocksize):
    """Main callback."""

    # Step 1: Update/delete existing voices from previous block

    # Iterating over a copy because items may be deleted:
    for pitch in list(voices):
        envelope, vel, target = voices[pitch]
        if any([vel, target]):
            envelope[0] = envelope[-1]
            voices[pitch] = update_envelope(envelope, 0, target, vel)
        else:
            del voices[pitch]

    # Step 2: Create envelopes from the MIDI events of the current block

    for offset, data in midiport.incoming_midi_events():
        if len(data) == 3:
            status, pitch, vel = bytes(data)
            # MIDI channel number is ignored!
            status >>= 4
            if status == NOTEON and vel > 0:
                try:
                    envelope, _, _ = voices[pitch]
                except KeyError:
                    envelope = np.zeros(blocksize)
                voices[pitch] = update_envelope(
                    envelope, offset, offset + attack, vel)
            elif status in (NOTEON, NOTEOFF):
                # NoteOff velocity is ignored!
                try:
                    envelope, _, _ = voices[pitch]
                except KeyError:
                    print('NoteOff without NoteOn (ignored)')
                    continue
                voices[pitch] = update_envelope(
                    envelope, offset, offset + release, 0)
            else:
                pass  # ignore
        else:
            pass  # ignore

    # Step 3: Create sine tones, apply envelopes, add to output buffer

    buf = audioport.get_array()
    buf.fill(0)
    for pitch, (envelope, _, _) in voices.items():
        t = (np.arange(blocksize) + client.last_frame_time) / fs
        tone = np.sin(2 * np.pi * m2f(pitch) * t)
        buf += tone * envelope / 127


@client.set_samplerate_callback
def samplerate(samplerate):
    global fs, attack, release
    fs = samplerate
    attack = int(attack_seconds * fs)
    release = int(release_seconds * fs)
    voices.clear()


@client.set_shutdown_callback
def shutdown(status, reason):
    print('JACK shutdown:', reason, status)
    event.set()

with client:
    print('Press Ctrl+C to stop')
    try:
        event.wait()
    except KeyboardInterrupt:
        print('\nInterrupted by user')