"""DEMIX Python/NumPy implementation
Description
-----------
DEMIX is an algorithm that counts the number of sources,
based on their spatial cues, and returns the estimated parameters,
which are related to the relative amplitudes between the channels,
as well as the relative time shifts. The full description is given
in::
Arberet, S.; Gribonval, R. & Bimbot, F.
A Robust Method to Count and Locate Audio Sources in
a Multichannel Underdetermined Mixture
IEEE Transactions on Signal Processing, 2010, 58, 121 - 133
This implementation is based on the MATLAB Toolbox provided
by the authors of the above article.
Additionally, this implementation further allows time-frequency
representations other than the short-term Fourier transform (STFT).
Copyright
---------
Jean-Louis Durrieu, EPFL-STI-IEL-LTS5
jean DASH louis AT durrieu DOT ch
2012-2013
Reference
---------
"""
import numpy as np
import audioObject as ao
import tftransforms.tft as tft
tfts = {'stft': tft.STFT,
'cqt': tft.CQTransfo,
'mqt': tft.MinQTransfo,
'minqt': tft.MinQTransfo,
}
# to get the PCA for 2D vectors:
import tools.signalTools as st
import warnings
eps = 1e-10
# global variables
uVarBound = 2.33 # 2.33 # 1.7 #2.33
uDistBound = 2.33 # 2.33 # 1.7 # 2.33
def decibel(val):
return 20. * np.log10(val)
# alias for above function:
db = decibel
def invdb(dbval):
return 10. ** (dbval / 20.)
[docs]def get_indices_peak(sequence, ind_max, threshold=0.8):
"""returns the indices of the peak around ind_max,
with values down to ``threshold * sequence[ind_max]``
"""
thres_value = sequence[ind_max] * threshold
# considering sequence is circular, we put ind_max to the center,
# and find the bounds:
lenSeq = sequence.size
##midSeq = lenSeq / 2
##indices = np.remainder(np.arange(lenSeq) - midSeq + ind_max, lenSeq)
##
#newseq = sequence[]
indSup = ind_max
indInf = ind_max
while sequence[indSup]>thres_value:
indSup += 1
indSup = np.remainder(indSup, lenSeq)
while sequence[indInf]>thres_value:
indInf -= 1
indInf = np.remainder(indInf, lenSeq)
indices = np.zeros(lenSeq, dtype=bool)
if indInf < indSup:
indices[(indInf+1):indSup] = True
else:
indices[:indSup] = True
indices[(indInf+1):] = True
return indices
[docs]def confidenceFromVar(variance, neighbors):
"""Computes the confidence, in dB, for a given number of
neighbours and a variance.
"""
## JLD: TODO check how this confidence is computed and how it can
##be interpreted.
alpha = (
(2 * variance * (neighbors - 1) + 1 +
np.sqrt(1 + 4 * variance * (neighbors - 1)))
/ (2 * variance * (neighbors - 1))
)
return 20 * np.log10(alpha)
[docs]class DEMIX(object):
"""DEMIX algorithm, for 2 channels.
"""
def __init__(self, audio,
nsources=2,
wlen=2048,
hopsize=1024,
neighbors=20,
verbose=0,
maxclusters=100,
tfrepresentation='stft',
tffmin=25, tffmax=18000,
tfbpo=48,
winFunc=tft.sqrt_blackmanharris):
"""init function: DEMIX for 2 channels
"""
self.nsources = nsources
if nsources is None:
self.maxclusters = maxclusters
else:
self.maxclusters = np.maximum(maxclusters, self.nsources*10)
self.verbose = verbose
self.neighbors = neighbors
if isinstance(audio, ao.AudioObject):
self.audioObject = audio
elif isinstance(audio, str):
self.audioObject = ao.AudioObject(filename=audio)
else:
raise AttributeError("The provided audio parameter is"+
"not a supported format.")
self.sig_repr_params = {}
self.sig_repr_params['wlen'] = wlen # window length
self.sig_repr_params['fsize'] = ao.nextpow2(wlen) # Fourier length
self.sig_repr_params['hopsize'] = hopsize
self.sig_repr_params['tfrepresentation'] = tfrepresentation.lower()
self.sig_repr_params['hopfactor'] = (
1. * hopsize / self.sig_repr_params['wlen'])
self.sig_repr_params['tftkwargs'] = {
'fmin': tffmin, 'fmax': tffmax, 'bins': tfbpo,
'linFTLen': self.sig_repr_params['fsize'],
'fs': self.audioObject.samplerate,
'atomHopFactor': self.sig_repr_params['hopfactor'],
'winFunc': winFunc
}
self.tft = tfts[self.sig_repr_params['tfrepresentation']](
**self.sig_repr_params['tftkwargs'])
[docs] def comp_clusters(self, threshold=0.8):
"""Computes the time-frequency clusters, along with their centroids,
which contain the parameters of the mixing process - namely `theta`,
which parameterizes the relative amplitude, and `delta`, which is
homogeneous to a delay in samples between the two channels.
"""
self.comp_sig_repr() # the STFT
self.comp_pcafeatures() # PCA on the neighborhouds
self.comp_parameters() # extract some parameters from the PCA features
self.init_subpts_set() # initiate the mask of points to cluster to full
# now find the clusters...
self.clusterCentroids = []
self.clusters = []
nbRemainingPoints = self.subTFpointSet.sum()
if self.verbose:
print "Computing the clusters... ", nbRemainingPoints, "TF points"
while nbRemainingPoints and len(self.clusters) < self.maxclusters:
if self.verbose>4: #DEBUG: tends to be a bit cumbersome
print "Remaining points:", nbRemainingPoints
self.clusterCentroids.append(self.get_max_confidence_point())
if self.verbose>1:
## attention: no delta in centroid yet...
# print " centroid:", self.clusterCentroids[-1]
print " belongs to previous cluster:",
print np.any([ccc[self.clusterCentroids[-1].index_freq,
self.clusterCentroids[-1].index_time] for
ccc in self.clusters])
ind_cluster_theta_pts, distBound = (
self.getTFpointsNearTheta_OneScale(self.clusterCentroids[-1])
)
if self.verbose>1:
print "comp_clusters:"
print " ind_cluster_theta_pts, theta",
print ind_cluster_theta_pts.sum()
## already done in above getTFpointsNearTheta method:
# ind_cluster_theta_pts *= self.subTFpointSet
maxDelta = self.identify_deltaT(
ind_cluster_pts=ind_cluster_theta_pts,
centroid=self.clusterCentroids[-1],
threshold=threshold)
if maxDelta is not None:
self.clusterCentroids[-1].delta = maxDelta
ind_cluster_theta_pts = (
self.getTFPointsNearDelta(
centroid=self.clusterCentroids[-1]))
else:
self.clusterCentroids[-1].delta = np.NaN
# keeping only the centroid in that cluster, then:
ind_cluster_theta_pts *= False
ind_cluster_theta_pts[self.clusterCentroids[-1].index_freq,
self.clusterCentroids[-1].index_time] = True
if self.verbose>1:
print "comp_clusters:"
print " ind_cluster_theta_pts, delta",
print ind_cluster_theta_pts.sum()
print " ***theta***:", self.clusterCentroids[-1].theta
print " ***delta***:", maxDelta
self.clusters.append(ind_cluster_theta_pts)
if self.verbose>1:
print "comp_clusters:"
print " self.clusters[-1]", self.clusters[-1].sum()
# removing the points that were just classified:
self.subTFpointSet *= (-ind_cluster_theta_pts)
nbRemainingPoints = self.subTFpointSet.sum()
[docs] def refine_clusters(self):
"""Refining the clusters in order to verify that they are possible.
Additionally, if self.nsources is defined, this method only keeps
the required number. Otherwise, it is decided by choosing the most
likely centroids.
"""
self.reestimate_clusterCentroids()
if self.nsources is not None:
# keeping only the best clusters, according to self.nsources
self.keepBestClusters()
else:
# estimate the active number of clusters:
##distanceArray = self.get_centroid_distance()
##clusterDistBounds = [cc.clusterDistBound() for
## cc in self.clusterCentroids]
self.remove_weak_clusters(#distanceArray,
#clusterDistBounds)
)
self.reestimate_clusterCentroids()
[docs] def getTFpointsNearTheta_OneScale(self,
centroid_tfpoint,
):# index_pts_to_classify # in self....
"""returns the TF points whose theta is close to that of the centroid,
among the points considered in index_pts_to_classify
TODO: make the function for different scales, as in matlab toolbox
"""
dist = np.abs(self.thetaAll - centroid_tfpoint.theta)
## globally set:
#uVarBound = 2.33
#uDistBound = 2.33
distBound = np.sqrt(
self.estim_bound_var_theta(centroid_tfpoint.confidence,
infOrSup='sup',
u=uVarBound)
) * uDistBound
# conversion to angle on the sphere (???)
distBound = (
1 / 2. *
np.arccos(((distBound**2 - 2)**2) / 2. - 1)
)
return (dist<distBound) * self.subTFpointSet, distBound
[docs] def identify_deltaT(self, ind_cluster_pts, centroid, threshold=0.8):
"""returns the delay maxDelta in samples that corresponds to the
largest peak of the cluster defined by the provided cluster index
"""
distBound = centroid.estimDeltaPhi()
# max nb of samples in error interval:
nbMaxSamples = 2**7
# max zoom for oversampling
zoomMax = 2**4
if distBound != np.Inf:
zoom = 2 ** np.ceil(np.log2(nbMaxSamples / 2 * distBound))
zoom = max(1, zoom)
zoom = min(zoom, zoomMax)
zoom = int(zoom)
else:
zoom = 1
ind_candidates = centroid.getPeakCandidateIndices(zoom=zoom,
distBound=distBound)
# Computing the detection function, with peaks at the correct delta:
deltaDetFun = self.compute_temporal(ind_cluster_pts=ind_cluster_pts,
zoom=zoom)
# invalidating non candidate peaks:
deltaDetFun[-ind_candidates] = 0
# % x-axis values:
Nft = self.sig_repr_params['fsize']
deltaAxis = (
np.concatenate([np.arange(start=0, stop=Nft*zoom/2),
np.arange(start=-Nft*zoom/2, stop=0)])
) * 1. / zoom
indMaxDelta = np.argmax(deltaDetFun)
peakDelta = deltaDetFun[indMaxDelta]
maxDelta = deltaAxis[indMaxDelta]
indicePeak = get_indices_peak(sequence=deltaDetFun,
ind_max=indMaxDelta,
threshold=threshold)
if not np.isinf(distBound) and \
np.any(deltaDetFun[-indicePeak] > threshold*peakDelta):
maxDelta = None
return maxDelta
[docs] def getTFPointsNearDelta(self, centroid):
"""returns a TF mask which is True if their corresponding value of
`delta` is close enough to the delta from the `centroid`.
"""
# the reduced frequencies:
frequencies = self.freqs / self.audioObject.samplerate
# at first used self.egv0, but actually, that's probably not
# good, because of some sort of undeterminacies:
# steering vectors for the TF plane:
u0 = np.cos(self.thetaAll)
u1 = np.sin(self.thetaAll) * np.exp(1j * self.phiAll)
# conjugated v centroid : hence the + in the complex exp
v_centroid = (
np.vstack([np.ones(self.freqs.size) * np.cos(centroid.theta),
np.exp(1j * 2 * np.pi * centroid.delta * frequencies) *
np.sin(centroid.theta)])
)
dist = np.sqrt(2 *
(1 - np.abs(u0 * np.vstack(v_centroid[0]) +
u1 * np.vstack(v_centroid[1]) ))
##(1 - np.abs(self.egv0[0] * np.vstack(v_centroid[0]) +
## self.egv0[1] * np.vstack(v_centroid[1]) ))
# if euclidean distance, should be np.real, no ?
)
distBound = self.estimDAOBound(confidence=centroid.confidence)
# discarding points that have a higher confidence than centroid (ie
# previously identified centroids)
distBound[centroid.confidence<self.confidence] = 0
ind_cluster_pts = (dist < distBound)
ind_cluster_pts[centroid.index_freq, centroid.index_time] = True
return ind_cluster_pts
[docs] def compute_temporal(self, ind_cluster_pts, zoom):
"""This computes the inverse Fourier transform of the
estimated Steering Vectors, weighed by their inverse variance
The result is a detection function that provides peaks at the
most likely delta - the delay in samples.
"""
thetaVars = np.copy(self.thetaVarAll)
thetaVars[thetaVars==0] = eps
y = (1/thetaVars) * np.exp(1j * self.phiAll)
# effectively removing points outside the set of cluster points:
thetaVars[-ind_cluster_pts] = np.Inf
y[-ind_cluster_pts] = 0
# normalizing each channel
normalisation = np.sum((1 / thetaVars), axis=1)
normalisation[normalisation==0] = eps
# the following may not work for representations other than stft:
if self.sig_repr_params['tfrepresentation'] not in (
'stft', 'stftold'):
#raise NotImplementedError(
# "The required representation is borken in DEMIX.")
# kernel to replace the Fourier Kernel (slower)
K = np.vstack([
np.exp(2j*np.pi*
np.arange(self.sig_repr_params['fsize'] * zoom)
/ self.audioObject.samplerate * fp)
for fp in self.tft.freq_stamps])
deltaDetFun = np.abs(np.dot(y.sum(axis=1)/normalisation,
K))
else:
deltaDetFun = np.fft.irfft(y.sum(axis=1) / normalisation,
n=self.sig_repr_params['fsize'] * zoom)
return deltaDetFun
[docs] def comp_sig_repr(self):
"""Computes the signal representation, stft
"""
if not hasattr(self.audioObject, '_data'):
self.audioObject._read()
if self.verbose:
print ("Computing the chosen signal representation:")
nc = self.audioObject.channels
if nc != 2:
raise ValueError("This implementation only deals "+
"with stereo audio!")
self.sig_repr = {}
# if more than 1min of signal, take 1 min in the middle
# better way : sample data so as to take randomly in the signal
lengthData = self.audioObject.data.shape[0]
startData = 0
endData = lengthData
oneMinLenData = 60*self.audioObject.samplerate
oneMinLenData *= 2 # or more than 1min?
if lengthData>oneMinLenData:
startData = (lengthData - oneMinLenData)/2
endData = startData + oneMinLenData
if self.sig_repr_params['tfrepresentation'] == 'stftold':
self.sig_repr[0], freqs, times = ao.stft(
self.audioObject.data[startData:endData,0],
window=np.hanning(self.sig_repr_params['wlen']),
hopsize=self.sig_repr_params['hopsize'],
nfft=self.sig_repr_params['fsize'],
fs=self.audioObject.samplerate
)
self.sig_repr[1], freqs, times = ao.stft(
self.audioObject.data[startData:endData,1],
window=np.hanning(self.sig_repr_params['wlen']),
hopsize=self.sig_repr_params['hopsize'],
nfft=self.sig_repr_params['fsize'],
fs=self.audioObject.samplerate
)
else:
self.tft.computeTransform(
self.audioObject.data[startData:endData,0],)
self.sig_repr[0] = self.tft.transfo
self.tft.computeTransform(
self.audioObject.data[startData:endData,1],)
self.sig_repr[1] = self.tft.transfo
freqs = self.tft.freq_stamps
# keeping the frequencies, not computing them each time
self.freqs = freqs
del self.audioObject.data
[docs] def comp_pcafeatures(self):
"""Compute the PCA features
"""
if not(hasattr(self, 'X0')):
self.comp_sig_repr()
self.lbd0, self.lbd1, self.egv0, self.egv1 = st.prinComp2D(
X0=self.sig_repr[0],
X1=self.sig_repr[1],
neighborNb=self.neighbors)
self.confidence = 20. * np.log10(self.lbd0 /
np.maximum(self.lbd1, eps))
# issue with NaN ... :
self.confidence[np.isnan(self.confidence)] = 0. # is that ok?...
# 20130208 DJL adding a weighting scheme to lower low energy
# coeff influence:
start = self.neighbors / 2
energy = np.mean(
np.array(self.sig_repr.values()) ** 2,
axis=0)[:,start:start+self.confidence.shape[1]]
self.confidence[energy<energy.max()*1e-6] = 0.
# no need for the signal representation anymore, for now:
del self.sig_repr
del self.lbd0
# del self.egv0 # needed in comp_parameters
del self.egv1
del self.lbd1
[docs] def comp_parameters(self):
"""comp_parameters
"""
# removing first 2 bands, avoiding continuous components
self.confidence[:2,:] = eps
# storing the data points following the chosen parameterization of the
# steering vectors:
#
# u_k = [cos(\theta_k) sin(\theta_k) exp(-j 2 \pi f \delta_k)]^T
#
# where \theta_k is an intensity parameter (IP),
# \delta_k is the time delay between the channels
# and k is the source index.
#
self.phiAll = np.angle(self.egv0[1] / self.egv0[0])
self.thetaAll = np.arctan(np.abs(self.egv0[1]) / np.abs(self.egv0[0]))
self.thetaVarAll = self.estim_var_theta(confidence=self.confidence)
del self.egv0
def estim_var_theta(self, confidence):
maxLim_confidence = 3073.
thetaVarAll = np.ones_like(confidence)
indNotZeroConf = np.where(confidence>eps)
if self.verbose>1 and False:
print "estim_var_theta: indNotZeroConf", indNotZeroConf
# The confusing equations are because the values are stored as dBs...
thetaVarAll[indNotZeroConf[0], indNotZeroConf[1]] = (
10 ** (confidence[indNotZeroConf[0], indNotZeroConf[1]] / 20.) /
((self.neighbors-1) *
(10 ** (confidence[indNotZeroConf[0], indNotZeroConf[1]] / 20.)
- 1) ** 2)
)
thetaVarAll[confidence>=maxLim_confidence] = 0
thetaVarAll[confidence<=eps] = np.Inf
return thetaVarAll
def estim_bound_var_theta(self, confidence, infOrSup='sup', u=2.33):
minConfidence = 0.
sigma = np.sqrt(1600. / (self.neighbors - 1.))
signInfSup = {'inf': 1, 'sup': -1}
confidence += signInfSup[infOrSup] * u * sigma
confidence = np.maximum(confidence, minConfidence)
confidence = np.atleast_2d(confidence)
if self.verbose>1 and False:
print "confidence", confidence
# the method below returns a 2D array, and therefore,
# we only return the (only) element of it:
if np.size(confidence)>1:
return self.estim_var_theta(confidence)
else:
return self.estim_var_theta(confidence)[0,0]
[docs] def estimDAOBound(self, confidence, confidenceVal=None):
"""computes the max distance between centroid and points
"""
maxErrorTheta = self.estim_bound_var_theta(confidence=confidence,
u=uVarBound)
if confidenceVal is not None:
thetaVarAll = self.estim_bound_var_theta(
confidence=confidenceVal,
u=uVarBound)
else:
thetaVarAll = self.thetaVarAll
if self.verbose>1 and False: # DEBUG, lots of stuff written!
print "daobound", (np.sqrt(thetaVarAll) +
np.sqrt(maxErrorTheta)) * uDistBound
return (np.sqrt(thetaVarAll) +
np.sqrt(maxErrorTheta)) * uDistBound
def init_subpts_set(self, ):
# setting the "mask" for the
try:
self.subTFpointSet = np.ones(self.confidence.shape, dtype=bool)
except AttributeError:
raise AttributeError("no lbd0 in object!")
def get_max_confidence_point(self, ):
tmpConfidence = np.copy(self.confidence)
# it should be a boolean array of the same size as self.confidence:
tmpConfidence[(- self.subTFpointSet)] = 0
index = np.unravel_index(tmpConfidence.argmax(), tmpConfidence.shape)
del tmpConfidence
return TFPoint(demixinstance=self,
index_freq=index[0],
index_time=index[1])
[docs] def reestimate_clusterCentroids(self):
"""reestimate cluster centroids
considering all the cluster masks, reestimate the centroids,
discarding the clusters for which there was no well-defined delta.
"""
if not(hasattr(self, 'clusterCentroids')):
self.comp_clusters()
# filtering out clusters for which we dont have a correct delay:
ind_good_cluster = - np.isnan(
[c.delta for c in self.clusterCentroids])
if self.verbose>1: #DEBUG
print "reestimate_clusterCentroids"
print " ind_good_cluster", ind_good_cluster.sum()
print " [c.delta for c in self.clusterCentroids]"
print [c.delta for c in self.clusterCentroids]
print " cluster sizes:", [c.sum() for c in self.clusters]
if self.verbose:
print "reestimate_clusterCentroids"
print " constraining the clusters to the good ones"
self.clusterCentroids = (
[self.clusterCentroids[n] for n in np.where(ind_good_cluster)[0]])
self.clusters = (
[self.clusters[n] for n in np.where(ind_good_cluster)[0]])
if self.verbose:
print " reestimate_clusterCentroids: Computing exclusive clusters"
nbClusters = ind_good_cluster.sum()
# filtering out the Time-Freq points that are in several clusters:
self.create_exclusive_clusters()
if self.verbose:
print (" reestimate_clusterCentroids: "+
"Computing thresholded clusters")
# adaptive thresholding of clusters
thresholdedClusters = self.adaptive_thresholding_clusters()
if self.verbose:
print " reestimate_clusterCentroids: For each cluster, "
print " reestimate the centroid"
# estimation of centroids for each cluster
for n, cluster in enumerate(thresholdedClusters):
if self.verbose>1:
print " cluster", n+1, "of", len(thresholdedClusters)
if cluster.sum() != 0:
# only the confidences for the current cluster:
confidences = self.confidence[cluster]
thetas = self.thetaAll[cluster]
variances = self.thetaVarAll[cluster]
if self.verbose > 1 and False: # DEBUG
print "reestim_cluster: confidence", confidences
varBounds = self.estim_bound_var_theta(confidence=confidences,
infOrSup='sup',
u=uVarBound)
# normalisation coeff:
varP = 1. / (
np.sum(1./variances)
)
# weighted mean of thetas, weight is varP/variance
theta = varP * np.sum(thetas / variances)
# confidence of the estimation:
varBound = 1. / np.sum(1./varBounds)
clusterConfidence = confidenceFromVar(varBound, self.neighbors)
# updating if the confidence of current estimation is better than
# the original one:
# NB even we use thresholdedClusters, the centroids should be
# the same as for self.clusters.
if clusterConfidence > self.clusterCentroids[n].confidence:
self.clusterCentroids[n].theta = theta
self.clusterCentroids[n].confidence = clusterConfidence
[docs] def create_exclusive_clusters(self):
"""
create_exclusive_clusters
reconfigures the cluster indices in self.clusters such
that all the Time-Freq points that appear in more than
one cluster are dismissed from all computations
"""
if not(hasattr(self, 'clusters')):
self.comp_clusters()
# JLD modifying this such that we keep the TF points in the cluster
# with best confidence.
if len(self.clusters):
maskOnlyOneCluster = np.array(self.clusters[0], dtype=int)
else:
warnings.warn("create_exclusive_clusters: no clusters anymore!")
for clustern, cluster in enumerate(self.clusters[1:]):
maskOnlyOneCluster += self.clusters[clustern+1]
cluster *= (maskOnlyOneCluster==1)
# original program : remove all common tf points
##maskOnlyOneCluster = (np.sum(self.clusters, axis=0)==1)
##for c in self.clusters:
## c *= maskOnlyOneCluster
# JDL: adding a control to discard empty clusters
# strange thing: this case never appeared in DEMIX in matlab...
self.remove_empty_clusters()
[docs] def get_centroid_distance(self):
"""get_centroid_distance
distance between the centroids
"""
nbClusters = len(self.clusters)
nbFreqs = self.sig_repr_params['fsize'] / 2 + 1
# self.sig_repr[0].shape[0]#
distanceArray = np.zeros([nbClusters, nbClusters])
# frequencies vector:
f = np.arange(0, nbFreqs) * 1. / self.sig_repr_params['fsize']
for n in range(nbClusters):
for m in range(n+1, nbClusters):
theta = self.clusterCentroids[n].theta
alpha = self.clusterCentroids[m].theta
diffDelta = (self.clusterCentroids[n].delta -
self.clusterCentroids[m].delta)
d2 = (2. * (1. -
np.mean(1. / np.sqrt(2.) *
np.sqrt(1. +
np.cos(2*alpha) * np.cos(2*theta) +
np.sin(2*alpha) * np.sin(2*theta) *
np.cos(2.*np.pi * f * diffDelta)),
axis=0)
)
)
distanceArray[n,m] = np.sqrt(d2)
return distanceArray
[docs] def adaptive_thresholding_clusters(self):
"""compute for each cluster in self.clusters a threshold depending
on the other clusters, in order to keep only those points in cluster
that are close to the actual centroid, but not close to centroids of
other clusters.
The returned clusters are the original clusters thresholded.
"""
distanceArray = self.get_centroid_distance()
nbClusters = len(self.clusters)
thresholdedClusters = []
confidenceThreshold = np.zeros(nbClusters)
distTocluster = np.ones(nbClusters) * 2
if self.verbose:
print "adaptive thresholding the clusters"
for n in range(nbClusters):
if self.verbose>1:
print " cluster", n, "of", nbClusters
clustersMinusN = range(nbClusters)
clustersMinusN.remove(n)
for m in clustersMinusN:
if self.verbose>2 and False: # DEBUG
print "[min(n,m), max(n,m)]", [min(n,m), max(n,m)]
dist_n_m = np.copy(distanceArray[min(n,m), max(n,m)])
if self.verbose>2 and False: # DEBUG
print "dist_n_m", dist_n_m
# max erreurs sur l'estimation des centroids:
dist_centroid_n = self.estimDAOBound(
confidence=self.clusterCentroids[n].confidence,
confidenceVal=np.Inf)
dist_centroid_m = self.estimDAOBound(
confidence=self.clusterCentroids[m].confidence,
confidenceVal=np.Inf)
if self.verbose>2 and False: # DEBUG
print " dist_centroid_n/m",dist_centroid_n,dist_centroid_m
dist_n_m -= (dist_centroid_n + dist_centroid_m)
dist_n_m = max(dist_n_m , 0.)
distTocluster[n] = min(dist_n_m, distTocluster[n])
if self.verbose>2 and False: # DEBUG
print "distTocluster[n]", distTocluster[n]
if distTocluster[n]==0:
print " nul dist for", [min(n,m), max(n,m)]
# estimation of threshold
confidenceThreshold[n] = confidenceFromVar((distTocluster[n]/2)**2,
self.neighbors)
# self.clusters[n] *= (self.confidence >= confidenceThreshold[n])
thresholdedClusters.append(
self.clusters[n] * (self.confidence >= confidenceThreshold[n]))
# to avoid errors when considering empty clusters...
self.remove_empty_clusters()
return thresholdedClusters
def keepBestClusters(self, nsources=None):
if nsources is None:
nsources = self.nsources
if nsources is None:
raise AttributeError('The nb of sources is not provided.')
nclusters = len(self.clusters)
if nclusters < nsources:
warnings.warn("The number of clusters %d" %nclusters +
"is different from the required\nnumber of sources" +
" %d." %nsources)
pass
else:
confidences = [cc.confidence for cc in self.clusterCentroids]
sortedIndices = np.argsort(confidences)
# indices of increasing values sorted: read upside down:
self.clusterCentroids = (
[self.clusterCentroids[n] for
n in sortedIndices[:-(nsources+1):-1]])
self.clusters = (
[self.clusters[n] for
n in sortedIndices[:-(nsources+1):-1]]
)
[docs] def remove_empty_clusters(self):
"""remove_empty_clusters
DJL: this did never happen in DEMIX Matlab version, have to contact
authors for explanations...
"""
clustersizes = np.array([c.sum() for c in self.clusters])
if self.verbose>1:
print "remove_empty_clusters"
print " cluster sizes:", clustersizes
print " Removing", np.sum(clustersizes==0), "clusters"
if self.verbose:
print " cluster centroids:", self.clusterCentroids
self.clusters = (
[self.clusters[n] for n in np.where(clustersizes>0)[0]])
self.clusterCentroids = (
[self.clusterCentroids[n] for n in np.where(clustersizes>0)[0]])
def remove_weak_clusters(self, distanceArray=None,
clusterDistBounds=None):
if distanceArray is None:
distanceArray = self.get_centroid_distance()
if clusterDistBounds is None:
clusterDistBounds = [cc.clusterDistBound() for
cc in self.clusterCentroids]
# removing clusters that are too close to the others:
nbClusters = len(self.clusters)
indGoodClusters = []
for n in range(nbClusters):
clustersMinusN = range(nbClusters)
clustersMinusN.remove(n)
isGoodCluster = True
for m in clustersMinusN:
if self.verbose>1 and False: # DEBUG
print "[min(n,m), max(n,m)]", [min(n,m), max(n,m)]
dist_n_m = np.copy(distanceArray[min(n,m), max(n,m)])
if (self.clusterCentroids[n].confidence>
self.clusterCentroids[m].confidence):
dist_n_m = np.Inf
isGoodCluster *= (dist_n_m>clusterDistBounds[n])
if isGoodCluster:
indGoodClusters.append(n)
self.clusters = [self.clusters[n] for n in indGoodClusters]
self.clusterCentroids = (
[self.clusterCentroids[n] for n in indGoodClusters])
[docs] def spatial_filtering(self):
"""using optimal spatial filters to obtain separated signals
this is a beamformer implementation.
MVDR or assuming the sources are normal, independent and
with same variance (not sure whether this does not mean that
we can't separate them...)
From::
Maazaoui, M.; Grenier, Y. & Abed-Meraim, K.
``Blind Source Separation for Robot Audition using
Fixed Beamforming with HRTFs'',
in proc. of INTERSPEECH, 2011.
per channel, the filter steering vector, source p:
.. math::
b(f,p) = \\frac{R_{aa,f}^{-1} a(f,p)}{a^{H}(f,p) R_{aa,f}^{-1} a(f,p)}
"""
A = self.steeringVectorsFromCentroids()
nsrc = len(self.clusterCentroids)
Raa_00 = np.mean(np.abs(A[:,:,0]) ** 2, axis=0)
Raa_11 = np.mean(np.abs(A[:,:,1]) ** 2, axis=0)
Raa_01 = np.mean(A[:,:,0] * np.conj(A[:,:,1]), axis=0)
# invert the matrices, in one pass, easy since 2D and hermitian:
invRaa_00, invRaa_01, invRaa_11 = st.invHermMat2D(Raa_00, Raa_01,
Raa_11)
# note: if all steering vectors are the same, then Raa is
# degenerate, and the result might be unstable...
if not hasattr(self, "sig_repr"):
self.comp_sig_repr()
sep_src = []
for p in range(nsrc):
# the beamformer filter B is nfreqs x 2
B = np.vstack([invRaa_00 * A[p,:,0] + invRaa_01 * A[p,:,1],
np.conj(invRaa_01) * A[p,:,0] + invRaa_11 * A[p,:,1]])
B /= ([np.conj(A[p,:,0]) * B[0] +
np.conj(A[p,:,1]) * B[1]])
B = np.conj(B.T)
# this is not in the formula of the cited paper,
# but should be theoretically correct... TBC!
S = (np.vstack(B[:,0]) * self.sig_repr[0] +
np.vstack(B[:,1]) * self.sig_repr[1])
s = []
# ... and recreating the image given the mixing matrix A
if self.sig_repr_params['tfrepresentation'] == 'stftold':
s.append(ao.istft(
X=S * np.vstack(A[p,:,0]),
window=np.hanning(self.sig_repr_params['wlen']),
analysisWindow=None,
hopsize=self.sig_repr_params['hopsize'],
nfft=self.sig_repr_params['fsize']))
s.append(ao.istft(
X=S * np.vstack(A[p,:,1]),
window=np.hanning(self.sig_repr_params['wlen']),
analysisWindow=None,
hopsize=self.sig_repr_params['hopsize'],
nfft=self.sig_repr_params['fsize']))
else:
self.tft.transfo = S * np.vstack(A[p,:,0])
s.append(self.tft.invertTransform())
self.tft.transfo = S * np.vstack(A[p,:,1])
s.append(self.tft.invertTransform())
s = np.array(s).T
sep_src.append(s)
return sep_src
[docs] def steeringVectorsFromCentroids(self):
"""Generates the steering vectors a(p,f,c) for source p,
(reduced) freq f and channel c.
.. math::
a[p,f,0] = \\cos(\\theta_p)
a[p,f,1] = \\sin(\\theta_p) \\exp(- 2 j \\pi f \\delta_p)
"""
# TODO: should check that
# * clusterCentroids exists,
# * nchannels of audioObject is 2
# * etc.
if not hasattr(self, "freqs"):
self.comp_sig_repr()
thetas = [cc.theta for cc in self.clusterCentroids]
deltas = [cc.delta for cc in self.clusterCentroids]
# frequencies in reduced form (from 0 to 1/2)
freqs = self.freqs * 1. / self.audioObject.samplerate
nsrc = len(self.clusterCentroids)
A = np.zeros([nsrc, self.freqs.size,
self.audioObject.channels], dtype=np.complex)
if nsrc>0:
# left channel:
A[:,:,0] = np.vstack(np.cos(thetas))
# right channel:
A[:,:,1] = (np.vstack(np.sin(thetas)) *
np.exp(- 1j * 2. * np.pi *
np.outer(deltas,
freqs)))
else:
warnings.warn("Caution: the number of sources is %d." %nsrc)
return A
class TFPoint(object):
def __init__(self, demixinstance=None, thetaphidelta=None,
index_scale=0, index_freq=0, index_time=0,
verbose=2):
if demixinstance is None and thetaphidelta is not None:
self.theta = thetaphidelta[0]
self.phi = thetaphidelta[1]
self.delta = thetaphidelta[2]
self.confidence = 100.
self.index_scale = index_scale # for future integration
self.index_freq = index_freq
self.index_time = index_time
self.frequency = 0.
elif demixinstance is not None:
self.theta = demixinstance.thetaAll[index_freq,
index_time]
self.phi = demixinstance.phiAll[index_freq,
index_time]
self.confidence = demixinstance.confidence[index_freq,
index_time]
self.frequency = (
index_freq * 1. / demixinstance.sig_repr_params['fsize'])
self.index_scale = index_scale # for future integration
self.index_freq = index_freq
self.index_time = index_time
# to be able to use general purpose methods like:
# estim_bound_var_theta
self.demixInst = demixinstance
else:
print "TFPoint: generating dummy TFPoint"
self.verbose = verbose
def estimDeltaPhi(self):
"""Estimates a bound on the distance
confidence interval, in samples, to the TFpoint delta
"""
## globally set:
#uVarBound = 2.33
#uDistBound = 2.33
dmax = np.sqrt(
self.demixInst.estim_bound_var_theta(
confidence=self.confidence,
infOrSup='sup',
u=uVarBound)
) * uDistBound
if self.verbose>1 and False:
print "dmax", dmax
cos_ds = (dmax**2 - 2)**2 / 2 - 1 # where does this come from...
if np.arccos(cos_ds) <= min(2*self.theta, np.pi - 2*self.theta):
alpha = 0.5 * np.arccos(np.cos(2 * self.theta) / cos_ds)
deltaPhi = np.arccos(
(cos_ds - np.cos(2 * self.theta) * np.cos(2 * alpha)) /
(np.sin(2 * self.theta) * np.sin(2 * alpha))
)
return deltaPhi / (2 * np.pi * self.frequency)
else:
return np.Inf
def getPeakCandidateIndices(self, zoom=1, distBound=None):
"""computes the indices of compatible delta bins
"""
if distBound is None:
_, distBound = (
self.demixInst.getTFpointsNearTheta_OneScale(
centroid_tfpoint=self)
)
Nft = self.demixInst.sig_repr_params['fsize']
kmax = np.floor((2 * np.pi * self.frequency *
Nft / 2 -
np.abs(self.phi)) /
(2 * np.pi))
kmax = np.maximum(kmax, 0)
ks = np.arange(-kmax, kmax+1)
centroidDeltas = (
(- self.phi + 2 * np.pi * ks) /
(2 * np.pi * self.frequency)
)
deltaXiFFTmin = - (Nft * zoom / 2 - 1)
deltaXiFFTmax = (Nft * zoom / 2)
infBounds = np.zeros(len(ks))
supBounds = np.zeros(len(ks))
bound1 = np.ceil((centroidDeltas - distBound) * zoom)
bound2 = np.floor((centroidDeltas + distBound) * zoom)
bound1 = np.maximum(deltaXiFFTmin, bound1)
bound1 = np.minimum(deltaXiFFTmax, bound1)
bound2 = np.maximum(deltaXiFFTmin, bound2)
bound2 = np.minimum(deltaXiFFTmax, bound2)
infBounds[bound1>=0] = bound1[bound1>=0]
infBounds[bound1<0] = (
bound1[bound1<0] + deltaXiFFTmax - deltaXiFFTmin + 1)
supBounds[bound2>=0] = bound2[bound2>=0]
supBounds[bound2<0] = (
bound2[bound2<0] + deltaXiFFTmax - deltaXiFFTmin + 1)
if np.isinf(distBound):
# return a True vector for all time bins:
self.indices_candidates = np.ones(Nft * zoom, dtype=bool)
else:
self.indices_candidates = np.zeros(Nft * zoom, dtype=bool)
for n in range(len(ks)):
if infBounds[n] <= supBounds[n]:
self.indices_candidates[infBounds[n]:(supBounds[n]+1)]=True
else:
self.indices_candidates[infBounds[n]:] = True
self.indices_candidates[:(supBounds[n]+1)] = True
return self.indices_candidates
def clusterDistBound(self, ):
# this is a bit strange....
#
# %u_nbpPoints(1) <=> 200000pts,u_nbpPoints(2) <=> 400000pts...
u_nbpPoints = [2.33, 2.58, 2.72, 2.81, 2.88, 2.93, 2.98, 3.05]
indNbPts = 7 # dont get why this choice
uVarBound = u_nbpPoints[indNbPts]
uDistBound = u_nbpPoints[indNbPts]
varBounds = self.demixInst.estim_bound_var_theta(
confidence=self.confidence,
infOrSup='sup',
u=uVarBound)
return np.sqrt(varBounds) * uDistBound # distBound
def __str__(self):
return ("[confidence: "+str(self.confidence)+
", delta: "+str(self.delta)+", "+
"phi :"+str(self.phi)+
", theta: "+str(self.theta)+", "+
"freq : "+str(self.frequency)+"]")
def __repr__(self):
return self.__str__()