Last active
December 6, 2022 15:36
-
-
Save mkyt/7b8348451d6c6b0d62851fb75ab65d4a to your computer and use it in GitHub Desktop.
Fix merge conflicts for `pages-metadata.edn` of LogSeq data directory
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
from pathlib import Path | |
from typing import Union, Tuple, List, NamedTuple, Dict | |
import sys | |
class Block(NamedTuple): | |
name: str | |
created: int | |
updated: int | |
def skip_space(s: str, pos: int) -> int: | |
while s[pos].isspace(): | |
pos += 1 | |
return pos | |
def consume(s: str, pos: int, c: str) -> Tuple[bool, int]: | |
if s[pos] == c: | |
return (True, pos + 1) | |
else: | |
return (False, pos) | |
def parse_key(s: str, pos: int) -> Tuple[Union[str, None], int]: | |
if s[pos] != ":": | |
return (None, pos) | |
else: | |
pos += 1 | |
key = "" | |
while not s[pos].isspace(): | |
key += s[pos] | |
pos += 1 | |
return (key, pos) | |
def parse_str(s: str, pos: int) -> Tuple[Union[str, None], int]: | |
ok, pos = consume(s, pos, '"') | |
if not ok: | |
return (None, pos) | |
res: List[str] = [] | |
prev_is_backslash = False | |
while True: | |
if s[pos] == '"': | |
pos += 1 | |
if prev_is_backslash: | |
prev_is_backslash = False | |
res.append('"') | |
else: | |
break | |
elif s[pos] == "\\": | |
if prev_is_backslash: # two consecutive backslashes | |
prev_is_backslash = False | |
else: | |
prev_is_backslash = True | |
res.append(s[pos]) | |
pos += 1 | |
else: | |
prev_is_backslash = False | |
res.append(s[pos]) | |
pos += 1 | |
return ("".join(res), pos) | |
def parse_int(s: str, pos: int) -> Tuple[Union[int, None], int]: | |
if s[pos] == "-": | |
sgn = -1 | |
pos += 1 | |
else: | |
sgn = 1 | |
if not s[pos].isdigit(): | |
return (None, pos) | |
val = 0 | |
while s[pos].isdigit(): | |
val = val * 10 + int(s[pos]) | |
pos += 1 | |
return (sgn * val, pos) | |
def parse_value(s: str, pos: int) -> Tuple[Union[str, int, None], int]: | |
pos = skip_space(s, pos) | |
v, pos = parse_str(s, pos) | |
if v is not None: | |
return (v, pos) | |
v, pos = parse_int(s, pos) | |
if v is not None: | |
return (v, pos) | |
return (None, pos) | |
def parse_block(s: str, pos: int) -> Tuple[Union[Block, None], int]: | |
kv: Dict[str, Union[str, int]] = {} | |
pos = skip_space(s, pos) | |
ok, pos = consume(s, pos, "{") | |
if not ok: | |
return (None, pos) | |
while True: | |
pos = skip_space(s, pos) | |
# k-v pair | |
key, pos = parse_key(s, pos) | |
if not key: | |
return (None, pos) | |
pos = skip_space(s, pos) | |
val, pos = parse_value(s, pos) | |
if not val: | |
return (None, pos) | |
kv[key] = val | |
pos = skip_space(s, pos) | |
# end of block | |
ok, pos = consume(s, pos, "}") | |
if ok: | |
break | |
# next k-v pair | |
ok, pos = consume(s, pos, ",") | |
if not ok: | |
return (None, pos) | |
blk = Block(kv["block/name"], kv["block/created-at"], kv["block/updated-at"]) | |
return (blk, pos) | |
def parse_pages_metadata(s: str) -> Union[List[Block], None]: | |
blocks: List[Block] = [] | |
pos = 0 | |
pos = skip_space(s, pos) | |
ok, pos = consume(s, pos, "[") | |
if not ok: | |
return None | |
while True: | |
block, pos = parse_block(s, pos) | |
if block is None: | |
break | |
blocks.append(block) | |
pos = skip_space(s, pos) | |
ok, pos = consume(s, pos, "]") | |
if not ok: | |
return None | |
return blocks | |
def parse_diff(s: str) -> Tuple[bool, Union[str, None], Union[str, None]]: | |
a: List[str] = [] | |
b: List[str] = [] | |
state = "both" | |
has_conflicts = False | |
for line in s.split("\n"): | |
if line.startswith("<<<<<<< "): | |
has_conflicts = True | |
if state == "both": | |
state = "a" | |
else: | |
return (has_conflicts, None, None) | |
elif line.startswith("======="): | |
if state == "a": | |
state = "b" | |
else: | |
return (has_conflicts, None, None) | |
elif line.startswith(">>>>>>> "): | |
if state == "b": | |
state = "both" | |
else: | |
return (has_conflicts, None, None) | |
else: | |
if state == "a": | |
a.append(line) | |
elif state == "b": | |
b.append(line) | |
else: # 'both' | |
a.append(line) | |
b.append(line) | |
return (has_conflicts, "\n".join(a), "\n".join(b)) | |
def merge_block(a: Block, b: Block) -> Block: | |
created = min(a.created, b.created) | |
updated = max(a.updated, b.updated) | |
return Block(a.name, created, updated) | |
def merge_blocks(blk_a: List[Block], blk_b: List[Block]) -> List[Block]: | |
blk_a_dict: Dict[str, Block] = {} | |
blk_b_dict: Dict[str, Block] = {} | |
for blk in blk_a: | |
blk_a_dict[blk.name] = blk | |
for blk in blk_b: | |
blk_b_dict[blk.name] = blk | |
res: List[Block] = [] | |
for blk in blk_a: | |
if blk.name in blk_b_dict: | |
res.append(merge_block(blk, blk_b_dict[blk.name])) | |
else: | |
res.append(blk) | |
for blk in blk_b: | |
if blk.name not in blk_a_dict: | |
res.append(blk) | |
res.sort(key=lambda x: x.name) | |
return res | |
def print_blocks(blks: List[Block]) -> str: | |
res = "[" | |
for i, blk in enumerate(blks): | |
if i == 0: | |
s = f'{{:block/name "{blk.name}",\n :block/created-at {blk.created},\n :block/updated-at {blk.updated}}}\n' | |
elif i == len(blks) - 1: | |
s = f' {{:block/name "{blk.name}",\n :block/created-at {blk.created},\n :block/updated-at {blk.updated}}}' | |
else: | |
s = f' {{:block/name "{blk.name}",\n :block/created-at {blk.created},\n :block/updated-at {blk.updated}}}\n' | |
res += s | |
res += "]\n" | |
return res | |
if __name__ == "__main__": | |
if len(sys.argv) > 1: | |
fp = Path(sys.argv[1]) | |
else: | |
fp = Path(".") | |
fp = fp / "logseq/pages-metadata.edn" | |
if not fp.exists(): | |
print("`pages-metadata.edn` not found") | |
sys.exit(1) | |
with fp.open(encoding="utf-8") as f: | |
content = f.read() | |
conf, a, b = parse_diff(content) | |
if not conf: | |
print("No conflicts found") | |
sys.exit(0) | |
if not a or not b: | |
print("Could not parse diff") | |
sys.exit(1) | |
blk_a = parse_pages_metadata(a) | |
blk_b = parse_pages_metadata(b) | |
if not blk_a or not blk_b: | |
print("Could not parse `pages-metadata.edn`") | |
sys.exit(1) | |
blk = merge_blocks(blk_a, blk_b) | |
s = print_blocks(blk) | |
fp.unlink() | |
fp.open("w", encoding="utf-8").write(s) | |
print("Merged `pages-metadata.edn` successfully") | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment