Define SelectionData
from dataclasses import dataclass
from numpy import ndarray
@dataclass
class SelectionData:
verb: ndarray
frame: ndarray
subj: ndarray resp: ndarray
Our aim in this section will be to evaluate different assumptions about constraints on the relationship between syntactic and semantic types.
from dataclasses import dataclass
from numpy import ndarray
@dataclass
class SelectionData:
verb: ndarray
frame: ndarray
subj: ndarray resp: ndarray
@dataclass
class SelectionModelParametersABC:
int
n_verb: int
n_frame: int
n_subj: int
n_resp_levels:
@dataclass
class SelectionModelParameters(SelectionModelParametersABC):
int n_component:
import torch
from torch import Tensor
torch.set_default_tensor_type(torch.DoubleTensor)
def ordered_logistic_likelihood(value: Tensor, jumps: Tensor, center: bool = True) -> Tensor:
"""Compute the ordered logistic likelihood given a value
Parameters
----------
value
The value to compute the likelihood for
(shape: batch_size)
jumps
The distance between cutpoints
(shape: batch_size x number of response levels - 1)
Returns
-------
log_likelihood
the ordered logistic log-likelihood
"""
= torch.cumsum(jumps, axis=1)
cutpoints
if center:
= cutpoints - cutpoints.mean(1)[:,None]
cutpoints
= torch.sigmoid(cutpoints - value[:,None])
cdfs
= torch.cat(
low_prob 0], 1]), cdfs],
[torch.zeros([cdfs.shape[=1
axis
)= torch.cat(
high_prob 0], 1])],
[cdfs, torch.ones([cdfs.shape[=1
axis
)
return high_prob - low_prob
from typing import Optional
from torch import manual_seed
from torch import randn
from torch.nn import Module
= 1e-5
ZERO = 1. - ZERO
ONE
class SelectionModel(Module):
= SelectionModelParameters
parameter_class = SelectionData
data_class
def __init__(self, parameters: SelectionModelParameters):
super().__init__()
self.model_parameters = parameters
# prior parameters
self.verb_component_prior_mean_aux = torch.nn.Parameter(
randn(parameters.n_component), =True
requires_grad
)self.frame_component_prior_mean_aux = torch.nn.Parameter(
randn(parameters.n_component), =True
requires_grad
)
self.verb_component_prior_precision_aux = torch.nn.Parameter(
randn(parameters.n_component), =True
requires_grad
)self.frame_component_prior_precision_aux = torch.nn.Parameter(
randn(parameters.n_component), =True
requires_grad
)
# latent matrices
self.verb_component_aux = torch.nn.Parameter(
randn([
parameters.n_verb, parameters.n_component
]), =True
requires_grad
)self.frame_component_aux = torch.nn.Parameter(
randn([
parameters.n_frame, parameters.n_component
]), =True
requires_grad
)
# likelihood parameters
self.log_jumps = torch.nn.Parameter(
torch.ones([-1
parameters.n_subj, parameters.n_resp_levels
]), =True
requires_grad
)
def forward(self, data: SelectionData):
# compute the verb frame probabilities
= self.verb_frame_prob(
verb_frame_prob
data.verb, data.frame,=True
clamp
)
# apply a logit to those probabilities
= torch.log(verb_frame_prob) -\
verb_frame_logodds 1. - verb_frame_prob)
torch.log(
# compute the jumps for each subject
= self.jumps[data.subj]
jumps
# return the ordered logistic probabilities
return ordered_logistic_likelihood(
verb_frame_logodds, jumps
)
def verb_frame_prob(
self,
= None,
verb_idx: Optional[ndarray] = None,
frame_idx: Optional[ndarray] bool = False
clamp: -> Tensor:
) if verb_idx is not None or frame_idx is not None:
= 1. - torch.prod(
acc 1. - self.verb_component_prob[verb_idx,:] *
self.frame_component_prob[frame_idx,:],
=1
axis
).clamp(ZERO, ONE)elif verb_idx is None and frame_idx is not None:
= 1. - torch.prod(
acc 1. - self.verb_component_prob[:,None,:] *
self.frame_component_prob[:,frame_idx,:],
=2
axis
).clamp(ZERO, ONE)elif verb_idx is not None and frame_idx is None:
= 1. - torch.prod(
acc 1. - self.verb_component_prob[verb_idx,None,:] *
self.frame_component_prob[None,:,:],
=2
axis
).clamp(ZERO, ONE)else:
= 1. - torch.prod(
acc 1. - self.verb_component_prob[:,None,:] *
self.frame_component_prob[None,:,:],
=2
axis
)
if clamp:
return acc.clamp(ZERO, ONE)
else:
return acc
@property
def verb_component_prob(self) -> Tensor:
return torch.sigmoid(self.verb_component_aux)
@property
def frame_component_prob(self) -> Tensor:
return torch.sigmoid(self.frame_component_aux)
@property
def jumps(self):
return torch.exp(self.log_jumps)
@property
def verb_component_prior_mean(self):
return torch.sigmoid(self.verb_component_prior_mean_aux)
@property
def frame_component_prior_mean(self):
return torch.sigmoid(self.frame_component_prior_mean_aux)
@property
def verb_component_prior_precision(self):
return torch.exp(self.verb_component_prior_precision_aux)
@property
def frame_component_prior_precision(self):
return torch.exp(self.frame_component_prior_precision_aux)
from pandas import CategoricalDtype, Series, DataFrame
def hash_series(series: Series, categories: Optional[list[str]] = None, indexation: int=1) -> tuple[ndarray, ndarray]:
"""Hash a series to numeric codes
Parameters
----------
column
The series to hash
index
The starting index (defaults to 1)
"""
# enforce 0- or 1-indexation
if indexation not in [0, 1]:
raise ValueError("Must choose either 0- or 1-indexation.")
# convert the series to a category
if categories is None:
= series.astype("category")
category_series else:
= CategoricalDtype(categories=categories)
cat_type = series.astype(cat_type)
category_series
# get the hash
= category_series.cat.categories.values
hash_map
# map to one-indexed codes
= (category_series.cat.codes + indexation).values
hashed_series
return hash_map, hashed_series
torch
As mentioned in the last section, we’ll estimate the parameters contained in our SelectionModel
, StructuredSelectionModel
, and ConstrainedStructuredSelectionModel
Module
s using a trainer class. This class will do a few things for us.
First, it will format the model hyperparameters and data for consumption by the Module
. Across all of our models, the format of the data (SelectionData
) will remain the same, though the shape of the objects containing the hyperparameters will differ.
Second, It will compute the (unnormalized) posterior given the current setting of the model parameters contained in the Module
. The posterior is unnormalized in the sense that we won’t bother with computing the evidence, since the evidence is not relevant for computing \(\arg_{\boldsymbol\theta}\max p(\boldsymbol\theta \mid \mathbf{x})\), since the evidence \(p(\mathbf{x})\) is a constant relative to \(\boldsymbol\theta\):
\[\arg_{\boldsymbol\theta}\max p(\boldsymbol\theta \mid \mathbf{x}) = \arg_{\boldsymbol\theta}\max \frac{p(\mathbf{x} \mid \boldsymbol\theta)p(\boldsymbol\theta)}{p(\mathbf{x})} = \arg_{\boldsymbol\theta}\max p(\mathbf{x} \mid \boldsymbol\theta)p(\boldsymbol\theta)\]
Finally and most importantly, it will actually conduct gradient descent by computing the gradient \(\nabla p(\boldsymbol\theta \mid \mathbf{x})\) and then taking a step in the direction of the gradient.
All of the trainrs for our Module
s will use an abstract base class SelectionModelTrainerABC
. Similar to our wrappers for CmdStanModel
s from Modules 1 and 2, the trainers will (partially) conform to the sklearn
API by implementing a fit method that returns the trainer itself.
from abc import ABC, abstractmethod
from numpy import arange, round, sum, abs, mean, inf, random, corrcoef, concatenate
class SelectionModelTrainerABC(ABC):
= SelectionData
data_class
@abstractmethod
def construct_model_parameters(self, data: DataFrame) -> SelectionModelParameters:
raise NotImplementedError
def construct_model_data(self, data: DataFrame) -> SelectionData:
= self._construct_hashes(data)
frame_hashed, verb_hashed, subj_hashed
= {
model_data "verb": verb_hashed,
"frame": frame_hashed,
"subj": subj_hashed,
"resp": data.response.astype(int).values - 1
}
return self.data_class(**model_data)
def _construct_hashes(self, data: DataFrame):
if hasattr(self, "frame_hash_map"):
= hash_series(data.frame, self.frame_hash_map, indexation=0)
_, frame_hashed else:
self.frame_hash_map, frame_hashed = hash_series(data.frame, indexation=0)
if hasattr(self, "verb_hash_map"):
= hash_series(data.verb, self.verb_hash_map, indexation=0)
_, verb_hashed else:
self.verb_hash_map, verb_hashed = hash_series(data.verb, indexation=0)
if hasattr(self, "subj_hash_map"):
= hash_series(data.participant, self.subj_hash_map, indexation=0)
_, subj_hashed else:
self.subj_hash_map, subj_hashed = hash_series(data.participant, indexation=0)
return frame_hashed, verb_hashed, subj_hashed
def _initialize_model(self, data: DataFrame):
= self.construct_model_parameters(data)
model_parameters
return self.model_class(model_parameters)
def _construct_splits(self, data: DataFrame) -> tuple[DataFrame]:
= data.verb.unique()
verbs = data.frame.unique()
frames
= [v + "_" + f for v in verbs for f in frames]
verb_frame_pairs
= int(len(verb_frame_pairs) / 10)
n_dev
= random.choice(verb_frame_pairs, n_dev, replace=False)
verb_frame_pairs_dev
= (data.verb + "_" + data.frame).isin(verb_frame_pairs_dev)
dev_indicator
= data[~dev_indicator]
data_train = data[dev_indicator]
data_dev
return data_train, data_dev
def fit(
self, data: DataFrame, batch_size=1_000, max_epochs:int=10_000,
float = 1e-5, patience: int = 100,
lr: float = 1e-5, tolerance_heldout: float = 0.05,
tolerance_loss: int = 100, verbosity: int=100, seed: int = 403928
window_size: -> SelectionModelTrainerABC:
)
manual_seed(seed)
random.seed(seed)
# necessary for initializing hashes that contain all verbs, frames,
# and subjects
self._construct_hashes(data)
= self._construct_splits(data)
data_train, data_dev self.model = self._initialize_model(data_train)
# wrap the dev split responses in a tensor
# this tensor will be used to compute the correlation between
# the models expected value for a response and the actual
# response
= torch.tensor(data_dev.response.values)
target_dev
# initialize the optimizer
= torch.optim.Adam(self.model.parameters(), lr=lr)
optimizer
# initialize the loss trace
self.loss_trace = [inf]
# initialize the dev-train correlations trace
self.corr_train_trace = []
self.corr_dev_trace = []
# initialize the loss differences trace
= []
loss_diffs_trace
# initialize the dev-train correlation differences trace
= []
corr_diffs_trace
for e in range(max_epochs):
# shuffle the training data
= data_train.sample(frac=1.)
data_shuffled = data_shuffled.reset_index(drop=True)
data_shuffled
# compute the number of batches based on the batch size
= int(data_shuffled.shape[0]/batch_size)
n_batches
# zero the total loss for the epoch
= 0.
epoch_total_loss
# initialize the list of correlations
= []
correlations_train
for i in range(n_batches):
# construct the minibatch
= i*batch_size
lower_bound
if i == (n_batches - 1):
= data_shuffled.shape[0]
upper_bound else:
= (i+1)*batch_size
upper_bound
= self.construct_model_data(
data_sub
data_shuffled.iloc[lower_bound:upper_bound]
)
# wrap the responses in a tensor
= torch.tensor(data_sub.resp)
target
# zero out the gradients
optimizer.zero_grad()
# compute the (log-)probabilities for the minibatch
= self.model(data_sub)
probs = torch.log(probs)
logprobs
# compute the loss
= self.loss_function(logprobs, target)
loss += self._prior_loss()
loss
# compute gradients
loss.backward()
# take a step in the direction of the gradient
optimizer.step()
# compute correlation between expected value and target
= sum(
expected_value_train 1, probs.shape[1]+1)[None,:] *
arange(
probs.detach().numpy(), =1
axis
)= corrcoef(
corr_train
concatenate([None,:],
expected_value_train[None,:]
data_sub.resp[=0)
], axis
)
correlations_train.append(0,1]
corr_train[
)
# compute total loss
+= loss.detach().numpy()
epoch_total_loss
# compute the difference between the current and previous loss
= self.loss_trace[-1] - epoch_total_loss
loss_diff = loss_diff/abs(self.loss_trace[-1]) if e else inf
loss_diff_scaled
loss_diffs_trace.append(
loss_diff_scaled
)
# save epoch mean loss
self.loss_trace.append(epoch_total_loss)
# compute correlation between exp. value and ratings
# in the dev set
= self.expected_value(data_dev)
expected_value_dev = corrcoef(
corr_dev
concatenate([None,:],
expected_value_dev[None,:]
data_dev.response.values[=0)
], axis0,1]
)[self.corr_dev_trace.append(
corr_dev
)
# compute and save the mean correlation in the train set
= mean(correlations_train)
correlations_train_mean self.corr_train_trace.append(correlations_train_mean)
# compute the difference between the mean train correlation
# and the dev correlation
corr_diffs_trace.append(- corr_dev
correlations_train_mean
)
if verbosity and not e % verbosity:
print(f"Epoch: {e}")
print(f"Loss: {round(epoch_total_loss)}")
print(f"loss improvement: {round(loss_diff_scaled, 7)}")
print(f"Mean train corr.: {round(correlations_train_mean, 2)}")
print(f"Dev corr.: {round(corr_dev, 2)}")
print()
= min(len(corr_diffs_trace), window_size)
max_window_size = torch.mean(
mean_diff -max_window_size:])
torch.tensor(corr_diffs_trace[
)
if e > patience and (mean_diff > tolerance_heldout or loss_diff_scaled < tolerance_loss):
if verbosity:
print(f"Epoch: {e}")
print(f"Loss: {round(epoch_total_loss)}")
print(f"loss improvement: {round(loss_diff_scaled, 7)}")
print(f"Mean train corr.: {round(correlations_train_mean, 2)}")
print(f"Dev corr.: {round(corr_dev, 2)}")
print()
break
else:
= corr_dev
prev_corr_dev
return self
@abstractmethod
def _prior_loss(self):
raise NotImplementedError
def expected_value(self, data: DataFrame):
= self.construct_model_data(data)
model_data = self.model(model_data).detach().numpy()
probs
= sum(
expected_value 1, 8)[None,:] * probs,
arange(=1
axis
)
return expected_value
def likelihood(self, data: DataFrame):
= self.construct_model_data(data)
model_data = self.model(model_data)
probs
return probs[model_data.resp]
def predict(self, data: DataFrame):
= self.construct_model_data(data)
model_data = self.model(model_data)
probs
return probs[model_data.resp]
SelectionModel
To implement a trainer (SelectionModelTrainer
) for our basic SelectionModel
, we need to specify three things: (i) how to initialize the SelectionModelTrainer
; (ii) how to construct the model parameters; and (iii) how to compute the prior.
from torch.distributions.gamma import Gamma
from torch.distributions.exponential import Exponential
class SelectionModelTrainer(SelectionModelTrainerABC):
= SelectionModelParameters
parameter_class = SelectionModel
model_class
def __init__(self, n_component: int):
self.n_component = n_component
self.loss_function = torch.nn.CrossEntropyLoss(reduction="sum")
def construct_model_parameters(self, data: DataFrame) -> SelectionModelParametersABC:
= {
model_parameters "n_verb": self.verb_hash_map.shape[0],
"n_frame": self.frame_hash_map.shape[0],
"n_subj": self.subj_hash_map.shape[0],
"n_resp_levels": 7,
"n_component": self.n_component,
}
return self.parameter_class(**model_parameters)
def _prior_loss(self):
= self.model.jumps
jumps = (jumps / jumps.mean(0)[None,:]).mean()
loss
return loss
import os
from pandas import read_csv
def load_data(fname: str, verbose: bool = True) -> DataFrame:
# read the raw data skipping comment rows at the beginning
= read_csv(fname, sep="\t")
data
if verbose:
= data.shape[0]
n_datapoints print(f"The full dataset has {n_datapoints} datapoints.")
# remove non-native speakers
if "nativeenglish" in data.columns:
= data.query("nativeenglish")
data
if verbose:
= data.shape[0]
n_datapoints_native print(f"Removing {n_datapoints - n_datapoints_native} "
"responses from nonnative speakers.")
# remove NaN judgments
= data.query("~response.isnull()")
data
if verbose:
= data.shape[0]
n_datapoints_nonnull print(f"Removing {n_datapoints_native - n_datapoints_nonnull} NA responses.")
return data
= "./data/mega-acceptability-v1/"
data_dir = load_data(os.path.join(data_dir, "mega-acceptability-v1.tsv")) data
The full dataset has 250000 datapoints.
Removing 600 responses from nonnative speakers.
Removing 10 NA responses.
= SelectionModelTrainer(39)
selection_model selection_model.fit(data)
To get a sense for how well this model fits the data, we can compare the expected response it assigns to each datapoint to the actual response.
from scipy.stats import spearmanr
from seaborn import boxplot
"expected_value"] = selection_model.expected_value(data).data
data[
= boxplot(data, x="response", y="expected_value")
_
"response", "expected_value"]]) spearmanr(data[[
SignificanceResult(statistic=0.5189593508120506, pvalue=0.0)
We see these expected values have a correlation relatively near the level that White and Rawlins (2020) report that participants agree with each other (mean Spearman \(\rho=0.55\)).
= DataFrame(
frame_component_prob
selection_model.model.frame_component_prob.data,=selection_model.frame_hash_map
index )
from seaborn import clustermap, color_palette
clustermap(
frame_component_prob, =color_palette("light:b", as_cmap=True),
cmap=True,
yticklabels=False
col_cluster )
= DataFrame(
verb_component_prob
selection_model.model.verb_component_prob.data,=selection_model.verb_hash_map
index )
= clustermap(
_
verb_component_prob, =color_palette("light:b", as_cmap=True),
cmap=False,
yticklabels=False,
col_cluster )
/opt/conda/lib/python3.10/site-packages/seaborn/matrix.py:560: UserWarning: Clustering large matrix with scipy. Installing `fastcluster` may give better performance.
warnings.warn(msg)
To evaluate this model’s ability to capture temporal orientation, we’ll ask how well we can predict the judgments in MegaOrientation using the probabilities contained in \(\boldsymbol\Lambda\) and \(\boldsymbol\Pi\) as predictors in a mixed effects model. We’ll use the implementation we developed in Module 1.
import patsy
from numpy import array
from cmdstanpy import CmdStanModel
from arviz import InferenceData
@dataclass
class MixedEffectsData:
int # number of responses
N_resp: int # number of subjects
N_subj: int # number of possible likert scale acceptability judgment responses
N_resp_levels: # subject who gave response n
subj: ndarray # likert scale acceptability judgment responses
resp: ndarray int # number of fixed predictors
N_fixed: # predictors (length and dependency type) including intercept
fixed_predictors: ndarray int # number of items
N_item: int # number of random by-item predictors
N_by_item: # by-item predictors (length and dependency type) including intercept
by_item_predictors: ndarray int # number of random by-subject predictors
N_by_subj: # by-subject predictors (length and dependency type) including intercept
by_subj_predictors: ndarray # item corresponding to response n
item: ndarray
class MixedEffectsModel:
= "../island-effects/models/mixed-effects-model/mixed-effects-model.stan"
stan_file = MixedEffectsData
data_class
def __init__(
self, fixed_formula: str, by_subj_formula: str, by_item_formula: str,
):self.model = CmdStanModel(stan_file=self.stan_file)
self.fixed_formula = fixed_formula
self.by_subj_formula = by_subj_formula
self.by_item_formula = by_item_formula
def _validate_data(self):
self.data_class(**self.model_data)
def construct_model_data(self, data: DataFrame):
self.fixed_predictors = patsy.dmatrix(
self.fixed_formula, data, return_type="dataframe"
)
self.by_subj_predictors = patsy.dmatrix(
self.by_subj_formula, data, return_type="dataframe"
)
self.by_item_predictors = patsy.dmatrix(
self.by_item_formula, data, return_type="dataframe"
)
self.subj_hash_map, subj_hashed = hash_series(data.participant)
self.item_hash_map, item_hashed = hash_series(data.sentence)
= {
model_data "N_resp": data.shape[0],
"N_subj": self.subj_hash_map.shape[0],
"N_resp_levels": 7,
"subj": subj_hashed,
"resp": data.response.astype(int).values,
"N_fixed": self.fixed_predictors.shape[1],
"fixed_predictors": self.fixed_predictors.values,
"N_by_subj": self.by_subj_predictors.shape[1],
"by_subj_predictors": self.by_subj_predictors.values,
"N_item": self.item_hash_map.shape[0],
"N_by_item": self.by_item_predictors.shape[1],
"by_item_predictors": self.by_item_predictors.values,
"item": item_hashed
}
return model_data
def fit(
self,
data: DataFrame,str] = None,
save_dir: Optional[bool = False,
verbose: bool = True,
map_initialization: int = 50493,
seed: **kwargs
-> InferenceData:
) if verbose:
print("Constructing model data...")
self.model_data = self.construct_model_data(data)
self._validate_data()
if map_initialization:
if verbose:
print("Fitting model with MAP initialization...")
= self._compute_map_estimate(seed)
map_estimate
if "inits" in kwargs:
# inits passed to fit() should override MAP
"inits"])
map_estimate.update(kwargs[
"inits"] = map_estimate
kwargs[
elif verbose:
print("Fitting model...")
# sample from the posterior starting at the MAP
self.raw_model_fit = self.model.sample(
=self.model_data,
data**kwargs
)
if save_dir is not None:
if verbose:
print("Saving model...")
self.save(save_dir)
if verbose:
print("Running MCMC diagnostics...")
print()
print(self.diagnose())
return self
def _compute_map_estimate(self, seed:int):
# compute MAP fit
self.map_model_fit = self.model.optimize(
=self.model_data,
data=seed,
seed="lbfgs",
algorithm=1.
tol_obj
)
= self.map_model_fit.stan_variables()
map_estimate
for vname, v in map_estimate.items():
if "cov" in vname and not hasattr(v, "shape"):
= array([[v]])
map_estimate[vname]
return map_estimate
@property
def model_fit(self):
return arviz.from_cmdstanpy(self.raw_model_fit)
def save(self, save_dir: str = "."):
self.raw_model_fit.save_csvfiles(save_dir)
@classmethod
def from_csv(cls, path: str, **kwargs):
= cls(**kwargs)
model = cmdstanpy.from_csv(path)
model.raw_model_fit
def diagnose(self):
return self.raw_model_fit.diagnose()
= "./data/mega-orientation-v1.1/"
data_dir
= load_data(os.path.join(data_dir, "mega-orientation-v1.1.tsv"))
data_orientation
"frame"] = data_orientation.frame.str.replace("_", "V") data_orientation[
The full dataset has 44160 datapoints.
Removing 0 responses from nonnative speakers.
Removing 0 NA responses.
from pandas import merge
= merge(
data_orientation_with_comps
merge(
data_orientation, =lambda x: f"verbcomp{x}"),
verb_component_prob.rename(columns="verb", right_index=True
left_on
),=lambda x: f"framecomp{x}"),
frame_component_prob.rename(columns="frame", right_index=True
left_on
)
= " + ".join(frame_component_prob.columns.map(lambda x: f"framecomp{x}"))
frame_comp_form = " + ".join(verb_component_prob.columns.map(lambda x: f"verbcomp{x}"))
verb_comp_form
= MixedEffectsModel(
selection_model_validation =f"~ ({verb_comp_form} + {frame_comp_form}) * orientation",
fixed_formula=f"~ 1",
by_subj_formula="~ 1",
by_item_formula
)
selection_model_validation.fit(data_orientation_with_comps)
StructuredSelectionModel
from collections import OrderedDict
= OrderedDict({
frame_to_constituents 'NP was Ved whichNP to VP': {
"NP_subj", "NP_obj", "whichNP to VP"),
("NP_obj", "whichNP to VP")
(
}, 'NP Ved for NP to VP': {
"NP_subj", "for NP to VP"),
("NP_subj", "PP_for", "to VP")
(
},'NP Ved NP to VP[+eventive]': {
"NP_subj", "NP to VP[+eventive]"),
("NP_subj", "NP_obj", "to VP[+eventive]")
(
}, 'NP was Ved whether to VP': {
"NP_subj", "NP_obj", "whether to VP"),
("NP_obj", "whether to VP")
(
},'NP Ved to VP[+eventive]': {
"NP_subj", "to VP[+eventive]")
(
}, 'NP Ved NP to NP': {
"NP_subj", "NP_obj", "NP_iobj")
(
}, 'NP Ved NP that S': {
"NP_subj", "NP_obj", "that S")
(
},'NP was Ved about NP': {
"NP_subj", "NP_obj", "about NP") ,
("NP_obj", "about NP")
(
}, 'NP was Ved that S[-tense]': {
"NP_subj", "NP_obj", "S[-tense]") ,
("NP_obj", "S[-tense]")
(
},'NP Ved to NP that S[+future]': {
"NP_subj", "NP_iobj", "that S[+future]")
(
}, 'NP Ved whether to VP': {
"NP_subj", "whether to VP")
(
},'NP Ved whichNP to VP': {
"NP_subj", "whichNP to VP")
(
}, 'NP Ved about whether S': {
"NP_subj", "about whether S")
(
},'NP Ved whichNP S': {
"NP_subj", "whichNP S")
(
}, 'NP Ved that S[-tense]': {
"NP_subj", "that S[-tense]")
(
},'NP Ved whether S[+future]': {
"NP_subj", "whether S[+future]")
(
}, 'NP was Ved that S[+future]': {
"NP_subj", "NP_obj", "that S[+future]") ,
("NP_obj", "that S[+future]")
(
},'NP Ved to NP whether S': {
"NP_subj", "NP_iobj", "whether S")
(
}, 'NP Ved': {
"NP_subj",)
(
}, 'NP Ved NP to VP[-eventive]': {
"NP_subj", "NP to VP[-eventive]"),
("NP_subj", "NP_obj", "to VP[-eventive]"),
("NP_subj", "NP to VP[-eventive]")
(
},'NP was Ved so': {
"NP_obj", "so"),
("NP_subj", "NP_obj", "so")
(
}, 'NP Ved so': {
"NP_subj", "so")
(
}, 'NP Ved NP that S[+future]': {
"NP_subj", "NP_obj", "that S[+future]")
(
},'NP Ved NP whether S[+future]': {
"NP_subj", "NP_obj", "whether S[+future]")
(
}, 'NP Ved to NP whether S[+future]': {
"NP_subj", "NP_iobj", "whether S[+future]")
(
},'NP was Ved that S': {
"NP_obj", "that S"),
("NP_subj", "NP_obj", "that S")
(
}, 'NP Ved NP whether S': {
"NP_subj", "NP_obj", "whether S")
(
}, 'NP was Ved whether S': {
"NP_obj", "whether S"),
("NP_subj", "NP_obj", "whether S")
(
},'NP was Ved to VP[-eventive]': {
"NP_obj", "to VP[-eventive]"),
("NP_subj", "NP_obj", "to VP[-eventive]"),
("NP_subj", "NP to VP[-eventive]")
(
}, 'NP Ved NP VP': {
"NP_subj", "NP_obj", "VP"),
("NP_subj", "NP VP")
(
}, 'NP Ved VPing': {
"NP_subj", "VPing")
(
},'NP was Ved to VP[+eventive]': {
"NP_obj", "to VP[+eventive]"),
("NP_subj", "NP_obj", "to VP[+eventive]"),
("NP_subj", "NP to VP[+eventive]")
(
}, 'NP Ved NP that S[-tense]': {
"NP_subj", "NP_obj", "that S[-tense]")
(
},'NP Ved that S': {
"NP_subj", "that S")
(
}, 'NP was Ved': {
"NP_obj",),
("NP_subj", "NP_obj")
(
}, 'NP Ved S': {
"NP_subj", "S")
(
},'NP Ved that S[+future]': {
"NP_subj", "that S[+future]")
(
}, 'NP was Ved about whether S': {
"NP_subj", "NP_obj", "about whether S") ,
("NP_obj", "about whether S")
(
},'NP Ved NP': {
"NP_subj", "NP_obj")
(
}, 'NP Ved NP VPing': {
"NP_subj", "NP_obj", "VPing"),
("NP_subj", "NP VPing")
(
}, 'NP Ved NP whichNP S': {
"NP_subj", "NP_obj", "whichNP S")
(
},'NP Ved about NP': {
"NP_subj", "about NP")
(
}, 'NP was Ved S': {
"NP_subj", "NP_obj", "S") ,
("NP_obj", "S")
(
}, 'NP Ved to NP that S': {
"NP_subj", "NP_iobj", "that S"),
(
},'NP was Ved whether S[+future]': {
"NP_subj", "NP_obj", "whether S[+future]") ,
("NP_obj", "whether S[+future]")
(
}, 'NP Ved whether S': {
"NP_subj", "whether S")
(
},'NP was Ved whichNP S': {
"NP_subj", "NP_obj", "whichNP S") ,
("NP_obj", "whichNP S")
(
}, 'NP Ved to NP that S[-tense]': {
"NP_subj", "NP_iobj", "that S[-tense]")
(
},'NP Ved to VP[-eventive]': {
"NP_subj", "to VP[-eventive]")
(
}
})
= data[data.frame.isin(frame_to_constituents)] data
from numpy import array, zeros, where, isin
from xarray import DataArray
= array(sorted({
constituents
c for tups in frame_to_constituents.values()
for t in tups
for c in t
}))
= max(
max_parses len(t)
for tups in frame_to_constituents.values()
for t in tups
)
= zeros([
frame_to_parse_constituent_indicators len(frame_to_constituents), max_parses, len(constituents)
])
for i, (f, parses) in enumerate(frame_to_constituents.items()):
for j, parse in enumerate(parses):
for k, const in enumerate(parse):
= where(constituents == const)[0][0]
const_idx = 1.
frame_to_parse_constituent_indicators[i,j,const_idx]
= DataArray(
frame_to_parse_constituent_indicators
frame_to_parse_constituent_indicators, =["frame", "parse", "constituent"],
dims={
coords"frame": list(frame_to_constituents),
"parse": list(range(max_parses)),
"constituent": constituents,
} )
from numpy import prod
from torch import tensor
@dataclass
class StructuredSelectionModelParameters(SelectionModelParametersABC):
frame_to_parse_constituent_indicators: ndarrayint
n_primitive_semantic_types: int
n_primitive_syntactic_types: int
max_complex_type_size:
class StructuredSelectionModel(Module):
= StructuredSelectionModelParameters
parameter_class = SelectionData
data_class
def __init__(self, parameters: StructuredSelectionModelParameters):
super().__init__()
self.model_parameters = parameters
# initialize the verb-complex semantic type probabilities
self._initialize_verb_complex_semantic_type()
# initialize the frame-complex syntactic type probabilities
self._initialize_frame_complex_syntactic_type()
# initialize the semantic-syntactic primitive type map
self._initialize_primitive_type_map()
# initialize the syntactic primitive type-constituent map
= torch.zeros([
syntactic_primitive_type_constituent_map_aux
parameters.n_primitive_syntactic_types,2]
parameters.frame_to_parse_constituent_indicators.shape[
])self.syntactic_primitive_type_constituent_map_aux = torch.nn.Parameter(
syntactic_primitive_type_constituent_map_aux, =True
requires_grad
)
# initialize the cutpoint distances
self.log_jumps = torch.nn.Parameter(
torch.ones([-1
parameters.n_subj, parameters.n_resp_levels
]), =True
requires_grad
)
def _initialize_verb_complex_semantic_type(self):
= (self.model_parameters.n_verb,) +\
verb_shape self.complex_semantic_type_shape
self.verb_complex_semantic_type_aux = torch.nn.Parameter(
torch.randn(verb_shape), =True
requires_grad
)
def _initialize_frame_complex_syntactic_type(self):
= (self.model_parameters.n_frame,) +\
frame_shape self.complex_syntactic_type_shape
self.frame_complex_syntactic_type_aux = torch.nn.Parameter(
torch.randn(frame_shape), =True
requires_grad
)
def _initialize_primitive_type_map(self):
= randn([
primitive_type_map_aux self.model_parameters.n_primitive_semantic_types,
self.model_parameters.n_primitive_syntactic_types + 1
])self.primitive_type_map_aux = torch.nn.Parameter(
primitive_type_map_aux, =True
requires_grad
)
@property
def complex_semantic_type_shape(self):
return (self.model_parameters.n_primitive_semantic_types+1,) *\
self.model_parameters.max_complex_type_size
@property
def complex_syntactic_type_shape(self):
return (self.model_parameters.n_primitive_syntactic_types+1,)*\
self.model_parameters.max_complex_type_size
def forward(self, data: SelectionData):
= self.verb_frame_prob(data.verb, data.frame)
verb_frame_prob = torch.log(verb_frame_prob) - torch.log(1. - verb_frame_prob)
verb_frame_logodds
= self.jumps[data.subj]
jumps
return ordered_logistic_likelihood(
verb_frame_logodds, jumps
)
def verb_frame_prob(self, verb_idx: ndarray, frame_idx: ndarray) -> Tensor:
= self.model_parameters.n_verb
n_verb = self.model_parameters.n_frame
n_frame
= self.complex_semantic_type_shape +\
verb_shape 1,) * len(self.complex_syntactic_type_shape)
(= (1,) * len(self.complex_semantic_type_shape) +\
frame_shape self.complex_syntactic_type_shape
if verb_idx.shape != frame_idx.shape:
raise ValueError
else:
= verb_idx.shape[0]
resp_shape
# shape for verbs and frames to complex type signatures
= (resp_shape,) + verb_shape
verb_shape = (resp_shape,) + frame_shape
frame_shape
# shape with response dimension inserted into map
= (1,) +\
synsem_map_shape self.complex_semantic_type_shape +\
self.complex_syntactic_type_shape
# shape for broadcasting syn map
= (resp_shape,) +\
syn_map_shape 1,) * self.model_parameters.max_complex_type_size +\
(self.complex_syntactic_type_shape
# shape for computing prod on all but the reponse dimension
= (resp_shape, prod(synsem_map_shape))
flat_shape
= self.verb_complex_semantic_type[verb_idx].view(verb_shape) *\
p self.frame_complex_syntactic_type[frame_idx].view(frame_shape) *\
self.complex_synsem_type_map.view(synsem_map_shape) *\
self.complex_syn_type_map[frame_idx].view(syn_map_shape)
= 1. - (1. - p.view(flat_shape)).prod(axis=1)
acc = acc.clamp(min=ZERO, max=ONE)
acc
return acc
@property
def verb_complex_semantic_type(self) -> Tensor:
return torch.sigmoid(self.verb_complex_semantic_type_aux)
@property
def frame_complex_syntactic_type(self) -> Tensor:
return torch.sigmoid(self.frame_complex_syntactic_type_aux)
@property
def primitive_type_map(self) -> Tensor:
= torch.sigmoid(
prob self.primitive_type_map_aux
)
# the 0th primitive type is the null primitive type and null
# primitive types should only map onto each other
= torch.zeros([
null_map 1,
self.model_parameters.n_primitive_syntactic_types + 1,
])0,0] = 1.0
null_map[
return torch.cat([null_map, prob], axis=0)
@property
def complex_synsem_type_map(self):
# only want to compute the transformation implicit in the
# property once
= self.primitive_type_map
primitive_type_map
# the final shape of the complex type map
= primitive_type_map.shape *\
final_shape self.model_parameters.max_complex_type_size
= (primitive_type_map.shape[0],) +\
shape 1,) * (self.model_parameters.max_complex_type_size - 1) +\
(1],) +\
(primitive_type_map.shape[1,) * (self.model_parameters.max_complex_type_size - 1)
(= primitive_type_map.view(shape)
m
for i in range(1, self.model_parameters.max_complex_type_size):
= (1,) * i +\
shape 0],) +\
(primitive_type_map.shape[1,) * (self.model_parameters.max_complex_type_size - (i + 1)) +\
(1,) * i +\
(1],) +\
(primitive_type_map.shape[1,) * (self.model_parameters.max_complex_type_size - (i + 1))
(= m * primitive_type_map.view(shape)
m
return m
@property
def complex_syn_type_map(self):
= tensor(
parse_constituent_indicators self.model_parameters.frame_to_parse_constituent_indicators
)= self.syntactic_primitive_type_constituent_map.transpose(1,0)
primitive_type_map
# frame x parse x constituent type x primitive type^max_size
= parse_constituent_indicators.shape +\
indicators_shape 1,) * self.model_parameters.max_complex_type_size
(
= (1,1,) +\
map_shape +\
primitive_type_map.shape 1,) * (self.model_parameters.max_complex_type_size - 1)
(
= 1. - primitive_type_map.view(map_shape) *\
m
parse_constituent_indicators.view(indicators_shape)
for i in range(1, self.model_parameters.max_complex_type_size):
= (1,1,primitive_type_map.shape[0]) +\
map_shape 1,) * i +\
(1:] +\
primitive_type_map.shape[1,) * (self.model_parameters.max_complex_type_size - (i+1))
(
= m * (1. - primitive_type_map.view(map_shape) *\
m
parse_constituent_indicators.view(indicators_shape))
= 1. - torch.prod(
prob 1. - torch.prod(
1.-parse_constituent_indicators.view(indicators_shape)) +
(1. - m,
=2
axis=1
), axis
)
return prob
@property
def syntactic_primitive_type_constituent_map(self):
= torch.sigmoid(
prob self.syntactic_primitive_type_constituent_map_aux
)
# the 0th primitive type is the null primitive type and null
# primitive types should not map onto any constitutent, we do
# this by multiplying by a special mask
= torch.zeros([
null_map 1,
self.model_parameters.frame_to_parse_constituent_indicators.shape[2]
])
return torch.cat([null_map, prob], axis=0)
@property
def jumps(self):
return torch.exp(self.log_jumps)
class ConstrainedStructuredSelectionModel(StructuredSelectionModel):
def _initialize_primitive_type_map(self):
= randn([
primitive_type_rank_sem_aux self.model_parameters.n_primitive_semantic_types
])self.primitive_type_rank_sem_aux = torch.nn.Parameter(
primitive_type_rank_sem_aux, =True
requires_grad
)
= randn([
primitive_type_rank_syn_aux self.model_parameters.n_primitive_syntactic_types
])self.primitive_type_rank_syn_aux = torch.nn.Parameter(
primitive_type_rank_syn_aux, =True
requires_grad
)
self.nonnull_sem_to_null_syn_aux = torch.nn.Parameter(
self.model_parameters.n_primitive_semantic_types),
randn(=True
requires_grad
)
self.type_rank_log_jumps = torch.nn.Parameter(
randn(min(
self.model_parameters.n_primitive_syntactic_types - 1,
self.model_parameters.n_primitive_syntactic_types - 1
)
), =True
requires_grad
)
@property
def primitive_type_map(self) -> Tensor:
= torch.exp(self.type_rank_log_jumps)
rank_jumps = torch.cumsum(rank_jumps, axis=0) - rank_jumps[0]
rank_cuts
= ordered_logistic_likelihood(
prob_sem self.primitive_type_rank_sem_aux,
None,:]
rank_cuts[
)= ordered_logistic_likelihood(
prob_syn self.primitive_type_rank_syn_aux,
None,:]
rank_cuts[
)
= (prob_sem[:,None,:] * prob_syn[None,:,:]).sum(2)
prob_prim
= torch.zeros([
null_sem_to_nonnull_syn 1,
self.model_parameters.n_primitive_syntactic_types + 1,
])
0,0] = 1.0
null_sem_to_nonnull_syn[
= torch.sigmoid(
nonnull_sem_to_null_syn self.nonnull_sem_to_null_syn_aux
)
# the 0th primitive type is the null primitive type and null
# primitive types should only map onto each other, we do this
# by multiplying by a special mask
= torch.cat([
prob None],
nonnull_sem_to_null_syn[:,
prob_prim=1)
], axis
return torch.cat([null_sem_to_nonnull_syn, prob], axis=0)
class StructuredSelectionModelTrainer(SelectionModelTrainer):
= StructuredSelectionModelParameters
parameter_class = StructuredSelectionModel
model_class
def __init__(
self,
int,
n_primitive_semantic_types: int,
n_primitive_syntactic_types: int,
max_complex_type_size:
frame_to_parse_constituent_indicators: DataArray
):self.n_primitive_semantic_types = n_primitive_semantic_types
self.n_primitive_syntactic_types = n_primitive_syntactic_types
self.max_complex_type_size = max_complex_type_size
self.frame_to_parse_constituent_indicators = frame_to_parse_constituent_indicators
self.nonparametric = False
self.alpha = 1.
self.beta = 1.
self.loss_function = torch.nn.CrossEntropyLoss()
def construct_model_parameters(self, data: DataFrame) -> StructuredSelectionModelParameters:
= {
model_parameters "n_verb": self.verb_hash_map.shape[0],
"n_frame": self.frame_hash_map.shape[0],
"n_subj": self.subj_hash_map.shape[0],
"n_resp_levels": 7,
"n_primitive_semantic_types": self.n_primitive_semantic_types,
"n_primitive_syntactic_types": self.n_primitive_syntactic_types,
"max_complex_type_size": self.max_complex_type_size,
"frame_to_parse_constituent_indicators": self.frame_to_parse_constituent_indicators.loc[self.frame_hash_map].values
}
return self.parameter_class(**model_parameters)
= StructuredSelectionModelTrainer(3,3,3,frame_to_parse_constituent_indicators)
structured_selection_model_trainer =1, tolerance_loss=1e-6) structured_selection_model_trainer.fit(data, verbosity
ConstrainedStructuredSelectionModel
class ConstrainedStructuredSelectionModelTrainer(StructuredSelectionModelTrainer):
= ConstrainedStructuredSelectionModel model_class
= ConstrainedStructuredSelectionModelTrainer(3, 3, 3, frame_to_parse_constituent_indicators)
constrained_structured_selection_model_trainer =1, tolerance_loss=1e-6) constrained_structured_selection_model_trainer.fit(data, verbosity