Source code for pygacity.util.collectors
import logging
import os
import shutil
import stat
import sys
import tarfile
from collections import UserList
from dataclasses import dataclass
from pathlib import Path
from zipfile import ZipFile, ZIP_DEFLATED
from .stringthings import my_logger
logger = logging.getLogger(__name__)
[docs]
def on_rm_error(func, path, exc):
os.chmod(path, stat.S_IWRITE)
func(path)
[docs]
@dataclass
class ByteCollector:
"""
A simple string manager
The main object in a ByteCollector instance is a string of bytes (byte_collector).
The string can be appended to by anoter string or the contents of a file.
The string can have "comments" written to it.
"""
line_length: int = 80
""" line length for comment wrapping """
comment_char: str = '#'
""" comment character """
byte_collector: str = ''
""" the collected string """
[docs]
def reset(self):
"""
Resets the string
"""
self.byte_collector = ''
[docs]
def write(self, msg: str):
"""
Appends msg to the string
Parameters
----------
msg : str
the message
"""
self.byte_collector += msg
[docs]
def addline(self, msg: str, end: str = '\n'):
"""
Appends msg to the string as a line
Parameters
----------
msg : str
the message
end : str, optional
end-of-line byte
"""
self.byte_collector += f'{msg}{end}'
[docs]
def lastline(self, end: str = '\n', exclude: str = '#'):
"""
Returns last line in the string
Parameters
----------
end: str, optional
end-of-line byte
exclude: str, optional
comment byte
"""
lines=[x for x in self.byte_collector.split(end) if (len(x)>0 and not x.startswith(exclude))]
if len(lines) > 0:
return lines[-1]
else:
return None
[docs]
def has_statement(self, statement: str, end: str = '\n', exclude: str = '#'):
"""
Determines if a particular statement is on at least one non-comment line
Parameters
----------
statement : str
the statement; e.g., 'exit'
end : str, optional
end-of-line byte
exclude : str, optional
comment byte
"""
lines=[x for x in self.byte_collector.split(end) if (len(x)>0 and not x.startswith(exclude))]
if len(lines) > 0:
for l in lines:
if statement in l:
return True
return False
[docs]
def injest_file(self, filename: str):
"""
Appends contents of file 'filename' to the string
Parameters
----------
filename : str
the name of the file
"""
with open(filename,'r') as f:
self.byte_collector += f.read()
[docs]
def log(self, msg: str):
"""
Logs msg using my_logger
Parameters
----------
msg : str
the message
"""
my_logger(msg, self.addline)
[docs]
def banner(self, msg: str):
"""
Logs msg as a banner using my_logger
Parameters
----------
msg : str
the message
"""
my_logger(msg, self.addline, fill='#', width=80, just='^')
def __str__(self):
return self.byte_collector
[docs]
class FileCollector(UserList):
"""
A class for handling collections of files to be managed together
as Paths
"""
def __init__(self, initial: list[str | Path] = None):
data: list[Path] = [Path(x) for x in initial] if initial is not None else []
super().__init__(data)
[docs]
def append(self, item: str | Path):
"""
Appends a file path to the collection
Parameters
----------
item : str | Path
the file path to append
"""
p = Path(item)
if p not in self.data:
self.data.append(p)
[docs]
def flush(self):
"""
Deletes all files in the collection from disk
"""
logger.debug(f'Flushing file collector: {len(self.data)} entries.')
for f in self.data:
if f.is_file():
# logger.debug(f'Deleting file {f.as_posix()} exists? {f.exists()}')
f.unlink()
# logger.debug(f' -> exists? {f.exists()}')
elif f.is_dir():
# logger.debug(f'Deleting directory {f.as_posix()} exists? {f.exists()}')
shutil.rmtree(f, onerror=on_rm_error)
# logger.debug(f' -> exists? {f.exists()}')
else:
logger.debug(f'FileCollector.flush: path {f.as_posix()} does not exist.')
self.clear()
[docs]
def get_filenames(self) -> list[str]:
"""
Returns list of filenames in the collection as strings
"""
return [x.as_posix() for x in self.data]
def __str__(self):
cwd = Path.cwd()
return ' '.join([x.relative_to(cwd).as_posix() for x in self.data])
[docs]
def archive(self, basepath: Path, delete: bool = False):
"""
Archives the files in the collection into a single compressed file. If OS is Windows, makes a zipfile; if Linux, makes a tarball of the files in the collection.
Parameters
----------
basepath : Path
basename of the resulting tarball or zipfile
delete : bool, optional
if True, deletes the original files after archiving (default is False)
"""
# check the OS type first
arcname = ''
if sys.platform.startswith('win'):
# Windows: make a zipfile
zippath = basepath.with_suffix('.zip')
with ZipFile(zippath, 'w', ZIP_DEFLATED) as zf:
for src in self.data:
if src.is_file():
zf.write(src, arcname=src.name)
else:
for p in src.rglob("*"):
logger.debug(f'adding {p} to zipfile')
if p.is_file():
zf.write(p, arcname=p.relative_to(basepath.parent))
logger.debug(f'generated zipfile {zippath}')
arcname = zippath
else:
tgzpath = basepath.with_suffix('.tgz')
with tarfile.open(tgzpath, 'w:gz') as tf:
for f in self.data:
tf.add(f, arcname=f.name)
logger.debug(f'generated tarball {tgzpath}')
arcname = tgzpath
if delete:
self.flush()
return arcname