Created
June 25, 2022 19:41
-
-
Save 0scarB/bfeda580ea6c1a219df26caf256eaf63 to your computer and use it in GitHub Desktop.
Simple file watcher
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from functools import lru_cache | |
import sys | |
import os.path | |
import time | |
WATCH_INTERVAL = 1 | |
SCHEDULER_POLL_INTERVAL = 0.1 | |
PURGE_OLD_UIDS_INTERVAL = 100 * WATCH_INTERVAL | |
IGNORE_FILES_WITH_PREFIXES = {"~"} | |
class CacheFormat: | |
DIRS_FILE_NAME = ".watch-cache" | |
FILES_FILE_PREFIX = ".watch-cache." | |
UID_SEPARATOR = ":" | |
def main(): | |
if len(sys.argv) < 2: | |
raise ValueError(f"Please provide paths to watch as arguments: {sys.argv[0]} [PATHS]") | |
root_paths = [os.path.abspath(rel_path) for rel_path in sys.argv[1:]] | |
root_paths_to_uids = { | |
root_path: extract_cached_uids(root_path) | |
for root_path in root_paths | |
} | |
try: | |
for operation, root_path in schedule(root_paths): | |
uids = root_paths_to_uids[root_path] | |
if operation == "watch": | |
for file_path in watch(root_path, uids): | |
sys.stdout.write(f"{file_path}\n") | |
sys.stdout.flush() | |
elif operation == "purge_old_uids": | |
purge_old_uids(uids) | |
except KeyboardInterrupt: | |
for root_path, uids in root_paths_to_uids.items(): | |
purge_old_uids(uids) | |
create_cache_file(root_path, uids) | |
def watch(root_path, uids): | |
for file_path in iter_files_in_tree(root_path): | |
uid = create_uid(file_path) | |
if uid in uids: | |
continue | |
if should_ignore_file(root_path, file_path): | |
continue | |
yield file_path | |
uids.add(uid) | |
def purge_old_uids(uids): | |
visited_paths = set() | |
# the (last) modification time is the first component in the uid | |
# hence uids will be sorted by the modification time | |
for uid in sorted(uids, reverse=True): | |
_, path = uid.split(CacheFormat.UID_SEPARATOR, maxsplit=1) | |
if path in visited_paths: | |
uids.remove(uid) | |
else: | |
visited_paths.add(path) | |
def schedule(root_paths): | |
watch_any_root_path_interval = WATCH_INTERVAL / len(root_paths) | |
purge_old_uids_any_root_path_interval = PURGE_OLD_UIDS_INTERVAL / len(root_paths) | |
t_watch = time.time() | |
# Offset purge time from watch time by 0.5 to avoid delays | |
t_purge_old_uids = \ | |
time.time() \ | |
+ purge_old_uids_any_root_path_interval - 0.5 * watch_any_root_path_interval | |
watch_count = 0 | |
purge_count = 0 | |
while True: | |
operation = None | |
# Update times first to avoid drift | |
t_current = time.time() | |
if t_current >= t_watch: | |
t_watch += watch_any_root_path_interval | |
operation = "watch" | |
elif t_current >= t_purge_old_uids: | |
t_purge_old_uids += purge_old_uids_any_root_path_interval | |
operation = "purge_old_uids" | |
if operation == "watch": | |
root_path = root_paths[watch_count % len(root_paths)] | |
yield operation, root_path | |
watch_count += 1 | |
elif operation is not None: | |
root_path = root_paths[purge_count % len(root_paths)] | |
yield operation, root_path | |
purge_count += 1 | |
time.sleep(SCHEDULER_POLL_INTERVAL) | |
def should_ignore_file(root_path, file_path): | |
return \ | |
file_path[-1] in IGNORE_FILES_WITH_PREFIXES \ | |
or file_path == get_cache_file_path(root_path) | |
def create_uid(path): | |
t_last_modified = os.stat(path).st_mtime | |
return f"{t_last_modified}{CacheFormat.UID_SEPARATOR}{path}" | |
# Setup | |
# ===== | |
def extract_cached_uids(root_path): | |
try: | |
return extract_cached_uids_from_cache_file(root_path) | |
except FileNotFoundError: | |
return set() | |
def extract_cached_uids_from_cache_file(root_path): | |
cache_file_path = get_cache_file_path(root_path) | |
with open(cache_file_path, "r") as f: | |
return set( | |
line.strip() for line in f.readlines() if line.strip() | |
) | |
@lru_cache(maxsize=100) | |
def get_cache_file_path(root_path): | |
if os.path.isdir(root_path): | |
return f"{root_path}/{CacheFormat.DIRS_FILE_NAME}" | |
elif os.path.isfile(root_path): | |
return f"{os.path.dirname(root_path)}/{CacheFormat.FILES_FILE_PREFIX}{os.path.basename(root_path)}" | |
raise ValueError(f"Path {root_path} is not a file or directory") | |
# Teardown | |
# ======== | |
def create_cache_file(root_path, uids): | |
cache_file_path = get_cache_file_path(root_path) | |
with open(cache_file_path, "w") as f: | |
f.seek(0) | |
f.write("\n".join(uids)) | |
f.truncate() | |
# Helpers | |
# ======= | |
def iter_files_in_tree(path): | |
if os.path.isfile(path): | |
yield path | |
return | |
elif os.path.isdir(path): | |
for child_name in os.listdir(path): | |
child_path = f"{path}/{child_name}" | |
yield from iter_files_in_tree(child_path) | |
else: | |
raise ValueError(f"Path {path} is not a file or directory") | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment