KiBot/kibot/fil_spec_to_field.py

249 lines
11 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (c) 2023 Salvador E. Tropea
# Copyright (c) 2023 Instituto Nacional de Tecnología Industrial
# License: GPL-3.0
# Project: KiBot (formerly KiPlot)
# Description: Extracts information from the distributor spec and fills fields
import re
from .bom.units import comp_match, get_prefix, ParsedValue
from .bom.xlsx_writer import get_spec
from .error import KiPlotConfigurationError
from .kiplot import look_for_output, run_output
from .misc import W_FLDCOLLISION
# from .gs import GS
from .optionable import Optionable
from .macros import macros, document, filter_class # noqa: F401
from . import log
logger = log.get_logger()
UNITS = {'voltage': 'V', 'power': 'W', 'current': 'A'}
EI_TYPES = {'value': 'value', 'tolerance': 'percent', 'footprint': 'string', 'power': 'power', 'current': 'current',
'voltage': 'voltage', 'frequency': 'string', 'temp_coeff': 'string', 'manf': 'string', 'size': 'string'}
DEF_CHECK = ['_value', '_tolerance', '_power', '_current', '_voltage', '_temp_coeff']
class SpecOptions(Optionable):
""" A spec to copy """
def __init__(self):
super().__init__()
self._unknown_is_error = True
with document:
self.spec = Optionable
""" *[string|list(string)=''] Name/s of the source spec/s.
The following names are uniform across distributors: '_desc', '_value', '_tolerance', '_footprint',
'_power', '_current', '_voltage', '_frequency', '_temp_coeff', '_manf' and '_size' """
self.field = ''
""" *Name of the destination field """
self.policy = 'overwrite'
""" [overwrite,update,new] Controls the behavior of the copy mechanism.
`overwrite` always copy the spec value,
`update` copy only if the field already exist,
`new` copy only if the field doesn't exist. """
self.collision = 'warning'
""" [warning,error,ignore] How to report a collision between the current value and the new value """
self.type = 'string'
""" [percent,voltage,power,current,value,string] How we compare the current value to determine a collision.
`value` is the component value i.e. resistance for R* """
self._field_example = 'RoHS'
self._spec_example = 'rohs_status'
def config(self, parent):
super().config(parent)
if not self.field:
raise KiPlotConfigurationError("Missing or empty `field` in spec_to_field filter ({})".format(str(self._tree)))
if not self.spec:
raise KiPlotConfigurationError("Missing or empty `spec` in spec_to_field filter ({})".format(str(self._tree)))
self.spec = self.force_list(self.spec)
@filter_class
class Spec_to_Field(BaseFilter): # noqa: F821
""" Spec to Field
This filter extracts information from the specs obtained from component distributors
and fills fields.
I.e. create a field with the RoHS status of a component.
In order to make it work you must be able to get prices using the KiCost options of
the `bom` output. Make sure you can do this before trying to use this filter.
Usage [example](https://inti-cmnb.github.io/kibot-examples-1/spec_to_field/) """
def __init__(self):
super().__init__()
self._is_transform = True
with document:
self.from_output = ''
""" *Name of the output used to collect the specs.
Currently this must be a `bom` output with KiCost enabled and a distributor that returns specs """
self.specs = SpecOptions
""" *[list(dict)|dict] One or more specs to be copied """
self.check_dist_coherence = True
""" Check that the data we got from different distributors is equivalent """
self.check_dist_fields = Optionable
""" [string|list(string)=''] List of fields to include in the check.
For a full list of fields consult the `specs` option """
self._from = None
self._check_dist_fields_example = DEF_CHECK
def config(self, parent):
super().config(parent)
if not self.from_output:
raise KiPlotConfigurationError("You must specify an output that collected the specs")
if isinstance(self.specs, type):
raise KiPlotConfigurationError("At least one spec must be provided ({})".format(str(self._tree)))
if isinstance(self.specs, SpecOptions):
self.specs = [self.specs]
if isinstance(self.check_dist_fields, type):
self.check_dist_fields = DEF_CHECK
else:
self.check_dist_fields = self.force_list(self.check_dist_fields)
def _normalize(self, val, kind, comp):
val = val.strip()
if kind == 'string':
return val
if kind == 'percent':
# TODO: What about +20%, -10%?
res = re.match(r"(?:\+/-|±|\+-)?\s*(\d+)\s*%?", val)
if not res:
return val
return res.group(1)+'%'
if kind == 'value':
if not comp.ref_prefix or comp.ref_prefix[0] not in "RLC":
return val
res = comp_match(val, comp.ref_prefix, comp.ref, relax_severity=True, stronger=True)
if not res:
return val
return str(res)
# voltage,power,current
new_val = re.sub(r"\s*(Volts?|V|Watts?|W|Amperes?|Ampers|Amp|A)", '', val)
# Change things like 0.125 to 125 m
res = comp_match(new_val, ' ', comp.ref, relax_severity=True, stronger=True)
if res is None:
if ',' not in new_val:
return val
# Some distributor APIs can return multiple values separated by comma
res_many = []
for v in new_val.split(','):
res = comp_match(v.strip(), ' ', comp.ref, relax_severity=True, stronger=True)
if res is not None:
res.unit = UNITS[kind]
res_many.append(res)
if not res_many:
return val
res = res_many[0]
reference = str(res)
if len(res_many) > 1:
for r in res_many[1:]:
if str(r) != reference:
logger.warning(W_FLDCOLLISION+'Inconsistencies in multiple values {}: `{}` ({} vs {})'.
format(comp.ref, val, r, reference))
res.unit = UNITS[kind]
return str(res)
def normalize(self, old_val, kind, comp):
val = self._normalize(old_val, kind, comp)
logger.debugl(3, "- Normalize {} -> {}".format(old_val, val))
return val
def compare(self, cur_val, spec_val):
cur_val = cur_val.lower().strip()
spec_val = spec_val.lower().strip()
return cur_val == spec_val
def solve_from(self):
if self._from is not None:
return
# Check the renderer output is valid
out = look_for_output(self.from_output, 'from_output', self._parent, {'bom'})
if not out._done:
run_output(out)
self._from = out
def update_extra_info(self, res, attr, ei, dattr, pattern="{}", units=""):
if dattr in ei:
# Don't overwrite collected data
return
value = res.get_extra(attr)
if not value:
# No result for this
return
if isinstance(value, float):
if int(value) == value:
value = int(value)
if units:
# Change things like 0.125 to 125 m
v, pow = get_prefix(value, '')
parsed = ParsedValue(v, pow, units)
value = str(parsed)
ei[dattr] = pattern.format(value)
def check_coherent(self, c):
if not self.check_dist_coherence or not hasattr(c, 'kicost_part'):
return
extra_info = {}
for d, dd in c.kicost_part.dd.items():
ei = dd.extra_info
if not ei:
# We got nothing for this distributor
continue
if 'desc' in ei and 'value' not in ei:
# Very incomplete extra info, try to fill it
desc = ei['desc']
logger.debugl(3, '- Parsing {}'.format(desc))
res = comp_match(desc, c.ref_prefix, c.ref, relax_severity=True, stronger=True)
if res:
ei['value'] = str(res)
self.update_extra_info(res, 'tolerance', ei, 'tolerance', "{}%")
self.update_extra_info(res, 'voltage_rating', ei, 'voltage', units="V")
self.update_extra_info(res, 'size', ei, 'footprint')
self.update_extra_info(res, 'characteristic', ei, 'temp_coeff')
self.update_extra_info(res, 'power_rating', ei, 'power', units="W")
logger.debugl(3, '- New extra_info: {}'.format(ei))
for n, v in ei.items():
if '_'+n not in self.check_dist_fields:
continue
if n not in extra_info:
# First time we see it
extra_info[n] = (self.normalize(v, EI_TYPES[n], c), d)
else:
cur_val, cur_dist = extra_info[n]
v = self.normalize(v, EI_TYPES[n], c)
if not self.compare(cur_val, v):
desc = "`{}` vs `{}` collision, `{}` != `{}`".format(d, cur_dist, v, cur_val)
logger.non_critical_error(desc)
def filter(self, comp):
self.solve_from()
self.check_coherent(comp)
for s in self.specs:
field = s.field.lower()
spec_name = []
spec_val = set()
for sp in s.spec:
name, val = get_spec(comp.kicost_part, sp)
if name:
spec_name.append(name)
if val:
val = self.normalize(val, s.type, comp)
spec_val.add(val)
spec_name = ','.join(spec_name)
spec_val = ' '.join(spec_val)
if not spec_name or not spec_val:
# No info
continue
has_field = comp.is_field(field)
cur_val = comp.get_field_value(field) if has_field else None
if cur_val:
cur_val = self.normalize(cur_val, s.type, comp)
if cur_val == spec_val:
# Already there
continue
if not self.compare(cur_val, spec_val):
# Collision
desc = "{} field `{}` collision, has `{}`, found `{}`".format(comp.ref, s.field, cur_val, spec_val)
if s.collision == 'warning':
logger.warning(W_FLDCOLLISION+desc)
elif s.collision == 'error':
raise KiPlotConfigurationError(desc)
if s.policy == 'overwrite' or (self.p == 'update' and has_field) or (s.policy == 'new' and not has_field):
comp.set_field(s.field, spec_val)
logger.debugl(2, "- {} {}: {} ({})".format(comp.ref, s.field, spec_val, spec_name))