280 lines
12 KiB
Python
280 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
# Copyright (c) 2021-2023 Salvador E. Tropea
|
|
# Copyright (c) 2021-2023 Instituto Nacional de Tecnología Industrial
|
|
# License: GPL-3.0
|
|
# Project: KiBot (formerly KiPlot)
|
|
"""
|
|
Dependencies:
|
|
- name: RAR
|
|
url: https://www.rarlab.com/
|
|
url_down: https://www.rarlab.com/download.htm
|
|
help_option: -?
|
|
downloader: rar
|
|
role: Compress in RAR format
|
|
debian: rar
|
|
arch: rar(AUR)
|
|
"""
|
|
import re
|
|
import os
|
|
import glob
|
|
import sys
|
|
from zipfile import ZipFile, ZIP_STORED, ZIP_DEFLATED, ZIP_BZIP2, ZIP_LZMA
|
|
from tarfile import open as tar_open
|
|
from collections import OrderedDict
|
|
from .gs import GS
|
|
from .kiplot import config_output, run_output, get_output_targets, run_command
|
|
from .misc import WRONG_INSTALL, W_EMPTYZIP, INTERNAL_ERROR
|
|
from .optionable import Optionable, BaseOptions
|
|
from .registrable import RegOutput
|
|
from .macros import macros, document, output_class # noqa: F401
|
|
from . import log
|
|
|
|
logger = log.get_logger()
|
|
|
|
|
|
class FilesList(Optionable):
|
|
def __init__(self):
|
|
super().__init__()
|
|
with document:
|
|
self.source = '*'
|
|
""" *File names to add, wildcards allowed. Use ** for recursive match.
|
|
By default this pattern is applied to the output dir specified with `-d` command line option.
|
|
See the `from_cwd` and `from_output_dir` options """
|
|
self.from_cwd = False
|
|
""" Use the current working directory instead of the dir specified by `-d` """
|
|
self.from_output_dir = False
|
|
""" Use the current directory specified by the output instead of the dir specified by `-d`.
|
|
Note that it only applies when using `from_output` and no `dest` is specified.
|
|
It has more prescedence than `from_cwd` """
|
|
self.from_output = ''
|
|
""" *Collect files from the selected output.
|
|
When used the `source` option is ignored """
|
|
self.filter = '.*'
|
|
""" A regular expression that source files must match """
|
|
self.dest = ''
|
|
""" Destination directory inside the archive, empty means the same of the file """
|
|
|
|
|
|
class CompressOptions(BaseOptions):
|
|
ZIP_ALGORITHMS = {'auto': ZIP_DEFLATED,
|
|
'stored': ZIP_STORED,
|
|
'deflated': ZIP_DEFLATED,
|
|
'bzip2': ZIP_BZIP2,
|
|
'lzma': ZIP_LZMA}
|
|
TAR_MODE = {'auto': 'bz2',
|
|
'stored': '',
|
|
'deflated': 'gz',
|
|
'bzip2': 'bz2',
|
|
'lzma': 'xz'}
|
|
|
|
def __init__(self):
|
|
with document:
|
|
self.output = GS.def_global_output
|
|
""" *Name for the generated archive (%i=name of the output %x=according to format) """
|
|
self.format = 'ZIP'
|
|
""" *[ZIP,TAR,RAR] Output file format """
|
|
self.compression = 'auto'
|
|
""" [auto,stored,deflated,bzip2,lzma] Compression algorithm. Use auto to let KiBot select a suitable one """
|
|
self.files = FilesList
|
|
""" *[list(dict)] Which files will be included """
|
|
self.move_files = False
|
|
""" Move the files to the archive. In other words: remove the files after adding them to the archive """
|
|
self.remove_files = None
|
|
""" {move_files} """
|
|
self.follow_links = True
|
|
""" Store the file pointed by symlinks, not the symlink """
|
|
self.skip_not_run = False
|
|
""" Skip outputs with `run_by_default: false` """
|
|
super().__init__()
|
|
|
|
def config(self, parent):
|
|
super().config(parent)
|
|
if isinstance(self.files, type):
|
|
self.files = []
|
|
logger.warning(W_EMPTYZIP+'No files provided, creating an empty archive')
|
|
self._expand_id = parent.name
|
|
self._expand_ext = self.solve_extension()
|
|
|
|
def create_zip(self, output, files):
|
|
extra = {}
|
|
extra['compression'] = self.ZIP_ALGORITHMS[self.compression]
|
|
if sys.version_info >= (3, 7):
|
|
extra['compresslevel'] = 9
|
|
with ZipFile(output, 'w', **extra) as zip:
|
|
for fname, dest in files.items():
|
|
logger.debug('Adding '+fname+' as '+dest)
|
|
zip.write(fname, dest)
|
|
|
|
def create_tar(self, output, files):
|
|
with tar_open(output, 'w:'+self.TAR_MODE[self.compression]) as tar:
|
|
for fname, dest in files.items():
|
|
logger.debug('Adding '+fname+' as '+dest)
|
|
tar.add(fname, dest)
|
|
|
|
def create_rar(self, output, files):
|
|
if os.path.isfile(output):
|
|
os.remove(output)
|
|
command = self.ensure_tool('RAR')
|
|
if command is None:
|
|
return
|
|
for fname, dest in files.items():
|
|
logger.debugl(2, 'Adding '+fname+' as '+dest)
|
|
cmd = [command, 'a', '-m5', '-ep', '-ap'+os.path.dirname(dest), output, fname]
|
|
run_command(cmd, err_msg='Failed to invoke rar command, error {ret}', err_lvl=WRONG_INSTALL)
|
|
|
|
def solve_extension(self):
|
|
if self.format == 'ZIP':
|
|
return 'zip'
|
|
if self.format == 'RAR':
|
|
return 'rar'
|
|
# TAR
|
|
ext = 'tar'
|
|
sub_ext = self.TAR_MODE[self.compression]
|
|
if sub_ext:
|
|
ext += '.'+sub_ext
|
|
return ext
|
|
|
|
def get_files(self, output, no_out_run=False):
|
|
output_real = os.path.realpath(output)
|
|
files = OrderedDict()
|
|
out_dir_cwd = os.getcwd()
|
|
out_dir_default = self.expand_filename_sch(GS.out_dir)
|
|
dirs_list = []
|
|
for f in self.files:
|
|
# Get the list of candidates
|
|
files_list = None
|
|
output_out_dir = None
|
|
if f.from_output:
|
|
logger.debugl(2, '- From output `{}`'.format(f.from_output))
|
|
files_list, out_dir, out = get_output_targets(f.from_output, self._parent)
|
|
if not out.run_by_default and self.skip_not_run:
|
|
continue
|
|
output_out_dir = out_dir
|
|
logger.debugl(2, '- List of files: {}'.format(files_list))
|
|
if out_dir not in dirs_list:
|
|
dirs_list.append(out_dir)
|
|
if not no_out_run:
|
|
extra_files = []
|
|
for file in files_list:
|
|
if not os.path.exists(file):
|
|
# The target doesn't exist
|
|
if not out._done:
|
|
# The output wasn't created in this run, try running it
|
|
run_output(out)
|
|
if not os.path.exists(file):
|
|
# Still missing, something is wrong
|
|
GS.exit_with_error(f'Unable to generate `{file}` from {out}', INTERNAL_ERROR)
|
|
if os.path.isdir(file):
|
|
# Popultate output adds the image dirs
|
|
# Computing its content is complex:
|
|
# - We must parse the input markdown
|
|
# - We must coinfigure and use the renderer output to do the file name expansion
|
|
# This is almost as complex as generating the whole output, so it adds the dir
|
|
extra_files += glob.glob(os.path.join(file, '**'), recursive=True)
|
|
if extra_files:
|
|
files_list += extra_files
|
|
else:
|
|
out_dir = out_dir_cwd if f.from_cwd else out_dir_default
|
|
source = f.expand_filename_both(f.source, make_safe=False)
|
|
files_list = glob.iglob(os.path.join(out_dir, source), recursive=True)
|
|
if GS.debug_level > 1:
|
|
files_list = list(files_list)
|
|
logger.debug('- Pattern {} list of files: {}'.format(source, files_list))
|
|
# Compute the reference dir when no f.dest
|
|
out_dir = out_dir_cwd if f.from_cwd else out_dir_default
|
|
if f.from_output_dir:
|
|
out_dir = output_out_dir
|
|
# Filter and adapt them
|
|
for fname in filter(re.compile(f.filter).match, files_list):
|
|
fname_real = os.path.realpath(fname) if self.follow_links else os.path.abspath(fname)
|
|
# Avoid including the output
|
|
if fname_real == output_real:
|
|
continue
|
|
# Compute the destination directory inside the archive
|
|
dest = fname
|
|
if f.dest:
|
|
dest = os.path.join(f.dest, os.path.basename(fname))
|
|
else:
|
|
dest = os.path.relpath(dest, out_dir)
|
|
files[fname_real] = dest
|
|
return files, dirs_list
|
|
|
|
def get_targets(self, out_dir):
|
|
return [self._parent.expand_filename(out_dir, self.output)]
|
|
|
|
def get_dependencies(self):
|
|
output = self.get_targets(self.expand_filename_sch(GS.out_dir))[0]
|
|
files, _ = self.get_files(output, no_out_run=True)
|
|
return files.keys()
|
|
|
|
def get_categories(self):
|
|
cats = set()
|
|
for f in self.files:
|
|
if f.from_output:
|
|
out = RegOutput.get_output(f.from_output)
|
|
if out is not None:
|
|
config_output(out)
|
|
if out.category:
|
|
if isinstance(out.category, str):
|
|
cats.add(out.category)
|
|
else:
|
|
cats.update(out.category)
|
|
else:
|
|
cats.add('Compress')
|
|
return list(cats)
|
|
|
|
def run(self, output):
|
|
# Output file name
|
|
logger.debug('Collecting files')
|
|
# Collect the files
|
|
files, dirs_outs = self.get_files(output)
|
|
logger.debug('Generating `{}` archive'.format(output))
|
|
if self.format == 'ZIP':
|
|
self.create_zip(output, files)
|
|
elif self.format == 'TAR':
|
|
self.create_tar(output, files)
|
|
elif self.format == 'RAR':
|
|
self.create_rar(output, files)
|
|
if self.move_files:
|
|
dirs = dirs_outs
|
|
for fname in files.keys():
|
|
if os.path.isfile(fname):
|
|
os.remove(fname)
|
|
logger.debug('Removing '+fname)
|
|
elif os.path.isdir(fname):
|
|
dirs.append(fname)
|
|
for d in sorted(dirs, key=lambda x: len(x.split(os.sep)), reverse=True):
|
|
logger.debug('Removing '+d)
|
|
try:
|
|
os.rmdir(d)
|
|
except OSError as e:
|
|
if e.errno == 39:
|
|
logger.debug(' Not empty')
|
|
else:
|
|
raise
|
|
|
|
|
|
@output_class
|
|
class Compress(BaseOutput): # noqa: F821
|
|
""" Archiver (files compressor)
|
|
Generates a compressed file containing output files.
|
|
This is used to generate groups of files in compressed file format. """
|
|
def __init__(self):
|
|
super().__init__()
|
|
# Make it low priority so it gets created after all the other outputs
|
|
self.priority = 10
|
|
with document:
|
|
self.options = CompressOptions
|
|
""" *[dict] Options for the `compress` output """
|
|
self._none_related = True
|
|
# The help is inherited and already mentions the default priority
|
|
self.fix_priority_help()
|
|
|
|
def config(self, parent):
|
|
super().config(parent)
|
|
if self.category is None and not isinstance(self.options, type):
|
|
self.category = self.options.get_categories()
|
|
|
|
def get_dependencies(self):
|
|
return self.options.get_dependencies()
|