# Author: Cameron F. Abrams, <cfa22@drexel.edu>
"""
A class for handling answer sets in pygacity
"""
import yaml
import os
from collections import UserList, UserDict
import pandas as pd
import logging
logger = logging.getLogger(__name__)
[docs]
class AnswerSet(UserDict):
_keys = ['label', 'value', 'units', 'formatter']
def __init__(self, data: dict = {}, serial: int = 0, serialstr: str = None):
self.serial = serial
self.serialstr = serialstr if serialstr is not None else str(serial)
self.dumpname = f'answers-{serial:08d}.yaml'
self.first_index = None
super().__init__(data)
def __len__(self):
return len(self.data)
[docs]
@classmethod
def from_yaml(cls, filename: str, delete: bool = False):
"""
Create an AnswerSet instance by loading from a YAML file of the same format as that
generated by the to_yaml() method.
Parameters
----------
filename : str
the YAML filename to load
delete : bool, optional
whether to delete the YAML file after loading (default is False)
Returns
-------
AnswerSet
An AnswerSet instance populated with data from the YAML file.
"""
root, ext = os.path.splitext(filename)
assert ext in ['.yaml', '.yml'], f'{filename} does not end in .yaml or .yml'
tokens = root.split('-')
assert len(tokens) == 2, f'{filename} should be of the format "answers-<serial#>.yaml"'
serial = int(tokens[1])
R = cls(serial=serial)
with open(filename, 'r') as f:
R.data = yaml.safe_load(f)
if delete:
os.remove(filename)
return R
[docs]
def register(self, index: any, label: str = None, value: any = None,
units: str = None, formatter: str = None, group: int = None):
"""
Register an answer entry for a particular question index.
Parameters
----------
index : any
the question index
label : str, optional
the label for the answer entry
value : any, optional
the value of the answer entry
units : str, optional
the units of the answer entry
formatter : str, optional
a format string for displaying the value
group : int, optional
a group identifier for the answer entry
"""
if not self.first_index:
self.first_index = index
if not index in self.data:
self.data[index] = []
# if value is a pint.Quantity, extract magnitude and units
if hasattr(value, 'magnitude') and hasattr(value, 'units'):
if units is None:
units = f'{value.units:~P}'
value = value.magnitude
# if value is a numpy data type, convert to native python type
if hasattr(value, 'item'):
value = value.item()
self.data[index].append(dict( label=label,
value=value,
units=units,
formatter=formatter,
group=group))
logger.debug(f'AnswerSet.register index={index} label={label} value={value} units={units} formatter={formatter} group={group}')
[docs]
def display(self, index: any, element: int = 0):
"""
Returns a formatted string for a particular answer entry.
Parameters
----------
index : any
the question index
element : int, optional
the element number within the index (default is 0)
Returns
-------
str
formatted string for the answer entry
"""
D = None
if element < len(self.data[index]):
D = self.data[index][element]
if D:
fmt = D.get('formatter',None)
val = D.get('value',None)
label = D.get('label',None)
units = D.get('units',None)
vstr = ''
if val:
if fmt:
vstr = fmt.format(val)
else:
vstr = str(val)
if units:
vstr += f' {units}'
if label:
if vstr:
return f'{label} = {vstr}'
else:
return label
return ''
[docs]
def to_yaml(self):
"""
Dumps the AnswerSet to a YAML file.
"""
raw_indices = list(self.data.keys())
common_prefix = os.path.commonprefix([str(x) for x in raw_indices])
logger.debug(f'AnswerSet.to_yaml common prefix: "{common_prefix}"')
if common_prefix:
new_D = {}
for index, AL in self.data.items():
new_index = str(index)[len(common_prefix):]
new_D[new_index] = AL
self.data = new_D
with open(self.dumpname, 'w', encoding='utf-8') as f:
yaml.safe_dump(self.data, f)
[docs]
class AnswerSuperSet(UserList[AnswerSet]):
"""
A collection of AnswerSet instances with methods to convert to pandas DataFrames
and LaTeX tables.
"""
def __init__(self, initial: list[AnswerSet] = []):
super().__init__(initial)
if not self._check_congruency():
print(f'Error: There is a lack of congruency among answer sets')
self._make_dfs()
[docs]
@classmethod
def from_dumpfiles(cls, files: list[str] = [], delete: bool = False):
"""
Create an **AnswerSuperSet** instance by loading multiple AnswerSet instances from YAML files.
Parameters
----------
files : list[str]
list of YAML filenames to load
delete : bool, optional
whether to delete the YAML files after loading (default is False)
Returns
-------
AnswerSuperSet
An AnswerSuperSet instance populated with AnswerSet instances from the YAML files.
"""
data = []
for f in files:
data.append(AnswerSet.from_yaml(f, delete=delete))
return cls(initial=data)
[docs]
def to_latex(self):
"""
Converts the **AnswerSuperSet** to a LaTeX formatted string.
Returns
-------
str
LaTeX formatted string representing the **AnswerSuperSet**
"""
result = ''
for group_name, group_data in self.groups.items():
df = group_data['df']
formatters = group_data.get('formatters', None)
logger.debug(f'AnswerSuperSet.to_latex group "{group_name}" with formatters: {formatters}')
result += df.to_latex(formatters=formatters, index=False, longtable=True)#,header=self.headings)
return result
def _check_congruency(self):
"""
Checks that all **AnswerSet** instances in the collection have the same indices
Returns
-------
bool
True if all **AnswerSet** instances have the same indices, False otherwise
"""
if len(self.data) > 0:
indices = list(self.data[0].data.keys())
for l in self.data[1:]:
test_indices = list(l.data.keys())
check = all([x==y for x,y in zip(indices,test_indices)])
if not check:
return False
for i in indices:
ilen = len(self.data[0].data[i])
for l in self.data[1:]:
test_ilen = len(l.data[i])
check = ilen == test_ilen
if not check:
return False
return True
def _make_dfs(self):
"""
Constructs pandas DataFrames for the answer data in the collection.
"""
# sort by integer serial so row order is always numerically ascending
self.data.sort(key=lambda x: x.serial)
serials = [x.serial for x in self.data]
serialstrs = [x.serialstr for x in self.data]
values = {'serials': serialstrs}
pattern = self.data[0] # keys in first AnswerSet form the pattern all sets follow
self.formatters = {}
self.groups = {}
# keys in D may be prepended with a common prefix; remove it
common_prefix = os.path.commonprefix([str(x) for x in pattern.data.keys()])
logger.debug(f'Overall common prefix: "{common_prefix}"')
for dataset in self.data:
new_dataset_D = {}
for index in dataset.data.keys():
new_index = str(index)[len(common_prefix):]
new_dataset_D[new_index] = dataset.data[index]
dataset.data = new_dataset_D
for index, AL in pattern.data.items():
index_pref = f'{index}-'
if len(serials) == 1:
index_pref = ''
for a in AL:
key = f'{index_pref}{a["label"]}'
if 'units' in a and a['units']:
key += f' ({a["units"]})'
group = a.get('group', None)
if group:
if group not in self.groups:
self.groups[group] = dict(formatters={}, df=None, values={'serials': serialstrs})
self.groups[group]['values'][key] = []
if 'formatter' in a:
self.groups[group]['formatters'][key] = a['formatter']
else:
values[key] = []
if 'formatter' in a:
self.formatters[key] = a['formatter']
for inst in self.data:
for index, AL in inst.data.items():
index_pref = f'{index}-'
if len(serials) == 1:
index_pref = ''
for a in AL:
key = f'{index_pref}{a["label"]}'
if 'units' in a and a['units']:
key += f' ({a["units"]})'
group = a.get('group', None)
if group:
self.groups[group]['values'][key].append(a['value'])
else:
values[key].append(a['value'])
if not self.groups:
DF = pd.DataFrame(values)
self.groups['base'] = dict(formatters=self.formatters, df=DF)
else:
for gname, gdata in self.groups.items():
logger.debug(f'Building DataFrame for group {gname} with values: {gdata["values"]}')
DF = pd.DataFrame(gdata['values'])
self.groups[gname]['df'] = DF
# print(self.DF)