Python audio signal generator controlled by D-BUS

As an exercise to both learn how to use D-Bus and how to generate simple audio signals using Python, I created the following pair of programs. They are heavily based on the example programs form the D-Bus documentation.

The synthesizer has a loop that writes signal frames iteratively. The signal is a PWM wave, and the control parameters are the frequency and the duty cycle. A signal handler function receives new values for these parameters via D-Bus, and changes them, so the other process can control the signal being generated. The second program is just a client that reads the values form the console and sends them to the synth process.

My plan is to some day use this to control DC motors from the audio output of a simple hand-held device like a Nokia tablet or a Beagle Board. Since the audio is stereo, we can control a couple of wheels and do differential driving. But of course, you can use this same program for anything else. (Tramchan is the name of the robot I was going to make with a N900 before my dreams were crushed by the opaque impenetrability of reality.)

Here is the synth:
==============

#!/usr/bin/env python

#audio stuff
import alsaaudio
import array
import pylab

#dbus and threading stuff
import sys
import traceback

import gobject

import dbus
import dbus.mainloop.glib

import pylab

class Synth:
    def __init__(self, freq, duty):
        self.time=0
        self.freq=freq
        self.duty=duty

    def write_signal_period(self, out, period_size):
        print " *** Begin period", self.time
        t = pylab.arange(self.time, self.time+period_size)
        ## For sinusoidal output
        #l = pylab.int16(  0.5*32000*pylab.sin(2*3.1415 * t * self.freq/8000)   )
        l = (t % (fs/self.freq) > (self.duty*fs/self.freq))
        #for k in l:
        #    print k,
        l = l * 64001 - 32000


        out.write( array.array('h', l).tostring() )

        self.time += period_size

        return True

def handle_reply(msg):
    print msg

def handle_error(e):
    print str(e)

def pwm_signal_handler(freq, duty):
    print "Received values from emitter [%f] [%f]."%(freq, duty)
    synth.freq=freq
    synth.duty=duty

if __name__ == '__main__':
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    ## Audio parameters
    fs = 48000 # Sample rate
    period_size = 4800 # Length of block (number of frames)

    ## Open the device in playback mode. 
    out = alsaaudio.PCM(alsaaudio.PCM_PLAYBACK)
    ## Set attributes: Mono, xxx Hz, 16 bit little endian frames
    out.setchannels(1)
    out.setrate(fs)
    out.setformat(alsaaudio.PCM_FORMAT_S16_LE)
    out.setperiodsize(period_size)

    synth = Synth(150, 0.00)

    ## Start the audio-filling thread
    gobject.timeout_add(70, Synth.write_signal_period, synth, out, period_size)


    bus = dbus.SessionBus()
    try:
        object = bus.get_object("tramchan.VisionMaster","/tramchan/VisionMaster/object")
        object.connect_to_signal("PwmSignal", pwm_signal_handler, dbus_interface="tramchan.VisionMaster")
    except dbus.DBusException:
        traceback.print_exc()
        sys.exit(1)

    ## inicia loop GLib
    loop = gobject.MainLoop()
    loop.run()

==============
And the controller:
==============

#!/usr/bin/env python

usage = """Usage:
python example-signal-emitter.py &
python example-signal-recipient.py
python example-signal-recipient.py --exit-service
"""

import gobject

import dbus
import dbus.service
import dbus.mainloop.glib

import string


class VisionMaster(dbus.service.Object):
    def __init__(self, conn, object_path='/tramchan/VisionMaster/object'):
        dbus.service.Object.__init__(self, conn, object_path)

    @dbus.service.signal('tramchan.VisionMaster')
    def PwmSignal(self, freq, duty):
        # The signal is emitted when this method exits
        # You can have code here if you wish
        pass

    @dbus.service.method('tramchan.VisionMaster')
    def emitPwmSignal(self):
        in_st=raw_input('osc>')
        sp=string.split(in_st)
            

        if len(sp)!=2:
            print "Wrong number of parameters. Should be ' '."
            return False

        freq = float(sp[0])
        duty = float(sp[1])

        self.PwmSignal(freq, duty)
        print 'Signal emitted: [%f] [%f]'%(freq,duty)
        return True

    @dbus.service.method("tramchan.VisionMaster",
                         in_signature='', out_signature='')
    def Exit(self):
        loop.quit()

def signal_proxy():
    try:
        object.emitPwmSignal()
    except EOFError:
        loop.quit()
        return False
    except KeyboardInterrupt:
        loop.quit()
        return False

    return True

if __name__ == '__main__':
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    session_bus = dbus.SessionBus()
    name = dbus.service.BusName('tramchan.VisionMaster', session_bus)
    object = VisionMaster(session_bus)

    gobject.timeout_add(0, signal_proxy)

    loop = gobject.MainLoop()
    print "Running example signal emitter service."
    print usage
    loop.run()
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s