Created
April 4, 2022 00:17
-
-
Save toolness/0aff778914e8c9ce29e0ec94d4d7c0c4 to your computer and use it in GitHub Desktop.
Script to download metadata about your Gmail account into a sqlite database.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
This script can be used to create a sqlite database | |
that contains metadata about all your Gmail messages. | |
You will want to modify IMAP_PASSWORD and IMAP_USERNAME below. | |
See CREATE_TABLE_SQL for the schema of the database. | |
""" | |
import sys | |
import imaplib | |
import re | |
import sqlite3 | |
import unittest | |
import time | |
import pprint | |
from typing import NamedTuple | |
from datetime import datetime | |
from email.parser import BytesParser | |
from email.policy import default | |
IMAP_PASSWORD = "Put your password here" | |
IMAP_USERNAME = "[email protected]" | |
IMAP_SERVER = "imap.gmail.com" | |
IMAP_PORT = 993 | |
MAILBOX = "\"[Gmail]/All Mail\"" | |
RFC822_SIZE_RE = re.compile(br".*RFC822\.SIZE (\d+)") | |
UID_RE = re.compile(br".*UID (\d+)") | |
BATCH_SIZE = 1000 | |
EMAIL_DB_FILENAME = "email.db" | |
CREATE_TABLE_SQL = """ | |
CREATE TABLE IF NOT EXISTS email ( | |
uid INTEGER PRIMARY KEY NOT NULL, | |
date TIMESTAMP NOT NULL, | |
is_flagged INTEGER NOT NULL CHECK(is_flagged IN (0, 1)), | |
is_seen INTEGER NOT NULL CHECK(is_seen IN (0, 1)), | |
is_answered INTEGER NOT NULL CHECK(is_answered IN (0, 1)), | |
is_deleted INTEGER NOT NULL CHECK(is_deleted IN (0, 1)), | |
size INTEGER NOT NULL, | |
from_email TEXT NOT NULL, | |
from_display_name TEXT NOT NULL, | |
to_email TEXT NOT NULL, | |
to_display_name TEXT NOT NULL, | |
subject TEXT NOT NULL | |
) | |
""" | |
INSERT_TABLE_ROW_SQL = """ | |
INSERT INTO email( | |
uid, | |
date, | |
is_flagged, | |
is_seen, | |
is_answered, | |
is_deleted, | |
size, | |
from_email, | |
from_display_name, | |
to_email, | |
to_display_name, | |
subject | |
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) | |
""" | |
class EmailMetadata(NamedTuple): | |
uid: int | |
date: datetime | |
is_flagged: bool | |
is_seen: bool | |
is_answered: bool | |
is_deleted: bool | |
size: int | |
from_email: str | |
from_display_name: str | |
to_email: str | |
to_display_name: str | |
subject: str | |
def parse_size(msg): | |
match = RFC822_SIZE_RE.match(msg) | |
if not match: | |
raise ValueError(f"Unable to find size from {msg}") | |
return int(match.group(1)) | |
def parse_uid(msg): | |
match = UID_RE.match(msg) | |
if not match: | |
raise ValueError(f"Unable to find UID from {msg}") | |
return int(match.group(1)) | |
def safe_string(value: str): | |
try: | |
value.encode('utf-8') | |
return value | |
except UnicodeEncodeError: | |
return value.encode('utf-8', 'ignore').decode('utf-8') | |
def parse_msg(msg): | |
assert isinstance(msg[-1], bytes) | |
assert isinstance(msg[0], tuple) | |
assert isinstance(msg[0][1], bytes) | |
date = datetime.fromtimestamp(time.mktime(imaplib.Internaldate2tuple(msg[-1]))) | |
flags = set(imaplib.ParseFlags(msg[-1])) | |
is_flagged = b"\\Flagged" in flags | |
is_seen = b"\\Seen" in flags | |
is_answered = b"\\Answered" in flags | |
is_deleted = b"\\Deleted" in flags | |
# https://docs.python.org/3/library/email.examples.html | |
headers = BytesParser(policy=default).parsebytes(msg[0][1], headersonly=True) | |
if headers["to"] and headers["to"].addresses: | |
to = headers["to"].addresses[0] | |
to_email = f"{to.username}@{to.domain}".lower() | |
to_display_name = to.display_name | |
else: | |
to_email = '' | |
to_display_name = '' | |
if headers["from"] and headers["from"].addresses: | |
sender = headers['from'].addresses[0] | |
from_email = f"{sender.username}@{sender.domain}".lower() | |
from_display_name = sender.display_name | |
else: | |
from_email = '' | |
from_display_name = '' | |
subject = headers.get('subject', '') | |
assert isinstance(date, datetime) | |
return EmailMetadata( | |
uid=parse_uid(msg[0][0]), | |
date=date, | |
is_flagged=is_flagged, | |
is_seen=is_seen, | |
is_answered=is_answered, | |
is_deleted=is_deleted, | |
size=parse_size(msg[0][0]), | |
from_email=safe_string(from_email), | |
from_display_name=safe_string(from_display_name), | |
to_email=safe_string(to_email), | |
to_display_name=safe_string(to_display_name), | |
subject=safe_string(subject), | |
) | |
def main(): | |
with imaplib.IMAP4_SSL(IMAP_SERVER, port=IMAP_PORT) as M: | |
M.login(IMAP_USERNAME, IMAP_PASSWORD) | |
print(f"Logged into {IMAP_SERVER} as {IMAP_USERNAME}.") | |
status, messages = M.select(MAILBOX) | |
assert status == "OK" | |
# Sequence numbers vs. UIDs are confusing, see: | |
# https://dev.to/kehers/imap-new-messages-since-last-check-44gm | |
max_seq_num = int(messages[0]) | |
con = sqlite3.connect(EMAIL_DB_FILENAME) | |
con.execute(CREATE_TABLE_SQL) | |
max_uid = con.execute("SELECT MAX(uid) FROM email").fetchone()[0] | |
if max_uid is not None: | |
status, response = M.search(None, f"UID {max_uid}") | |
assert status == "OK" | |
next_seq_num = int(response[0]) + 1 | |
else: | |
next_seq_num = 1 | |
while next_seq_num <= max_seq_num: | |
batch_start = next_seq_num | |
batch_end = next_seq_num + (BATCH_SIZE - 1) | |
if batch_end > max_seq_num: | |
batch_end = max_seq_num | |
print(f"Fetching messages {batch_start} to {batch_end}.") | |
# https://datatracker.ietf.org/doc/html/rfc3501#section-6.4.5 | |
status, batch_msg = M.fetch(f"{batch_start}:{batch_end}", "(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER UID)") | |
assert status == "OK" | |
assert len(batch_msg) % 2 == 0 | |
rows = [] | |
for msg_num in range(len(batch_msg) // 2): | |
try: | |
i = msg_num * 2 | |
msg = [batch_msg[i], batch_msg[i + 1]] | |
email_metadata = parse_msg(msg) | |
rows.append(email_metadata) | |
print(f"Parsed #{email_metadata.uid} {email_metadata.date.month}-{email_metadata.date.year}: {repr(email_metadata.subject)}.") | |
except: | |
print("Error processing message:") | |
pprint.pprint(msg) | |
raise | |
pct = batch_end / max_seq_num * 100 | |
print(f"{pct:.2f}% done. Writing rows to database...") | |
con.executemany(INSERT_TABLE_ROW_SQL, rows) | |
con.commit() | |
next_seq_num = batch_end + 1 | |
print(f"Done writing {EMAIL_DB_FILENAME}.") | |
con.close() | |
class TestCases(unittest.TestCase): | |
def test_email_metadata(self): | |
reply = [(b'1 (UID 1 RFC822.SIZE 2103 RFC822.HEADER {220}', b"To: Boop Jones <[email protected]>\r\nFrom: Gmail Team <[email protected]>\r\nSubject: Gmail is different. Here's what you need to know.\r\nContent-Type: text/html; charset=ISO-8859-1\r\nContent-Transfer-Encoding: 7bit\r\n\r\n"), b' INTERNALDATE "18-Jun-2004 03:08:35 +0000" FLAGS (\\Flagged \\Seen))'] | |
m = parse_msg(reply) | |
self.assertEqual(m.date, datetime(2004, 6, 17, 23, 8, 35)) | |
self.assertEqual(m.is_flagged, True) | |
self.assertEqual(m.is_seen, True) | |
self.assertEqual(m.uid, 1) | |
self.assertEqual(m.size, 2103) | |
self.assertEqual(m.subject, "Gmail is different. Here's what you need to know.") | |
self.assertEqual(m.from_display_name, "Gmail Team") | |
self.assertEqual(m.from_email, "[email protected]") | |
self.assertEqual(m.to_display_name, "Boop Jones") | |
self.assertEqual(m.to_email, "[email protected]") | |
def test_parse_size(self): | |
self.assertEqual(parse_size(b'1 (UID 1 RFC822.SIZE 2103 RFC822.HEADER {220}'), 2103) | |
def test_parse_uid(self): | |
self.assertEqual(parse_uid(b'1 (UID 1 RFC822.SIZE 2103 RFC822.HEADER {220}'), 1) | |
if __name__ == '__main__': | |
if len(sys.argv) >= 2 and sys.argv[1] == "test": | |
del sys.argv[1] | |
unittest.main() | |
else: | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment