Last active
October 22, 2022 00:13
-
-
Save lelandbatey/5d26a90f0068ab68d37a161790669090 to your computer and use it in GitHub Desktop.
Code for visualizing overlapping named spans of time
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Timespan is code for visualizing timelines of events. We can show spans of time | |
on a timeline, as well as individual events. | |
The following is example code and what it prints: | |
>>> example_timespans = [ | |
... TimeSpan(1647922287310, 1647922287564, 'A short thing happened'), | |
... TimeSpan(1648123040908, 1648123109165, 'a span of time where things happening'), | |
... ] | |
>>> example_events = [ | |
... Event(1648121040908, 'just a single thing happened here'), | |
... ] | |
>>> print_timespans(example_timespans, example_events) | |
2022-03-22 04:11:22.000 | |
2022-03-22 04:11:23.000 | |
<snip> | |
2022-03-22 04:11:27.000 - A short thing happened | |
2022-03-22 04:11:28.000 | |
2022-03-22 04:11:29.000 | |
2022-03-22 04:11:30.000 | |
2022-03-24 11:24:00.908 just a single thing happened here | |
<snip> | |
2022-03-24 11:57:20.000 ┐ a span of time where things happening | |
2022-03-24 11:57:21.000 │ a span of time where things happening | |
2022-03-24 11:57:22.000 │ a span of time where things happening | |
2022-03-24 11:57:23.000 │ a span of time where things happening | |
<snip> │ a span of time where things happening | |
2022-03-24 11:58:29.000 ┘ a span of time where things happening | |
2022-03-24 11:58:30.000 | |
2022-03-24 11:58:31.000 | |
2022-03-24 11:58:32.000 | |
''' | |
from typing import List, Dict | |
from datetime import datetime, timezone | |
import time | |
import math | |
import sys | |
import os | |
class TimeSpan: | |
def __init__(self, start_moment_ms, end_moment_ms, name=None): | |
if start_moment_ms > end_moment_ms: | |
raise ValueError(f"start cannot come after end; {start_moment_ms=} > {end_moment_ms=}") | |
self.start = start_moment_ms | |
self.end = end_moment_ms | |
self.middle = self.start + ((self.end - self.start) // 2) | |
self.duration = self.end - self.start | |
self.name = name | |
def contains(self, ts: int): | |
if self.start <= ts and self.end > ts: | |
return True | |
return False | |
def overlap(self, other: 'TimeSpan') -> bool: | |
if other.end <= self.start: | |
return False | |
if other.start > self.end: | |
return False | |
return True | |
def __str__(self): | |
return repr(self) | |
def __eq__(self, other): | |
return str(self) == str(other) | |
def __lt__(self, other): | |
if self.start == other.start: | |
return self.end < other.end | |
return self.start < other.start | |
def __hash__(self): | |
return hash(str(self)) | |
def __repr__(self): | |
return f"TimeSpan(start_moment_ms={self.start:>14}, end_moment_ms={self.end:>14}, name={self.name})" | |
class Event: | |
def __init__(self, moment_ms, name=None): | |
self.moment = moment_ms | |
self.name = name | |
def combine_timespans(a: TimeSpan, b: TimeSpan) -> TimeSpan: | |
if not a.overlap(b): | |
raise ValueError(f"{a=} does not overlap {b=}") | |
return TimeSpan(min(a.start, b.start), max(a.end, b.end), a.name) | |
def coalesce_timespans(spans: List[TimeSpan]) -> List[TimeSpan]: | |
'''Combines overlapping timespans till there are no overlapping timespans.''' | |
spans = set(spans) | |
newspans = set() | |
while True: | |
for s in spans: | |
overlap_count = 0 | |
for xs in spans: | |
if s != xs and s.overlap(xs): | |
c = combine_timespans(s, xs) | |
overlap_count += 1 | |
newspans.add(c) | |
if overlap_count == 0: | |
newspans.add(s) | |
if spans == newspans: | |
break | |
spans = newspans.copy() | |
newspans = set() | |
return sorted(newspans) | |
def find_necessary_buckets( | |
spans: List[TimeSpan], | |
events: List[Event], | |
smallest_bucket_ms=1000, | |
bucket_padding=5, | |
) -> List[TimeSpan]: | |
'''Attempt to build time spans describing the "relevant" spans of time to | |
create buckets for. By doing this, we can have fine granularity of overal | |
buckets (say, at the level of 1-second per bucket) and cover large spans of | |
time (multiple days) without taking a lot of time to do comparisons. This | |
technique is only useful when "spans" and "events" are relatively sparse | |
compared to the spans of time they cover. | |
| ┐ Only include these buckets | |
| ┘ | |
| | |
| | |
| ┐ And these buckets | |
| ┘ | |
Only get buckets around starts & ends of timespans; ignore everything in between | |
''' | |
full_events = events.copy() | |
for s in spans: | |
full_events.append(Event(s.start)) | |
full_events.append(Event(s.end)) | |
necessary_bucket_spans = list() | |
tpad = bucket_padding * smallest_bucket_ms | |
for ev in full_events: | |
start = ev.moment - tpad | |
end = ev.moment + tpad | |
necessary_bucket_spans.append(TimeSpan(start, end)) | |
return necessary_bucket_spans | |
def normalize_span(span: TimeSpan, span_truncate_ms=1000) -> TimeSpan: | |
first = span.start | |
last = span.end | |
first = (first // span_truncate_ms) * span_truncate_ms | |
last = int(math.ceil(last / span_truncate_ms)) * span_truncate_ms | |
return TimeSpan(first, last, name=span.name) | |
def calc_span_column(spans: List[TimeSpan]) -> Dict[TimeSpan, int]: | |
'''Map each span to the column that it should be displayed in such that no | |
overlapping TimeSpans will be given the same column.''' | |
span_column = {s: -1 for s in spans} | |
def count_overlaps(subj: TimeSpan, candidates: List[TimeSpan]) -> int: | |
i = 0 | |
for candidate_span in candidates: | |
if candidate_span == subj: | |
continue | |
if candidate_span.overlap(subj): | |
i += 1 | |
return i | |
spans_by_column = {i: set() for i in range(len(spans))} | |
# Pre-sort spans for nice-ish layout | |
# Sort by duration first, then by start, since we care about start most of | |
# all and duration secondary. | |
sorted_spans = sorted(spans, key=lambda s: -s.duration) | |
sorted_spans = sorted(sorted_spans, key=lambda s: s.start) | |
for s in sorted_spans: | |
column = 0 | |
while True: | |
if count_overlaps(s, spans_by_column[column]) > 0: | |
column += 1 | |
else: | |
break | |
spans_by_column[column].add(s) | |
for column, colgrp in spans_by_column.items(): | |
for s in colgrp: | |
span_column[s] = column | |
return span_column | |
def calc_span_widths(spans_to_column: Dict[TimeSpan, int]) -> List[int]: | |
''' | |
- List will be at most N long, where N is len(spans_to_column) | |
- List will be at 1 in length | |
- Each X in List is a number and has position N within List. X will be the | |
maximum width of span with column N | |
''' | |
span_widths = list() | |
for n in range(len(spans_to_column)): | |
names = [s.name for s, col in spans_to_column.items() if col == n] | |
max_width = max([0,] + [len(name) for name in names]) | |
span_widths.append(max_width) | |
return span_widths | |
def fmt_utc_ts(z): | |
return datetime.utcfromtimestamp(z / 1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] | |
LOCAL_TIMEZONE = datetime.now(timezone.utc).astimezone().tzinfo | |
def fmt_local_ts(z): | |
dt = datetime.fromtimestamp(z / 1000, tz=timezone.utc) | |
localdt = dt.astimezone(LOCAL_TIMEZONE) | |
return localdt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] | |
def print_timespans(spans: List[TimeSpan], events: List[Event], fmtfunc=None, smallest_bucket_ms=1000): | |
first = min([s.start for s in spans] + [ev.moment for ev in events]) | |
last = max([s.end for s in spans] + [ev.moment for ev in events]) | |
# mindiff is the minimum difference in time between each "bucket", each | |
# line printed on the timeline. The smallest this difference may be is 1 | |
# second, and there is no upper limit. If the smallest duration between | |
# spans is less than 250ms and the smallest_bucket_ms has been provided as | |
# less than 250ms, then mindiff will be 1ms. | |
mindiff = max(min(s.duration for s in spans), smallest_bucket_ms) | |
mindiff = max((mindiff // 250) * 250, 1) | |
linefmt = "{bucket_start:^24} {timelines} {names}" | |
buckets = list() | |
set_of_buckets = set() | |
# We narrow down the buckets a bunch, then try to snap the beginning and | |
# end of each "bucket" to a second mark (technically snapping to | |
# smallest_bucket_ms, which is usually 1 second). | |
necessary_bucket_spans = find_necessary_buckets(spans, events, smallest_bucket_ms=smallest_bucket_ms) | |
for tmp in necessary_bucket_spans: | |
necb = normalize_span(tmp, span_truncate_ms=mindiff) | |
minispan_count = int(math.ceil(necb.duration / mindiff)) | |
minispan_durat = int(math.ceil(necb.duration / minispan_count)) | |
for x in range(0, minispan_count): | |
b = TimeSpan(necb.start + (x * minispan_durat), necb.start + ((x + 1) * minispan_durat)) | |
if not b in set_of_buckets: | |
buckets.append(b) | |
set_of_buckets.add(b) | |
buckets = sorted(buckets) | |
if fmtfunc is None: | |
fmtfunc = fmt_local_ts | |
# For ease of implementation, we describe "Events" as TimeSpans which end 2 | |
# milliseconds (2 is not important) after they began. | |
oldspans = spans | |
evspans = [TimeSpan(ev.moment, ev.moment+2, ev.name) for ev in events] | |
spans = oldspans + evspans | |
emptycount = 1 | |
repeated_lines_count = 1 | |
spans_to_column = calc_span_column(spans) | |
span_widths = calc_span_widths(spans_to_column) | |
# We've arranged the timespans into different columns such that all the | |
# spans in each column will not cover overlapping spans of time. This has | |
# allowed us to pre-calculate the widths of each column, which means we can | |
# pre-calculate what each column should look like in case it's blank, | |
# alowing us to merely "override" columns in that list when necessary to | |
# create an individual line with all the right padding. | |
blank_fmtspans = [f"{'':<{width+2+1}}" for width in span_widths] | |
prior_timelines = list() | |
clean_prior = list() | |
for buck in buckets: | |
# Each "line" on the screen visualizes a moment of time and all the | |
# timespans which overlap with that brief moment of time. | |
spanstates = list() | |
timelines = blank_fmtspans[::] | |
bucket_start = buck.start | |
spans_to_draw = [s for s in spans if s.overlap(buck)] | |
for span in spans_to_draw: | |
column = spans_to_column[span] | |
newtl = ' ' | |
if buck.contains(span.start): | |
newtl = ' ┐' | |
if repeated_lines_count: | |
print(linefmt.format(bucket_start='......', timelines=''.join(clean_prior), names='')) | |
repeated_lines_count = 0 | |
elif buck.contains(span.end): | |
newtl = ' ┘' | |
if repeated_lines_count: | |
print(linefmt.format(bucket_start='......', timelines=''.join(clean_prior), names='')) | |
repeated_lines_count = 0 | |
# in this case we overlap but we're not at top or bottom, so we | |
# must in the middle, so we use a straight line | |
else: | |
newtl = ' │' | |
if buck.contains(span.start) and buck.contains(span.end): | |
newtl = ' -' | |
timelines[column] = f"{newtl:2<} {span.name:<{span_widths[column]}}" | |
if repeated_lines_count > 1: | |
continue | |
if timelines == prior_timelines: | |
repeated_lines_count += 1 | |
alltimelines = ''.join(timelines) | |
print(linefmt.format(bucket_start=fmtfunc(buck.start), timelines=alltimelines, names='')) | |
prior_timelines = timelines | |
clean_prior = [x.replace('┘', ' ').replace('┐', '│') for x in prior_timelines] | |
def create_parsefunc(fmt): | |
def dateparsefunc(s): | |
dt = datetime.strptime(s, fmt) | |
if dt.tzinfo is not None: | |
dt = dt.astimezone(timezone.utc) | |
return dt | |
def intparsefunc(s): | |
x = float(s) | |
# if larger than this, probably an epoch timestamp of milliseconds | |
# instead of the usual 'seconds' | |
if x > 9999999999: | |
x = x/1000 | |
dt = datetime.fromtimestamp(x, timezone.utc) | |
return dt | |
if fmt == 'int': | |
return intparsefunc | |
return dateparsefunc | |
def guess_format(value): | |
named_parsefuncs = [ | |
("RFC3339", create_parsefunc("%Y-%m-%dT%H:%M:%S.%f%z")), | |
("RFC3339", create_parsefunc("%Y-%m-%dT%H:%M:%S%z")), | |
("YYYY-mm-dd HH:MM:SS.ms", create_parsefunc("%Y-%m-%d %H:%M:%S.%f")), | |
("YYYY-mm-dd HH:MM:SS.ms TZ", create_parsefunc("%Y-%m-%d %H:%M:%S.%f %z")), | |
("YYYY-mm-dd HH:MM:SS", create_parsefunc("%Y-%m-%d %H:%M:%S")), | |
("YYYY-mm-dd HH:MM:SS TZ", create_parsefunc("%Y-%m-%d %H:%M:%S %z")), | |
("YYYY-mm-dd", create_parsefunc("%Y-%m-%d")), | |
('epoch second', create_parsefunc('int')) | |
] | |
for name, parsefunc in named_parsefuncs: | |
try: | |
dt = parsefunc(value) | |
epoch_ms = int(dt.timestamp() * 1000) | |
full_isoformat = dt.isoformat() | |
print(f"# Interpreting timestamp '{value}' as format '{name}', apparently holds an epoch-ms formatted timestamp {epoch_ms}, which in ISO format is: {full_isoformat}", file=sys.stderr) | |
print(f"# Proceeding with the assumption that all dates have format '{name}'", file=sys.stderr) | |
return parsefunc | |
except Exception: | |
# print(f"Value '{value}' cannot be parsed with fmt '{name}'", file=sys.stderr) | |
continue | |
raise ValueError(f"cannot guess format of date value '{value}'") | |
def epoch_ms(dt: datetime): | |
return int(dt.timestamp() * 1000) | |
def parse_input(infile, delim=",", strptime_fmt="guess"): | |
timeparse = None | |
if strptime_fmt != "guess": | |
timeparse = create_parsefunc(strptime_fmt) | |
spans = list() | |
for idx, line in enumerate(infile): | |
line = line.strip() | |
if not line: | |
continue | |
pieces = line.split(delim) | |
if timeparse is None: | |
timeparse = guess_format(pieces[0]) | |
if len(pieces) < 2: | |
raise ValueError(f"line #{idx} is missing a delimiter '{delim}'") | |
elif len(pieces) == 2: | |
first = epoch_ms(timeparse(pieces[0])) | |
# 2 is not important, just a "nice" number; could be anything really | |
secnd = first+2 | |
name = pieces[1] | |
elif len(pieces) > 2: | |
first = epoch_ms(timeparse(pieces[0])) | |
secnd = epoch_ms(timeparse(pieces[1])) | |
name = delim.join(pieces[2:]) | |
spans.append(TimeSpan(first, secnd, name)) | |
return spans | |
def main(): | |
if not os.isatty(sys.stdin.fileno()): | |
spans = parse_input(sys.stdin) | |
print_timespans(spans, list()) | |
return | |
a = TimeSpan(1, 3, "a") | |
b = TimeSpan(2, 4, "b") | |
c = TimeSpan(3, 5, "c") | |
d = TimeSpan(4, 7, "d") | |
e = TimeSpan(1, 5, "e") | |
print(f"{a.overlap(b)=} True") | |
print(f"{a.overlap(c)=} True") | |
print(f"{a.overlap(d)=} False") | |
print(f"{a.overlap(e)=} True") | |
print(f"{b.overlap(d)=} True") | |
print(f"{d.overlap(b)=} False") | |
print(f"{d.overlap(a)=} False") | |
print(f"{e.overlap(a)=} True") | |
print(f"{e.overlap(b)=} True") | |
print(f"{e.overlap(c)=} True") | |
print(f"{e.overlap(d)=} True") | |
example_timespans = [ | |
TimeSpan(1647922287310, 1647922287564, 'A short thing happened'), | |
TimeSpan(1648123040908, 1648123109165, 'a span of time where things happening'), | |
] | |
example_events = [ | |
Event(1648121040908, 'just a single thing happened here'), | |
] | |
print_timespans( | |
[TimeSpan(x.start*1, x.end*1, x.name) for x in [a, b, c, d, e]], | |
list(), | |
fmtfunc=lambda x: str(x), | |
smallest_bucket_ms=100, | |
) | |
print_timespans( | |
[TimeSpan(x.start*1000+1666130400, x.end*1000+1666130400, x.name) for x in [a, b, c, d, e]], | |
list(), | |
) | |
print_timespans(example_timespans, example_events) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment