Assignments 3 and 4

Like Assignments 1 and 2, Assignments 3 and 4 are bundled together. You only need to do Task 1 for Assignment 3 and Tasks 2 and 3 for Assignment 4.

These assignments focus on implementing fuzzy tree search. In class, we developed various tree search algorithms that look for an exact match between a query and the data contained in particular trees. I’ve copied in the relevant class below as TreeOld.

import pyparsing
from typing import TypeVar, Union
from rdflib import Graph, URIRef

DataType = TypeVar("DataType")
TreeList = list[str | tuple[Union[str, 'TreeList']]]

class TreeOld:
    """A tree
    
    Parameters
    ----------
    data
    children
    """
    
    RDF_TYPES = {}
    RDF_EDGES = {'is': URIRef('is-a'),
                 'parent': URIRef('is-the-parent-of'),
                 'child': URIRef('is-a-child-of'),
                 'sister': URIRef('is-a-sister-of')}
    
    LPAR = pyparsing.Suppress('(')
    RPAR = pyparsing.Suppress(')')
    DATA = pyparsing.Regex(r'[^\(\)\s]+')

    PARSER = pyparsing.Forward()
    SUBTREE = pyparsing.ZeroOrMore(PARSER)
    PARSERLIST = pyparsing.Group(LPAR + DATA + SUBTREE + RPAR)
    PARSER <<= DATA | PARSERLIST
    
    def __init__(self, data: DataType, children: list['TreeOld'] = []):
        self._data = data
        self._children = children
        
        self._validate()
  
    def __str__(self):
        if self._children:
            return ' '.join(c.__str__() for c in self._children)
        else:
            return str(self._data)
        
    def __repr__(self):
        return self.to_string()
     
    def to_string(self, depth: int = 0) -> str:
        s = (depth - 1) * '  ' +\
            int(depth > 0) * '--' +\
            self._data + '\n'
        s += ''.join(c.to_string(depth+1)
                     for c in self._children)
        
        return s

    def __contains__(self, data: DataType) -> bool:
        # pre-order depth-first search
        if self._data == data:
            return True
        else:
            for child in self._children:
                if data in child:
                    return True
                
            return False
        
    def __getitem__(self, idx: tuple[int]) -> DataType:
        if isinstance(idx, int):
            return self._children[idx]
        elif len(idx) == 1:
            return self._children[idx[0]]
        elif idx:
            return self._children[idx[0]].__getitem__(idx[1:])
        else:
            return self
        
    @property
    def data(self) -> DataType:
        return self._data 
    
    @property
    def children(self) -> list['TreeOld']:
        return self._children
        
    def _validate(self):
        try:
            assert all(isinstance(c, Tree)
                       for c in self._children)
        except AssertionError:
            msg = 'all children must be trees'
            raise TypeError(msg)
            
    def index(self, data: DataType, index_path: tuple[int] = tuple()):
        indices = [index_path] if self._data==data else []
        root_path = [] if index_path == -1 else index_path
        
        indices += [j 
                    for i, c in enumerate(self._children) 
                    for j in c.index(data, root_path+(i,))]

        return indices
            
    def to_rdf(
        self, 
        graph: Graph | None=None, 
        nodes: dict[int, URIRef] = {}, 
        idx: tuple[int] = tuple()
    ) -> Graph: 
        graph = Graph() if graph is None else graph
        
        idxstr = '_'.join(str(i) for i in idx)
        nodes[idx] = URIRef(idxstr)
            
        if self._data not in self.RDF_TYPES:
            self.RDF_TYPES[self._data] = URIRef(self._data)

        typetriple = (nodes[idx], 
                      self.RDF_EDGES['is'],
                      self.RDF_TYPES[self.data])

        graph.add(typetriple)

        for i, child in enumerate(self._children):
            childidx = idx+(i,)
            child.to_rdf(graph, nodes, childidx)
                
            partriple = (nodes[idx], 
                         self.RDF_EDGES['parent'],
                         nodes[childidx])
            chitriple = (nodes[childidx], 
                         self.RDF_EDGES['child'],
                         nodes[idx])
            
            graph.add(partriple)
            graph.add(chitriple)
            
        for i, child1 in enumerate(self._children):
            for j, child2 in enumerate(self._children):
                child1idx = idx+(i,)
                child2idx = idx+(j,)
                sistriple = (nodes[child1idx], 
                             Tree.RDF_EDGES['sister'],
                             nodes[child2idx])
                
                graph.add(sistriple)
        
        self._rdf_nodes = nodes
        
        return graph
    
    @property
    def rdf(self) -> Graph:
        return self.to_rdf()
    
    def find(self, query):
        return [tuple([int(i) 
                       for i in str(res[0]).split('_')]) 
                for res in self.rdf.query(query)]
    
    @classmethod
    def from_string(cls, treestr: str) -> 'TreeOld':
        treelist = cls.PARSER.parseString(treestr[2:-2])[0]
        
        return cls.from_list(treelist)
    
    @classmethod
    def from_list(cls, treelist: TreeList):
        if isinstance(treelist, str):
            return cls(treelist[0])
        elif isinstance(treelist[1], str):
            return cls(treelist[0], [cls(treelist[1])])
        else:
            return cls(treelist[0], [cls.from_list(l) for l in treelist[1:]])

In fuzzy search, we allow this exact matching restriction to be loosened by instead allowing that matches be (i) within some fixed edit distance; and/or (ii) closest (in terms of edit distance) to the query among all pieces of data. I’ve copied the relevant edit distance class that we developed in class below as EditDistance.

import numpy as np

class EditDistance:
    '''Distance between strings


    Parameters
    ----------
    insertion_cost
    deletion_cost
    substitution_cost
    '''
    
    def __init__(self, insertion_cost: float = 1., 
                 deletion_cost: float = 1., 
                 substitution_cost: float | None = None):
        self._insertion_cost = insertion_cost
        self._deletion_cost = deletion_cost

        if substitution_cost is None:
            self._substitution_cost = insertion_cost + deletion_cost
        else:
            self._substitution_cost = substitution_cost

    def __call__(self, source: str | list[str], target: str | list[str]) ->  float:
        '''The edit distance between the source and target
        
        The use of lists enables digraphs to be identified
        
        Parameters
        ----------
        source
        target
        '''
        
        # coerce to list if not a list
        if isinstance(source, str):
            source = list(source)
            
        if isinstance(target, str):
            target = list(target)
        
        n, m = len(source), len(target)
        source, target = ['#']+source, ['#']+target

        distance = np.zeros([n+1, m+1], dtype=float)
        
        for i in range(1,n+1):
            distance[i,0] = distance[i-1,0]+self._deletion_cost

        for j in range(1,m+1):
            distance[0,j] = distance[0,j-1]+self._insertion_cost
            
        for i in range(1,n+1):
            for j in range(1,m+1):
                if source[i] == target[j]:
                    substitution_cost = 0.
                else:
                    substitution_cost = self._substitution_cost
                    
                costs = np.array([distance[i-1,j]+self._deletion_cost,
                                  distance[i-1,j-1]+substitution_cost,
                                  distance[i,j-1]+self._insertion_cost])
                    
                distance[i,j] = costs.min()
                
        return distance[n,m]

Task 1

Lines: 14

Define an instance method fuzzy_find. This method should take a piece of query data and optionally a distance and return all of the nodes that have data that is within edit distance distance from data; and/or if , closest is True, it should return all of the nodes closest to data among all nodes in the tree.

For instance:

  • fuzzy_find('review', distance=3., closest=False) will return a tuple of every piece of data in the tree within edit distance 3 from review (e.g. view, reviewer, reviews, etc.), its distance to review, and its index; if there is nothing within that edit distance, an empty list will be returned
  • fuzzy_find('review', distance=3., closest=True) will return a tuple of the closest pieces of data in the tree that are also within edit distance 3 from review (e.g. view, reviewer, reviews, etc.), its distance to review, and its index; if there is nothing within that edit distance, an empty list will be returned
  • fuzzy_find('review', distance=np.inf, closest=True) will return a tuple of the closest pieces of data in the tree to review (e.g. view, reviewer, reviews, etc.), regardless of edit distance, its distance to review, and its index; this will always return something
  • fuzzy_find('review', distance=np.inf, closest=False) will return a tuple of every piece of data in the tree to review (e.g. view, reviewer, reviews, etc.), regardless of edit distance, its distance to review, and its index; this will always return a list with as many elements as there are nodes in the tree

This method should also support only searching the terminal nodes (leaves) of the tree with the flag terminals_only.

Hint: you should look back at the methods we defined for searching and indexing the tree above. Specifically, to understand why you might want something like index_path defaulting to the empty tuple, look at the index method of TreeOld.

FuzzyFindResult = tuple[tuple, str | list[str], float]

class Tree(TreeOld):
    
    DIST = EditDistance(1., 1.)
    
    def fuzzy_find(self, data: Union[str, list[str]], 
                   closest: bool = True, 
                   distance: float = np.inf,
                   case_fold: bool = True,
                   terminals_only: bool = True,
                   index_path: tuple = tuple()) -> list[FuzzyFindResult]:
        
        '''Find the (closest) strings within a certain distance
        
        Defaults to computing the closest strings among the terminals and ignoring case
        
        The format of the things returned is [((0,1,0), "view", 2.0), ...]. Note that 
        edit distance can be computed on either a `str` or `List[str]`; that's why
        the middle element of each tuple might be either.
        
        Parameters
        ----------
        data
            the data to match against
        closest
            whether to return only the closest strings or all strings within distance
        distance
            the distance within which a string must be
        case_fold
            whether to lower-case the data
        terminals_only
            whether to only search the terminals
        index_path
        '''
        raise NotImplementedError

Write tests that use the following tree as input data.

treestr = '( (SBARQ (WHNP-1 (WP What)) (SQ (NP-SBJ-1 (-NONE- *T*)) (VP (VBZ is) (NP-PRD (NP (DT the) (JJS best) (NN place)) (SBAR (WHADVP-2 (-NONE- *0*)) (S (NP-SBJ (-NONE- *PRO*)) (VP (TO to) (VP (VB get) (NP (NP (NNS discounts)) (PP (IN for) (NP (NML (NNP San) (NNP Francisco)) (NNS restaurants)))) (ADVP-LOC-2 (-NONE- *T*))))))))) (. ?)) )'

testtree = Tree.from_string(treestr)

testtree
SBARQ
--WHNP-1
  --WP
    --What
--SQ
  --NP-SBJ-1
    ---NONE-
      --*T*
  --VP
    --VBZ
      --is
    --NP-PRD
      --NP
        --DT
          --the
        --JJS
          --best
        --NN
          --place
      --SBAR
        --WHADVP-2
          ---NONE-
            --*0*
        --S
          --NP-SBJ
            ---NONE-
              --*PRO*
          --VP
            --TO
              --to
            --VP
              --VB
                --get
              --NP
                --NP
                  --NNS
                    --discounts
                --PP
                  --IN
                    --for
                  --NP
                    --NML
                      --NNP
                        --San
                      --NNP
                        --Francisco
                    --NNS
                      --restaurants
              --ADVP-LOC-2
                ---NONE-
                  --*T*
--.
  --?

The tests should test the four combinations of distance and closest listed above with the same query data, both with and without terminals_only=True and terminals_only=False (eight tests in total). Two further tests should test distance=np.inf, closest=True, terminals_only=True for a case where only a single element should be returned and a case where multiple elements in the tree should be returned.

# write tests here

Remember that we talked in class about what it would mean to take the distance between a string and a collection of strings: basically, the minimum of the edit distances between the string and each string in that set.

We can use this concept in two ways here. The first is to view the tree as a container for some data and to compute the minimum distance between a query and any data contained in the tree. Alternatively, we can think of the query itself as determining a set and compute the minimum distance of each piece of data in the tree to that set. Task 2 will implement the former and Task 3 the latter.

I’ve copied the corpus reader we developed for the English Web Treebank in class below. We’ll make use of this for Task 2. (You’ll need to grab LDC2012T13.tgz from the course Google drive.)

!wget --no-check-certificate 'https://drive.google.com/uc?export=download&id=1ygMIl1w6wz6A24oxkLwirunSKXb9EW12' -O 'LDC2012T13.tgz'
import tarfile
from collections import defaultdict

class EnglishWebTreebankOld:
    
    def __init__(self, root='LDC2012T13.tgz'):
        
        def trees():
            with tarfile.open(root) as corpus:
                for fname in corpus.getnames():
                    if '.xml.tree' in fname:
                        with corpus.extractfile(fname) as treefile:
                            treestr = treefile.readline().decode()
                            yield fname, Tree.from_string(treestr)
                        
        self._trees = trees()
                        
    def items(self):
        for fn, tlist in self._trees:
              yield fn, tlist

Task 2

Lines: 3

Define an instance method fuzzy_find for the corpus reader class that computes the minimum distance between a query and a tree for all trees in the corpus. It should return a list of tuples with the first element a tree ID, the second an index in that tree, the third the data at that index and the fourth the distance between the query and that index. A tuple should be included in the returned list only if the distance is equal to the minimum across trees in the corpus.

Hint: this should be very straightforward using a particular parameterization for Tree1.fuzzy_find. Which one?

class EnglishWebTreebank(EnglishWebTreebankOld):
    
    def fuzzy_find(self, data: str | list[str]) -> list[FuzzyFindResult]:
        '''Find the trees in the corpus closest to the query data
        
        Parameters
        ----------
        data
        '''
        raise NotImplementedError

Now, load this corpus.

ewt = EnglishWebTreebank()

Write a single test for a piece of data you know exists in some tree in the corpus. (Determiners or auxiliary verbs are good candidates.) Thus, the minimum distance will be zero and your method should return only trees that contain that element. Note that this test should use some already existing method to produce the correct set of trees.

Hint: such a method already exists in the TreeOld class.

# write test here

The next task will look at computing distance between the elements of a tree and a query set defined by a regular expression. Here is a regular expression class based on the formal definition of regular expressions I gave you in class.

from itertools import product

class Regex:
    """A regular expression
    
    Parameters
    ----------
    regex_parsed
    maxlength
    """

    CHAR = pyparsing.Word(pyparsing.alphas, exact=1).setName("character") # <- use 'exact', not 'max'

    LPAR = pyparsing.Suppress('(')
    RPAR = pyparsing.Suppress(')')

    PARSER = pyparsing.Forward()
    GROUP = pyparsing.Group(LPAR + PARSER + RPAR)
    QUANT = pyparsing.oneOf("* ? +")
    DSJ = '|'

    ITEM = pyparsing.Group((CHAR | GROUP) + QUANT) | pyparsing.Group(CHAR + DSJ + CHAR) | CHAR | GROUP
    ITEMSEQ = pyparsing.OneOrMore(ITEM)

    PARSER <<= pyparsing.delimitedList(ITEMSEQ, pyparsing.Empty())
    
    def __init__(self, regex_parsed: List[Union[str, List]], maxlength: int):
        self._regex_parsed = regex_parsed
        self._maxlength = maxlength
    
    @classmethod
    def from_string(cls, regexstr: str, maxlength: int = 30):
        if regexstr[0] != '(':
            regexstr = '(' + regexstr
            
        if regexstr[-1] != ')':
            regexstr = regexstr +')'
            
        regex_parsed = cls.PARSER.parseString(regexstr)[0]
        
        return cls(regex_parsed, maxlength)
    
    def __iter__(self):
        self._gen = self._construct_regex_generator()
        return self
    
    def __next__(self):
        return next(self._gen)
        
    def _construct_regex_generator(self, regex=None):
        if regex is None:
            regex = self._regex_parsed
        
        if isinstance(regex, str):
            if len(regex) > self._maxlength:
                raise StopIteration

            yield regex
        
        elif regex[1] in ['*', '+']:
            i = 0 if regex[1] == '*' else 1
            while True:
                for s in self._construct_regex_generator(regex[0]):
                    yield s*i

                i += 1
                
                if i > self._maxlength:
                    break
                    
        elif regex[1] == '?':
            yield ''
            yield regex[0]

        elif regex[1] == '|':
            left = self._construct_regex_generator(regex[0])
            right = self._construct_regex_generator(regex[2])
            
            s1 = s2 = ''
            
            while True:
                if len(s1) <= self._maxlength:                
                    s1 = next(left)
                    yield s1
                
                if len(s2) <= self._maxlength:
                    s2 = next(right)
                    yield s2

                if len(s1) > self._maxlength and len(s2) > self._maxlength:
                    break
        
        else:
            evaluated = [self._construct_regex_generator(r) for r in regex]
            for p in product(*evaluated):
                c = ''.join(p)
                
                if len(c) <= self._maxlength:
                    yield c

The way to use this class to generate the set of strings associated with a regular expression is tree an instance of the Regex class as a generator.

Importantly, I’ve include a way of only generating strings of less than some length threshold maxlength in the case that your regular expression evaluates to an infinite set.

for s in Regex.from_string('co+lou?r', 20):
    print(s)

Task 3

Lines: 15

Define a new version of fuzzy_find that behaves exactly the same as your version from Task 1 except that it allows the query data to be a regular expression parsable by Regex.from_string. Make sure that you correctly handle infinite sets.

Hint: your new fuzzy_find will be nearly identical to the old one. My implementation only has a single additional line.

class Tree(TreeOld):
    
    DIST = EditDistance(1., 1., 1.)
    
    def fuzzy_find(self, data: str | list[str], 
                   closest: bool = True, 
                   distance: float = np.inf,
                   case_fold: bool = True,
                   terminals_only: bool = True,
                   index_path: tuple[int] = tuple()) -> list[FuzzyFindResult]:
        
        '''Find the (closest) strings within a distance of the set defined by a regex
        
        Defaults to computing the closest strings among the terminals and ignoring case
        
        Parameters
        ----------
        data
            the regex to match against
        closest
            whether to return only the closest strings or all strings within distance
        distance
            the distance within which a string must be
        case_fold
            whether to lower-case the data
        terminals_only
            whether to only search the terminals
        index_path
        '''
        raise NotImplementedError

Write tests analogous to the ones you wrote for Task 1.

# write tests here