Skip to content

Instantly share code, notes, and snippets.

@lelandbatey
Last active October 22, 2022 00:13
Show Gist options
  • Save lelandbatey/5d26a90f0068ab68d37a161790669090 to your computer and use it in GitHub Desktop.
Save lelandbatey/5d26a90f0068ab68d37a161790669090 to your computer and use it in GitHub Desktop.
Code for visualizing overlapping named spans of time
'''
Timespan is code for visualizing timelines of events. We can show spans of time
on a timeline, as well as individual events.
The following is example code and what it prints:
>>> example_timespans = [
... TimeSpan(1647922287310, 1647922287564, 'A short thing happened'),
... TimeSpan(1648123040908, 1648123109165, 'a span of time where things happening'),
... ]
>>> example_events = [
... Event(1648121040908, 'just a single thing happened here'),
... ]
>>> print_timespans(example_timespans, example_events)
2022-03-22 04:11:22.000
2022-03-22 04:11:23.000
<snip>
2022-03-22 04:11:27.000 - A short thing happened
2022-03-22 04:11:28.000
2022-03-22 04:11:29.000
2022-03-22 04:11:30.000
2022-03-24 11:24:00.908 just a single thing happened here
<snip>
2022-03-24 11:57:20.000 ┐ a span of time where things happening
2022-03-24 11:57:21.000 │ a span of time where things happening
2022-03-24 11:57:22.000 │ a span of time where things happening
2022-03-24 11:57:23.000 │ a span of time where things happening
<snip> │ a span of time where things happening
2022-03-24 11:58:29.000 ┘ a span of time where things happening
2022-03-24 11:58:30.000
2022-03-24 11:58:31.000
2022-03-24 11:58:32.000
'''
from typing import List, Dict
from datetime import datetime, timezone
import time
import math
import sys
import os
class TimeSpan:
def __init__(self, start_moment_ms, end_moment_ms, name=None):
if start_moment_ms > end_moment_ms:
raise ValueError(f"start cannot come after end; {start_moment_ms=} > {end_moment_ms=}")
self.start = start_moment_ms
self.end = end_moment_ms
self.middle = self.start + ((self.end - self.start) // 2)
self.duration = self.end - self.start
self.name = name
def contains(self, ts: int):
if self.start <= ts and self.end > ts:
return True
return False
def overlap(self, other: 'TimeSpan') -> bool:
if other.end <= self.start:
return False
if other.start > self.end:
return False
return True
def __str__(self):
return repr(self)
def __eq__(self, other):
return str(self) == str(other)
def __lt__(self, other):
if self.start == other.start:
return self.end < other.end
return self.start < other.start
def __hash__(self):
return hash(str(self))
def __repr__(self):
return f"TimeSpan(start_moment_ms={self.start:>14}, end_moment_ms={self.end:>14}, name={self.name})"
class Event:
def __init__(self, moment_ms, name=None):
self.moment = moment_ms
self.name = name
def combine_timespans(a: TimeSpan, b: TimeSpan) -> TimeSpan:
if not a.overlap(b):
raise ValueError(f"{a=} does not overlap {b=}")
return TimeSpan(min(a.start, b.start), max(a.end, b.end), a.name)
def coalesce_timespans(spans: List[TimeSpan]) -> List[TimeSpan]:
'''Combines overlapping timespans till there are no overlapping timespans.'''
spans = set(spans)
newspans = set()
while True:
for s in spans:
overlap_count = 0
for xs in spans:
if s != xs and s.overlap(xs):
c = combine_timespans(s, xs)
overlap_count += 1
newspans.add(c)
if overlap_count == 0:
newspans.add(s)
if spans == newspans:
break
spans = newspans.copy()
newspans = set()
return sorted(newspans)
def find_necessary_buckets(
spans: List[TimeSpan],
events: List[Event],
smallest_bucket_ms=1000,
bucket_padding=5,
) -> List[TimeSpan]:
'''Attempt to build time spans describing the "relevant" spans of time to
create buckets for. By doing this, we can have fine granularity of overal
buckets (say, at the level of 1-second per bucket) and cover large spans of
time (multiple days) without taking a lot of time to do comparisons. This
technique is only useful when "spans" and "events" are relatively sparse
compared to the spans of time they cover.
| ┐ Only include these buckets
| ┘
|
|
| ┐ And these buckets
| ┘
Only get buckets around starts & ends of timespans; ignore everything in between
'''
full_events = events.copy()
for s in spans:
full_events.append(Event(s.start))
full_events.append(Event(s.end))
necessary_bucket_spans = list()
tpad = bucket_padding * smallest_bucket_ms
for ev in full_events:
start = ev.moment - tpad
end = ev.moment + tpad
necessary_bucket_spans.append(TimeSpan(start, end))
return necessary_bucket_spans
def normalize_span(span: TimeSpan, span_truncate_ms=1000) -> TimeSpan:
first = span.start
last = span.end
first = (first // span_truncate_ms) * span_truncate_ms
last = int(math.ceil(last / span_truncate_ms)) * span_truncate_ms
return TimeSpan(first, last, name=span.name)
def calc_span_column(spans: List[TimeSpan]) -> Dict[TimeSpan, int]:
'''Map each span to the column that it should be displayed in such that no
overlapping TimeSpans will be given the same column.'''
span_column = {s: -1 for s in spans}
def count_overlaps(subj: TimeSpan, candidates: List[TimeSpan]) -> int:
i = 0
for candidate_span in candidates:
if candidate_span == subj:
continue
if candidate_span.overlap(subj):
i += 1
return i
spans_by_column = {i: set() for i in range(len(spans))}
# Pre-sort spans for nice-ish layout
# Sort by duration first, then by start, since we care about start most of
# all and duration secondary.
sorted_spans = sorted(spans, key=lambda s: -s.duration)
sorted_spans = sorted(sorted_spans, key=lambda s: s.start)
for s in sorted_spans:
column = 0
while True:
if count_overlaps(s, spans_by_column[column]) > 0:
column += 1
else:
break
spans_by_column[column].add(s)
for column, colgrp in spans_by_column.items():
for s in colgrp:
span_column[s] = column
return span_column
def calc_span_widths(spans_to_column: Dict[TimeSpan, int]) -> List[int]:
'''
- List will be at most N long, where N is len(spans_to_column)
- List will be at 1 in length
- Each X in List is a number and has position N within List. X will be the
maximum width of span with column N
'''
span_widths = list()
for n in range(len(spans_to_column)):
names = [s.name for s, col in spans_to_column.items() if col == n]
max_width = max([0,] + [len(name) for name in names])
span_widths.append(max_width)
return span_widths
def fmt_utc_ts(z):
return datetime.utcfromtimestamp(z / 1000).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
LOCAL_TIMEZONE = datetime.now(timezone.utc).astimezone().tzinfo
def fmt_local_ts(z):
dt = datetime.fromtimestamp(z / 1000, tz=timezone.utc)
localdt = dt.astimezone(LOCAL_TIMEZONE)
return localdt.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
def print_timespans(spans: List[TimeSpan], events: List[Event], fmtfunc=None, smallest_bucket_ms=1000):
first = min([s.start for s in spans] + [ev.moment for ev in events])
last = max([s.end for s in spans] + [ev.moment for ev in events])
# mindiff is the minimum difference in time between each "bucket", each
# line printed on the timeline. The smallest this difference may be is 1
# second, and there is no upper limit. If the smallest duration between
# spans is less than 250ms and the smallest_bucket_ms has been provided as
# less than 250ms, then mindiff will be 1ms.
mindiff = max(min(s.duration for s in spans), smallest_bucket_ms)
mindiff = max((mindiff // 250) * 250, 1)
linefmt = "{bucket_start:^24} {timelines} {names}"
buckets = list()
set_of_buckets = set()
# We narrow down the buckets a bunch, then try to snap the beginning and
# end of each "bucket" to a second mark (technically snapping to
# smallest_bucket_ms, which is usually 1 second).
necessary_bucket_spans = find_necessary_buckets(spans, events, smallest_bucket_ms=smallest_bucket_ms)
for tmp in necessary_bucket_spans:
necb = normalize_span(tmp, span_truncate_ms=mindiff)
minispan_count = int(math.ceil(necb.duration / mindiff))
minispan_durat = int(math.ceil(necb.duration / minispan_count))
for x in range(0, minispan_count):
b = TimeSpan(necb.start + (x * minispan_durat), necb.start + ((x + 1) * minispan_durat))
if not b in set_of_buckets:
buckets.append(b)
set_of_buckets.add(b)
buckets = sorted(buckets)
if fmtfunc is None:
fmtfunc = fmt_local_ts
# For ease of implementation, we describe "Events" as TimeSpans which end 2
# milliseconds (2 is not important) after they began.
oldspans = spans
evspans = [TimeSpan(ev.moment, ev.moment+2, ev.name) for ev in events]
spans = oldspans + evspans
emptycount = 1
repeated_lines_count = 1
spans_to_column = calc_span_column(spans)
span_widths = calc_span_widths(spans_to_column)
# We've arranged the timespans into different columns such that all the
# spans in each column will not cover overlapping spans of time. This has
# allowed us to pre-calculate the widths of each column, which means we can
# pre-calculate what each column should look like in case it's blank,
# alowing us to merely "override" columns in that list when necessary to
# create an individual line with all the right padding.
blank_fmtspans = [f"{'':<{width+2+1}}" for width in span_widths]
prior_timelines = list()
clean_prior = list()
for buck in buckets:
# Each "line" on the screen visualizes a moment of time and all the
# timespans which overlap with that brief moment of time.
spanstates = list()
timelines = blank_fmtspans[::]
bucket_start = buck.start
spans_to_draw = [s for s in spans if s.overlap(buck)]
for span in spans_to_draw:
column = spans_to_column[span]
newtl = ' '
if buck.contains(span.start):
newtl = ' ┐'
if repeated_lines_count:
print(linefmt.format(bucket_start='......', timelines=''.join(clean_prior), names=''))
repeated_lines_count = 0
elif buck.contains(span.end):
newtl = ' ┘'
if repeated_lines_count:
print(linefmt.format(bucket_start='......', timelines=''.join(clean_prior), names=''))
repeated_lines_count = 0
# in this case we overlap but we're not at top or bottom, so we
# must in the middle, so we use a straight line
else:
newtl = ' │'
if buck.contains(span.start) and buck.contains(span.end):
newtl = ' -'
timelines[column] = f"{newtl:2<} {span.name:<{span_widths[column]}}"
if repeated_lines_count > 1:
continue
if timelines == prior_timelines:
repeated_lines_count += 1
alltimelines = ''.join(timelines)
print(linefmt.format(bucket_start=fmtfunc(buck.start), timelines=alltimelines, names=''))
prior_timelines = timelines
clean_prior = [x.replace('┘', ' ').replace('┐', '│') for x in prior_timelines]
def create_parsefunc(fmt):
def dateparsefunc(s):
dt = datetime.strptime(s, fmt)
if dt.tzinfo is not None:
dt = dt.astimezone(timezone.utc)
return dt
def intparsefunc(s):
x = float(s)
# if larger than this, probably an epoch timestamp of milliseconds
# instead of the usual 'seconds'
if x > 9999999999:
x = x/1000
dt = datetime.fromtimestamp(x, timezone.utc)
return dt
if fmt == 'int':
return intparsefunc
return dateparsefunc
def guess_format(value):
named_parsefuncs = [
("RFC3339", create_parsefunc("%Y-%m-%dT%H:%M:%S.%f%z")),
("RFC3339", create_parsefunc("%Y-%m-%dT%H:%M:%S%z")),
("YYYY-mm-dd HH:MM:SS.ms", create_parsefunc("%Y-%m-%d %H:%M:%S.%f")),
("YYYY-mm-dd HH:MM:SS.ms TZ", create_parsefunc("%Y-%m-%d %H:%M:%S.%f %z")),
("YYYY-mm-dd HH:MM:SS", create_parsefunc("%Y-%m-%d %H:%M:%S")),
("YYYY-mm-dd HH:MM:SS TZ", create_parsefunc("%Y-%m-%d %H:%M:%S %z")),
("YYYY-mm-dd", create_parsefunc("%Y-%m-%d")),
('epoch second', create_parsefunc('int'))
]
for name, parsefunc in named_parsefuncs:
try:
dt = parsefunc(value)
epoch_ms = int(dt.timestamp() * 1000)
full_isoformat = dt.isoformat()
print(f"# Interpreting timestamp '{value}' as format '{name}', apparently holds an epoch-ms formatted timestamp {epoch_ms}, which in ISO format is: {full_isoformat}", file=sys.stderr)
print(f"# Proceeding with the assumption that all dates have format '{name}'", file=sys.stderr)
return parsefunc
except Exception:
# print(f"Value '{value}' cannot be parsed with fmt '{name}'", file=sys.stderr)
continue
raise ValueError(f"cannot guess format of date value '{value}'")
def epoch_ms(dt: datetime):
return int(dt.timestamp() * 1000)
def parse_input(infile, delim=",", strptime_fmt="guess"):
timeparse = None
if strptime_fmt != "guess":
timeparse = create_parsefunc(strptime_fmt)
spans = list()
for idx, line in enumerate(infile):
line = line.strip()
if not line:
continue
pieces = line.split(delim)
if timeparse is None:
timeparse = guess_format(pieces[0])
if len(pieces) < 2:
raise ValueError(f"line #{idx} is missing a delimiter '{delim}'")
elif len(pieces) == 2:
first = epoch_ms(timeparse(pieces[0]))
# 2 is not important, just a "nice" number; could be anything really
secnd = first+2
name = pieces[1]
elif len(pieces) > 2:
first = epoch_ms(timeparse(pieces[0]))
secnd = epoch_ms(timeparse(pieces[1]))
name = delim.join(pieces[2:])
spans.append(TimeSpan(first, secnd, name))
return spans
def main():
if not os.isatty(sys.stdin.fileno()):
spans = parse_input(sys.stdin)
print_timespans(spans, list())
return
a = TimeSpan(1, 3, "a")
b = TimeSpan(2, 4, "b")
c = TimeSpan(3, 5, "c")
d = TimeSpan(4, 7, "d")
e = TimeSpan(1, 5, "e")
print(f"{a.overlap(b)=} True")
print(f"{a.overlap(c)=} True")
print(f"{a.overlap(d)=} False")
print(f"{a.overlap(e)=} True")
print(f"{b.overlap(d)=} True")
print(f"{d.overlap(b)=} False")
print(f"{d.overlap(a)=} False")
print(f"{e.overlap(a)=} True")
print(f"{e.overlap(b)=} True")
print(f"{e.overlap(c)=} True")
print(f"{e.overlap(d)=} True")
example_timespans = [
TimeSpan(1647922287310, 1647922287564, 'A short thing happened'),
TimeSpan(1648123040908, 1648123109165, 'a span of time where things happening'),
]
example_events = [
Event(1648121040908, 'just a single thing happened here'),
]
print_timespans(
[TimeSpan(x.start*1, x.end*1, x.name) for x in [a, b, c, d, e]],
list(),
fmtfunc=lambda x: str(x),
smallest_bucket_ms=100,
)
print_timespans(
[TimeSpan(x.start*1000+1666130400, x.end*1000+1666130400, x.name) for x in [a, b, c, d, e]],
list(),
)
print_timespans(example_timespans, example_events)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment