Created
July 1, 2022 10:34
-
-
Save shaypal5/23fa1932945ae3b08d6dff116a1d1eb3 to your computer and use it in GitHub Desktop.
A naive implementation of Funk's MF for collaborative filtering (commonly and wrongly called SVD for collaborative filtering). Might contain mistakes (let me know).
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Tuple, Optional | |
import numpy as np | |
import pandas as pd | |
def train_val_split( | |
training_df: pd.DataFrame, | |
val_ratio: float, | |
) -> Tuple[pd.DataFrame, pd.DataFrame]: | |
"""Splits the input training dataset into train/val set. | |
Parameters | |
---------- | |
training_df : pandas.DataFrame | |
The training dataframe to split. | |
val_ratio : float, between 0 and 1 | |
The ratio of the input dataframe the validation set should comprise. | |
Returns | |
------- | |
pandas.DataFrame, pandas.DataFrame | |
The resulting train and validation sets, respectively, as dataframes. | |
""" | |
training_df.sort_values(by='date', inplace=True) | |
split_ix = int(len(training_df) * (1 - val_ratio)) | |
return training_df.iloc[:split_ix], training_df[split_ix:] | |
class CategEncoder(): | |
"""A simple encoder of categorical data. | |
Parameters | |
---------- | |
values : pandas.Series | |
The categorical values to encode (do not have to be a unique set). | |
def_ix: int | |
The default integer encoding to give to unknown categorical values. | |
""" | |
def __init__(self, values: pd.Series, def_ix: int) -> None: | |
self._def_ix = def_ix | |
self.i2v = np.sort(values.unique()) | |
self.n = len(self.i2v) | |
self._v2i = { | |
self.i2v[i]: i | |
for i in range(self.n) | |
} | |
def v2i(self, v: object) -> int: | |
"""Get the integer encoding of an input categorical value. | |
If it was not seen on CategEncoder initialization, the value set as | |
the `def_ix` parameter on initialization is returned. | |
""" | |
try: | |
return self._v2i[v] | |
except KeyError: | |
return self._def_ix | |
EPSILON_SHIFT = 0.001 | |
EPSILON_SCALE = 0.00000001 | |
def epsilons_ndarray(shape: Tuple[int] | Tuple[int, int]) -> np.ndarray: | |
vals = np.random.rand(*shape) + EPSILON_SHIFT | |
vals = vals * EPSILON_SCALE | |
sign = np.random.rand(*shape) > 0.5 | |
sign = (sign.astype(int) * 2) - 1 | |
return np.multiply(vals, sign) | |
class FunkMfRecommender(): | |
"""A naive implementation of Funk's MF for collaborative filtering. | |
The method is commonly and wrongly called SVD for collaborative filtering. | |
See here for Simon Funk's original post detailing this method (and | |
originating the wrong name): | |
https://sifter.org/simon/journal/20061211.html | |
And this Wikipedia entry from some clarifications: | |
https://en.wikipedia.org/wiki/Matrix_factorization_(recommender_systems) | |
Parameters | |
---------- | |
user_column_label : str | |
The column label of the user ids column in input dataframes. | |
item_column_label : str | |
The column label of the item ids column in input dataframes. | |
rating_column_label : str | |
The column label of the ratings column in input dataframes. | |
rating_range : (int, int) | |
The minimum and maximum possible ratings. | |
f : int | |
The number of latent factors in the learned model. | |
gamma : float, optional | |
The learning rate to use. Defaults to 0.005. | |
lamda : float, optional | |
The regularization coefficient to use. Defaults to 0.02. | |
Misspelled to avoid clashing with the preserved Python keyword | |
`lambda`. | |
use_baselines_while_fitting : bool, default True | |
Whether to use sensible prediction baselines while fitting. | |
""" | |
def __init__( | |
self, | |
user_column_label: str, | |
item_column_label: str, | |
rating_column_label: str, | |
rating_range: Tuple[int, int], | |
f: int, | |
gamma: Optional[float] = 0.005, | |
lamda: Optional[float] = 0.02, | |
use_baselines_while_fitting: Optional[bool] = False, | |
) -> None: | |
self.usr_colbl = user_column_label | |
self.itm_colbl = item_column_label | |
self.rtg_colbl = rating_column_label | |
self.min_rtg = rating_range[0] | |
self.max_rtg = rating_range[1] | |
self.max_rmse = self.max_rtg - self.min_rtg | |
self.f = f | |
self.base_gamma = gamma | |
self.gamma = gamma | |
# we misspel because 'lambda' is a reserved python word | |
self.lamda = lamda | |
self._use_baseline = use_baselines_while_fitting | |
def _trim_prediction(self, prediction: float) -> float: | |
return max(1, min(5, prediction)) | |
def _predict_baseline(self, user_id: object, item_id: object) -> float: | |
try: | |
return self._trim_prediction( | |
self.avg_rating[item_id] + self.avg_offset[user_id]) | |
except AttributeError: | |
raise RuntimeError('FunkMfRecommender was not fitted!') | |
def _predict_parameterized( | |
self, user_id: object, item_id: object, | |
) -> float: | |
uix = self.user_encoder.v2i(user_id) | |
iix = self.item_encoder.v2i(item_id) | |
pred_rating = self._trim_prediction( | |
self.mu + self.bi[iix] + self.bu[uix] + np.matmul( | |
self.q[iix], self.p[uix]) | |
) | |
return pred_rating | |
def _predict_parameterized_return_ix( | |
self, user_id: object, item_id: object, | |
) -> Tuple[float, int, int]: | |
uix = self.user_encoder.v2i(user_id) | |
iix = self.item_encoder.v2i(item_id) | |
pred_rating = self._trim_prediction( | |
self.mu + self.bi[iix] + self.bu[uix] + np.matmul( | |
self.q[iix], self.p[uix]) | |
) | |
return pred_rating, uix, iix | |
def _predict_while_fitting( | |
self, user_id: object, item_id: object, | |
) -> float: | |
if self._use_baseline: | |
return self._predict_baseline(user_id, item_id) | |
return self._predict_parameterized(user_id, item_id) | |
def predict(self, user_id: object, item_id: object) -> float: | |
uix = self.user_encoder.v2i(user_id) | |
iix = self.item_encoder.v2i(item_id) | |
# If this is a never-before-seen user... | |
if uix == self.missing_uid: | |
# ..and a never-before-seen item... | |
if iix == self.missing_iid: | |
# ..just return the avg item rating, as it is | |
# the best estimator we have. | |
return self.mu | |
else: | |
# ..we've seen this item, so return its avg rating | |
return self.avg_rating[item_id] | |
# If we know the user, but this is a never-before-seen item... | |
if iix == self.missing_iid: | |
# ..ofsset the avg item rating with this user's avg offset | |
return self._trim_prediction(self.mu + self.avg_offset[user_id]) | |
# Otherwise, we've seen both this item and this user, | |
# and so we have their baseline parameters and their p and q, | |
# and so we can predict the rating in a parameterized manner | |
return self._predict_parameterized(user_id, item_id) | |
def rmse_on_test(self, test_df: pd.DataFrame) -> float: | |
sum_squared_errors = 0 | |
for row_ix in test_df.index: | |
r_ui = test_df.loc[row_ix, self.rtg_colbl] | |
r_ui_pred = self.predict( | |
user_id=test_df.loc[row_ix, self.usr_colbl], | |
item_id=test_df.loc[row_ix, self.itm_colbl], | |
) | |
e_ui = r_ui - r_ui_pred | |
sum_squared_errors += e_ui ** 2 | |
rmse = np.sqrt(sum_squared_errors / len(test_df)) | |
return rmse | |
def fit( | |
self, | |
training_df: pd.DataFrame, | |
max_steps: Optional[int] = 20, | |
gamma_decay: Optional[float] = 1, | |
val_ratio: Optional[float] = 0.2, | |
verbose: Optional[bool] = False, | |
) -> None: | |
"""Fit the MF recommender on an input training set. | |
Parameters | |
---------- | |
training_df : pandas.DataFrame | |
The training set as (user, item, rating) rows. As the initialized | |
column labels are used to access this, order is not important, and | |
any additional columns are ignored. | |
max_steps : int, default 20 | |
The number of stocahstic gradient steps to take before stopping to | |
optimize the discovered latent space. | |
gamma_decay : float, default 1 | |
The rate at which to decay gamma, the learning rate, at each | |
iteration of the stocahstic gradient descent performed. By default, | |
no decay is introduced. | |
val_ratio : float, default 0.2 | |
The ratio of the input training set to use as a validation set for | |
early stopping. The rest of the input training set is used as the | |
set the algorithm is fitted on. | |
verbose : bool, default False | |
If set to True, some informative messages are printed. | |
""" | |
def _print(string: str) -> None: | |
pass | |
if verbose: | |
def _print(string: str) -> None: | |
print(string) | |
# train-val split | |
train_df, val_df = train_val_split(training_df, val_ratio) | |
# preliminaries | |
self.gamma = self.base_gamma | |
self.missing_uid = 2 * train_df[self.usr_colbl].nunique() | |
self.missing_iid = 2 * train_df[self.itm_colbl].nunique() | |
self.user_encoder = CategEncoder( | |
values=train_df[self.usr_colbl], def_ix=self.missing_uid) | |
self.item_encoder = CategEncoder( | |
values=train_df[self.itm_colbl], def_ix=self.missing_iid) | |
# learning baselines | |
self.avg_rating = (train_df[[self.itm_colbl, self.itm_colbl]].groupby( | |
self.itm_colbl).mean())[self.itm_colbl] | |
train_df['avg_rating'] = train_df[self.itm_colbl].map(self.avg_rating) | |
train_df['offset'] = train_df[self.itm_colbl] - train_df['avg_rating'] | |
train_df.drop(['avg_rating', 'offset'], axis=1) | |
self.avg_offset = (train_df[[self.usr_colbl, 'offset']].groupby( | |
self.usr_colbl).mean())['offset'] | |
# 4.b. calculate mu | |
self.mu = train_df[self.itm_colbl].mean() | |
_print(f"mu (mean rating): {self.mu}") | |
# 4.c. randomly init q, p, bi, bu | |
self.n_items: int = train_df[self.itm_colbl].nunique() | |
_print(f"n_items: {self.n_items}") | |
self.n_users: int = train_df[self.usr_colbl].nunique() | |
_print(f"n_users: {self.n_users}") | |
self.q_shape = tuple([self.n_items, self.f]) | |
self.p_shape = tuple([self.n_users, self.f]) | |
self.bi_shape = tuple([self.n_items]) | |
self.bu_shape = tuple([self.n_users]) | |
self.q = epsilons_ndarray(self.q_shape) | |
self.p = epsilons_ndarray(self.p_shape) | |
self.bi = epsilons_ndarray(self.bi_shape) | |
self.bu = epsilons_ndarray(self.bu_shape) | |
# prep rating series to iterate | |
train_df: pd.DataFrame = train_df.sort_values(self.usr_colbl) | |
ix_cols = [self.usr_colbl, self.itm_colbl] | |
train_df.index = pd.MultiIndex.from_frame( | |
train_df[ix_cols], names=ix_cols) | |
ratings_ser = train_df[self.itm_colbl] | |
groupby_obj = ratings_ser.groupby(level=0) | |
# SGD | |
num_rows = len(train_df) | |
before_last_error = self.max_rmse + 1 | |
last_error = self.max_rmse | |
completed_iters = 0 | |
# pbar = tqdm(total=max_steps) | |
_print("Starting to train...") | |
while last_error < before_last_error and completed_iters < max_steps: | |
sum_squared_errors = 0 | |
# for row_ix in train_df.index: | |
for uid, user_ratings_series in groupby_obj: | |
# uix = self.user_encoder.v2i(uid) | |
for ix, r_ui in user_ratings_series.iteritems(): | |
iid = ix[1] | |
# uid = train_df.loc[row_ix, self.usr_colbl] | |
# iid = train_df.loc[row_ix, self.itm_colbl] | |
# iix = self.item_encoder.v2i(iid) | |
# r_ui = train_df.loc[row_ix, self.itm_colbl] | |
r_ui_pred, uix, iix = \ | |
self._predict_parameterized_return_ix(uid, iid) | |
r_ui_pred = self._predict_while_fitting(uid, iid) | |
e_ui = r_ui - r_ui_pred | |
sum_squared_errors += e_ui ** 2 | |
self.bu[uix] += self.gamma * ( | |
e_ui - self.lamda * self.bu[uix]) | |
self.bi[iix] += self.gamma * ( | |
e_ui - self.lamda * self.bi[iix]) | |
prev_qi = self.q[iix] | |
self.q[iix] += self.gamma * ( | |
e_ui * self.p[uix] - self.lamda * self.q[iix]) | |
self.p[uix] += self.gamma * ( | |
e_ui * prev_qi - self.lamda * self.p[uix]) | |
completed_iters += 1 | |
train_rmse = np.sqrt(sum_squared_errors / num_rows) | |
val_rmse = self.rmse_on_test(val_df) | |
_print(( | |
f"Step {completed_iters:02d}| γ: {self.gamma:.5f}| " | |
f"train set RMSE: {train_rmse:.6f} | " | |
f"val set RMSE: {val_rmse:.6f}" | |
)) | |
# with at least one iteration done we move from using | |
# the baseline predictions to the parameterized predictions | |
self._use_baseline = False | |
# update errors | |
before_last_error = last_error | |
last_error = val_rmse | |
# gamma decay | |
self.gamma = gamma_decay * self.gamma | |
self._last_train_rmse = train_rmse | |
self._last_val_rmse = last_error |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment