Last active
October 2, 2024 22:22
-
-
Save panzi/595899fe31060256cf0090de2a041069 to your computer and use it in GitHub Desktop.
Python script to list and compare Message-IDs of MBOX files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from typing import Generator | |
import sys | |
import mailbox | |
import argparse | |
import hashlib | |
def hash_body(hasher, msg: mailbox.Message) -> None: | |
payload = msg.get_payload(decode=False) | |
if isinstance(payload, list): | |
for part in payload: | |
if isinstance(part, str): | |
hasher.update(part.encode()) | |
elif isinstance(part, bytes): | |
hasher.update(part) | |
else: | |
hash_body(hasher, part) | |
elif isinstance(payload, str): | |
hasher.update(payload.encode()) | |
elif isinstance(payload, bytes): | |
hasher.update(payload) | |
else: | |
hash_body(hasher, payload) | |
def lsmsgid(path: str) -> Generator[str, None, None]: | |
box = mailbox.mbox(path) | |
for email in box: | |
msg_id = email['Message-ID'] | |
if msg_id is not None: | |
msg_id = msg_id.strip() | |
if not msg_id: | |
hasher = hashlib.sha1() | |
hash_body(hasher, email) | |
digest = hasher.hexdigest() | |
fake_msg_id = f'<MISSING-Message-ID: {email["Date"]}|{email["From"]}|{email["Subject"]}|{digest}>' | |
yield fake_msg_id | |
else: | |
yield msg_id | |
if sys.stdout.isatty(): | |
RED = "\033[0;31m" | |
GREEN = "\033[0;32m" | |
BLUE = "\033[0;34m" | |
NORMAL = "\033[0m" | |
else: | |
RED = GREEN = BLUE = NORMAL = "" | |
def diff_msgids(path1: str, path2: str) -> None: | |
print('Reading:', path1) | |
ids1 = set(lsmsgid(path1)) | |
print('Reading:', path2) | |
ids2 = set(lsmsgid(path2)) | |
print('Comparing...') | |
only_ids1 = ids1.difference(ids2) | |
only_ids2 = ids2.difference(ids1) | |
if not only_ids1 and not only_ids2: | |
print(f'{BLUE}# The two files are identical{NORMAL}') | |
else: | |
both_ids = [*only_ids1, *only_ids2] | |
both_ids.sort() | |
for msg_id in both_ids: | |
if msg_id in only_ids1: | |
print(f'{GREEN}+ {msg_id}{NORMAL}') | |
else: | |
print(f'{RED}- {msg_id}{NORMAL}') | |
print(len(both_ids), 'differences') | |
def mdiff_msgids(base_path: str, other_paths: list[str]) -> None: | |
print('Reading:', base_path) | |
base_ids = set(lsmsgid(base_path)) | |
count = 0 | |
for path in other_paths: | |
print('Reading:', path) | |
other_ids = set(lsmsgid(path)) | |
missing_ids = sorted(other_ids.difference(base_ids)) | |
if not missing_ids: | |
print(f'{GREEN}# No missing IDs{NORMAL}') | |
else: | |
for msg_id in missing_ids: | |
print(f'{RED}- {msg_id}{NORMAL}') | |
print(count, 'differences') | |
def main() -> None: | |
parser = argparse.ArgumentParser() | |
subparsers = parser.add_subparsers() | |
cmd_ls = subparsers.add_parser('ls') | |
cmd_ls.set_defaults(cmd='ls') | |
cmd_ls.add_argument('paths', nargs='*') | |
cmd_diff = subparsers.add_parser('diff') | |
cmd_diff.set_defaults(cmd='diff') | |
cmd_diff.add_argument('path1') | |
cmd_diff.add_argument('path2') | |
cmd_mdiff = subparsers.add_parser('mdiff') | |
cmd_mdiff.set_defaults(cmd='mdiff') | |
cmd_mdiff.add_argument('base_path') | |
cmd_mdiff.add_argument('other_paths', nargs='+') | |
args = parser.parse_args() | |
if args.cmd == 'ls': | |
for msg_id in lsmsgid(args.path): | |
print(msg_id) | |
elif args.cmd == 'diff': | |
diff_msgids(args.path1, args.path2) | |
elif args.cmd == 'mdiff': | |
mdiff_msgids(args.base_path, args.other_paths) | |
else: | |
print('Illegal arguments', file=sys.stderr) | |
parser.print_usage() | |
sys.exit(1) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment