summaryrefslogtreecommitdiff
path: root/earlpy/earlpy.py
diff options
context:
space:
mode:
Diffstat (limited to 'earlpy/earlpy.py')
-rw-r--r--earlpy/earlpy.py300
1 files changed, 300 insertions, 0 deletions
diff --git a/earlpy/earlpy.py b/earlpy/earlpy.py
new file mode 100644
index 0000000..d48c1be
--- /dev/null
+++ b/earlpy/earlpy.py
@@ -0,0 +1,300 @@
+import tempfile
+import subprocess
+import hashlib
+from glob import glob
+import pathlib
+import struct
+
+DIR = pathlib.Path(__file__).parent.resolve()
+
+class Parser:
+ def __init__(self, parser_dir):
+ assert parser_dir and parser_dir[0] != '/'
+ parser_dir = parser_dir
+ files = sorted([f"{parser_dir}/grammar.txt",
+ *glob(f"{parser_dir}/*.c"),
+ f"{DIR}/parser.c"])
+ if f"{parser_dir}/parser.c" in files:
+ files.remove(f"{parser_dir}/parser.c")
+ hashes = ' '.join(
+ hashlib.sha256(b''.join(open(f, "rb").readlines())).hexdigest()
+ for f in files)
+
+ already_built = False
+ lex_path = f"{parser_dir}/parser.l"
+ if glob(lex_path) and glob(f"{parser_dir}/parser"):
+ if open(lex_path, "r").readline()[3:][:-3].strip() == hashes:
+ already_built = True
+
+ lines = self.parse_grammar(f"{parser_dir}/grammar.txt")
+ if not already_built:
+ if glob(f"{parser_dir}/parser"):
+ subprocess.run(f"rm {parser_dir}/parser", shell=True)
+ with open(f"{parser_dir}/parser.l", "w") as f:
+ f.write(f"/* {hashes} */\n")
+ for line in lines:
+ f.write(line + "\n")
+ f.write(open(f"{DIR}/parser.c", "r").read())
+ for path in glob(f"{parser_dir}/*.c"):
+ if path == f"{parser_dir}/parser.c": continue
+ f.write(open(path, "r").read())
+ res = subprocess.run(f"flex -o {parser_dir}/parser.c {parser_dir}/parser.l",
+ shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ if res.returncode: print(res.stderr.decode("utf-8"))
+ assert res.returncode == 0
+ res = subprocess.run(f"gcc -O3 {parser_dir}/parser.c -o {parser_dir}/parser",
+ shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ if res.returncode: print(res.stderr.decode("utf-8"))
+ assert res.returncode == 0
+
+ self.parser = f"{parser_dir}/parser"
+
+ def parse_string(self, string):
+ with tempfile.NamedTemporaryFile() as f:
+ f.write(string.encode("utf-8"))
+ f.flush()
+ result = self.parse_file(f.name)
+ f.close()
+ return result
+
+ def parse_file(self, path):
+ res = subprocess.run([self.parser, path], stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ if res.returncode:
+ print("FAIL:", res.stderr.decode("utf-8"))
+ raise ValueError
+
+ contents = open(path, "r").read()
+
+ n_tokens, = struct.unpack("Q", res.stdout[:8])
+ # symbol id, start idx, length
+ tokens = list(struct.iter_unpack("QQQ", res.stdout[8:8+(8*3*n_tokens)]))
+ tokens = [(token, contents[token[1]:token[1]+token[2]])
+ for token in tokens]
+ # production id
+ nodes = [t[0] for t in struct.iter_unpack("Q", res.stdout[8+(8*3*n_tokens):])]
+
+ # REPARSE the nodes
+ root = Node(self.productions[nodes[0]][1],
+ self.productions[nodes[0]][0])
+ nodes = nodes[1:]
+ stack = [root]
+ while stack:
+ node = stack[-1]
+ if (isinstance(node, tuple)
+ or len(node.production) == len(node.contents)):
+ # COMPLETE!
+ stack.pop()
+ if stack: stack[-1].contents.append(node)
+ else:
+ symbol = node.production[len(node.contents)]
+ if symbol.kind == "nonterm":
+ prod_id = nodes.pop(0)
+ stack.append(Node(self.productions[prod_id][1],
+ self.productions[prod_id][0]))
+ else:
+ stack.append(tokens.pop(0))
+ return root
+
+ def parse_grammar(self, grammar_file):
+ grammar = open(grammar_file, "r")
+
+ # (0) PARSE symbols from the grammar file
+ self.name_to_symbol = dict()
+ ordered_symbols = []
+ last_symbol = None
+ for line in grammar:
+ if line[0] in ' \t':
+ if last_symbol.kind == "list":
+ last_symbol.contents.extend(line.split())
+ elif last_symbol.kind == "regex":
+ assert not last_symbol.contents
+ last_symbol.contents = line.strip()
+ elif last_symbol.kind == "nonterm":
+ last_symbol.contents.append(line.split())
+ else: raise NotImplementedError
+ elif line.strip().startswith("#"):
+ continue
+ elif line.strip():
+ last_symbol = Symbol(line)
+ self.name_to_symbol[last_symbol.name] = last_symbol
+ ordered_symbols.append(last_symbol)
+
+ # We allow mixing of concrete tokens and symbol names in the nonterminal
+ # patterns; this undoes that.
+ # (1a) map each concrete token to the list it belongs to
+ concrete_to_symbol = dict()
+ for symbol in ordered_symbols:
+ if symbol.kind != "list": continue
+ for token in symbol.contents:
+ assert token not in concrete_to_symbol
+ concrete_to_symbol[token] = symbol
+ # (1b) rewrite any rule involving concrete 'x' from list 'y' to 'y::x'
+ used_concretes = set()
+ for symbol in ordered_symbols:
+ if symbol.kind != "nonterm": continue
+ new_contents = []
+ for rule in symbol.contents:
+ new_rule = []
+ for token in rule:
+ if token in self.name_to_symbol:
+ new_rule.append(token)
+ else:
+ assert token in concrete_to_symbol, f"Token '{token}' is not in a list"
+ new_rule.append(f"{concrete_to_symbol[token].name}::{token}")
+ used_concretes.add(token)
+ new_contents.append(new_rule)
+ symbol.contents = new_contents
+ # (1c) if 'y::x' appeared, turn 'y' into a nonterminal
+ new_ordered_symbols = []
+ for symbol in ordered_symbols.copy():
+ if symbol.kind != "list":
+ new_ordered_symbols.append(symbol)
+ continue
+ split_out = set(symbol.contents) & used_concretes
+ if not split_out:
+ new_ordered_symbols.append(symbol)
+ continue
+ new_rule = []
+ for token in split_out:
+ name = f"{symbol.name}::{token}"
+ self.name_to_symbol[name] = Symbol(name + " list")
+ self.name_to_symbol[name].contents = [token]
+ new_ordered_symbols.append(self.name_to_symbol[name])
+ new_rule.append([name])
+
+ left_in = set(symbol.contents) - used_concretes
+ if left_in:
+ name = f"{symbol.name}::__rest__"
+ self.name_to_symbol[name] = Symbol(name + " list")
+ self.name_to_symbol[name].contents = sorted(left_in)
+ new_ordered_symbols.append(self.name_to_symbol[name])
+ new_rule.append([name])
+
+ symbol.kind = "nonterm"
+ symbol.contents = new_rule
+ symbol.is_pseudo_node = True
+ new_ordered_symbols.append(symbol)
+ ordered_symbols = new_ordered_symbols
+ # Done!
+
+ ##### DESCRIBE the lexer and the symbols
+ lines = []
+ def put(x): lines[-1] += x
+ def putl(*x): lines.extend(x)
+ putl("%option noyywrap",
+ "%option reentrant",
+ "%{",
+ "typedef size_t prod_id_t;",
+ "typedef size_t symbol_id_t;",
+ "int OFFSET;",
+ # https://stackoverflow.com/questions/47094667/getting-the-current-index-in-the-input-string-flex-lexer
+ "#define YY_USER_ACTION OFFSET += yyleng;",
+ )
+ self.max_n_productions = max(len(symbol.contents) + 1
+ for symbol in ordered_symbols
+ if symbol.kind == "nonterm")
+ putl(f"#define MAX_N_PRODUCTIONS {self.max_n_productions}")
+ self.max_production_len = max(max(map(len, symbol.contents)) + 1
+ for symbol in ordered_symbols
+ if symbol.kind == "nonterm")
+ putl(f"#define MAX_PRODUCTION_LEN {self.max_production_len}")
+ n_nonterms = len([symbol for symbol in ordered_symbols
+ if symbol.kind == "nonterm"])
+ putl(f"#define N_NONTERMS {n_nonterms}")
+ putl(f"#define N_SYMBOLS {len(ordered_symbols) + 1}")
+ putl(f"#define DONE_SYMBOL 0")
+ # 0, nonterm1, nonterm2, ..., nontermN, term, ...
+ putl(f"#define IS_NONTERM(x) ((0 < (x)) && ((x) <= N_NONTERMS))")
+
+ # put all the nonterminals at the beginning
+ ordered_symbols = sorted(ordered_symbols,
+ key=lambda s: (s.kind == "nonterm"),
+ reverse=True)
+ self.id_to_symbol = dict()
+ putl("char *SYMBOL_ID_TO_NAME[] = { \"DONE\"")
+ for i, symbol in enumerate(ordered_symbols):
+ symbol.id = i + 1
+ self.id_to_symbol[symbol.id] = symbol
+ put(", \"" + symbol.name + "\"")
+ put(" };")
+ for symbol in ordered_symbols:
+ if symbol.name.isalnum():
+ putl(f"#define SYMBOL_{symbol.name} {symbol.id}")
+ if symbol.is_start:
+ putl(f"#define START_SYMBOL {symbol.id}")
+
+ putl("prod_id_t SYMBOL_ID_TO_PRODUCTION_IDS[N_SYMBOLS][MAX_N_PRODUCTIONS] = { {0}")
+ # [(production, Symbol), ...]
+ self.productions = [([], None)]
+ for symbol in ordered_symbols:
+ if symbol.kind == "nonterm":
+ start_idx = len(self.productions)
+ assert isinstance(symbol.contents[0], list)
+ for rule in symbol.contents:
+ rule = [self.name_to_symbol[x] for x in rule]
+ self.productions.append((rule, symbol))
+ prods = ', '.join(map(str, range(start_idx, len(self.productions))))
+ put(", {" + prods + ", 0}")
+ else:
+ put(", {0}")
+ put(" };")
+ putl(f"#define N_PRODUCTIONS {len(self.productions)}")
+
+ putl("symbol_id_t PRODUCTION_ID_TO_PRODUCTION[N_PRODUCTIONS][MAX_PRODUCTION_LEN] = { {0}")
+ for i, (production, _) in enumerate(self.productions):
+ if i == 0: continue
+ production = ', '.join([symbol.name for symbol in production])
+ put(", {" + production + ", 0}")
+ put(" };")
+
+ putl("symbol_id_t PRODUCTION_ID_TO_SYMBOL[N_PRODUCTIONS] = { 0")
+ for i, (_, symbol) in enumerate(self.productions):
+ if i != 0: put(f", {symbol.id}")
+ put(" };")
+
+ putl("void lex_symbol(symbol_id_t);")
+ putl("%}")
+ putl("%%")
+
+ # Spit out the lexer!
+ def escape_literal(lit):
+ return '"' + lit.replace('\\', '\\\\') + '"'
+ for symbol in ordered_symbols:
+ if symbol.kind == "nonterm": continue
+ if symbol.kind == "list":
+ for token in symbol.contents:
+ putl(escape_literal(token) + f" {{ lex_symbol({symbol.id}); }}")
+ elif symbol.kind == "regex":
+ putl(symbol.contents + f" {{ lex_symbol({symbol.id}); }}")
+ else: raise NotImplementedError
+ putl(". { }")
+ putl("\\n { }")
+ putl("%%")
+
+ return lines
+
+class Symbol:
+ def __init__(self, declaration):
+ parts = declaration.split()
+ self.name = parts[0]
+ self.kind = parts[1]
+ self.is_start = ".start" in parts[2:]
+ self.contents = []
+ self.id = None
+ self.is_pseudo_node = False
+
+class Node:
+ def __init__(self, symbol, production):
+ self.symbol = symbol
+ self.production = production
+ self.contents = []
+
+ def pprint(self):
+ def pprint(other):
+ if isinstance(other, Node):
+ return other.pprint()
+ return other[1]
+ if len(self.contents) == 1:
+ return pprint(self.contents[0])
+ return '(' + ' '.join(map(pprint, self.contents)) + ')'
generated by cgit on debian on lair
contact matthew@masot.net with questions or feedback