Created
November 19, 2024 11:18
-
-
Save amacfie/cc32d857d9078f38a7b4c4b70c74ff51 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
r""" | |
requires: | |
pydantic==1.* | |
numpy==1.* | |
scipy==1.* | |
""" | |
from abc import ABC, abstractmethod | |
import math | |
import random | |
from pydantic import BaseModel | |
from scipy import stats | |
import numpy | |
# https://stackoverflow.com/questions/18441779/how-to-specify-upper-and-lower-limits-when-using-numpy-random-normal | |
def trunc_norm_sample(mu, sigma, a=0, b=1): | |
return stats.truncnorm.rvs( | |
(a - mu) / sigma, (b - mu) / sigma, loc=mu, scale=sigma | |
) | |
# https://stackoverflow.com/questions/6824681/get-a-random-boolean-in-python | |
def random_boolean(p): | |
return random.random() < p | |
class Comment(BaseModel): | |
ident: int | |
created_at: float | |
upvote_prob: float | |
downvote_prob: float | |
num_up: int | |
num_down: int | |
score: float | |
class Scorer(BaseModel, ABC): | |
@abstractmethod | |
def score(self, comment: "Comment", t: float): | |
pass | |
class ModifiedBayes(Scorer): | |
prior_votes: int = 7 | |
decay_rate: float = 1 | |
gravity: float = 1 | |
def score(self, comment: "Comment", t: float): | |
age = t - comment.created_at | |
prior_vote_val = (age + 1) ** (-self.decay_rate) | |
return ( | |
comment.num_up | |
- comment.num_down | |
+ self.prior_votes * prior_vote_val | |
) / (self.prior_votes + (age + 1) ** self.gravity) | |
class Simulation(BaseModel): | |
class Config: | |
arbitrary_types_allowed = True | |
visitors_per_h: float | |
prob_comment: float | |
downvote_prob_leave: float | |
novote_prob_leave: float | |
max_unique_comments: int | |
scorer: Scorer | |
comments: list[Comment] = [] | |
duplicates_removed: bool = False | |
# if true, voters downvote all duplicates seen so far that don't have the max | |
# diff score. if false, votes downvote duplicates after the first unique one. | |
downvote_nonmax_dups: bool = False | |
def get_vote_probs(self): | |
category = numpy.random.choice( | |
["stinker", "mediocre", "great"], | |
p=[0.1, 0.8, 0.1], | |
) | |
if category == "stinker": | |
prob_upvote = trunc_norm_sample(mu=0.05, sigma=0.1) | |
prob_downvote = trunc_norm_sample(mu=0.5, sigma=0.2) | |
elif category == "mediocre": | |
prob_upvote = trunc_norm_sample(mu=0.1, sigma=0.05) | |
prob_downvote = trunc_norm_sample(mu=0.05, sigma=0.05) | |
else: | |
prob_upvote = trunc_norm_sample(mu=0.5, sigma=0.2) | |
prob_downvote = trunc_norm_sample(mu=0.05, sigma=0.1) | |
return (prob_upvote, prob_downvote) | |
def run(self): | |
self.comments = [] | |
num_visitors = math.floor(24 * self.visitors_per_h) | |
for t in numpy.linspace(0, 24, num_visitors): | |
for comment in self.comments: | |
comment.score = self.scorer.score(comment=comment, t=t) | |
self.comments.sort(key=lambda comment: comment.score, reverse=True) | |
is_commenter = random_boolean(self.prob_comment) | |
if is_commenter: | |
ident = numpy.random.randint(0, self.max_unique_comments) | |
for prev_comment in self.comments: | |
if prev_comment.ident == ident: | |
if not self.duplicates_removed: | |
self.comments.append( | |
Comment( | |
ident=ident, | |
created_at=round(t, 3), | |
upvote_prob=prev_comment.upvote_prob, | |
downvote_prob=prev_comment.downvote_prob, | |
num_up=0, | |
num_down=0, | |
score=0, | |
) | |
) | |
break | |
else: | |
upvote_prob, downvote_prob = self.get_vote_probs() | |
self.comments.append( | |
Comment( | |
ident=ident, | |
created_at=round(t, 3), | |
upvote_prob=round(upvote_prob, 3), | |
downvote_prob=round(downvote_prob, 3), | |
num_up=0, | |
num_down=0, | |
score=0, | |
) | |
) | |
else: | |
votes = dict() | |
idents_seen = set() | |
for i, comment in enumerate(self.comments): | |
if comment.ident not in idents_seen: | |
idents_seen.add(comment.ident) | |
upvote = random_boolean(comment.upvote_prob) | |
downvote = random_boolean(comment.downvote_prob) | |
else: | |
if self.downvote_nonmax_dups: | |
prev_ixes = [ | |
j | |
for j, prev_comment in enumerate( | |
self.comments[:i] | |
) | |
if prev_comment.ident == comment.ident | |
] | |
my_max_vote = max(votes[ix] for ix in prev_ixes) | |
_, max_diff_ix = max( | |
( | |
self.comments[ix].num_up | |
- self.comments[ix].num_down, | |
ix, | |
) | |
for ix in prev_ixes + [i] | |
) | |
if max_diff_ix == i: | |
upvote = my_max_vote == 1 | |
downvote = my_max_vote == -1 | |
for ix in prev_ixes: | |
votes[ix] = -1 | |
else: | |
upvote = False | |
downvote = True | |
else: | |
upvote = False | |
downvote = True | |
if upvote: | |
votes[i] = 1 | |
leave = False | |
elif downvote: | |
votes[i] = -1 | |
leave = random_boolean(self.downvote_prob_leave) | |
else: | |
votes[i] = 0 | |
leave = random_boolean(self.novote_prob_leave) | |
if leave: | |
break | |
for i, vote in votes.items(): | |
if vote == 1: | |
self.comments[i].num_up += 1 | |
elif vote == -1: | |
self.comments[i].num_down += 1 | |
total_upvotes = sum([comment.num_up for comment in self.comments]) | |
upvotes_per_visitor = total_upvotes / num_visitors | |
return upvotes_per_visitor | |
if __name__ == "__main__": | |
simulation = Simulation( | |
visitors_per_h=10, | |
prob_comment=0.1, | |
downvote_prob_leave=0.5, | |
novote_prob_leave=0.15, | |
scorer=ModifiedBayes(), | |
max_unique_comments=1_000, | |
duplicates_removed=False, | |
downvote_nonmax_dups=True, | |
) | |
vals = [simulation.run() for _ in range(1_000)] | |
print(numpy.mean(vals)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment