Source code for sosfiltering

# -*- coding: utf-8 -*-
"""This module contains second order section filtering routines
implemented in c, cffi and numpy.

A bilinear transform converting sos analog weights to
sos digital weights is provided by :func:`bilinear_sos`.

There are different implementations of sos filtering routines:
    - A scipy.lfilter()-based :func:`sosfilter_py`
    - cffi implementations with float- and double-precision
      and a mimo-implementation:
          * :func:`sosfilter_c` (float)
          * :func:`sosfilter_double_c` (double)
          * :func:`sosfilter_double_mimo_c`
            (multi channel input and 3-dim output).

    - prototypes for the c-implementations
      (slowest, only for debugging)

The c-implementations are for real valued signals only.

With the function :func:`freqz` you can check the
frequency response of your second order section filters.

For the :mod:`cffi` you need :mod:`pycparser` being installed.

Compiling the c source
----------------------
Firstly i implemented a prototype-function in python
for easy debugging "sosfilter_cprototype_py()".
After that i translated this prototype into a c-function. By
compiling a shared library from it with the listed
steps below, one can use the python cffi to access this
shared library in python. ::
    $ gcc -c -std=c99 -O3 sosfilter.c
    $ gcc -shared -o sosfilter.so sosfilter.o
    $ or the last line for windows users:
    $ gcc -shared -o sosfilter.dll sosfilter.o

Functions
---------
"""
import os
from sys import platform
from platform import architecture

import numpy as np
from cffi import FFI
from scipy.signal import lfilter


ffi = FFI()
ffi.cdef("""
void sosfilter(float*, int, float*, int, float*);
void sosfilter_double(double*, int, double*, int, double*);
void sosfilter_double_mimo(double*, int, int, double*, int, int, double*);
""")

if platform == 'win32' and architecture()[0] == '64bit':
    _dl = 'sosfilt64.dll'
elif platform == 'win32' and architecture()[0] == '32bit':
    _dl = 'sosfilt32.dll'
else:
    _dl = 'sosfilt.so'


if __name__ != '__main__':
    _mylibpath = os.path.join('.', os.path.dirname(__file__))
else:
    _mylibpath = os.curdir

_mylibpath = os.path.join(_mylibpath, _dl)

_c = ffi.dlopen(_mylibpath)


[docs]def sosfilter_c(signal, sos, states=None): """Second order section filter function using cffi signal_out, states = sosfilter_c(signal_in, sos, states=None) Parameters ---------- signal : ndarray Input array of shape (N x 0). sos : ndarray Second order section coefficients array of shape (K*6 x 0). One biquad -> 6 coefficients: :code:`[b00, b01, b02, a00, a01, a02, ..., bK1, ..., aK2]` states : ndarray Array with filter states. Initial value can be None. Returns ------- signal : ndarray Filtered signal of shape (N x 0). states : ndarray Array with filter states. """ signal_c = ffi.new( 'char[]', np.array(signal, dtype=np.float32).flatten().tostring()) sos_c = ffi.new( 'char[]', np.array(sos, dtype=np.float32).flatten().tostring()) nsamp = int(len(signal)) ksos = int(sos.size/6) if isinstance(states, type(None)): states = np.zeros(ksos*2).astype(np.double) states_c = ffi.new( 'char[]', np.array(states, dtype=np.float32).flatten().tostring()) _c.sosfilter(ffi.cast("float*", signal_c), nsamp, ffi.cast("float*", sos_c), ksos, ffi.cast("float*", states_c)) out = np.fromstring( ffi.buffer(signal_c), dtype=np.float32, count=nsamp ) states = np.fromstring( ffi.buffer(states_c), dtype=np.float32, count=len(states) ) return out, states
[docs]def sosfilter_double_c(signal, sos, states=None): """Second order section filter function using cffi, double precision. signal_out, states = sosfilter_c(signal_in, sos, states=None) Parameters ---------- signal : ndarray Signal array of shape (N x 0). sos : ndarray Second order section coefficients array of shape (K*6 x 0). One biquad -> 6 coefficients: ``[b00, b01, b02, a00, a01, a02, ..., b10, bK1 ... , aK2]`` states : ndarray Filter states, initial value can be None. Returns ------- signal : Filtered signal array of shape (N x 0). states : ndarray Filter states, initial value can be None. """ signal_c = ffi.new( 'char[]', np.array(signal, dtype=np.double).flatten().tostring()) sos_c = ffi.new( 'char[]', np.array(sos, dtype=np.double).flatten().tostring()) nsamp = int(len(signal)) ksos = int(sos.size/6) if isinstance(states, type(None)): states = np.zeros(ksos*2).astype(np.double) states_c = ffi.new( 'char[]', np.array(states, dtype=np.double).flatten().tostring()) _c.sosfilter_double(ffi.cast("double*", signal_c), nsamp, ffi.cast("double*", sos_c), ksos, ffi.cast("double*", states_c)) out = np.fromstring( ffi.buffer(signal_c), dtype=np.double, count=nsamp) states = np.fromstring( ffi.buffer(states_c), dtype=np.double, count=len(states)) return out, states
[docs]def sosfilter_double_mimo_c(signal, sos, states=None): """Second order section filter function for multi channel input using cffi, double precision signal_out, states = sosfilter_c(signal_in, sos, states=None) Parameters ---------- signal : ndarray Signal array of shape (N x C). sos : ndarray Second order section filter coefficients (K*6 x B x C) np-array. states : ndarray Filter states, initial can be None. Otherwise shape is (K*2 x B x C) Returns ------- signal : ndarray Filtered signal of shape (N x B x C). Where N is the number of samples, B is th number of filter bands and C is the number of signal channels. states : ndarray Filter states of shape (K*2 x B x C). """ shape_signal = signal.shape nframes = int(shape_signal[0]) if len(shape_signal) > 1: nchan = int(shape_signal[1]) else: nchan = int(1) shape_sos = sos.shape ksos = int(shape_sos[0]/6) if len(shape_sos) > 1: kbands = int(shape_sos[1]) if len(shape_sos) == 2 and nchan > 1: sos = np.tile(sos.flatten('F'), (nchan)) else: kbands = int(1) if isinstance(states, type(None)): states = np.zeros(nchan*kbands*ksos*2).astype(np.double) states_c = ffi.new( 'char[]', np.array(states, dtype=np.double).flatten('F').tostring()) sos_c = ffi.new( 'char[]', np.array(sos, dtype=np.double).flatten('F').tostring()) if nchan > 1: signal = np.tile(signal, (kbands, 1)) else: signal = np.tile(signal, (kbands)) shape_signal = signal.shape signal_c = ffi.new( 'char[]', np.array(signal, dtype=np.double).T.flatten().tostring()) _c.sosfilter_double_mimo( ffi.cast("double*", signal_c), nframes, nchan, ffi.cast("double*", sos_c), ksos, kbands, ffi.cast("double*", states_c) ) out = np.fromstring( ffi.buffer(signal_c), dtype=np.double, count=signal.size ) states = np.fromstring( ffi.buffer(states_c), dtype=np.double, count=len(states) ) return out.reshape((nframes, kbands, nchan), order='F'), states
[docs]def sosfilter_mimo_cprototype_py(signal_in, sos_in, states_in=None): """Prototype for the mimo c-filter function. Implements a IIR DF-II biquad filter strucure. But with multiple input und multiple bands.""" signal = signal_in.copy().flatten('F') shape_signal = signal_in.shape print(len(signal)) nframes = int(signal_in.shape[0]) print(nframes) nchan = int(signal_in.shape[2]) print(nchan) sos = np.tile(sos_in.copy().flatten('F'), (nchan)) ksos = int(sos_in.shape[0]/6) print(ksos) kbands = int(sos_in.shape[1]) print(kbands) if not states_in: states = np.zeros(nchan*ksos*kbands*2) else: states = states_in ii = 0 for c in range(nchan): for b in range(kbands): for k in range(ksos): w1 = states[c*ksos*kbands*2 + b*ksos*2 + k*2] w2 = states[c*ksos*kbands*2 + b*ksos*2 + k*2 + 1] b0 = sos[ii] ii += 1 b1 = sos[ii] ii += 1 b2 = sos[ii] ii += 1 a0 = sos[ii] ii += 1 a1 = sos[ii] ii += 1 a2 = sos[ii] ii += 1 for n in range(nframes): w0 = signal[c*nframes*kbands + b*nframes + n] w0 = w0 - a1*w1 - a2*w2 yn = b0*w0 + b1*w1 + b2*w2 w2 = w1 w1 = w0 signal[c*nframes*kbands + b*nframes + n] = yn states[c*ksos*kbands*2 + b*ksos*2 + k*2] = w1 states[c*ksos*kbands*2 + b*ksos*2 + k*2 + 1] = w2 return signal.reshape(shape_signal), states
[docs]def sosfilter_cprototype_py(signal, sos, states): """Prototype for second order section filtering c function. Implements a IIR DF-II biquad filter strucure. """ N = int(len(signal)) K = int(sos.size/6) if isinstance(states, type(None)): states = np.zeros(K*2).astype(np.double) signal = signal.copy() # only python specific sos = sos.copy().flatten() yn = 0.0 # buffer for output w0 = 0.0 # signal states for k in range(K): # get coefficients of current biquad w1 = states[k*2] w2 = states[k*2+1] b0 = sos[k*6] b1 = sos[k*6+1] b2 = sos[k*6+2] a0 = sos[k*6+3] a1 = sos[k*6+4] a2 = sos[k*6+5] for n in range(N): # get a sample w0 = signal[n].copy() # recursive path w0 = w0 - a1*w1 - a2*w2 # transversal path yn = b0*w0 + b1*w1 + b2*w2 # delays w2 = w1 w1 = w0 # write output signal signal[n] = yn states[k*2] = w1 states[k*2+1] = w2 return signal, states
[docs]def sosfilter_py(x, sos, states=None): """Second order section filter routing with scipy lfilter. Parameters ---------- x : ndarray Input signal array. sos : ndarray Second order section coefficients array. states : ndarray or None Filter states, initial value can be None. Returns ------- signal : ndarray Filtered signal. states : ndarray Array with filter states. """ n = sos.shape[0] if isinstance(states, type(None)): states = dict() for i in np.arange(n): states[i] = np.zeros(2) for ii in np.arange(n): zi = states[ii] b = sos[ii, :3] a = sos[ii, 3:] x, zi = lfilter(b, a, x, 0, zi=zi) states[ii] = zi return x, states
[docs]def bilinear_sos(d, c): """Bilinear transformation of analog weights to digital weights. >>>>>>> 8d01abb1e1f252834c0666d50c645dd3d35a1f52 Bilinear transformation of analog weights to digital weights. Weights of IIR digital filter in cascade form with 2-pole sections; H(z)=H(z,1)H(z,2)...H(z,L/2) where L is the number of poles and each section is a ratio of quadratics. Parameters ---------- d : ndarray Numerator weights of analog filter in 1-pole sections. d is dimensioned (L/2 x 2). c : ndarray Denominator weights, dimensioned same as d. Returns ------- b : ndarray Digital numerator weights, dimensioned (L/2 x 3). a : ndarray Digital denominator weights, dimensioned the same. """ L2, ncd = d.shape nr, ncc = c.shape # Check for errors. if(nr != L2 or ncd != 2 or ncc != 2): raise Exception('Inputs d and c must both be L/2 x 2 arrays.') # Bilinear transformation of H(s) to H(z) using z and p vectors. a = np.zeros((L2, 3), dtype=np.double) a[:, 0] = np.abs(c[:, 0] + c[:, 1])**2 if np.min(a[:, 0]) == 0: raise Exception('"c" should not have a row of zeros.') a[:, 1] = 2*np.real((c[:, 0] + c[:, 1]) * np.conj(c[:, 1] - c[:, 0])) a[:, 2] = np.abs(c[:, 1] - c[:, 0])**2 b = np.zeros((L2, 3), dtype=np.double) b[:, 0] = np.abs(d[:, 0] + d[:, 1])**2 b[:, 1] = 2*np.real((d[:, 0] + d[:, 1]) * np.conj(d[:, 1] - d[:, 0])) b[:, 2] = np.abs(d[:, 1] - d[:, 0])**2 # Scale H(z) so a(:,1)=1: sa = np.kron(np.ones((3, 1)), a[:, 0]).T a = a / sa b = b / sa return b, a
[docs]def freqz(sosmat, nsamples=44100, sample_rate=44100, plot=True): """Plots Frequency response of sosmat.""" from pylab import np, plt, fft, fftfreq x = np.zeros(nsamples) x[nsamples/2] = 0.999 y, states = sosfilter_double_c(x, sosmat) Y = fft(y) f = fftfreq(len(x), 1.0/sample_rate) if plot: plt.grid(True) plt.axis([0, sample_rate / 2, -100, 5]) L = 20*np.log10(np.abs(Y[:len(x)/2]) + 1e-17) plt.semilogx(f[:len(x)/2], L, lw=0.5) plt.hold(True) plt.title(u'freqz sos filter') plt.xlabel('Frequency / Hz') plt.ylabel(u'Damping /dB(FS)') plt.xlim((10, sample_rate/2)) plt.hold(False) return x, y, f, Y