Skip to content

Instantly share code, notes, and snippets.

@shaypal5
Created July 1, 2022 10:34
Show Gist options
  • Save shaypal5/23fa1932945ae3b08d6dff116a1d1eb3 to your computer and use it in GitHub Desktop.
Save shaypal5/23fa1932945ae3b08d6dff116a1d1eb3 to your computer and use it in GitHub Desktop.
A naive implementation of Funk's MF for collaborative filtering (commonly and wrongly called SVD for collaborative filtering). Might contain mistakes (let me know).
from typing import Tuple, Optional
import numpy as np
import pandas as pd
def train_val_split(
training_df: pd.DataFrame,
val_ratio: float,
) -> Tuple[pd.DataFrame, pd.DataFrame]:
"""Splits the input training dataset into train/val set.
Parameters
----------
training_df : pandas.DataFrame
The training dataframe to split.
val_ratio : float, between 0 and 1
The ratio of the input dataframe the validation set should comprise.
Returns
-------
pandas.DataFrame, pandas.DataFrame
The resulting train and validation sets, respectively, as dataframes.
"""
training_df.sort_values(by='date', inplace=True)
split_ix = int(len(training_df) * (1 - val_ratio))
return training_df.iloc[:split_ix], training_df[split_ix:]
class CategEncoder():
"""A simple encoder of categorical data.
Parameters
----------
values : pandas.Series
The categorical values to encode (do not have to be a unique set).
def_ix: int
The default integer encoding to give to unknown categorical values.
"""
def __init__(self, values: pd.Series, def_ix: int) -> None:
self._def_ix = def_ix
self.i2v = np.sort(values.unique())
self.n = len(self.i2v)
self._v2i = {
self.i2v[i]: i
for i in range(self.n)
}
def v2i(self, v: object) -> int:
"""Get the integer encoding of an input categorical value.
If it was not seen on CategEncoder initialization, the value set as
the `def_ix` parameter on initialization is returned.
"""
try:
return self._v2i[v]
except KeyError:
return self._def_ix
EPSILON_SHIFT = 0.001
EPSILON_SCALE = 0.00000001
def epsilons_ndarray(shape: Tuple[int] | Tuple[int, int]) -> np.ndarray:
vals = np.random.rand(*shape) + EPSILON_SHIFT
vals = vals * EPSILON_SCALE
sign = np.random.rand(*shape) > 0.5
sign = (sign.astype(int) * 2) - 1
return np.multiply(vals, sign)
class FunkMfRecommender():
"""A naive implementation of Funk's MF for collaborative filtering.
The method is commonly and wrongly called SVD for collaborative filtering.
See here for Simon Funk's original post detailing this method (and
originating the wrong name):
https://sifter.org/simon/journal/20061211.html
And this Wikipedia entry from some clarifications:
https://en.wikipedia.org/wiki/Matrix_factorization_(recommender_systems)
Parameters
----------
user_column_label : str
The column label of the user ids column in input dataframes.
item_column_label : str
The column label of the item ids column in input dataframes.
rating_column_label : str
The column label of the ratings column in input dataframes.
rating_range : (int, int)
The minimum and maximum possible ratings.
f : int
The number of latent factors in the learned model.
gamma : float, optional
The learning rate to use. Defaults to 0.005.
lamda : float, optional
The regularization coefficient to use. Defaults to 0.02.
Misspelled to avoid clashing with the preserved Python keyword
`lambda`.
use_baselines_while_fitting : bool, default True
Whether to use sensible prediction baselines while fitting.
"""
def __init__(
self,
user_column_label: str,
item_column_label: str,
rating_column_label: str,
rating_range: Tuple[int, int],
f: int,
gamma: Optional[float] = 0.005,
lamda: Optional[float] = 0.02,
use_baselines_while_fitting: Optional[bool] = False,
) -> None:
self.usr_colbl = user_column_label
self.itm_colbl = item_column_label
self.rtg_colbl = rating_column_label
self.min_rtg = rating_range[0]
self.max_rtg = rating_range[1]
self.max_rmse = self.max_rtg - self.min_rtg
self.f = f
self.base_gamma = gamma
self.gamma = gamma
# we misspel because 'lambda' is a reserved python word
self.lamda = lamda
self._use_baseline = use_baselines_while_fitting
def _trim_prediction(self, prediction: float) -> float:
return max(1, min(5, prediction))
def _predict_baseline(self, user_id: object, item_id: object) -> float:
try:
return self._trim_prediction(
self.avg_rating[item_id] + self.avg_offset[user_id])
except AttributeError:
raise RuntimeError('FunkMfRecommender was not fitted!')
def _predict_parameterized(
self, user_id: object, item_id: object,
) -> float:
uix = self.user_encoder.v2i(user_id)
iix = self.item_encoder.v2i(item_id)
pred_rating = self._trim_prediction(
self.mu + self.bi[iix] + self.bu[uix] + np.matmul(
self.q[iix], self.p[uix])
)
return pred_rating
def _predict_parameterized_return_ix(
self, user_id: object, item_id: object,
) -> Tuple[float, int, int]:
uix = self.user_encoder.v2i(user_id)
iix = self.item_encoder.v2i(item_id)
pred_rating = self._trim_prediction(
self.mu + self.bi[iix] + self.bu[uix] + np.matmul(
self.q[iix], self.p[uix])
)
return pred_rating, uix, iix
def _predict_while_fitting(
self, user_id: object, item_id: object,
) -> float:
if self._use_baseline:
return self._predict_baseline(user_id, item_id)
return self._predict_parameterized(user_id, item_id)
def predict(self, user_id: object, item_id: object) -> float:
uix = self.user_encoder.v2i(user_id)
iix = self.item_encoder.v2i(item_id)
# If this is a never-before-seen user...
if uix == self.missing_uid:
# ..and a never-before-seen item...
if iix == self.missing_iid:
# ..just return the avg item rating, as it is
# the best estimator we have.
return self.mu
else:
# ..we've seen this item, so return its avg rating
return self.avg_rating[item_id]
# If we know the user, but this is a never-before-seen item...
if iix == self.missing_iid:
# ..ofsset the avg item rating with this user's avg offset
return self._trim_prediction(self.mu + self.avg_offset[user_id])
# Otherwise, we've seen both this item and this user,
# and so we have their baseline parameters and their p and q,
# and so we can predict the rating in a parameterized manner
return self._predict_parameterized(user_id, item_id)
def rmse_on_test(self, test_df: pd.DataFrame) -> float:
sum_squared_errors = 0
for row_ix in test_df.index:
r_ui = test_df.loc[row_ix, self.rtg_colbl]
r_ui_pred = self.predict(
user_id=test_df.loc[row_ix, self.usr_colbl],
item_id=test_df.loc[row_ix, self.itm_colbl],
)
e_ui = r_ui - r_ui_pred
sum_squared_errors += e_ui ** 2
rmse = np.sqrt(sum_squared_errors / len(test_df))
return rmse
def fit(
self,
training_df: pd.DataFrame,
max_steps: Optional[int] = 20,
gamma_decay: Optional[float] = 1,
val_ratio: Optional[float] = 0.2,
verbose: Optional[bool] = False,
) -> None:
"""Fit the MF recommender on an input training set.
Parameters
----------
training_df : pandas.DataFrame
The training set as (user, item, rating) rows. As the initialized
column labels are used to access this, order is not important, and
any additional columns are ignored.
max_steps : int, default 20
The number of stocahstic gradient steps to take before stopping to
optimize the discovered latent space.
gamma_decay : float, default 1
The rate at which to decay gamma, the learning rate, at each
iteration of the stocahstic gradient descent performed. By default,
no decay is introduced.
val_ratio : float, default 0.2
The ratio of the input training set to use as a validation set for
early stopping. The rest of the input training set is used as the
set the algorithm is fitted on.
verbose : bool, default False
If set to True, some informative messages are printed.
"""
def _print(string: str) -> None:
pass
if verbose:
def _print(string: str) -> None:
print(string)
# train-val split
train_df, val_df = train_val_split(training_df, val_ratio)
# preliminaries
self.gamma = self.base_gamma
self.missing_uid = 2 * train_df[self.usr_colbl].nunique()
self.missing_iid = 2 * train_df[self.itm_colbl].nunique()
self.user_encoder = CategEncoder(
values=train_df[self.usr_colbl], def_ix=self.missing_uid)
self.item_encoder = CategEncoder(
values=train_df[self.itm_colbl], def_ix=self.missing_iid)
# learning baselines
self.avg_rating = (train_df[[self.itm_colbl, self.itm_colbl]].groupby(
self.itm_colbl).mean())[self.itm_colbl]
train_df['avg_rating'] = train_df[self.itm_colbl].map(self.avg_rating)
train_df['offset'] = train_df[self.itm_colbl] - train_df['avg_rating']
train_df.drop(['avg_rating', 'offset'], axis=1)
self.avg_offset = (train_df[[self.usr_colbl, 'offset']].groupby(
self.usr_colbl).mean())['offset']
# 4.b. calculate mu
self.mu = train_df[self.itm_colbl].mean()
_print(f"mu (mean rating): {self.mu}")
# 4.c. randomly init q, p, bi, bu
self.n_items: int = train_df[self.itm_colbl].nunique()
_print(f"n_items: {self.n_items}")
self.n_users: int = train_df[self.usr_colbl].nunique()
_print(f"n_users: {self.n_users}")
self.q_shape = tuple([self.n_items, self.f])
self.p_shape = tuple([self.n_users, self.f])
self.bi_shape = tuple([self.n_items])
self.bu_shape = tuple([self.n_users])
self.q = epsilons_ndarray(self.q_shape)
self.p = epsilons_ndarray(self.p_shape)
self.bi = epsilons_ndarray(self.bi_shape)
self.bu = epsilons_ndarray(self.bu_shape)
# prep rating series to iterate
train_df: pd.DataFrame = train_df.sort_values(self.usr_colbl)
ix_cols = [self.usr_colbl, self.itm_colbl]
train_df.index = pd.MultiIndex.from_frame(
train_df[ix_cols], names=ix_cols)
ratings_ser = train_df[self.itm_colbl]
groupby_obj = ratings_ser.groupby(level=0)
# SGD
num_rows = len(train_df)
before_last_error = self.max_rmse + 1
last_error = self.max_rmse
completed_iters = 0
# pbar = tqdm(total=max_steps)
_print("Starting to train...")
while last_error < before_last_error and completed_iters < max_steps:
sum_squared_errors = 0
# for row_ix in train_df.index:
for uid, user_ratings_series in groupby_obj:
# uix = self.user_encoder.v2i(uid)
for ix, r_ui in user_ratings_series.iteritems():
iid = ix[1]
# uid = train_df.loc[row_ix, self.usr_colbl]
# iid = train_df.loc[row_ix, self.itm_colbl]
# iix = self.item_encoder.v2i(iid)
# r_ui = train_df.loc[row_ix, self.itm_colbl]
r_ui_pred, uix, iix = \
self._predict_parameterized_return_ix(uid, iid)
r_ui_pred = self._predict_while_fitting(uid, iid)
e_ui = r_ui - r_ui_pred
sum_squared_errors += e_ui ** 2
self.bu[uix] += self.gamma * (
e_ui - self.lamda * self.bu[uix])
self.bi[iix] += self.gamma * (
e_ui - self.lamda * self.bi[iix])
prev_qi = self.q[iix]
self.q[iix] += self.gamma * (
e_ui * self.p[uix] - self.lamda * self.q[iix])
self.p[uix] += self.gamma * (
e_ui * prev_qi - self.lamda * self.p[uix])
completed_iters += 1
train_rmse = np.sqrt(sum_squared_errors / num_rows)
val_rmse = self.rmse_on_test(val_df)
_print((
f"Step {completed_iters:02d}| γ: {self.gamma:.5f}| "
f"train set RMSE: {train_rmse:.6f} | "
f"val set RMSE: {val_rmse:.6f}"
))
# with at least one iteration done we move from using
# the baseline predictions to the parameterized predictions
self._use_baseline = False
# update errors
before_last_error = last_error
last_error = val_rmse
# gamma decay
self.gamma = gamma_decay * self.gamma
self._last_train_rmse = train_rmse
self._last_val_rmse = last_error
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment