This repository has been archived by the owner on Oct 24, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
harmonic_analysis.py
230 lines (201 loc) · 7.93 KB
/
harmonic_analysis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import numpy as np
import logging
LOGGER = logging.getLogger(__name__)
PI2I = 2 * np.pi * complex(0, 1)
ZERO_PAD_DEF = False
HANN_DEF = False
SUBTR_ORBIT_DEF = False
class HarmonicAnalysis(object):
def __init__(self, samples, zero_pad=ZERO_PAD_DEF, hann=HANN_DEF,
subtr_orbit=SUBTR_ORBIT_DEF):
self._samples = samples
self._compute_orbit()
if subtr_orbit:
self._samples = self._samples - self.closed_orbit
if zero_pad:
self._pad_signal()
self._length = len(self._samples)
self._int_range = np.arange(self._length)
self._hann_window = None
if hann:
self._hann_window = np.hanning(self._length)
def laskar_method(self, num_harmonics):
samples = self._samples[:] # Copy the samples array.
n = self._length
coefficients = []
frequencies = []
for _ in range(num_harmonics):
# Compute this harmonic frequency and coefficient.
dft_data = HarmonicAnalysis._fft(samples)
frequency = self._jacobsen(dft_data)
coefficient = HarmonicAnalysis._compute_coef(
samples,
frequency * n
) / n
# Store frequency and amplitude
coefficients.append(coefficient)
frequencies.append(frequency)
# Subtract the found pure tune from the signal
new_signal = coefficient * np.exp(PI2I * frequency * self._int_range)
samples = samples - new_signal
coefficients, frequencies = zip(*sorted(zip(coefficients, frequencies),
key=lambda tuple: np.abs(tuple[0]),
reverse=True))
return frequencies, coefficients
def fft_method(self, num_harmonics):
dft_data = HarmonicAnalysis._fft(self._samples)
n = float(len(self._samples))
indices = np.argsort(np.abs(dft_data))[::-1][:num_harmonics]
frequencies = indices / n
coefficients = dft_data[indices] / n
return frequencies, coefficients
def get_signal(self):
if self._hann_window is not None:
return self._samples * self._hann_window
else:
return self._samples
def get_coefficient_for_freq(self, freq):
return self._compute_coef(self._samples, freq * self._length) / self._length
def _pad_signal(self):
"""
Pads the signal with zeros to a "good" FFT size.
"""
length = len(self._samples)
# TODO Think proper pad size
pad_length = (1 << (length - 1).bit_length()) - length
# pad_length = 6600 - length
self._samples = np.pad(
self._samples,
(0, pad_length),
'constant'
)
# self._samples = self._samples[:6000]
def _jacobsen(self, dft_values):
"""
This method interpolates the real frequency of the
signal using the three highest peaks in the FFT.
"""
k = np.argmax(np.abs(dft_values))
n = self._length
r = dft_values
delta = np.tan(np.pi / n) / (np.pi / n)
kp = (k + 1) % n
km = (k - 1) % n
delta = delta * np.real((r[km] - r[kp]) / (2 * r[k] - r[km] - r[kp]))
return (k + delta) / n
def _newton(self, kprime, samples):
# TODO: Properly do this
import scipy
new_kprime = scipy.optimize.newton(self._compute_coef_for_newton, kprime, args=(samples, ))
return new_kprime
def _compute_coef_for_newton(self, kprime, samples):
# TODO: Properly do this
n = len(samples)
freq = kprime / n
coef = self._compute_coef(samples, kprime)
ints = np.arange(n)
exponents = np.exp(-PI2I * freq * ints)
derivative_coef = complex(0, 1) * np.sum(exponents * ints * samples)
result = coef.real * derivative_coef.real + coef.imag * derivative_coef.imag
return result
@staticmethod
def _compute_coef_simple(samples, kprime):
"""
Computes the coefficient of the Discrete Time Fourier
Transform corresponding to the given frequency (kprime).
"""
n = len(samples)
freq = kprime / n
exponents = np.exp(-PI2I * freq * np.arange(n))
coef = np.sum(exponents * samples)
return coef
def _compute_coef_goertzel(samples, kprime):
"""
Computes the coefficient of the Discrete Time Fourier
Transform corresponding to the given frequency (kprime).
This function is faster than the previous one if compiled
with Numba.
"""
n = len(samples)
a = 2 * np.pi * (kprime / n)
b = 2 * np.cos(a)
c = np.exp(-complex(0, 1) * a)
d = np.exp(-complex(0, 1) *
((2 * np.pi * kprime) / n) *
(n - 1))
s0 = 0.
s1 = 0.
s2 = 0.
for i in range(n - 1):
s0 = samples[i] + b * s1 - s2
s2 = s1
s1 = s0
s0 = samples[n - 1] + b * s1 - s2
y = s0 - s1 * c
return y * d
def _compute_orbit(self):
self.closed_orbit = np.mean(self._samples)
self.closed_orbit_rms = np.std(self._samples) / np.sqrt(len(self._samples))
self.peak_to_peak = np.max(self._samples) - np.min(self._samples)
@staticmethod
def _conditional_import_compute_coef():
"""
Checks if Numba is installed.
If it is, it sets the compiled goertzel algorithm as the
coefficient function to use. If it isn't, it uses the
normal Numpy one.
"""
try:
from numba import jit
LOGGER.debug("Using compiled Numba functions.")
return jit(HarmonicAnalysis._compute_coef_goertzel,
nopython=True, nogil=True)
except ImportError:
print("Numba not found, using numpy functions.")
LOGGER.debug("Numba not found, using numpy functions.")
return HarmonicAnalysis._compute_coef_simple
@staticmethod
def _conditional_import_fft():
"""
If SciPy is installed, it will set its fft as the one
to use as it is slightly faster. Otherwise it will use
the Numpy one.
"""
try:
from scipy.fftpack import fft as scipy_fft
fft = staticmethod(scipy_fft)
LOGGER.debug("Scipy found, using scipy FFT.")
except ImportError:
from numpy.fft import fft as numpy_fft
print("Scipy not found, using numpy FFT.")
LOGGER.debug("Scipy not found, using numpy FFT.")
fft = staticmethod(numpy_fft)
return fft
# Set up conditional functions on load ##############################################
HarmonicAnalysis._compute_coef = staticmethod(HarmonicAnalysis._conditional_import_compute_coef())
HarmonicAnalysis._fft = HarmonicAnalysis._conditional_import_fft()
#####################################################################################
class HarmonicPlotter(object):
def __init__(self, harmonic_analisys):
import matplotlib.pyplot as plt
plt.rcParams['backend'] = "Qt4Agg"
self._plt = plt
self._harmonic_analisys = harmonic_analisys
def plot_laskar(self, num_harmonics):
(frequencies,
coefficients) = self._harmonic_analisys.laskar_method(num_harmonics)
self._plt.scatter(frequencies, coefficients)
self._show_and_reset()
def plot_windowed_signal(self):
signal = self._harmonic_analisys.get_signal()
self._plt.plot(range(len(signal)), signal)
self._show_and_reset()
def plot_fft(self):
fft_func = HarmonicAnalysis._fft
signal = self._harmonic_analisys.get_signal()
fft = fft_func(signal)
self._plt.plot(range(len(fft)), np.abs(fft))
self._show_and_reset()
def _show_and_reset(self):
self._plt.show()
self._plt.clf()