# Author: Cameron F. Abrams, <cfa22@drexel.edu>
"""
A class for handling answer sets in pygacity
"""
import yaml
import os
from collections import UserList, UserDict
import pandas as pd
import logging
logger = logging.getLogger(__name__)
[docs]
class AnswerSet(UserDict):
_keys = ['label', 'value', 'units', 'formatter']
def __init__(self, data: dict = {}, serial: int = 0, serialstr: str = None):
self.serial = serial
self.serialstr = serialstr if serialstr is not None else str(serial)
self.dumpname = f'answers-{self.serialstr}.yaml'
self.first_index = None
self.sources: dict = {}
super().__init__(data)
[docs]
def set_source(self, index: any, source: str):
"""Record the source filename for a question index."""
self.sources[index] = source
def __len__(self):
return len(self.data)
[docs]
@classmethod
def from_yaml(cls, filename: str, delete: bool = False):
"""
Create an AnswerSet instance by loading from a YAML file of the same format as that
generated by the to_yaml() method.
Parameters
----------
filename : str
the YAML filename to load
delete : bool, optional
whether to delete the YAML file after loading (default is False)
Returns
-------
AnswerSet
An AnswerSet instance populated with data from the YAML file.
"""
root, ext = os.path.splitext(filename)
assert ext in ['.yaml', '.yml'], f'{filename} does not end in .yaml or .yml'
tokens = root.split('-')
assert len(tokens) == 2, f'{filename} should be of the format "answers-<serial#>.yaml"'
serial = int(tokens[1])
R = cls(serial=serial)
with open(filename, 'r') as f:
R.data = yaml.safe_load(f)
if delete:
os.remove(filename)
return R
[docs]
def register(self, index: any, label: str = None, value: any = None,
units: str = None, formatter: str = None, group: int = None):
"""
Register an answer entry for a particular question index.
Parameters
----------
index : any
the question index
label : str, optional
the label for the answer entry
value : any, optional
the value of the answer entry
units : str, optional
the units of the answer entry
formatter : str, optional
a format string for displaying the value
group : int, optional
a group identifier for the answer entry
"""
if not self.first_index:
self.first_index = index
if not index in self.data:
self.data[index] = []
# if value is a pint.Quantity, extract magnitude and units
if hasattr(value, 'magnitude') and hasattr(value, 'units'):
if units is None:
units = f'{value.units:~P}'
value = value.magnitude
# if value is a numpy data type, convert to native python type
if hasattr(value, 'item'):
value = value.item()
element = len(self.data[index])
self.data[index].append(dict( label=label,
value=value,
units=units,
formatter=formatter,
group=group))
logger.debug(f'AnswerSet.register index={index} label={label} value={value} units={units} formatter={formatter} group={group}')
return (index, element)
[docs]
def display(self, index: any, element: int = 0):
if isinstance(index, tuple):
index, element = index
"""
Returns a formatted string for a particular answer entry.
Parameters
----------
index : any
the question index
element : int, optional
the element number within the index (default is 0)
Returns
-------
str
formatted string for the answer entry
"""
D = None
if element < len(self.data[index]):
D = self.data[index][element]
if D:
fmt = D.get('formatter',None)
val = D.get('value',None)
label = D.get('label',None)
units = D.get('units',None)
vstr = ''
if val:
if fmt:
vstr = fmt.format(val)
else:
vstr = str(val)
if units:
vstr += f' {units}'
if label:
if vstr:
return f'{label} = {vstr}'
else:
return label
return ''
[docs]
def to_yaml(self):
"""
Dumps the AnswerSet to a YAML file.
"""
raw_indices = list(self.data.keys())
common_prefix = os.path.commonprefix([str(x) for x in raw_indices])
logger.debug(f'AnswerSet.to_yaml common prefix: "{common_prefix}"')
if common_prefix:
new_D = {}
for index, AL in self.data.items():
new_index = str(index)[len(common_prefix):]
new_D[new_index] = AL
self.data = new_D
with open(self.dumpname, 'w', encoding='utf-8') as f:
yaml.safe_dump(self.data, f)
[docs]
class AnswerSuperSet(UserList[AnswerSet]):
"""
A collection of AnswerSet instances with methods to convert to pandas DataFrames
and LaTeX tables.
"""
def __init__(self, initial: list[AnswerSet] = []):
super().__init__(initial)
if not self._check_congruency():
print(f'Error: There is a lack of congruency among answer sets')
self._make_dfs()
[docs]
@classmethod
def from_dumpfiles(cls, files: list[str] = [], delete: bool = False):
"""
Create an **AnswerSuperSet** instance by loading multiple AnswerSet instances from YAML files.
Parameters
----------
files : list[str]
list of YAML filenames to load
delete : bool, optional
whether to delete the YAML files after loading (default is False)
Returns
-------
AnswerSuperSet
An AnswerSuperSet instance populated with AnswerSet instances from the YAML files.
"""
data = []
for f in files:
data.append(AnswerSet.from_yaml(f, delete=delete))
return cls(initial=data)
[docs]
def to_latex(self):
"""
Converts the **AnswerSuperSet** to a LaTeX formatted string.
Returns
-------
str
LaTeX formatted string representing the **AnswerSuperSet**
"""
result = ''
for index, qdata in self.questions.items():
df = qdata['df']
formatters = qdata.get('formatters', None)
logger.debug(f'AnswerSuperSet.to_latex question "{index}" with formatters: {formatters}')
source = self.question_sources.get(index, '')
if source:
escaped = source.replace('_', r'\_')
source_label = f' (\\texttt{{{escaped}}})'
else:
source_label = ''
result += f'\\noindent\\textbf{{Question {index}:{source_label}}}\\\\\n'
result += df.to_latex(formatters=formatters, index=False, longtable=True)
return result
def _check_congruency(self):
"""
Checks that all **AnswerSet** instances in the collection have the same indices
Returns
-------
bool
True if all **AnswerSet** instances have the same indices, False otherwise
"""
if len(self.data) > 0:
indices = list(self.data[0].data.keys())
for l in self.data[1:]:
test_indices = list(l.data.keys())
check = all([x==y for x,y in zip(indices,test_indices)])
if not check:
return False
for i in indices:
ilen = len(self.data[0].data[i])
for l in self.data[1:]:
test_ilen = len(l.data[i])
check = ilen == test_ilen
if not check:
return False
return True
def _make_dfs(self):
"""
Constructs one pandas DataFrame per question index.
The ``group`` field on answer entries is ignored; each question gets its
own table regardless of grouping.
"""
self.data.sort(key=lambda x: x.serial)
serialstrs = [x.serialstr for x in self.data]
pattern = self.data[0]
self.questions = {} # index -> {formatters, values, df, source}
# strip any common prefix from question indices
common_prefix = os.path.commonprefix([str(x) for x in pattern.data.keys()])
# collect source filenames, applying same prefix stripping as question indices
self.question_sources = {
str(k)[len(common_prefix):]: v for k, v in pattern.sources.items()
}
logger.debug(f'Overall common prefix: "{common_prefix}"')
for dataset in self.data:
new_D = {}
for index in dataset.data.keys():
new_D[str(index)[len(common_prefix):]] = dataset.data[index]
dataset.data = new_D
# build column structure from the pattern AnswerSet
for index, AL in pattern.data.items():
values = {'serials': serialstrs}
formatters = {}
for a in AL:
label = a.get('label', None)
units = a.get('units', None)
key = str(label) if label is not None else ''
if units:
key += f' ({units})'
values[key] = []
fmt = a.get('formatter', None)
if fmt:
formatters[key] = fmt
self.questions[index] = dict(formatters=formatters, values=values, df=None)
# fill values from all AnswerSet instances (one per serial)
for inst in self.data:
for index, AL in inst.data.items():
for a in AL:
label = a.get('label', None)
units = a.get('units', None)
key = str(label) if label is not None else ''
if units:
key += f' ({units})'
self.questions[index]['values'][key].append(a['value'])
# build DataFrames
for index, qdata in self.questions.items():
logger.debug(f'Building DataFrame for question {index} with values: {qdata["values"]}')
qdata['df'] = pd.DataFrame(qdata['values'])