import pyparsingfrom collections.abc import Hashablefrom rdflib import Graph, URIReftype DataType = Hashabletype TreeList = list[str | tuple[str | 'TreeList']]class TreeOld: """A tree data structure with RDF conversion and search capabilities. Parameters ---------- data : DataType The data stored at this node. children : list[TreeOld] The children of this node. """ RDF_TYPES = {} RDF_EDGES = {'is': URIRef('is-a'), 'parent': URIRef('is-the-parent-of'), 'child': URIRef('is-a-child-of'), 'sister': URIRef('is-a-sister-of')} LPAR = pyparsing.Suppress('(') RPAR = pyparsing.Suppress(')') DATA = pyparsing.Regex(r'[^\(\)\s]+') PARSER = pyparsing.Forward() SUBTREE = pyparsing.ZeroOrMore(PARSER) PARSERLIST = pyparsing.Group(LPAR + DATA + SUBTREE + RPAR) PARSER <<= DATA | PARSERLIST def __init__(self, data: DataType, children: list['TreeOld'] = []): self._data = data self._children = children self._validate() def __str__(self): if self._children: return ' '.join(c.__str__() for c in self._children) else: return str(self._data) def __repr__(self): return self.to_string() def to_string(self, depth: int = 0) -> str: """Render this tree as an indented string. Parameters ---------- depth : int Current depth for indentation. Returns ------- str Indented string representation of the tree. """ s = (depth - 1) * ' ' +\ int(depth > 0) * '--' +\ self._data + '\n' s += ''.join(c.to_string(depth+1) for c in self._children) return s def __contains__(self, data: DataType) -> bool: # pre-order depth-first search if self._data == data: return True else: for child in self._children: if data in child: return True return False def __getitem__(self, idx: tuple[int]) -> DataType: if isinstance(idx, int): return self._children[idx] elif len(idx) == 1: return self._children[idx[0]] elif idx: return self._children[idx[0]].__getitem__(idx[1:]) else: return self @property def data(self) -> DataType: """The data stored at this node.""" return self._data @property def children(self) -> list['TreeOld']: """The children of this node.""" return self._children def _validate(self): try: assert all(isinstance(c, Tree) for c in self._children) except AssertionError: msg = 'all children must be trees' raise TypeError(msg) def index(self, data: DataType, index_path: tuple[int] = tuple()): """Find all index paths to nodes matching the given data. Parameters ---------- data : DataType The data to search for. index_path : tuple[int] The current path prefix for recursive calls. Returns ------- list[tuple[int, ...]] List of index paths to matching nodes. """ indices = [index_path] if self._data==data else [] root_path = [] if index_path == -1 else index_path indices += [j for i, c in enumerate(self._children) for j in c.index(data, root_path+(i,))] return indices def to_rdf( self, graph: Graph | None=None, nodes: dict[int, URIRef] = {}, idx: tuple[int] = tuple() ) -> Graph: """Convert this tree to an RDF graph. Parameters ---------- graph : Graph | None An existing graph to add to, or None to create a new one. nodes : dict[int, URIRef] Mapping from index paths to RDF nodes. idx : tuple[int] Current index path. Returns ------- Graph The RDF graph representing this tree. """ graph = Graph() if graph is None else graph idxstr = '_'.join(str(i) for i in idx) nodes[idx] = URIRef(idxstr) if self._data not in self.RDF_TYPES: self.RDF_TYPES[self._data] = URIRef(self._data) typetriple = (nodes[idx], self.RDF_EDGES['is'], self.RDF_TYPES[self.data]) graph.add(typetriple) for i, child in enumerate(self._children): childidx = idx+(i,) child.to_rdf(graph, nodes, childidx) partriple = (nodes[idx], self.RDF_EDGES['parent'], nodes[childidx]) chitriple = (nodes[childidx], self.RDF_EDGES['child'], nodes[idx]) graph.add(partriple) graph.add(chitriple) for i, child1 in enumerate(self._children): for j, child2 in enumerate(self._children): child1idx = idx+(i,) child2idx = idx+(j,) sistriple = (nodes[child1idx], Tree.RDF_EDGES['sister'], nodes[child2idx]) graph.add(sistriple) self._rdf_nodes = nodes return graph @property def rdf(self) -> Graph: """The RDF graph for this tree.""" return self.to_rdf() def find(self, query): """Find nodes matching an RDF query. Parameters ---------- query An RDF query string. Returns ------- list[tuple[int, ...]] List of index paths to matching nodes. """ return [tuple([int(i) for i in str(res[0]).split('_')]) for res in self.rdf.query(query)] @classmethod def from_string(cls, treestr: str) -> 'TreeOld': """Parse a tree from a parenthesized string representation. Parameters ---------- treestr : str The parenthesized string to parse. Returns ------- TreeOld The parsed tree. """ treelist = cls.PARSER.parseString(treestr[2:-2])[0] return cls.from_list(treelist) @classmethod def from_list(cls, treelist: TreeList): """Build a tree from a nested list representation. Parameters ---------- treelist : TreeList The nested list to convert. Returns ------- TreeOld The constructed tree. """ if isinstance(treelist, str): return cls(treelist[0]) elif isinstance(treelist[1], str): return cls(treelist[0], [cls(treelist[1])]) else: return cls(treelist[0], [cls.from_list(l) for l in treelist[1:]])Assignments 5 and 6
Like Assignments 1/2 and 3/4, Assignments 5 and 6 are bundled together. You only need to do Task 1 for Assignment 5 and Tasks 2 and 3 for Assignment 6.
These assignments focus on implementing fuzzy tree search using edit distance and regular expressions. The appendix on Working with Annotated Corpora develops various tree search algorithms that look for an exact match between a query and the data contained in particular trees. You should read through it before starting these assignments, since the code used here is developed there. I’ve copied the relevant class below as TreeOld.
In fuzzy search, we allow this exact matching restriction to be loosened by instead allowing that matches be (i) within some fixed edit distance; and/or (ii) closest (in terms of edit distance) to the query among all pieces of data. I’ve copied simplified version of the edit distance class that we developed in class below as EditDistance.
import numpy as npclass EditDistance: """Distance between strings computed via dynamic programming. Parameters ---------- insertion_cost : float Cost of inserting a character. deletion_cost : float Cost of deleting a character. substitution_cost : float | None Cost of substituting a character. Defaults to insertion_cost + deletion_cost. """ def __init__(self, insertion_cost: float = 1., deletion_cost: float = 1., substitution_cost: float | None = None): self._insertion_cost = insertion_cost self._deletion_cost = deletion_cost if substitution_cost is None: self._substitution_cost = insertion_cost + deletion_cost else: self._substitution_cost = substitution_cost def __call__(self, source: str | list[str], target: str | list[str]) -> float: """Compute the edit distance between source and target. Uses lists to enable digraphs to be identified. Parameters ---------- source : str | list[str] The source string or list of string segments. target : str | list[str] The target string or list of string segments. Returns ------- float The minimum edit distance between source and target. """ # coerce to list if not a list if isinstance(source, str): source = list(source) if isinstance(target, str): target = list(target) n, m = len(source), len(target) source, target = ['#']+source, ['#']+target distance = np.zeros([n+1, m+1], dtype=float) for i in range(1,n+1): distance[i,0] = distance[i-1,0]+self._deletion_cost for j in range(1,m+1): distance[0,j] = distance[0,j-1]+self._insertion_cost for i in range(1,n+1): for j in range(1,m+1): if source[i] == target[j]: substitution_cost = 0. else: substitution_cost = self._substitution_cost costs = np.array([distance[i-1,j]+self._deletion_cost, distance[i-1,j-1]+substitution_cost, distance[i,j-1]+self._insertion_cost]) distance[i,j] = costs.min() return distance[n,m]Task 1
Lines: 14
Define an instance method fuzzy_find. This method should take a piece of query data and optionally a distance and return all of the nodes that have data that is within edit distance distance from data; and/or if , closest is True, it should return all of the nodes closest to data among all nodes in the tree.
For instance:
fuzzy_find('review', distance=3., closest=False)will return a tuple of every piece of data in the tree within edit distance 3 from review (e.g. view, reviewer, reviews, etc.), its distance to review, and its index; if there is nothing within that edit distance, an empty list will be returnedfuzzy_find('review', distance=3., closest=True)will return a tuple of the closest pieces of data in the tree that are also within edit distance 3 from review (e.g. view, reviewer, reviews, etc.), its distance to review, and its index; if there is nothing within that edit distance, an empty list will be returnedfuzzy_find('review', distance=np.inf, closest=True)will return a tuple of the closest pieces of data in the tree to review (e.g. view, reviewer, reviews, etc.), regardless of edit distance, its distance to review, and its index; this will always return somethingfuzzy_find('review', distance=np.inf, closest=False)will return a tuple of every piece of data in the tree to review (e.g. view, reviewer, reviews, etc.), regardless of edit distance, its distance to review, and its index; this will always return a list with as many elements as there are nodes in the tree
This method should also support only searching the terminal nodes (leaves) of the tree with the flag terminals_only.
Hint: you should look back at the methods we defined for searching and indexing the tree above. Specifically, to understand why you might want something like index_path defaulting to the empty tuple, look at the index method of TreeOld.
type FuzzyFindResult = tuple[tuple, str | list[str], float]class Tree(TreeOld): """A tree with fuzzy search capabilities via edit distance.""" DIST = EditDistance(1., 1.) def fuzzy_find(self, data: str | list[str], closest: bool = True, distance: float = np.inf, case_fold: bool = True, terminals_only: bool = True, index_path: tuple = tuple()) -> list[FuzzyFindResult]: """Find the (closest) strings within a certain distance. Defaults to computing the closest strings among the terminals and ignoring case. The format of the things returned is ``[((0,1,0), "view", 2.0), ...]``. Edit distance can be computed on either a ``str`` or ``list[str]``; that is why the middle element of each tuple might be either. Parameters ---------- data : str | list[str] The data to match against. closest : bool Whether to return only the closest strings or all strings within distance. distance : float The distance within which a string must be. case_fold : bool Whether to lower-case the data. terminals_only : bool Whether to only search the terminals. index_path : tuple The current path prefix for recursive calls. Returns ------- list[FuzzyFindResult] List of tuples of (index_path, data, distance). """ raise NotImplementedErrorWrite tests that use the following tree as input data.
treestr = '( (SBARQ (WHNP-1 (WP What)) (SQ (NP-SBJ-1 (-NONE- *T*)) (VP (VBZ is) (NP-PRD (NP (DT the) (JJS best) (NN place)) (SBAR (WHADVP-2 (-NONE- *0*)) (S (NP-SBJ (-NONE- *PRO*)) (VP (TO to) (VP (VB get) (NP (NP (NNS discounts)) (PP (IN for) (NP (NML (NNP San) (NNP Francisco)) (NNS restaurants)))) (ADVP-LOC-2 (-NONE- *T*))))))))) (. ?)) )'
testtree = Tree.from_string(treestr)
testtreeSBARQ
--WHNP-1
--WP
--What
--SQ
--NP-SBJ-1
---NONE-
--*T*
--VP
--VBZ
--is
--NP-PRD
--NP
--DT
--the
--JJS
--best
--NN
--place
--SBAR
--WHADVP-2
---NONE-
--*0*
--S
--NP-SBJ
---NONE-
--*PRO*
--VP
--TO
--to
--VP
--VB
--get
--NP
--NP
--NNS
--discounts
--PP
--IN
--for
--NP
--NML
--NNP
--San
--NNP
--Francisco
--NNS
--restaurants
--ADVP-LOC-2
---NONE-
--*T*
--.
--?
The tests should test the four combinations of distance and closest listed above with the same query data, both with and without terminals_only=True and terminals_only=False (eight tests in total). Two further tests should test distance=np.inf, closest=True, terminals_only=True for a case where only a single element should be returned and a case where multiple elements in the tree should be returned.
# write tests hereRemember that we talked in class about what it would mean to take the distance between a string and a collection of strings: basically, the minimum of the edit distances between the string and each string in that set.
We can use this concept in two ways here. The first is to view the tree as a container for some data and to compute the minimum distance between a query and any data contained in the tree. Alternatively, we can think of the query itself as determining a set and compute the minimum distance of each piece of data in the tree to that set. Task 2 will implement the former and Task 3 the latter.
I’ve copied the corpus reader we developed for the English Web Treebank in class below. We’ll make use of this for Task 2. (You’ll need to grab LDC2012T13.tgz from the course Google drive.)
!wget --no-check-certificate 'https://drive.google.com/uc?export=download&id=1ygMIl1w6wz6A24oxkLwirunSKXb9EW12' -O 'LDC2012T13.tgz'import tarfilefrom collections import defaultdictclass EnglishWebTreebankOld: """Corpus reader for the English Web Treebank. Parameters ---------- root : str Path to the treebank archive file. """ def __init__(self, root='LDC2012T13.tgz'): def trees(): with tarfile.open(root) as corpus: for fname in corpus.getnames(): if '.xml.tree' in fname: with corpus.extractfile(fname) as treefile: treestr = treefile.readline().decode() yield fname, Tree.from_string(treestr) self._trees = trees() def items(self): """Iterate over filename-tree pairs in the corpus. Yields ------ tuple[str, Tree] A tuple of the filename and the parsed tree. """ for fn, tlist in self._trees: yield fn, tlistTask 2
Lines: 3
Define an instance method fuzzy_find for the corpus reader class that computes the minimum distance between a query and a tree for all trees in the corpus. It should return a list of tuples with the first element a tree ID, the second an index in that tree, the third the data at that index and the fourth the distance between the query and that index. A tuple should be included in the returned list only if the distance is equal to the minimum across trees in the corpus.
Hint: this should be very straightforward using a particular parameterization for Tree1.fuzzy_find. Which one?
class EnglishWebTreebank(EnglishWebTreebankOld): """Corpus reader with fuzzy search across the treebank.""" def fuzzy_find(self, data: str | list[str]) -> list[FuzzyFindResult]: """Find the trees in the corpus closest to the query data. Parameters ---------- data : str | list[str] The query data to search for. Returns ------- list[FuzzyFindResult] List of tuples of (index_path, data, distance) for the closest matches across all trees. """ raise NotImplementedErrorNow, load this corpus.
ewt = EnglishWebTreebank()Write a single test for a piece of data you know exists in some tree in the corpus. (Determiners or auxiliary verbs are good candidates.) Thus, the minimum distance will be zero and your method should return only trees that contain that element. Note that this test should use some already existing method to produce the correct set of trees.
Hint: such a method already exists in the TreeOld class.
# write test hereThe next task will look at computing distance between the elements of a tree and a query set defined by a regular expression. Here is a regular expression class based on the formal definition of regular expressions I gave you in class.
from itertools import productclass Regex: """A regular expression that generates the set of matching strings. Parameters ---------- regex_parsed : list[str | list] The parsed regular expression structure. maxlength : int Maximum length of generated strings. """ CHAR = pyparsing.Word(pyparsing.alphas, exact=1).setName("character") # use 'exact', not 'max' LPAR = pyparsing.Suppress('(') RPAR = pyparsing.Suppress(')') PARSER = pyparsing.Forward() GROUP = pyparsing.Group(LPAR + PARSER + RPAR) QUANT = pyparsing.oneOf("* ? +") DSJ = '|' ITEM = pyparsing.Group((CHAR | GROUP) + QUANT) | pyparsing.Group(CHAR + DSJ + CHAR) | CHAR | GROUP ITEMSEQ = pyparsing.OneOrMore(ITEM) PARSER <<= pyparsing.delimitedList(ITEMSEQ, pyparsing.Empty()) def __init__(self, regex_parsed: list[str | list], maxlength: int): self._regex_parsed = regex_parsed self._maxlength = maxlength @classmethod def from_string(cls, regexstr: str, maxlength: int = 30): """Parse a regular expression from a string. Parameters ---------- regexstr : str The regular expression string to parse. maxlength : int Maximum length of generated strings. Returns ------- Regex The parsed regular expression. """ if regexstr[0] != '(': regexstr = '(' + regexstr if regexstr[-1] != ')': regexstr = regexstr +')' regex_parsed = cls.PARSER.parseString(regexstr)[0] return cls(regex_parsed, maxlength) def __iter__(self): self._gen = self._construct_regex_generator() return self def __next__(self): return next(self._gen) def _construct_regex_generator(self, regex=None): if regex is None: regex = self._regex_parsed if isinstance(regex, str): if len(regex) > self._maxlength: raise StopIteration yield regex elif regex[1] in ['*', '+']: i = 0 if regex[1] == '*' else 1 while True: for s in self._construct_regex_generator(regex[0]): yield s*i i += 1 if i > self._maxlength: break elif regex[1] == '?': yield '' yield regex[0] elif regex[1] == '|': left = self._construct_regex_generator(regex[0]) right = self._construct_regex_generator(regex[2]) s1 = s2 = '' while True: if len(s1) <= self._maxlength: s1 = next(left) yield s1 if len(s2) <= self._maxlength: s2 = next(right) yield s2 if len(s1) > self._maxlength and len(s2) > self._maxlength: break else: evaluated = [self._construct_regex_generator(r) for r in regex] for p in product(*evaluated): c = ''.join(p) if len(c) <= self._maxlength: yield cThe way to use this class to generate the set of strings associated with a regular expression is tree an instance of the Regex class as a generator.
Importantly, I’ve include a way of only generating strings of less than some length threshold maxlength in the case that your regular expression evaluates to an infinite set.
for s in Regex.from_string('co+lou?r', 20):
print(s)Task 3
Lines: 15
Define a new version of fuzzy_find that behaves exactly the same as your version from Task 1 except that it allows the query data to be a regular expression parsable by Regex.from_string. Make sure that you correctly handle infinite sets.
Hint: your new fuzzy_find will be nearly identical to the old one. My implementation only has a single additional line.
class Tree(TreeOld): """A tree with fuzzy search via edit distance against regex-defined sets.""" DIST = EditDistance(1., 1., 1.) def fuzzy_find(self, data: str | list[str], closest: bool = True, distance: float = np.inf, case_fold: bool = True, terminals_only: bool = True, index_path: tuple[int] = tuple()) -> list[FuzzyFindResult]: """Find the (closest) strings within a distance of the set defined by a regex. Defaults to computing the closest strings among the terminals and ignoring case. Parameters ---------- data : str | list[str] The regex to match against. closest : bool Whether to return only the closest strings or all strings within distance. distance : float The distance within which a string must be. case_fold : bool Whether to lower-case the data. terminals_only : bool Whether to only search the terminals. index_path : tuple[int] The current path prefix for recursive calls. Returns ------- list[FuzzyFindResult] List of tuples of (index_path, data, distance). """ raise NotImplementedErrorWrite tests analogous to the ones you wrote for Task 1.
# write tests here