Created
March 17, 2021 02:01
-
-
Save Cologler/479c11284b209bd050777d24552e8bc4 to your computer and use it in GitHub Desktop.
a json+ decoder which support regex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# Copyright (c) 2021~2999 - Cologler <[email protected]> | |
# ---------- | |
# | |
# ---------- | |
import json | |
import re | |
def _parse_regex(s: str, end: int, strict=True, *, memo: dict=None): | |
""" | |
Scan the string s for a regex. End is the index of the | |
character in s after the `/` that started the JSON string. | |
Returns a tuple of the regex and the index of the character in s | |
after the end quote. | |
""" | |
s_len = len(s) | |
begin = end - 1 | |
index = end | |
if memo is None: | |
memo = {} | |
# read pattern | |
try: | |
while index < s_len: | |
if s[index] == '\\': # enscape | |
index += 1 | |
if s[index] == '/': # end | |
break | |
index += 1 | |
except IndexError: | |
raise json.JSONDecodeError("Unterminated regex starting at", s, begin) from None | |
pattern = s[end:index] | |
index += 1 | |
# read flags | |
flags = 0 | |
while index < s_len: | |
char = s[index] | |
if char == 'i': | |
flags |= re.I | |
elif char == 'm': | |
flags |= re.M | |
elif char == 'x': | |
flags |= re.X | |
elif char == 's': | |
flags |= re.S | |
else: | |
break | |
index += 1 | |
key = (pattern, flags) | |
try: | |
regex = memo[key] | |
except KeyError: | |
try: | |
regex = re.compile(pattern, flags) | |
except re.error: | |
regex = None | |
memo[key] = regex | |
if regex is None: | |
raise json.JSONDecodeError("Invalid regex pattern starting at", s, begin) | |
return (regex, index) | |
_NUMBER_RE = re.compile( | |
r'(-?(?:0|[1-9]\d*))(\.\d+)?([eE][-+]?\d+)?', | |
(re.VERBOSE | re.MULTILINE | re.DOTALL)) | |
def _make_scanner(context): | |
parse_object = context.parse_object | |
parse_array = context.parse_array | |
parse_string = context.parse_string | |
match_number = _NUMBER_RE.match | |
strict = context.strict | |
parse_float = context.parse_float | |
parse_int = context.parse_int | |
parse_constant = context.parse_constant | |
object_hook = context.object_hook | |
object_pairs_hook = context.object_pairs_hook | |
memo = context.memo | |
def _scan_once(string, idx): | |
try: | |
nextchar = string[idx] | |
except IndexError: | |
raise StopIteration(idx) from None | |
if nextchar == '"': | |
return parse_string(string, idx + 1, strict) | |
elif nextchar == '{': | |
return parse_object((string, idx + 1), strict, | |
_scan_once, object_hook, object_pairs_hook, memo) | |
elif nextchar == '[': | |
return parse_array((string, idx + 1), _scan_once) | |
elif nextchar == 'n' and string[idx:idx + 4] == 'null': | |
return None, idx + 4 | |
elif nextchar == 't' and string[idx:idx + 4] == 'true': | |
return True, idx + 4 | |
elif nextchar == 'f' and string[idx:idx + 5] == 'false': | |
return False, idx + 5 | |
elif nextchar == '/': # added | |
return _parse_regex(string, idx + 1, strict) | |
m = match_number(string, idx) | |
if m is not None: | |
integer, frac, exp = m.groups() | |
if frac or exp: | |
res = parse_float(integer + (frac or '') + (exp or '')) | |
else: | |
res = parse_int(integer) | |
return res, m.end() | |
elif nextchar == 'N' and string[idx:idx + 3] == 'NaN': | |
return parse_constant('NaN'), idx + 3 | |
elif nextchar == 'I' and string[idx:idx + 8] == 'Infinity': | |
return parse_constant('Infinity'), idx + 8 | |
elif nextchar == '-' and string[idx:idx + 9] == '-Infinity': | |
return parse_constant('-Infinity'), idx + 9 | |
else: | |
raise StopIteration(idx) | |
def scan_once(string, idx): | |
try: | |
return _scan_once(string, idx) | |
finally: | |
memo.clear() | |
return scan_once | |
class JSONpDecoder(json.JSONDecoder): | |
''' | |
a json+ decoder, which support regex | |
''' | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.scan_once = _make_scanner(self) | |
if __name__ == '__main__': | |
# only for test | |
# not supported for std | |
try: | |
json.JSONDecoder().decode('{ "r": /^avv$/i }') | |
assert False | |
except json.JSONDecodeError: | |
pass | |
base_flags = re.compile('').flags | |
# basic | |
obj = JSONpDecoder().decode('/^axX$/i') | |
assert isinstance(obj, re.Pattern) | |
assert obj.pattern == '^axX$' | |
assert obj.flags == base_flags | re.I | |
# more flags: https://docs.mongodb.com/manual/reference/operator/query/regex/ | |
obj = JSONpDecoder().decode('/^axX$/imxs') | |
assert isinstance(obj, re.Pattern) | |
assert obj.pattern == '^axX$' | |
assert obj.flags == base_flags | re.I | re.M | re.X | re.S | |
# embedded object: | |
obj = JSONpDecoder().decode('{ "r": /^axX$/i, "i": 15 }') | |
assert obj['i'] == 15 | |
exp = obj['r'] | |
assert isinstance(exp, re.Pattern) | |
assert exp.pattern == '^axX$' | |
assert exp.flags == base_flags | re.I |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment