Skip to content

Instantly share code, notes, and snippets.

@toolness
Created April 4, 2022 00:17
Show Gist options
  • Save toolness/0aff778914e8c9ce29e0ec94d4d7c0c4 to your computer and use it in GitHub Desktop.
Save toolness/0aff778914e8c9ce29e0ec94d4d7c0c4 to your computer and use it in GitHub Desktop.
Script to download metadata about your Gmail account into a sqlite database.
"""
This script can be used to create a sqlite database
that contains metadata about all your Gmail messages.
You will want to modify IMAP_PASSWORD and IMAP_USERNAME below.
See CREATE_TABLE_SQL for the schema of the database.
"""
import sys
import imaplib
import re
import sqlite3
import unittest
import time
import pprint
from typing import NamedTuple
from datetime import datetime
from email.parser import BytesParser
from email.policy import default
IMAP_PASSWORD = "Put your password here"
IMAP_USERNAME = "[email protected]"
IMAP_SERVER = "imap.gmail.com"
IMAP_PORT = 993
MAILBOX = "\"[Gmail]/All Mail\""
RFC822_SIZE_RE = re.compile(br".*RFC822\.SIZE (\d+)")
UID_RE = re.compile(br".*UID (\d+)")
BATCH_SIZE = 1000
EMAIL_DB_FILENAME = "email.db"
CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS email (
uid INTEGER PRIMARY KEY NOT NULL,
date TIMESTAMP NOT NULL,
is_flagged INTEGER NOT NULL CHECK(is_flagged IN (0, 1)),
is_seen INTEGER NOT NULL CHECK(is_seen IN (0, 1)),
is_answered INTEGER NOT NULL CHECK(is_answered IN (0, 1)),
is_deleted INTEGER NOT NULL CHECK(is_deleted IN (0, 1)),
size INTEGER NOT NULL,
from_email TEXT NOT NULL,
from_display_name TEXT NOT NULL,
to_email TEXT NOT NULL,
to_display_name TEXT NOT NULL,
subject TEXT NOT NULL
)
"""
INSERT_TABLE_ROW_SQL = """
INSERT INTO email(
uid,
date,
is_flagged,
is_seen,
is_answered,
is_deleted,
size,
from_email,
from_display_name,
to_email,
to_display_name,
subject
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
class EmailMetadata(NamedTuple):
uid: int
date: datetime
is_flagged: bool
is_seen: bool
is_answered: bool
is_deleted: bool
size: int
from_email: str
from_display_name: str
to_email: str
to_display_name: str
subject: str
def parse_size(msg):
match = RFC822_SIZE_RE.match(msg)
if not match:
raise ValueError(f"Unable to find size from {msg}")
return int(match.group(1))
def parse_uid(msg):
match = UID_RE.match(msg)
if not match:
raise ValueError(f"Unable to find UID from {msg}")
return int(match.group(1))
def safe_string(value: str):
try:
value.encode('utf-8')
return value
except UnicodeEncodeError:
return value.encode('utf-8', 'ignore').decode('utf-8')
def parse_msg(msg):
assert isinstance(msg[-1], bytes)
assert isinstance(msg[0], tuple)
assert isinstance(msg[0][1], bytes)
date = datetime.fromtimestamp(time.mktime(imaplib.Internaldate2tuple(msg[-1])))
flags = set(imaplib.ParseFlags(msg[-1]))
is_flagged = b"\\Flagged" in flags
is_seen = b"\\Seen" in flags
is_answered = b"\\Answered" in flags
is_deleted = b"\\Deleted" in flags
# https://docs.python.org/3/library/email.examples.html
headers = BytesParser(policy=default).parsebytes(msg[0][1], headersonly=True)
if headers["to"] and headers["to"].addresses:
to = headers["to"].addresses[0]
to_email = f"{to.username}@{to.domain}".lower()
to_display_name = to.display_name
else:
to_email = ''
to_display_name = ''
if headers["from"] and headers["from"].addresses:
sender = headers['from'].addresses[0]
from_email = f"{sender.username}@{sender.domain}".lower()
from_display_name = sender.display_name
else:
from_email = ''
from_display_name = ''
subject = headers.get('subject', '')
assert isinstance(date, datetime)
return EmailMetadata(
uid=parse_uid(msg[0][0]),
date=date,
is_flagged=is_flagged,
is_seen=is_seen,
is_answered=is_answered,
is_deleted=is_deleted,
size=parse_size(msg[0][0]),
from_email=safe_string(from_email),
from_display_name=safe_string(from_display_name),
to_email=safe_string(to_email),
to_display_name=safe_string(to_display_name),
subject=safe_string(subject),
)
def main():
with imaplib.IMAP4_SSL(IMAP_SERVER, port=IMAP_PORT) as M:
M.login(IMAP_USERNAME, IMAP_PASSWORD)
print(f"Logged into {IMAP_SERVER} as {IMAP_USERNAME}.")
status, messages = M.select(MAILBOX)
assert status == "OK"
# Sequence numbers vs. UIDs are confusing, see:
# https://dev.to/kehers/imap-new-messages-since-last-check-44gm
max_seq_num = int(messages[0])
con = sqlite3.connect(EMAIL_DB_FILENAME)
con.execute(CREATE_TABLE_SQL)
max_uid = con.execute("SELECT MAX(uid) FROM email").fetchone()[0]
if max_uid is not None:
status, response = M.search(None, f"UID {max_uid}")
assert status == "OK"
next_seq_num = int(response[0]) + 1
else:
next_seq_num = 1
while next_seq_num <= max_seq_num:
batch_start = next_seq_num
batch_end = next_seq_num + (BATCH_SIZE - 1)
if batch_end > max_seq_num:
batch_end = max_seq_num
print(f"Fetching messages {batch_start} to {batch_end}.")
# https://datatracker.ietf.org/doc/html/rfc3501#section-6.4.5
status, batch_msg = M.fetch(f"{batch_start}:{batch_end}", "(FLAGS INTERNALDATE RFC822.SIZE RFC822.HEADER UID)")
assert status == "OK"
assert len(batch_msg) % 2 == 0
rows = []
for msg_num in range(len(batch_msg) // 2):
try:
i = msg_num * 2
msg = [batch_msg[i], batch_msg[i + 1]]
email_metadata = parse_msg(msg)
rows.append(email_metadata)
print(f"Parsed #{email_metadata.uid} {email_metadata.date.month}-{email_metadata.date.year}: {repr(email_metadata.subject)}.")
except:
print("Error processing message:")
pprint.pprint(msg)
raise
pct = batch_end / max_seq_num * 100
print(f"{pct:.2f}% done. Writing rows to database...")
con.executemany(INSERT_TABLE_ROW_SQL, rows)
con.commit()
next_seq_num = batch_end + 1
print(f"Done writing {EMAIL_DB_FILENAME}.")
con.close()
class TestCases(unittest.TestCase):
def test_email_metadata(self):
reply = [(b'1 (UID 1 RFC822.SIZE 2103 RFC822.HEADER {220}', b"To: Boop Jones <[email protected]>\r\nFrom: Gmail Team <[email protected]>\r\nSubject: Gmail is different. Here's what you need to know.\r\nContent-Type: text/html; charset=ISO-8859-1\r\nContent-Transfer-Encoding: 7bit\r\n\r\n"), b' INTERNALDATE "18-Jun-2004 03:08:35 +0000" FLAGS (\\Flagged \\Seen))']
m = parse_msg(reply)
self.assertEqual(m.date, datetime(2004, 6, 17, 23, 8, 35))
self.assertEqual(m.is_flagged, True)
self.assertEqual(m.is_seen, True)
self.assertEqual(m.uid, 1)
self.assertEqual(m.size, 2103)
self.assertEqual(m.subject, "Gmail is different. Here's what you need to know.")
self.assertEqual(m.from_display_name, "Gmail Team")
self.assertEqual(m.from_email, "[email protected]")
self.assertEqual(m.to_display_name, "Boop Jones")
self.assertEqual(m.to_email, "[email protected]")
def test_parse_size(self):
self.assertEqual(parse_size(b'1 (UID 1 RFC822.SIZE 2103 RFC822.HEADER {220}'), 2103)
def test_parse_uid(self):
self.assertEqual(parse_uid(b'1 (UID 1 RFC822.SIZE 2103 RFC822.HEADER {220}'), 1)
if __name__ == '__main__':
if len(sys.argv) >= 2 and sys.argv[1] == "test":
del sys.argv[1]
unittest.main()
else:
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment