Skip to content

Instantly share code, notes, and snippets.

@0scarB
Created June 25, 2022 19:41
Show Gist options
  • Save 0scarB/bfeda580ea6c1a219df26caf256eaf63 to your computer and use it in GitHub Desktop.
Save 0scarB/bfeda580ea6c1a219df26caf256eaf63 to your computer and use it in GitHub Desktop.
Simple file watcher
#!/usr/bin/env python3
from functools import lru_cache
import sys
import os.path
import time
WATCH_INTERVAL = 1
SCHEDULER_POLL_INTERVAL = 0.1
PURGE_OLD_UIDS_INTERVAL = 100 * WATCH_INTERVAL
IGNORE_FILES_WITH_PREFIXES = {"~"}
class CacheFormat:
DIRS_FILE_NAME = ".watch-cache"
FILES_FILE_PREFIX = ".watch-cache."
UID_SEPARATOR = ":"
def main():
if len(sys.argv) < 2:
raise ValueError(f"Please provide paths to watch as arguments: {sys.argv[0]} [PATHS]")
root_paths = [os.path.abspath(rel_path) for rel_path in sys.argv[1:]]
root_paths_to_uids = {
root_path: extract_cached_uids(root_path)
for root_path in root_paths
}
try:
for operation, root_path in schedule(root_paths):
uids = root_paths_to_uids[root_path]
if operation == "watch":
for file_path in watch(root_path, uids):
sys.stdout.write(f"{file_path}\n")
sys.stdout.flush()
elif operation == "purge_old_uids":
purge_old_uids(uids)
except KeyboardInterrupt:
for root_path, uids in root_paths_to_uids.items():
purge_old_uids(uids)
create_cache_file(root_path, uids)
def watch(root_path, uids):
for file_path in iter_files_in_tree(root_path):
uid = create_uid(file_path)
if uid in uids:
continue
if should_ignore_file(root_path, file_path):
continue
yield file_path
uids.add(uid)
def purge_old_uids(uids):
visited_paths = set()
# the (last) modification time is the first component in the uid
# hence uids will be sorted by the modification time
for uid in sorted(uids, reverse=True):
_, path = uid.split(CacheFormat.UID_SEPARATOR, maxsplit=1)
if path in visited_paths:
uids.remove(uid)
else:
visited_paths.add(path)
def schedule(root_paths):
watch_any_root_path_interval = WATCH_INTERVAL / len(root_paths)
purge_old_uids_any_root_path_interval = PURGE_OLD_UIDS_INTERVAL / len(root_paths)
t_watch = time.time()
# Offset purge time from watch time by 0.5 to avoid delays
t_purge_old_uids = \
time.time() \
+ purge_old_uids_any_root_path_interval - 0.5 * watch_any_root_path_interval
watch_count = 0
purge_count = 0
while True:
operation = None
# Update times first to avoid drift
t_current = time.time()
if t_current >= t_watch:
t_watch += watch_any_root_path_interval
operation = "watch"
elif t_current >= t_purge_old_uids:
t_purge_old_uids += purge_old_uids_any_root_path_interval
operation = "purge_old_uids"
if operation == "watch":
root_path = root_paths[watch_count % len(root_paths)]
yield operation, root_path
watch_count += 1
elif operation is not None:
root_path = root_paths[purge_count % len(root_paths)]
yield operation, root_path
purge_count += 1
time.sleep(SCHEDULER_POLL_INTERVAL)
def should_ignore_file(root_path, file_path):
return \
file_path[-1] in IGNORE_FILES_WITH_PREFIXES \
or file_path == get_cache_file_path(root_path)
def create_uid(path):
t_last_modified = os.stat(path).st_mtime
return f"{t_last_modified}{CacheFormat.UID_SEPARATOR}{path}"
# Setup
# =====
def extract_cached_uids(root_path):
try:
return extract_cached_uids_from_cache_file(root_path)
except FileNotFoundError:
return set()
def extract_cached_uids_from_cache_file(root_path):
cache_file_path = get_cache_file_path(root_path)
with open(cache_file_path, "r") as f:
return set(
line.strip() for line in f.readlines() if line.strip()
)
@lru_cache(maxsize=100)
def get_cache_file_path(root_path):
if os.path.isdir(root_path):
return f"{root_path}/{CacheFormat.DIRS_FILE_NAME}"
elif os.path.isfile(root_path):
return f"{os.path.dirname(root_path)}/{CacheFormat.FILES_FILE_PREFIX}{os.path.basename(root_path)}"
raise ValueError(f"Path {root_path} is not a file or directory")
# Teardown
# ========
def create_cache_file(root_path, uids):
cache_file_path = get_cache_file_path(root_path)
with open(cache_file_path, "w") as f:
f.seek(0)
f.write("\n".join(uids))
f.truncate()
# Helpers
# =======
def iter_files_in_tree(path):
if os.path.isfile(path):
yield path
return
elif os.path.isdir(path):
for child_name in os.listdir(path):
child_path = f"{path}/{child_name}"
yield from iter_files_in_tree(child_path)
else:
raise ValueError(f"Path {path} is not a file or directory")
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment