Last active
November 3, 2023 11:02
-
-
Save MCRE-BE/f40daf732886d091b0886e071abf9e75 to your computer and use it in GitHub Desktop.
A simple Fourier Extrapolation implementation in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Fast Fourier Transform class. | |
A simple fourier based extrapolation or forecasting class. It is heavily inspired | |
by two gists or codes listed below. | |
See Also | |
-------- | |
https://gist.github.com/tartakynov/83f3cd8f44208a1856ce?permalink_comment_id=2311986 | |
https://github.com/unit8co/darts/blob/ea37dc979190dc2e8949514a6c22d22306597c05/darts/models/forecasting/fft.py#L213 | |
Notes | |
----- | |
Requires Python >= 3.11 as we use case match implementation | |
""" | |
import dataclasses | |
from typing import Callable, List, Optional, Self | |
import numpy as np | |
import pandas as pd | |
@dataclasses.dataclass | |
class FourierExtrapolation: | |
"""Fourier Fourcasting Model. | |
Model that performs forecasting or extrapolation of a chosen timeseries using FFT, with frequency filtering | |
controlled by 'n_harmonics' and the option to detrend controlled by 'trend'. | |
Note | |
---- | |
We assume that the Series has no missing values (NaN's, None, etc...) and that the index is a pd.DatetimeIndex. | |
Both are not checked. | |
We don't perform any corrections to match the length to a specific frequency. | |
Parameters | |
---------- | |
n_harmonics : int, by default=10 | |
The number of frequencies that will be used for forecasting | |
trend : Optional[str], by default=None | |
The type of trend we want to use to detrend the series before applying the DFT. | |
Possible values : {None, 'exp', 'poly', 'lin'} | |
trend_poly_degree : int, by default=3 | |
The degree of the polynomial that will be used for detrending, if `trend='poly'`. | |
n_predict : int, by default=365 | |
Number of steps to predict. Can be overwritten in predict. | |
Examples | |
-------- | |
Simply fit the class | |
>>> fft = FourierExtrapolation(n_harmonics=18, trend="poly") | |
A simple example where we fit the class on a random signal with linear, offset, noise and sinus parts | |
>>> from datetime import datetime | |
>>> import pandas as pd | |
>>> import numpy as np | |
>>> N = 20 | |
>>> ys = np.arange(N) * 2 + 4 + np.random.randn(N) + 4*np.sin(2*np.pi*np.arange(N)/5) | |
>>> index = pd.date_range(datetime.today().date(), periods=N, freq="D") | |
>>> data = pd.Series(ys, index=ubdex) | |
>>> fft = FourierExtrapolation(n_harmonics=18, trend="poly") | |
>>> fft = fft.fit(data) | |
>>> pred = fft.predict(3) | |
""" | |
n_harmonics: int = 10 | |
trend: Optional[str] = None | |
trend_poly_degree: int = 3 | |
n_predict: int = 365 | |
# ... Public API ... | |
def fit( | |
self: Self, | |
X: pd.Series, | |
y: Optional[pd.Series] = None, | |
) -> Self: | |
"""Fit the Discrete Fourier Transformation with the provided X. | |
Parameters | |
---------- | |
X : pd.Series | |
Raises | |
------ | |
ValueError | |
In case the provided X is not of a supported dtype. | |
""" | |
# Ability to handle pd.Series and Numpy Arrays. | |
if isinstance(X, pd.Series): | |
self.X = X.values | |
self.Y = X.index | |
elif isinstance(X, np.ndarray): | |
self.X = X | |
self.Y = np.arange(0, len(X)) | |
else: | |
raise ValueError("Not supported import type.") | |
# Save key elements | |
self.series = X.copy() | |
self.size = len(X) | |
# Detrend | |
self.t = np.arange(0, self.size) | |
self._compute_trend() | |
self.X_detrended = self.X - self.trend_func(self.t) | |
# perform dft and get the frequency domain | |
self.fft_values = np.fft.fft(self.X_detrended) | |
self.freq = np.fft.fftfreq(self.size) | |
# Get the chosen amount of harmonics | |
indexes = list(range(self.size)) | |
indexes.sort(key=lambda x: np.absolute(self.freq[x])) | |
self.indexes = indexes[:1 + self.n_harmonics * 2] | |
return self | |
def predict( | |
self: Self, | |
n_predict: Optional[int] = None, | |
) -> pd.Series: | |
"""Predict the chosen timesteps in the future. | |
Parameters | |
---------- | |
n_predict : Optional[int], by default None | |
Number of steps to predict if we want to overwrite the value provided in __init__ | |
Returns | |
------- | |
pd.Series | |
_description_ | |
""" | |
# Compute the final trend | |
if n_predict is not None: self.n_predict = n_predict | |
t = np.arange(0, self.size + self.n_predict) | |
trend = [self.trend_func(i) for i in t] | |
trend = np.array(trend) | |
# Compute the FFT forecast | |
yf = self._compute_fourier( | |
n=self.size, | |
n_predict=self.n_predict, | |
indexes=self.indexes, | |
x_freqdom=self.fft_values, | |
f=self.freq, | |
) | |
# Output to a pd.Series | |
index = pd.date_range(self.series.index[0], periods=t.size, freq="D") | |
out = pd.Series(yf + trend, index=index) | |
return out | |
def fit_predict( | |
self: Self, | |
X: pd.Series, | |
y: Optional[pd.Series], | |
) -> pd.Series: | |
"""Fit and predict in one single step.""" | |
return self.fit(X).predict() | |
def plot(self: Self) -> None: | |
"""Plot the provided training data and the extrapolated FFT result.""" | |
# --- Import --- | |
import matplotlib.pyplot as plt | |
# --- Script --- | |
plt.plot(self.series, 'b', label='x', linewidth=1) | |
plt.plot( | |
self.predict(self.n_predict), | |
'r', | |
label='extrapolation', | |
) | |
plt.legend() | |
plt.show() | |
# ... Private API ... | |
@staticmethod | |
def _compute_fourier( | |
n: int, | |
n_predict: int, | |
indexes: List[int], | |
x_freqdom: np.ndarray, | |
f: np.ndarray, | |
) -> np.ndarray: | |
"""Inverse the fourier decomposition manually. | |
Parameters | |
---------- | |
n : int | |
Number of samples in the training data | |
n_predict : int | |
Number of steps to forecast after the end of the training data | |
indexes : List[int] | |
The relevant harmonics to plot | |
x_freqdom : np.ndarray | |
The computed FFT by np.fft.fft | |
f : np.ndarray | |
The Discrete Fourier Transformation sample frequencies | |
Returns | |
------- | |
nd.array | |
""" | |
t = np.arange(0, n + n_predict) | |
restored_sig = np.zeros(t.size) | |
for i in indexes: | |
ampli = np.absolute(x_freqdom[i]) / n # amplitude | |
phase = np.angle(x_freqdom[i]) # phase | |
restored_sig += ampli * np.cos(2 * np.pi * f[i] * t + phase) | |
return restored_sig | |
# Trend computations | |
def _compute_trend(self: Self) -> Self: | |
"""Set the attributes for the trend computation. | |
Attributes | |
---------- | |
trend_coef : np.ndarray | |
trend_func : Callable | |
""" | |
y = self.series.astype(int) | |
y = y.values # pass it as a numpy array | |
match self.trend: | |
case "poly": | |
self.trend_coef = np.polyfit(self.t, y, self.trend_poly_degree) | |
self.trend_func = self._poly_trend(self.trend_coef) | |
case "exp": | |
self.trend_coef = np.polyfit(self.t, np.log(y), 1) | |
self.trend_func = self._exp_trend | |
case "lin": | |
self.trend_coef = np.polyfit(self.t, y, 1) | |
self.trend_func = self._poly_trend(self.trend_coef) | |
case _: | |
self.trend_coef = None | |
self.trend_func = self._null_trend | |
return self | |
def _trend_exp(self: Self, x: int) -> Callable: | |
"""Helper function for an exponential trend. | |
See Also | |
-------- | |
https://stackoverflow.com/a/3433503/20716078 | |
""" | |
coef = self.trend_coef | |
return np.exp(coef[1]) * np.exp(coef[0] * x) | |
@staticmethod | |
def _poly_trend(trend_coef, *args) -> Callable: | |
"""Helper function to simulate a polynomial trend with chosen degree.""" | |
return np.poly1d(trend_coef) | |
@staticmethod | |
def _null_trend(*args, **kwargs) -> Callable: | |
"""Helper function to simulate a flat trend.""" | |
return 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment