Assignments 5 and 6

Like Assignments 1/2 and 3/4, Assignments 5 and 6 are bundled together. You only need to do Task 1 for Assignment 5 and Tasks 2 and 3 for Assignment 6.

These assignments focus on implementing fuzzy tree search using edit distance and regular expressions. The appendix on Working with Annotated Corpora develops various tree search algorithms that look for an exact match between a query and the data contained in particular trees. You should read through it before starting these assignments, since the code used here is developed there. I’ve copied the relevant class below as TreeOld.

import pyparsingfrom collections.abc import Hashablefrom rdflib import Graph, URIReftype DataType = Hashabletype TreeList = list[str | tuple[str | 'TreeList']]class TreeOld:    """A tree data structure with RDF conversion and search capabilities.        Parameters    ----------    data : DataType        The data stored at this node.    children : list[TreeOld]        The children of this node.    """        RDF_TYPES = {}    RDF_EDGES = {'is': URIRef('is-a'),                 'parent': URIRef('is-the-parent-of'),                 'child': URIRef('is-a-child-of'),                 'sister': URIRef('is-a-sister-of')}        LPAR = pyparsing.Suppress('(')    RPAR = pyparsing.Suppress(')')    DATA = pyparsing.Regex(r'[^\(\)\s]+')    PARSER = pyparsing.Forward()    SUBTREE = pyparsing.ZeroOrMore(PARSER)    PARSERLIST = pyparsing.Group(LPAR + DATA + SUBTREE + RPAR)    PARSER <<= DATA | PARSERLIST        def __init__(self, data: DataType, children: list['TreeOld'] = []):        self._data = data        self._children = children                self._validate()      def __str__(self):        if self._children:            return ' '.join(c.__str__() for c in self._children)        else:            return str(self._data)            def __repr__(self):        return self.to_string()         def to_string(self, depth: int = 0) -> str:        """Render this tree as an indented string.        Parameters        ----------        depth : int            Current depth for indentation.        Returns        -------        str            Indented string representation of the tree.        """        s = (depth - 1) * '  ' +\            int(depth > 0) * '--' +\            self._data + '\n'        s += ''.join(c.to_string(depth+1)                     for c in self._children)                return s    def __contains__(self, data: DataType) -> bool:        # pre-order depth-first search        if self._data == data:            return True        else:            for child in self._children:                if data in child:                    return True                            return False            def __getitem__(self, idx: tuple[int]) -> DataType:        if isinstance(idx, int):            return self._children[idx]        elif len(idx) == 1:            return self._children[idx[0]]        elif idx:            return self._children[idx[0]].__getitem__(idx[1:])        else:            return self            @property    def data(self) -> DataType:        """The data stored at this node."""        return self._data         @property    def children(self) -> list['TreeOld']:        """The children of this node."""        return self._children            def _validate(self):        try:            assert all(isinstance(c, Tree)                       for c in self._children)        except AssertionError:            msg = 'all children must be trees'            raise TypeError(msg)                def index(self, data: DataType, index_path: tuple[int] = tuple()):        """Find all index paths to nodes matching the given data.        Parameters        ----------        data : DataType            The data to search for.        index_path : tuple[int]            The current path prefix for recursive calls.        Returns        -------        list[tuple[int, ...]]            List of index paths to matching nodes.        """        indices = [index_path] if self._data==data else []        root_path = [] if index_path == -1 else index_path                indices += [j                     for i, c in enumerate(self._children)                     for j in c.index(data, root_path+(i,))]        return indices                def to_rdf(        self,         graph: Graph | None=None,         nodes: dict[int, URIRef] = {},         idx: tuple[int] = tuple()    ) -> Graph:         """Convert this tree to an RDF graph.        Parameters        ----------        graph : Graph | None            An existing graph to add to, or None to create a new one.        nodes : dict[int, URIRef]            Mapping from index paths to RDF nodes.        idx : tuple[int]            Current index path.        Returns        -------        Graph            The RDF graph representing this tree.        """        graph = Graph() if graph is None else graph                idxstr = '_'.join(str(i) for i in idx)        nodes[idx] = URIRef(idxstr)                    if self._data not in self.RDF_TYPES:            self.RDF_TYPES[self._data] = URIRef(self._data)        typetriple = (nodes[idx],                       self.RDF_EDGES['is'],                      self.RDF_TYPES[self.data])        graph.add(typetriple)        for i, child in enumerate(self._children):            childidx = idx+(i,)            child.to_rdf(graph, nodes, childidx)                            partriple = (nodes[idx],                          self.RDF_EDGES['parent'],                         nodes[childidx])            chitriple = (nodes[childidx],                          self.RDF_EDGES['child'],                         nodes[idx])                        graph.add(partriple)            graph.add(chitriple)                    for i, child1 in enumerate(self._children):            for j, child2 in enumerate(self._children):                child1idx = idx+(i,)                child2idx = idx+(j,)                sistriple = (nodes[child1idx],                              Tree.RDF_EDGES['sister'],                             nodes[child2idx])                                graph.add(sistriple)                self._rdf_nodes = nodes                return graph        @property    def rdf(self) -> Graph:        """The RDF graph for this tree."""        return self.to_rdf()        def find(self, query):        """Find nodes matching an RDF query.        Parameters        ----------        query            An RDF query string.        Returns        -------        list[tuple[int, ...]]            List of index paths to matching nodes.        """        return [tuple([int(i)                        for i in str(res[0]).split('_')])                 for res in self.rdf.query(query)]        @classmethod    def from_string(cls, treestr: str) -> 'TreeOld':        """Parse a tree from a parenthesized string representation.        Parameters        ----------        treestr : str            The parenthesized string to parse.        Returns        -------        TreeOld            The parsed tree.        """        treelist = cls.PARSER.parseString(treestr[2:-2])[0]                return cls.from_list(treelist)        @classmethod    def from_list(cls, treelist: TreeList):        """Build a tree from a nested list representation.        Parameters        ----------        treelist : TreeList            The nested list to convert.        Returns        -------        TreeOld            The constructed tree.        """        if isinstance(treelist, str):            return cls(treelist[0])        elif isinstance(treelist[1], str):            return cls(treelist[0], [cls(treelist[1])])        else:            return cls(treelist[0], [cls.from_list(l) for l in treelist[1:]])

In fuzzy search, we allow this exact matching restriction to be loosened by instead allowing that matches be (i) within some fixed edit distance; and/or (ii) closest (in terms of edit distance) to the query among all pieces of data. I’ve copied simplified version of the edit distance class that we developed in class below as EditDistance.

import numpy as npclass EditDistance:    """Distance between strings computed via dynamic programming.    Parameters    ----------    insertion_cost : float        Cost of inserting a character.    deletion_cost : float        Cost of deleting a character.    substitution_cost : float | None        Cost of substituting a character. Defaults to insertion_cost + deletion_cost.    """        def __init__(self, insertion_cost: float = 1.,                  deletion_cost: float = 1.,                  substitution_cost: float | None = None):        self._insertion_cost = insertion_cost        self._deletion_cost = deletion_cost        if substitution_cost is None:            self._substitution_cost = insertion_cost + deletion_cost        else:            self._substitution_cost = substitution_cost    def __call__(self, source: str | list[str], target: str | list[str]) ->  float:        """Compute the edit distance between source and target.                Uses lists to enable digraphs to be identified.                Parameters        ----------        source : str | list[str]            The source string or list of string segments.        target : str | list[str]            The target string or list of string segments.        Returns        -------        float            The minimum edit distance between source and target.        """                # coerce to list if not a list        if isinstance(source, str):            source = list(source)                    if isinstance(target, str):            target = list(target)                n, m = len(source), len(target)        source, target = ['#']+source, ['#']+target        distance = np.zeros([n+1, m+1], dtype=float)                for i in range(1,n+1):            distance[i,0] = distance[i-1,0]+self._deletion_cost        for j in range(1,m+1):            distance[0,j] = distance[0,j-1]+self._insertion_cost                    for i in range(1,n+1):            for j in range(1,m+1):                if source[i] == target[j]:                    substitution_cost = 0.                else:                    substitution_cost = self._substitution_cost                                    costs = np.array([distance[i-1,j]+self._deletion_cost,                                  distance[i-1,j-1]+substitution_cost,                                  distance[i,j-1]+self._insertion_cost])                                    distance[i,j] = costs.min()                        return distance[n,m]

Task 1

Lines: 14

Define an instance method fuzzy_find. This method should take a piece of query data and optionally a distance and return all of the nodes that have data that is within edit distance distance from data; and/or if , closest is True, it should return all of the nodes closest to data among all nodes in the tree.

For instance:

  • fuzzy_find('review', distance=3., closest=False) will return a tuple of every piece of data in the tree within edit distance 3 from review (e.g. view, reviewer, reviews, etc.), its distance to review, and its index; if there is nothing within that edit distance, an empty list will be returned
  • fuzzy_find('review', distance=3., closest=True) will return a tuple of the closest pieces of data in the tree that are also within edit distance 3 from review (e.g. view, reviewer, reviews, etc.), its distance to review, and its index; if there is nothing within that edit distance, an empty list will be returned
  • fuzzy_find('review', distance=np.inf, closest=True) will return a tuple of the closest pieces of data in the tree to review (e.g. view, reviewer, reviews, etc.), regardless of edit distance, its distance to review, and its index; this will always return something
  • fuzzy_find('review', distance=np.inf, closest=False) will return a tuple of every piece of data in the tree to review (e.g. view, reviewer, reviews, etc.), regardless of edit distance, its distance to review, and its index; this will always return a list with as many elements as there are nodes in the tree

This method should also support only searching the terminal nodes (leaves) of the tree with the flag terminals_only.

Hint: you should look back at the methods we defined for searching and indexing the tree above. Specifically, to understand why you might want something like index_path defaulting to the empty tuple, look at the index method of TreeOld.

type FuzzyFindResult = tuple[tuple, str | list[str], float]class Tree(TreeOld):    """A tree with fuzzy search capabilities via edit distance."""        DIST = EditDistance(1., 1.)        def fuzzy_find(self, data: str | list[str],                    closest: bool = True,                    distance: float = np.inf,                   case_fold: bool = True,                   terminals_only: bool = True,                   index_path: tuple = tuple()) -> list[FuzzyFindResult]:        """Find the (closest) strings within a certain distance.                Defaults to computing the closest strings among the terminals        and ignoring case.                The format of the things returned is ``[((0,1,0), "view", 2.0), ...]``.        Edit distance can be computed on either a ``str`` or ``list[str]``;        that is why the middle element of each tuple might be either.                Parameters        ----------        data : str | list[str]            The data to match against.        closest : bool            Whether to return only the closest strings or all strings            within distance.        distance : float            The distance within which a string must be.        case_fold : bool            Whether to lower-case the data.        terminals_only : bool            Whether to only search the terminals.        index_path : tuple            The current path prefix for recursive calls.        Returns        -------        list[FuzzyFindResult]            List of tuples of (index_path, data, distance).        """        raise NotImplementedError

Write tests that use the following tree as input data.

treestr = '( (SBARQ (WHNP-1 (WP What)) (SQ (NP-SBJ-1 (-NONE- *T*)) (VP (VBZ is) (NP-PRD (NP (DT the) (JJS best) (NN place)) (SBAR (WHADVP-2 (-NONE- *0*)) (S (NP-SBJ (-NONE- *PRO*)) (VP (TO to) (VP (VB get) (NP (NP (NNS discounts)) (PP (IN for) (NP (NML (NNP San) (NNP Francisco)) (NNS restaurants)))) (ADVP-LOC-2 (-NONE- *T*))))))))) (. ?)) )'

testtree = Tree.from_string(treestr)

testtree
SBARQ
--WHNP-1
  --WP
    --What
--SQ
  --NP-SBJ-1
    ---NONE-
      --*T*
  --VP
    --VBZ
      --is
    --NP-PRD
      --NP
        --DT
          --the
        --JJS
          --best
        --NN
          --place
      --SBAR
        --WHADVP-2
          ---NONE-
            --*0*
        --S
          --NP-SBJ
            ---NONE-
              --*PRO*
          --VP
            --TO
              --to
            --VP
              --VB
                --get
              --NP
                --NP
                  --NNS
                    --discounts
                --PP
                  --IN
                    --for
                  --NP
                    --NML
                      --NNP
                        --San
                      --NNP
                        --Francisco
                    --NNS
                      --restaurants
              --ADVP-LOC-2
                ---NONE-
                  --*T*
--.
  --?

The tests should test the four combinations of distance and closest listed above with the same query data, both with and without terminals_only=True and terminals_only=False (eight tests in total). Two further tests should test distance=np.inf, closest=True, terminals_only=True for a case where only a single element should be returned and a case where multiple elements in the tree should be returned.

# write tests here

Remember that we talked in class about what it would mean to take the distance between a string and a collection of strings: basically, the minimum of the edit distances between the string and each string in that set.

We can use this concept in two ways here. The first is to view the tree as a container for some data and to compute the minimum distance between a query and any data contained in the tree. Alternatively, we can think of the query itself as determining a set and compute the minimum distance of each piece of data in the tree to that set. Task 2 will implement the former and Task 3 the latter.

I’ve copied the corpus reader we developed for the English Web Treebank in class below. We’ll make use of this for Task 2. (You’ll need to grab LDC2012T13.tgz from the course Google drive.)

!wget --no-check-certificate 'https://drive.google.com/uc?export=download&id=1ygMIl1w6wz6A24oxkLwirunSKXb9EW12' -O 'LDC2012T13.tgz'
import tarfilefrom collections import defaultdictclass EnglishWebTreebankOld:    """Corpus reader for the English Web Treebank.    Parameters    ----------    root : str        Path to the treebank archive file.    """        def __init__(self, root='LDC2012T13.tgz'):                def trees():            with tarfile.open(root) as corpus:                for fname in corpus.getnames():                    if '.xml.tree' in fname:                        with corpus.extractfile(fname) as treefile:                            treestr = treefile.readline().decode()                            yield fname, Tree.from_string(treestr)                                self._trees = trees()                            def items(self):        """Iterate over filename-tree pairs in the corpus.        Yields        ------        tuple[str, Tree]            A tuple of the filename and the parsed tree.        """        for fn, tlist in self._trees:              yield fn, tlist

Task 2

Lines: 3

Define an instance method fuzzy_find for the corpus reader class that computes the minimum distance between a query and a tree for all trees in the corpus. It should return a list of tuples with the first element a tree ID, the second an index in that tree, the third the data at that index and the fourth the distance between the query and that index. A tuple should be included in the returned list only if the distance is equal to the minimum across trees in the corpus.

Hint: this should be very straightforward using a particular parameterization for Tree1.fuzzy_find. Which one?

class EnglishWebTreebank(EnglishWebTreebankOld):    """Corpus reader with fuzzy search across the treebank."""        def fuzzy_find(self, data: str | list[str]) -> list[FuzzyFindResult]:        """Find the trees in the corpus closest to the query data.                Parameters        ----------        data : str | list[str]            The query data to search for.        Returns        -------        list[FuzzyFindResult]            List of tuples of (index_path, data, distance) for the            closest matches across all trees.        """        raise NotImplementedError

Now, load this corpus.

ewt = EnglishWebTreebank()

Write a single test for a piece of data you know exists in some tree in the corpus. (Determiners or auxiliary verbs are good candidates.) Thus, the minimum distance will be zero and your method should return only trees that contain that element. Note that this test should use some already existing method to produce the correct set of trees.

Hint: such a method already exists in the TreeOld class.

# write test here

The next task will look at computing distance between the elements of a tree and a query set defined by a regular expression. Here is a regular expression class based on the formal definition of regular expressions I gave you in class.

from itertools import productclass Regex:    """A regular expression that generates the set of matching strings.    Parameters    ----------    regex_parsed : list[str | list]        The parsed regular expression structure.    maxlength : int        Maximum length of generated strings.    """    CHAR = pyparsing.Word(pyparsing.alphas, exact=1).setName("character")  # use 'exact', not 'max'    LPAR = pyparsing.Suppress('(')    RPAR = pyparsing.Suppress(')')    PARSER = pyparsing.Forward()    GROUP = pyparsing.Group(LPAR + PARSER + RPAR)    QUANT = pyparsing.oneOf("* ? +")    DSJ = '|'    ITEM = pyparsing.Group((CHAR | GROUP) + QUANT) | pyparsing.Group(CHAR + DSJ + CHAR) | CHAR | GROUP    ITEMSEQ = pyparsing.OneOrMore(ITEM)    PARSER <<= pyparsing.delimitedList(ITEMSEQ, pyparsing.Empty())        def __init__(self, regex_parsed: list[str | list], maxlength: int):        self._regex_parsed = regex_parsed        self._maxlength = maxlength        @classmethod    def from_string(cls, regexstr: str, maxlength: int = 30):        """Parse a regular expression from a string.        Parameters        ----------        regexstr : str            The regular expression string to parse.        maxlength : int            Maximum length of generated strings.        Returns        -------        Regex            The parsed regular expression.        """        if regexstr[0] != '(':            regexstr = '(' + regexstr                    if regexstr[-1] != ')':            regexstr = regexstr +')'                    regex_parsed = cls.PARSER.parseString(regexstr)[0]                return cls(regex_parsed, maxlength)        def __iter__(self):        self._gen = self._construct_regex_generator()        return self        def __next__(self):        return next(self._gen)            def _construct_regex_generator(self, regex=None):        if regex is None:            regex = self._regex_parsed                if isinstance(regex, str):            if len(regex) > self._maxlength:                raise StopIteration            yield regex                elif regex[1] in ['*', '+']:            i = 0 if regex[1] == '*' else 1            while True:                for s in self._construct_regex_generator(regex[0]):                    yield s*i                i += 1                                if i > self._maxlength:                    break                            elif regex[1] == '?':            yield ''            yield regex[0]        elif regex[1] == '|':            left = self._construct_regex_generator(regex[0])            right = self._construct_regex_generator(regex[2])                        s1 = s2 = ''                        while True:                if len(s1) <= self._maxlength:                                    s1 = next(left)                    yield s1                                if len(s2) <= self._maxlength:                    s2 = next(right)                    yield s2                if len(s1) > self._maxlength and len(s2) > self._maxlength:                    break                else:            evaluated = [self._construct_regex_generator(r) for r in regex]            for p in product(*evaluated):                c = ''.join(p)                                if len(c) <= self._maxlength:                    yield c

The way to use this class to generate the set of strings associated with a regular expression is tree an instance of the Regex class as a generator.

Importantly, I’ve include a way of only generating strings of less than some length threshold maxlength in the case that your regular expression evaluates to an infinite set.

for s in Regex.from_string('co+lou?r', 20):
    print(s)

Task 3

Lines: 15

Define a new version of fuzzy_find that behaves exactly the same as your version from Task 1 except that it allows the query data to be a regular expression parsable by Regex.from_string. Make sure that you correctly handle infinite sets.

Hint: your new fuzzy_find will be nearly identical to the old one. My implementation only has a single additional line.

class Tree(TreeOld):    """A tree with fuzzy search via edit distance against regex-defined sets."""        DIST = EditDistance(1., 1., 1.)        def fuzzy_find(self, data: str | list[str],                    closest: bool = True,                    distance: float = np.inf,                   case_fold: bool = True,                   terminals_only: bool = True,                   index_path: tuple[int] = tuple()) -> list[FuzzyFindResult]:        """Find the (closest) strings within a distance of the set defined by a regex.                Defaults to computing the closest strings among the terminals        and ignoring case.                Parameters        ----------        data : str | list[str]            The regex to match against.        closest : bool            Whether to return only the closest strings or all strings            within distance.        distance : float            The distance within which a string must be.        case_fold : bool            Whether to lower-case the data.        terminals_only : bool            Whether to only search the terminals.        index_path : tuple[int]            The current path prefix for recursive calls.        Returns        -------        list[FuzzyFindResult]            List of tuples of (index_path, data, distance).        """        raise NotImplementedError

Write tests analogous to the ones you wrote for Task 1.

# write tests here