Building a corpus reader

Definition of Tree up to this point
from typing import TypeVar
from rdflib import Graph, URIRef

DataType = TypeVar("DataType")

class Tree:
    """A tree
    
    Parameters
    ----------
    data
        The data contained in this tree
    children
        The subtrees of this tree
    """
    def __init__(self, data: DataType, children: list['Tree']=[]):
        self._data = data
        self._children = children
        
        self._validate()
        
    def _validate(self) -> None:
        try:
            assert all(isinstance(c, Tree)
                       for c in self._children)
        except AssertionError:
            msg = 'all children must be trees'
            raise TypeError(msg)
        
    @property
    def data(self) -> DataType:
        return self._data 
    
    @property
    def children(self) -> list['Tree']:
        return self._children

    def __str__(self):
        if self._children:
            return ' '.join(c.__str__() for c in self._children)
        else:
            return str(self._data)
        
    def __repr__(self):
        return self.to_string(0)
     
    def to_string(self, depth: int) -> str:
        s = (depth - 1) * '  ' +\
            int(depth > 0) * '--' +\
            self._data + '\n'
        s += ''.join(c.to_string(depth+1)
                     for c in self._children)
        
        return s

    def __contains__(self, data: DataType) -> bool:
        # pre-order depth-first search
        if self._data == data:
            return True
        else:
            for child in self._children:
                if data in child:
                    return True
                
            return False
        
    def __getitem__(self, idx: tuple[int]) -> 'Tree':
        idx = (idx,) if isinstance(idx, int) else idx
        
        try:
            assert all(isinstance(i, int) for i in idx)
            assert all(i >= 0 for i in idx)
        except AssertionError:
            errmsg = 'index must be a positive int or tuple of positive ints'
            raise IndexError(errmsg)
        
        if not idx:
            return self
        elif len(idx) == 1:
            return self._children[idx[0]]
        else:
            return self._children[idx[0]][idx[1:]]

    
    RDF_TYPES = {}
    RDF_EDGES = {'is': URIRef('is-a'),
                 'parent': URIRef('is-the-parent-of'),
                 'child': URIRef('is-a-child-of'),
                 'sister': URIRef('is-a-sister-of')}
            
    def to_rdf(self, graph=None, nodes={}, idx=tuple()) -> Graph: 
        graph = Graph() if graph is None else graph
        
        idxstr = '_'.join(str(i) for i in idx)
        nodes[idx] = URIRef(idxstr)
            
        if self._data not in Tree.RDF_TYPES:
            Tree.RDF_TYPES[self._data] = URIRef(self._data)

        typetriple = (nodes[idx], 
                      Tree.RDF_EDGES['is'],
                      Tree.RDF_TYPES[self.data])

        graph.add(typetriple)

        for i, child in enumerate(self._children):
            childidx = idx+(i,)
            child.to_rdf(graph, nodes, childidx)
                
            partriple = (nodes[idx], 
                         Tree.RDF_EDGES['parent'],
                         nodes[childidx])
            chitriple = (nodes[childidx], 
                         Tree.RDF_EDGES['child'],
                         nodes[idx])
            
            graph.add(partriple)
            graph.add(chitriple)
            
        for i, child1 in enumerate(self._children):
            for j, child2 in enumerate(self._children):
                child1idx = idx+(i,)
                child2idx = idx+(j,)
                sistriple = (nodes[child1idx], 
                             Tree.RDF_EDGES['sister'],
                             nodes[child2idx])
                
                graph.add(sistriple)
        
        self._rdf_nodes = nodes
        
        return graph
    
    @property
    def rdf(self) -> Graph:
        if not hasattr(self, "_rdf"):
            self._rdf = self.to_rdf()

        return self._rdf
    
    def find(self, query: str) -> list[tuple[int]]:
        return [tuple([int(i) 
                       for i in str(res[0]).split('_')]) 
                for res in self.rdf.query(query)]

Now that we can search over individual trees, let’s now see how to automatically load all trees from a corpus. We’ll use the constituency-parsed English Web TreeBank for this purpose. This corpus is separated into different genres, sources, and documents, with each .tree file containing possibly multiple parse trees (one per line).

!tar -xzf LDC2012T13.tgz --to-command=cat 'eng_web_tbk/data/newsgroup/penntree/groups.google.com_8TRACKGROUPFORCOOLPEOPLE_3b43577fb9121c9f_ENG_20050320_090500.xml.tree'
( (S (S-IMP (NP-SBJ (-NONE- *PRO*)) (VP (VB Play) (NP (PRP$ your) (NML (NML (NNS CD's)) (, ,) (NML (CD 8) (HYPH -) (NNS tracks)) (, ,) (NML (NML (NN reel)) (PP (IN to) (NP (NNS reels)))) (, ,) (NML (NNS cassettes)) (, ,) (NML (NN vinyl) (CD 33) (SYM /) (NNS 45's)) (, ,) (CC and) (NML (NN shellac) (NNS 78's)))) (PP-MNR (IN through) (NP (DT this) (JJ little) (JJ integrated) (NN amp))))) (, ,) (S (NP-SBJ (PRP you)) (VP (MD 'll) (VP (VB get) (NP (DT a) (JJ big) (NN eye) (NN opener))))) (. !)) )
( (FRAG (ADJP (JJ complete) (PP (IN with) (NP (JJ original) (NNP Magnavox) (NNS tubes)))) (, -) (S (S (NP-SBJ-1 (DT all) (NNS tubes)) (VP (VBP have) (VP (VBN been) (VP (VBN tested) (NP-1 (-NONE- *)))))) (S (NP-SBJ (PRP they)) (VP (VBP are) (RB all) (ADJP-PRD (JJ good))))) (, -) (NP (NN stereo) (NN amp))) )

We will talk about how to actually parse these sorts of strings against a grammar later in the class, but for current purposes, we’ll use pyparsing to define a grammar and parse threse strings to a list of lists.

import pyparsing

LPAR = pyparsing.Suppress('(')
RPAR = pyparsing.Suppress(')')
data = pyparsing.Regex(r'[^\(\)\s]+')

exp = pyparsing.Forward()
expList = pyparsing.Group(LPAR + data + exp[...] + RPAR)
exp <<= data | expList
import tarfile
from pprint import pprint

fname = "eng_web_tbk/data/newsgroup/penntree/groups.google.com_8TRACKGROUPFORCOOLPEOPLE_3b43577fb9121c9f_ENG_20050320_090500.xml.tree"

with tarfile.open("LDC2012T13.tgz") as corpus:
    with corpus.extractfile(fname) as treefile:
        treestr = treefile.readline().decode()[2:-2]
        treelist = exp.parseString(treestr)[0]
    
treelist
ParseResults(['S', ParseResults(['S-IMP', ParseResults(['NP-SBJ', ParseResults(['-NONE-', '*PRO*'], {})], {}), ParseResults(['VP', ParseResults(['VB', 'Play'], {}), ParseResults(['NP', ParseResults(['PRP$', 'your'], {}), ParseResults(['NML', ParseResults(['NML', ParseResults(['NNS', "CD's"], {})], {}), ParseResults([',', ','], {}), ParseResults(['NML', ParseResults(['CD', '8'], {}), ParseResults(['HYPH', '-'], {}), ParseResults(['NNS', 'tracks'], {})], {}), ParseResults([',', ','], {}), ParseResults(['NML', ParseResults(['NML', ParseResults(['NN', 'reel'], {})], {}), ParseResults(['PP', ParseResults(['IN', 'to'], {}), ParseResults(['NP', ParseResults(['NNS', 'reels'], {})], {})], {})], {}), ParseResults([',', ','], {}), ParseResults(['NML', ParseResults(['NNS', 'cassettes'], {})], {}), ParseResults([',', ','], {}), ParseResults(['NML', ParseResults(['NN', 'vinyl'], {}), ParseResults(['CD', '33'], {}), ParseResults(['SYM', '/'], {}), ParseResults(['NNS', "45's"], {})], {}), ParseResults([',', ','], {}), ParseResults(['CC', 'and'], {}), ParseResults(['NML', ParseResults(['NN', 'shellac'], {}), ParseResults(['NNS', "78's"], {})], {})], {})], {}), ParseResults(['PP-MNR', ParseResults(['IN', 'through'], {}), ParseResults(['NP', ParseResults(['DT', 'this'], {}), ParseResults(['JJ', 'little'], {}), ParseResults(['JJ', 'integrated'], {}), ParseResults(['NN', 'amp'], {})], {})], {})], {})], {}), ParseResults([',', ','], {}), ParseResults(['S', ParseResults(['NP-SBJ', ParseResults(['PRP', 'you'], {})], {}), ParseResults(['VP', ParseResults(['MD', "'ll"], {}), ParseResults(['VP', ParseResults(['VB', 'get'], {}), ParseResults(['NP', ParseResults(['DT', 'a'], {}), ParseResults(['JJ', 'big'], {}), ParseResults(['NN', 'eye'], {}), ParseResults(['NN', 'opener'], {})], {})], {})], {})], {}), ParseResults(['.', '!'], {})], {})

First, we’ll define a method for building a Tree from this ParseResults object, which can be viewed as a list of list of lists…

class Tree(Tree):
    
    @classmethod
    def from_string(cls, treestr):
        treelist = cls.PARSER.parseString(treestr[2:-2])[0]
        return cls.from_list(treelist)
    
    @classmethod
    def from_list(cls, treelist):
        if isinstance(treelist, str):
            return cls(treelist[0])
        elif isinstance(treelist[1], str):
            return cls(treelist[0], [cls(treelist[1])])
        else:
            return cls(treelist[0], [cls.from_list(l) for l in treelist[1:]])

We can now build a lightweight container for our trees.

import tarfile
from collections import defaultdict

class EnglishWebTreebank:
    
    def __init__(self, root='LDC2012T13.tgz'):
        
        def trees():
            with tarfile.open(root) as corpus:
                for fname in corpus.getnames():
                    if '.xml.tree' in fname:
                        with corpus.extractfile(fname) as treefile:
                            treestr = treefile.readline().decode()
                            yield fname, Tree.from_string(treestr)
                        
        self._trees = trees()
                        
    def items(self):
        for fn, tlist in self._trees:
              yield fn, tlist
        
ewt = EnglishWebTreebank()

next(ewt.items())
('eng_web_tbk/data/answers/penntree/20070404104007AAY1Chs_ans.xml.tree',
 S
 --SBARQ
   --WHADVP-9
     --WRB
       --where
   --SQ
     --MD
       --can
     --NP-SBJ
       --PRP
         --I
     --VP
       --VB
         --get
       --NP
         --NNS
           --morcillas
       --PP-LOC
         --IN
           --in
         --NP
           --NNP
             --tampa
           --NNP
             --bay
       --ADVP-LOC-9
         ---NONE-
           --*T*
 --,
   --,
 --S
   --S
     --NP-SBJ
       --PRP
         --I
     --VP
       --MD
         --will
       --VP
         --VB
           --like
         --NP
           --DT
             --the
           --JJ
             --argentinian
           --NN
             --type
   --,
     --,
   --CC
     --but
   --S
     --NP-SBJ-1
       --PRP
         --I
     --VP
       --MD
         --will
       --S
         --NP-SBJ-1
           ---NONE-
             --*PRO*
         --VP
           --TO
             --to
           --VP
             --VB
               --try
             --NP
               --NNS
                 --anothers
             --INTJ
               --UH
                 --please
 --.
   --?)

Now, we can run arbitrary queries across trees.

ewt = EnglishWebTreebank()

n_subj = 0
n_subj_prp = 0
n_obj_prp = 0
n_obj = 0 

for _, tree in ewt.items():
    idx_subj_prp = tree.find('''SELECT ?node
                                WHERE { ?node <is-a> <NP-SBJ>;
                                              <is-the-parent-of> ?child.
                                        ?child <is-a> <PRP>.
                                      }''')
    idx_subj = tree.find('''SELECT ?node
                                WHERE { ?node <is-a> <NP-SBJ>. }''')
    idx_obj_prp = tree.find('''SELECT ?node
                                WHERE { ?parent <is-the-parent-of> ?node.
                                        { ?parent <is-a> <VP> } UNION { ?parent <is-a> <PP> }
                                        ?node <is-the-parent-of> ?child;
                                              <is-a> <NP>.
                                        ?child <is-a> <PRP>.
                                      }''')
    idx_obj = tree.find('''SELECT ?node
                                WHERE { ?parent <is-the-parent-of> ?node.
                                        { ?parent <is-a> <VP> } UNION { ?parent <is-a> <PP> }
                                        ?node <is-a> <NP>.
                                      }''')