Skip to content

Instantly share code, notes, and snippets.

@amacfie
Created November 19, 2024 11:18
Show Gist options
  • Save amacfie/cc32d857d9078f38a7b4c4b70c74ff51 to your computer and use it in GitHub Desktop.
Save amacfie/cc32d857d9078f38a7b4c4b70c74ff51 to your computer and use it in GitHub Desktop.
r"""
requires:
pydantic==1.*
numpy==1.*
scipy==1.*
"""
from abc import ABC, abstractmethod
import math
import random
from pydantic import BaseModel
from scipy import stats
import numpy
# https://stackoverflow.com/questions/18441779/how-to-specify-upper-and-lower-limits-when-using-numpy-random-normal
def trunc_norm_sample(mu, sigma, a=0, b=1):
return stats.truncnorm.rvs(
(a - mu) / sigma, (b - mu) / sigma, loc=mu, scale=sigma
)
# https://stackoverflow.com/questions/6824681/get-a-random-boolean-in-python
def random_boolean(p):
return random.random() < p
class Comment(BaseModel):
ident: int
created_at: float
upvote_prob: float
downvote_prob: float
num_up: int
num_down: int
score: float
class Scorer(BaseModel, ABC):
@abstractmethod
def score(self, comment: "Comment", t: float):
pass
class ModifiedBayes(Scorer):
prior_votes: int = 7
decay_rate: float = 1
gravity: float = 1
def score(self, comment: "Comment", t: float):
age = t - comment.created_at
prior_vote_val = (age + 1) ** (-self.decay_rate)
return (
comment.num_up
- comment.num_down
+ self.prior_votes * prior_vote_val
) / (self.prior_votes + (age + 1) ** self.gravity)
class Simulation(BaseModel):
class Config:
arbitrary_types_allowed = True
visitors_per_h: float
prob_comment: float
downvote_prob_leave: float
novote_prob_leave: float
max_unique_comments: int
scorer: Scorer
comments: list[Comment] = []
duplicates_removed: bool = False
# if true, voters downvote all duplicates seen so far that don't have the max
# diff score. if false, votes downvote duplicates after the first unique one.
downvote_nonmax_dups: bool = False
def get_vote_probs(self):
category = numpy.random.choice(
["stinker", "mediocre", "great"],
p=[0.1, 0.8, 0.1],
)
if category == "stinker":
prob_upvote = trunc_norm_sample(mu=0.05, sigma=0.1)
prob_downvote = trunc_norm_sample(mu=0.5, sigma=0.2)
elif category == "mediocre":
prob_upvote = trunc_norm_sample(mu=0.1, sigma=0.05)
prob_downvote = trunc_norm_sample(mu=0.05, sigma=0.05)
else:
prob_upvote = trunc_norm_sample(mu=0.5, sigma=0.2)
prob_downvote = trunc_norm_sample(mu=0.05, sigma=0.1)
return (prob_upvote, prob_downvote)
def run(self):
self.comments = []
num_visitors = math.floor(24 * self.visitors_per_h)
for t in numpy.linspace(0, 24, num_visitors):
for comment in self.comments:
comment.score = self.scorer.score(comment=comment, t=t)
self.comments.sort(key=lambda comment: comment.score, reverse=True)
is_commenter = random_boolean(self.prob_comment)
if is_commenter:
ident = numpy.random.randint(0, self.max_unique_comments)
for prev_comment in self.comments:
if prev_comment.ident == ident:
if not self.duplicates_removed:
self.comments.append(
Comment(
ident=ident,
created_at=round(t, 3),
upvote_prob=prev_comment.upvote_prob,
downvote_prob=prev_comment.downvote_prob,
num_up=0,
num_down=0,
score=0,
)
)
break
else:
upvote_prob, downvote_prob = self.get_vote_probs()
self.comments.append(
Comment(
ident=ident,
created_at=round(t, 3),
upvote_prob=round(upvote_prob, 3),
downvote_prob=round(downvote_prob, 3),
num_up=0,
num_down=0,
score=0,
)
)
else:
votes = dict()
idents_seen = set()
for i, comment in enumerate(self.comments):
if comment.ident not in idents_seen:
idents_seen.add(comment.ident)
upvote = random_boolean(comment.upvote_prob)
downvote = random_boolean(comment.downvote_prob)
else:
if self.downvote_nonmax_dups:
prev_ixes = [
j
for j, prev_comment in enumerate(
self.comments[:i]
)
if prev_comment.ident == comment.ident
]
my_max_vote = max(votes[ix] for ix in prev_ixes)
_, max_diff_ix = max(
(
self.comments[ix].num_up
- self.comments[ix].num_down,
ix,
)
for ix in prev_ixes + [i]
)
if max_diff_ix == i:
upvote = my_max_vote == 1
downvote = my_max_vote == -1
for ix in prev_ixes:
votes[ix] = -1
else:
upvote = False
downvote = True
else:
upvote = False
downvote = True
if upvote:
votes[i] = 1
leave = False
elif downvote:
votes[i] = -1
leave = random_boolean(self.downvote_prob_leave)
else:
votes[i] = 0
leave = random_boolean(self.novote_prob_leave)
if leave:
break
for i, vote in votes.items():
if vote == 1:
self.comments[i].num_up += 1
elif vote == -1:
self.comments[i].num_down += 1
total_upvotes = sum([comment.num_up for comment in self.comments])
upvotes_per_visitor = total_upvotes / num_visitors
return upvotes_per_visitor
if __name__ == "__main__":
simulation = Simulation(
visitors_per_h=10,
prob_comment=0.1,
downvote_prob_leave=0.5,
novote_prob_leave=0.15,
scorer=ModifiedBayes(),
max_unique_comments=1_000,
duplicates_removed=False,
downvote_nonmax_dups=True,
)
vals = [simulation.run() for _ in range(1_000)]
print(numpy.mean(vals))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment