Assignments 7 and 8

Assignment 7 will consist of Tasks 1-3 and Assignment 8 will consist of Tasks 4-5.

In these assignments, you will be implementing finite state automata for generating/recognizing/parsing English syllables and words (Assignment 7) as well as finite state transducers for recognizing/parsing/transducing English sentences (Assigment 8). You will implement these using the FiniteStateAutomaton class that we developed in class as well as a FiniteStateTransducer class that builds on it.

Finite State Automata

The first set of utilities are used for determinization and epsilon closure.

from functools import reduce
from itertools import chain, combinations

from typing import Set, Tuple

def powerset(iterable):
    """https://docs.python.org/3/library/itertools.html#recipes"""
    s = list(iterable)
    return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))

def transitive_closure(edges: set[Tuple[str]]) -> set[Tuple[str]]:
    """
    the transitive closure of a graph

    Parameters
    ----------
    edges
        the graph to compute the closure of

    Returns
    ----------
    set(tuple)
    """
    while True:
        new_edges = {(x, w)
                     for x, y in edges
                     for q, w in edges
                     if q == y}

        all_edges = edges | new_edges

        if all_edges == edges:
            return edges

        edges = all_edges

transitive_closure in particular is useful in computing the epsilon closure (though epsilon closure requires quite a bit more code, as you can see below).

transitive_closure({(0, 1), (1, 2), (2, 3), (3, 4)})

And here’s the base FiniteStateAutomaton class, which relies on a TransitionFunction class to hide some of the operations on transition functions that can get nasty.

from logging import warn
from copy import copy, deepcopy
from functools import lru_cache
from typing import Literal

TransitionGraph = dict[tuple[str, str], str | set[str]]
FSAParse = tuple[tuple[str, str]]
FSAMode = Literal["recognize", "parse"]

class FiniteStateAutomaton:
    """A finite state automaton

    Parameters
    ----------
    alphabet
    states
    initial_state
    final states
    transition_graph
    """

    def __init__(self, alphabet: set[str], states: set[str], 
                 initial_state: str, final_states: set[str], 
                 transition_graph: TransitionGraph):
        self._alphabet = alphabet | {''}
        self._states = states
        self._initial_state = initial_state
        self._final_states = final_states
        self._transition_function = TransitionFunction(
            self._alphabet, 
            states,
            transition_graph
        )

        self._validate_initial_state()
        self._validate_final_states()

        self._generator = self._build_generator()

    def __contains__(self, string):
        return self._recognize(string)
        
    def __add__(self, other):
        return self.concatenate(other)

    def __or__(self, other):
        return self.union(other)

    def __and__(self, other):
        return self.intersect(other)

    def __pow__(self, k):
        return self.exponentiate(k)

    def __neg__(self):
        return self.complement()

    def __iter__(self):
        return self

    def __next__(self):
        return next(self._generator)

    def _build_generator(self):
        string_buffer = [(self._initial_state, '')]
        
        if self._initial_state in self._final_states:
            stack = ['']
        else:
            stack = []

        while string_buffer:
            if stack:
                yield stack.pop()

            else:
                # this is very inefficient when we have total
                # transition functions with many transitions to the
                # sink state; could be optimized by removing a string
                # if it's looping in a sink state, but we don't know
                # in general what the sink state is
                new_buffer = []
                for symb in self._alphabet:
                    for old_state, string in string_buffer:
                        new_states = self._transition_function(old_state, symb)
                        for st in new_states:
                            new_elem = (st, string+symb)
                            new_buffer.append(new_elem)

                stack += [string
                          for state, string in new_buffer
                          if state in self._final_states]

                string_buffer = new_buffer
    
    def __call__(self, string: str | list[str], mode: FSAMode = "recognize") -> bool | set[FSAParse]:
        """
        whether/how a string is accepted/parsed by the FSA

        Parameters
        ----------
        string
            the string to recognize or parse
        mode
            whether to run in "recognize" or "parse" mode
        """

        if mode == 'recognize':
            return self._recognize(string)
        elif mode == 'parse':
            return self._parse(string)
        else:
            msg = 'mode must be "recognize" or "parse"'
            raise ValueError(msg)

    def _add_epsilon_extensions(self, paths: set[tuple[str]]) -> set[tuple[str]]:
        paths_extended = {
            s1 + (s2,) 
            for s1 in paths 
            for s2 in self._transition_function(s1[-1], '')
        }
        
        break_counter = 0

        while not paths >= paths_extended:
            paths |= paths_extended
            paths_extended = {
                s1 + (s2,) 
                for s1 in paths
                for s2 in self._transition_function(s1[-1], '')
            }

            break_counter += 1

            if break_counter > 10:
                warn("epsilon path limit reached")
                break
            
        return paths

    @lru_cache(512)
    def _recognize(self, string: str | list[str], prev_state: str | None = None) -> bool:
        """Whether a string is accepted by the FSA

        Parameters
        ----------
        string
            the string to recognize or parse
        """
        paths = {(self._initial_state,)} if prev_state is None else {(prev_state,)}
        paths = self._add_epsilon_extensions(paths)

        if string:
            return any(
                self._recognize(string[1:], state)
                for p in paths
                for state in self._transition_function(p[-1], string[0])
            )

        else:
            return any(
                s[-1] in self._final_states for s in paths
            )

    @lru_cache(512)
    def _parse(self, string: str | list[str], 
               prev_state: str | None = None) -> set[FSAParse]:
        """How a string is parsed by the FSA
        
        This should return the list of transitions that 
        the machine could go through to parse a string

        Parameters
        ----------
        string
            the string to recognize or parse
        """
        paths = {(self._initial_state,)} if prev_state is None else {(prev_state,)}
        paths = self._add_epsilon_extensions(paths)

        if string:
            return {
                tuple((s,'') for s in p[1:]) + ((state, string[0]),) + parse
                for p in paths
                for state in self._transition_function(p[-1], string[0])
                for parse in self._parse(string[1:], state)
            }

        else:
            return {
                tuple((s,'') for s in p[1:])
                for p in paths
                if p[-1] in self._final_states
            }

    def _validate_initial_state(self):
        try:
            assert self._initial_state in self._states

        except AssertionError:
            raise ValueError('initial state must be in set of states')

    def _validate_final_states(self):
        try:
            assert all([s in self._states for s in self._final_states])

        except AssertionError:
            raise ValueError('final states must all be in set of states')

    def _deepcopy(self):
        fsa = copy(self)
        del fsa._generator

        return deepcopy(fsa)
        
    def _relabel_fsas(self, other):
        """
        append tag to the input/ouput states throughout two FSAs

        Parameters
        ----------
        other : FiniteStateAutomaton
        """

        fsa1 = self._deepcopy()._relabel_states(str(id(self)))
        fsa2 = other._deepcopy()._relabel_states(str(id(other)))

        return fsa1, fsa2

    def _relabel_states(self, tag: str) -> 'FiniteStateAutomaton':
        """
        append tag to the input/ouput states throughout the FSA

        Parameters
        ----------
        tag : str
        """

        state_map = {s: s+'_'+tag for s in self._states}
        
        self._states = {state_map[s] for s in self._states}    
        self._initial_state += '_'+tag
        self._final_states = {state_map[s] for s in self._final_states}

        self._transition_function.relabel_states(state_map)
        
        return self

    def concatenate(self, other: 'FiniteStateAutomaton'):
        """
        concatenate this FSA with another

        Parameters
        ----------
        other : FiniteStateAutomaton

        Returns
        -------
        FiniteStateAutomaton
        """
        msg = "you still need to implement FiniteStateAutomaton.concatenate"
        raise NotImplementedError(msg)

    def kleene_star(self)  -> 'FiniteStateAutomaton':
        """
        construct the kleene closure machine

        Returns
        -------
        FiniteStateAutomaton
        """
        fsa = self._deepcopy()
        
        new_transition = fsa._transition_function
        new_transition.add_transitions({(s, ''): fsa._initial_state
                                        for s in fsa._final_states})

        return FiniteStateAutomaton(fsa._alphabet, fsa._states,
                                    fsa._initial_state, fsa._final_states,
                                    new_transition.transition_graph)

    def exponentiate(self, k: int) -> 'FiniteStateAutomaton':
        """
        concatenate this FSA k times

        Parameters
        ----------
        k : int
            the number of times to repeat; must be >1

        Returns
        -------
        FiniteStateAutomaton
        """
        if k <= 1:
            raise ValueError("must be >1")

        new = self

        for i in range(1,k):
            new += self

        return new

    def union(self, other: 'FiniteStateAutomaton') -> 'FiniteStateAutomaton':
        """
        union this FSA with another

        Parameters
        ----------
        other : FiniteStateAutomaton

        Returns
        -------
        FiniteStateAutomaton
        """
        msg = "you still need to implement FiniteStateAutomaton.union"
        raise NotImplementedError(msg)

    def complement(self) -> 'FiniteStateAutomaton':
        """
        complement this FSA

        Returns
        -------
        FiniteStateAutomaton
        """
        fsa = self._deepcopy()
        fsa = fsa.determinize()
        fsa._transition_function.totalize()
        fsa._final_states = fsa._states - fsa._final_states

        return fsa

    def intersect(self, other: 'FiniteStateAutomaton') -> 'FiniteStateAutomaton':
        """
        intersect this FSA with another

        Parameters
        ----------
        other : FiniteStateAutomaton

        Returns
        -------
        FiniteStateAutomaton
        """
        fsa1 = self.complement()
        fsa2 = other.complement()

        return fsa1.union(fsa2).complement()
        
    def determinize(self) -> 'FiniteStateAutomaton':
        """
        if nondeterministic, determinize the FSA

        Returns
        -------
        FiniteStateAutomaton
        """
        new_states, new_initial_state, new_transition_graph = self._transition_function.determinize(self._initial_state)

        new_states = {'_'.join(sorted(s)) for s in new_states}
        new_initial_state = "_".join(sorted(new_initial_state))
        new_final_states = {
            '_'.join(sorted(s)) for s in powerset(self._states)
            if any([t in s for t in self._final_states])
            if s
        }
        new_transition_graph = {
            ("_".join(sorted(instates)), symb): {"_".join(sorted(o)) for o in outstates}
            for (instates, symb), outstates in new_transition_graph.items()
        }

        return FiniteStateAutomaton(
            self._alphabet-{''}, 
            new_states,
            new_initial_state, 
            new_final_states,
            new_transition_graph
        )

    @property
    def isdeterministic(self) -> bool:
        return self._transition_function.isdeterministic

class TransitionFunction(object):
    """
    A finite state machine transition function

    Parameters
    ----------
    transition_graph

    Attributes
    ----------
    isdeterministic : bool
    istotalfunction : bool
    transition_graph : dict

    Methods
    -------
    validate(alphabet, states)
    relable_states(state_map)
    totalize()
    """

    def __init__(self, alphabet: set[str], states: set[str], transition_graph: TransitionGraph):
        self._alphabet = alphabet
        self._states = states
        self._transition_graph = transition_graph

        self._validate()

    def __call__(self, state: str, symbol: str) -> set[str]:
        try:
            return self._transition_graph[(state, symbol)]
        except KeyError:
            return set({})

    def __or__(self, other: 'TransitionFunction') -> 'TransitionFunction':
        return TransitionFunction(
            dict(
                self._transition_graph, 
                **other._transition_graph
            )
        )

    def _add_epsilon_transitive_closure(self):
        # get the state graph of all epsilon transitions
        transitions = {
            (instate, outstate)
            for (instate, insymb), outs in self._transition_graph.items()
            for outstate in outs if not insymb
        }
        
        # compute the transitive closure of the epsilon transition
        # state graph; requires homogenization beforehand
        for instate, outstate in transitive_closure(transitions):
            self._transition_graph[(instate, '')] |= {outstate}

        new_graph = dict(self._transition_graph)

        # add alphabet transitions from all states q_i that exit
        # with an alphabet transition (q_i, a) and enter states q_j
        # with epsilon transitions to states q_k
        for (instate1, insymb1), outs1 in self._transition_graph.items():
            for (instate2, insymb2), outs2 in self._transition_graph.items():
                if instate2 in outs1 and not insymb2:
                    # vacuously adds the already added epsilon
                    # transitions as well
                    try:
                        new_graph[(instate1,insymb1)] |= outs2
                    except KeyError:
                        new_graph[(instate1,insymb1)] = outs2

        return new_graph

    def determinize(self, initial_state: str) -> tuple[set[frozenset[str]], frozenset[str], dict[frozenset[str],str], frozenset[str]]:
        # define the new states as the elements of the power set of the old 
        # states
        new_states = {frozenset(s) for s in powerset(self._states) if s}

        # add epsilon transitive closure
        epsilon_closed_graph = self._add_epsilon_transitive_closure()

        # the new initial states is the set containing the old initial set 
        # along with any state that could be reached by an epsilon transition
        # from the original initial state; because we have formed the epsilon
        # transitive closure, we only need to take one hope from the initial
        # state to get all such states 
        new_initial_state = frozenset(
            {initial_state} | epsilon_closed_graph[initial_state, '']
        )

        # filter epislon transitions
        filtered_transition_graph = {
            (instate, insymb): outstates
            for (instate, insymb), outstates in epsilon_closed_graph.items()
            if insymb
        }

        # construct the new transition graph
        new_transition_graph = {
            (frozenset(s), a): {frozenset(t)}
            for s in new_states
            for t in new_states
            for a in self._alphabet
            if s and t
            if any(
                (sprime, a) in filtered_transition_graph and 
                tprime in filtered_transition_graph[sprime, a]
                for sprime in s for tprime in t
            )
        }

        return new_states, new_initial_state, new_transition_graph
        
    def add_transitions(self, transition_graph):
        self._transition_graph.update(transition_graph)

    def _validate(self):
        self._validate_input_values()
        self._validate_output_types()
        self._homogenize_output_types()
        self._validate_output_values()

    def _validate_input_values(self):
        for state, symbol in self._transition_graph.keys():
            if symbol not in self._alphabet:
                msg = 'all input symbols in transition function ' +\
                      'must be in alphabet'
                raise ValueError(msg)

            if state not in self._states:
                msg = 'all input states in transition function ' +\
                      'must be in set of states'
                raise ValueError(msg)

    def _validate_output_types(self):
        for states in self._transition_graph.values():
            if type(states) is not str and type(states) is not set:
                msg = 'all outputs in transition function ' +\
                      'must be specified via str or set'
                raise ValueError(msg)            

    def _homogenize_output_types(self):
        for inp, out in self._transition_graph.items():
            if type(out) is str:
                self._transition_graph[inp] = {out}

    def _validate_output_values(self):
        for out in self._transition_graph.values():
            try:
                assert all([state in self._states for state in out])
            except AssertionError:
                msg = 'all output symbols in transition function' +\
                      'must be in states'
                raise ValueError(msg)

    @property
    def isdeterministic(self):
        return all(
            len(v) < 2 for v in self._transition_graph.values()
        )

    @property
    def istotalfunction(self):
        return all(
            (s, a) in self._transition_graph
            for s in self._states
            for a in self._alphabet
        )

    def relabel_states(self, state_map: dict[str, str]):
        """Append tag to the input/ouput states in the transition function

        Parameters
        ----------
        state_map
            A mapping from old states to new states
        """

        new_transition_graph = {}

        for (instate, insymb), outs in self._transition_graph.items():
            new_inp = (state_map[instate], insymb)
            new_outs = {state_map[o] for o in outs}
            new_transition_graph[new_inp] = new_outs

        self._transition_graph = new_transition_graph

    def totalize(self):
        if not self.istotalfunction:
            domain = {
                (s, a) 
                for s in self._states 
                for a in self._alphabet
            }

            sink_state = 'qsink'

            while sink_state in self._states:
                sink_state += 'sink'

            for inp in domain:
                if inp not in self._transition_graph:
                    self._transition_graph[inp] = {sink_state}

    @property
    def transition_graph(self):
        return self._transition_graph

An FSA can then be defined in a way that pretty closely matches the formal definition.

fsa1 = FiniteStateAutomaton(alphabet={"a", "b", ""},
                            states={"q0", "q1", "q2"},
                            initial_state="q0",
                            final_states={"q0", "q1"},
                            transition_graph={("q0", "a"): {"q0", "q1"},
                                              ("q0", ""): {"q1"},
                                              ("q1", ""): {"q2"},
                                              ("q1", "a"): {"q0"},
                                              ("q1", "b"): {"q0"}})

You won’t need to ever access the transition function directly, but note that it won’t necessarily be equivalent to the dictionary you passed, since the epsilon closure is computed when the TransitionFunction is initialized. (This means that the machine may only be weakly equivalent to the one defined, since computing the epsilon transitive closure may add edges that allow states along an epsilon-only path the be skipped.)

fsa1._transition_function._transition_graph

You also won’t need to determinize the FSA directly, but just to show you that this is also implemented:

fsa1_det = fsa1.determinize()

fsa1_det._initial_state
fsa1_det._transition_function._transition_graph

And as expected, our original FSA is nondeterministic, while the determinized version is deterministic.

fsa1.isdeterministic, fsa1_det.isdeterministic

You also won’t need to compute the complement directly—it is used as part of computing intersection—but I have implemented that operation as well.

fsa1_comp = fsa1.complement()

print(fsa1_det._states)
print(fsa1_comp._states)
print(fsa1_det._final_states)
print(fsa1_comp._final_states)

Notice that, because taking the complement requires a totalized transition function, we have exactly the deterministic machine from above, except with explicit sink states.

fsa1_comp._transition_function._transition_graph

We don’t need explicit determinization with the power set construction to have a deterministic machine. It’s also possible to define a DFA directly. This machine gets implicitly converted to the strongly equivalent NFA.

fsa2 = FiniteStateAutomaton(alphabet={"c", "d"},
                            states={"q0"},
                            initial_state="q0",
                            final_states={"q0"},
                            transition_graph={("q0", "c"): "q0",
                                              ("q0", "d"): "q0"})

fsa2.isdeterministic
fsa2._transition_function._transition_graph

Important for our purposes, we can compute strings that are in the language generated by the FSA by simply iterating through the FiniteStateAutomaton object. This behavior is implemented in FiniteStateAutomaton.__iter__ and FiniteStateAutomaton.__next__, which rely heavily on FiniteStateAutomaton._build_generator.

for i, string in enumerate(fsa1):
    if i < 50:
        print(string)
    else:
        break

Note that we end up with some repeated elements. This is because different paths through an FSA can result in the same string—which, of course, is why there can be multiple parses associated with a string. We could make it so that elements are not repeated, but I have not done this for the sake of clarity (for Task 1).

Compare the determinized machine.

for i, string in enumerate(fsa1_det):
    if i < 50:
        print(string)
    else:
        break

Also for the sake of clarity, I have implemented iteration in a simplistic way that will often result in the iteration hanging for long periods of time (or infinitely) when there are many sink states and/or when the machine computes the empty set. This could be fixed, but it would require a bit of extra code that would be harder to understand, and so I have retained the simpler implementation.

# this will hang
for i, string in enumerate(fsa1_comp):
    if i < 50:
        print(string)
    else:
        break

And just to show that our DFA works as expected:

for i, string in enumerate(fsa2):
    if i < 50:
        print(string)
    else:
        break

Finally (and probably most importantly), I have implemented the recognition and parsing algorithms for you. The FiniteStateAutomaton.__call__ method implements an interface to both the recognizer and the parser. To use the recognizer, set mode="recognize". As you would expect, the output will be a boolean.

fsa1("ab", mode="recognize"), fsa1("ac", mode="recognize")

To use the parser, set mode="parse". If the string is recognized, a set of parses will be output.

fsa1("ab", mode="parse")

If the string is not recognized, an empty set will be returned.

fsa1("ac", mode="parse")

Even though I have already implemented these algorithms for you, you should study their implementation in detail: in Task 4, you will be implementing the analogous algorithms for FSTs, and a good chunk of the logic in my implementation can be reused there.

Task 1

Implement FiniteStateAutomaton.union and FiniteStateAutomaton.concatenate.

Test your implementation.

Task 2

Define an FSA that can generate English noun phrases built from the following vocabulary.

\[\Sigma = \{\text{the}, \text{that}, \text{those}, \text{lazy}, \text{greyhound}, \text{greyhounds}, \text{human}, \text{humans}, \text{love}, \text{loves}\}\]

You may find it useful to define a few different machines and then union or concatenate them.

Your FSA should recognize not only simple noun phrases—such as greyhounds, the greyhound, that human, those lazy greyhounds, and the humans —but also noun phrases with relative clauses—such as the greyhound that the lazy human loves and the humans that love the lazy greyhound. Make sure, however, that your machine cannot generate any string that is not an noun phrase of English. For instance, your machine should not generate noun phrases with agreement errors in the relative clause—such as the humans that loves the greyhound.

To make the parses produced by a parser for your machine interpretable—i.e. the sequence of states the FSA goes through to recognize the string—use state labels that track at least part of speech information—e.g. that the is a determiner; that is some contexts that is a determiner, while in others it is a complementizer. You may want to track additional information in the states to handle agreement correctly.

You will not be able to build an FSA that can generate the infinite possible English noun phrases that can be built using this vocabulary. We will discuss the reasons for this when we cover context free grammars. But you should strive to capture as many noun phrases that can be built from this vocabulary as possible—including, insofar as it is possible, noun phrases of the form the humans that love the lazy greyhound that loves the humans that… with an unbounded number of relative clauses (making sure that you get the agreement correct).

Test this FSA by attempting to parse and recognize five possible noun phrases, which should be recognizable and have at least one parse, and five possible noun phrases, which should not be recognizable and should have no parses.

What kind of noun phrase cannot be generated by an FSA at all? Why?

Task 3

Define an FSA that can generate English sentences built from the vocabulary from Task 2. You are encouraged to use the noun phrase machine you developed in that task to generate the noun phrases within these sentences. As in that task, you may find it useful to define a few different machines (in addition to the noun phrase machine) and then union and/or concatenate them.

Your FSA should recognize not only simple transitive sentences—such as the greyhound loves the humans —but also sentences that involve clause-embedding—such as the greyhound loves that the humans love the greyhound. As before, make sure you are correctly handling agreement; and insofar as it is possible, make sure that you can recognize sentences of unbounded size—such as the humans love that the greyhound loves that the humans love that….

Test this FSA by attempting to parse and recognize five possible sentences, which should be recognizable and have at least one parse, and five possible sentences, which should not be recognizable and should have no parses.

Finite State Transducers

FSTMode = Literal["recognize", "parse", "transduce"]
FSTParse = tuple[tuple[str, str, str]]

TransductionGraph = dict[tuple[str, str, str], str]

class FiniteStateTransducer(FiniteStateAutomaton):

    """A finite state transducer

    Parameters
    ----------
    alphabet1
    alphabet2
    states
    initial_state
    final states
    transition_graph
    transduction_graph
    """

    def __init__(self, alphabet1: set[str], alphabet2: set[str], states: set[str], 
                 initial_state: str, final_states: set[str],
                 transition_graph: TransitionGraph, 
                 transduction_graph: TransductionGraph):
        self._transduction_function = TransductionFunction(transduction_graph)
        self._alphabet2 = alphabet2        
        self._transduction_function.validate(alphabet1, alphabet2, states)

        super().__init__(alphabet1,
                         states,
                         initial_state,
                         final_states,
                         transition_graph)
        
    def __next__(self):
        msg = "we do not need this for the current assignment"
        raise NotImplementedError(msg)
        
    def __call__(self, string1: str | list[str], 
                 string2: str | list[str] | None = None, 
                 mode: FSTMode = "recognize") -> bool | set[FSTParse] | set[str] | set[list[str]]:
        """
        whether/how/what a string is accepted/parsed/transduced to by the FST

        Parameters
        ----------
        string1
        string2
            only necessary if mode is "recognize" or "parse"
        mode
            whether to run in "recognize", "parse", or "transduce" mode
        """

        if mode == 'recognize':
            return self._recognize(string1, string2)
        elif mode == 'parse':
            return self._parse(string1, string2)
        elif mode == "transduce":
            return self._transduce(string1)
        else:
            msg = 'mode must be "recognize", "parse", or "transduce"'
            raise ValueError(msg)

    @lru_cache(65536)
    def _recognize(self, string1: str | list[str], string2: str | list[str], 
                   state: str | None = None) -> bool:
        """
        whether a pair of string is accepted by the FST

        Parameters
        ----------
        string1 : str
        string2 : str
        state : str | NoneType

        Returns
        -------
        bool
        """
        msg = "you still need to implement FiniteStateTransducer._recognize"
        raise NotImplementedError(msg)

    @lru_cache(65536)
    def _parse(self, string1: str | list[str], string2: str | list[str], 
               prev_state: str | None = None) -> set[FSTParse]:
        """
        how a pair of strings is parsed by the FST

        Parameters
        ----------
        string1 : str
        state : str | NoneType
        previous_states : list(str)

        Returns
        -------
        list(list(str))
        """
        msg = "you still need to implement FiniteStateTransducer._parse"
        raise NotImplementedError(msg)

    @lru_cache(65536)
    def _transduce(self, string: str | list[str], state: str | None = None, 
                   new_string: str='') -> set[str] | set[list[str]]:
        """
        the strings transduced from the input by the FST

        Parameters
        ----------
        string
        state
        new_string
        """
        msg = "you still need to implement FiniteStateTransducer._transduce"
        raise NotImplementedError(msg)

    def _relabel_states(self, tag: str):
        """
        append tag to the input/ouput states throughout the FSA

        Parameters
        ----------
        tag
        """

        super()._relabel_states(tag)
        
        state_map = {s: s+'_'+tag for s in self._states}
        self._transduction_function.relabel_states(state_map)

        return self

    def concatenate(self, other):
        msg = "we do not need this for the current assignment"
        raise NotImplementedError(msg)
    
    def exponentiate(self, k):        
        msg = "we do not need this for the current assignment"
        raise NotImplementedError(msg)
    
    def intersect(self, other):
        msg = "we do not need this for the current assignment"
        raise NotImplementedError(msg)
    
    def complement(self):
        msg = "we do not need this for the current assignment"
        raise NotImplementedError(msg)
    
    def union(other, self):
        msg = "we do not need this for the current assignment"
        raise NotImplementedError(msg)

    def determinize(self):
        msg = "we do not need this for the current assignment"
        raise NotImplementedError(msg)

class TransductionFunction(object):
    """
    A finite state transduction function

    Parameters
    ----------
    transduction_graph
    """

    def __init__(self, transduction_graph: TransductionGraph):
        self._transduction_graph = transduction_graph

    def __call__(self, state1, state2, symbol):
        try:
            return self._transduction_graph[(state1, state2, symbol)]
        except KeyError:
            return set({})

    def add_transductions(self, transduction_graph: TransductionGraph):
        self._transduction_graph.update(transduction_graph)

    def validate(self, alphabet1, alphabet2, states):
        self._validate_input_values(alphabet1, states)
        self._validate_output_values(alphabet2)

    def _validate_input_values(self, alphabet, states):
        for state1, state2, symbol in self._transduction_graph.keys():
            try:
                assert symbol in alphabet

            except AssertionError:
                msg = 'all input symbols in transduction function ' +\
                      'must be in alphabet1'
                raise ValueError(msg)

            try:
                assert state1 in states and state2 in states

            except AssertionError:
                msg = 'all input states in transduction function ' +\
                      'must be in set of states'
                raise ValueError(msg)

            try:
                assert symbol != ''

            except AssertionError:
                msg = 'epsilon transduction not currently supported'
                raise ValueError(msg)

                
        self._istotalfunction = all([(s1, s2, a) in self._transduction_graph
                                     for s1 in states
                                     for s2 in states
                                     for a in alphabet])

    def _validate_output_values(self, alphabet):
        for symb in self._transduction_graph.values():
            try:
                assert symb in alphabet
            except AssertionError:
                msg = 'all output symbols in transduction function' +\
                      'must be in alphabet2'
                raise ValueError(msg)

    def relabel_states(self, state_map):
        """
        append tag to the input/ouput states in the transduction function

        Parameters
        ----------
        tag : str
        """

        new_transduction_graph = {}

        for (instate, outstate, insymb), outs in self._transduction_graph.items():
            new_inp = (state_map[instate], state_map[outstate], insymb)
            new_transduction_graph[new_inp] = outs

        self._transduction_graph = new_transduction_graph

    @property
    def transduction_graph(self):
        return self._transduction_graph
fst = FiniteStateTransducer(alphabet1={"a", "b"},
                            alphabet2={"c", "d"},
                            states={"q0", "q1"},
                            initial_state="q0",
                            final_states={"q0", "q1"},
                            transition_graph={("q0", "a"): {"q0", "q1"},
                                              ("q0", "b"): {"q0"},
                                              ("q1", "a"): {"q0"},
                                              ("q1", "b"): {"q0"}},
                            transduction_graph={("q0", "q0", "a"): "d",
                                                ("q0", "q0", "b"): "d",
                                                ("q0", "q1", "a"): "c",
                                                ("q0", "q1", "b"): "d",
                                                ("q1", "q0", "a"): "c",
                                                ("q1", "q0", "b"): "d"})

Task 4

Implement FiniteStateTransducer._recognize, FiniteStateTransducer._parse, and FiniteStateTransducer._transduce. If you wish, you may alter the typing of the transduction_graph so that it outputs sets of characters in the output alphabet rather the single character.

Task 5

For this task, we’ll be working with the MegaAcceptability dataset, which you can read about in this paper.

!pip install pandas
import pandas as pd

mega_acceptability = pd.read_csv('http://megaattitude.io/projects/mega-acceptability/mega-acceptability-v1/mega-acceptability-v1-normalized.tsv', sep='\t')

mega_acceptability

We will specifically be interested in the frames…

frames = set(mega_acceptability.frame.unique())

frames

…and the sentences.

sentences = {frame: {s.lower() for s in mega_acceptability.query(f'frame=="{frame}"').sentence.unique()} 
             for frame in frames}

sentences['NP Ved NP']

Though for this task, you will find having access to the verb forms useful.

verbs = {'root': set(mega_acceptability.verb.unique()),
         'past': set(mega_acceptability.query('frame=="NP Ved"').verbform.unique()),
         'past_participle': set(mega_acceptability.query('frame=="NP was Ved"').verbform.unique())}

verbs['past']

Construct a finite state transducer whose input language is the set of frames and whose output language is the set of sentences. The transducer should map a frame—e.g. NP Ved NP—to all sentences that instantiate that frame—e.g. someone liked something, someone thought something, etc.

Test whether your FST recognizes all of the sentences in MegaAcceptability.

Test whether your transduction works correctly by testing whether you output all of the sentences corresponding to a particular frame.

MegaAcceptability contains many items that are unacceptable, denoted by having a more negative responsenorm.

mega_acceptability.sort_values('responsenorm')

Suppose that, given some frame in the input language, we wanted our FST to output only sentences that are instances of that frame whose responsenorm is above a particular threshold—i.e. generate only acceptable instances of a frame. You do not need to do it, but describe how you might go about it.