Source code for thermosteam.indexer

# -*- coding: utf-8 -*-
# BioSTEAM: The Biorefinery Simulation and Techno-Economic Analysis Modules
# Copyright (C) 2020, Yoel Cortes-Pena <yoelcortes@gmail.com>
# 
# This module is under the UIUC open-source license. See 
# github.com/BioSTEAMDevelopmentGroup/biosteam/blob/master/LICENSE.txt
# for license details.
"""
"""
import thermosteam as tmo
from .units_of_measure import AbsoluteUnitsOfMeasure
from . import utils
from .exceptions import UndefinedChemical
from ._phase import Phase, LockedPhase, NoPhase, PhaseIndexer, phase_tuple, check_phase
from free_properties import PropertyFactory, property_array
import numpy as np

__all__ = ('ChemicalIndexer',
           'MaterialIndexer',
           'ChemicalMolarFlowIndexer', 
           'MolarFlowIndexer',
           'ChemicalMassFlowIndexer', 
           'MassFlowIndexer',
           'ChemicalVolumetricFlowIndexer',
           'VolumetricFlowIndexer',
           'MassFlowProperty',
           'VolumetricFlowProperty')

# %% Utilities

isa = isinstance
_new = object.__new__

def find_main_phase(indexers, default):
    main_indexer, *indexers = indexers
    try:
        phase = main_indexer.phase
        for i in indexers:
            if phase != i.phase: return default
    except:
        return default
    return phase

def nonzeros(IDs, data):
    index, = np.where(data != 0)
    return [IDs[i] for i in index], data[index]

class ChemicalIndex:
    __slots__ = ('value')
    def __init__(self, value):
        self.value = value
    def __repr__(self): # pragma: no cover
        return f"{type(self).__name__}({self.value})"


# %% Abstract indexer
    
[docs]class Indexer: """Abstract class for fast indexing.""" __slots__ = ('_data',) units = None def copy(self): new = self._copy_without_data() new._data = self._data.copy() return new __copy__ = copy def get_conversion_factor(self, units): if self.units: return self.units.conversion_factor(units) else: raise TypeError(f"{type(self).__name__} object is unitless; " f"cannot get conversion factor for {units}") def get_data(self, units, *index): length = len(index) factor = self.get_conversion_factor(units) if length == 0: return factor * self._data elif length == 1: return factor * self[index[0]] else: return factor * self[index] def set_data(self, data, units, *index): length = len(index) data = np.asarray(data, dtype=float) factor = self.get_conversion_factor(units) scaled_data = data / factor if length == 0: self._data[:] = scaled_data elif length == 1: self[index[0]] = scaled_data else: self[index] = scaled_data @property def data(self): return self._data
# %% Phase data
[docs]@utils.chemicals_user class ChemicalIndexer(Indexer): """ Create a ChemicalIndexer that can index a single-phase, 1d-array given chemical IDs. Parameters ---------- phase : [str or PhaseContainer] {'s', 'l', 'g', 'S', 'L', 'G'} Phase of data. units : str Units of measure of input data. chemicals : Chemicals Required to define the chemicals that are present. **ID_data : float ID-value pairs Notes ----- A ChemicalIndexer does not have any units defined. To use units of measure, use the `ChemicalMolarIndexer`, `ChemicalMassIndexer`, or `ChemicalVolumetricIndexer`. """ __slots__ = ('_chemicals', '_phase', '_data_cache') def __new__(cls, phase=NoPhase, units=None, chemicals=None, **ID_data): if ID_data: chemicals = tmo.settings.get_default_chemicals(chemicals) self = cls.from_data(chemicals.kwarray(ID_data), phase, chemicals) if units: self.set_data(self._data, units) else: self = cls.blank(phase, chemicals) return self def __reduce__(self): return self.from_data, (self._data, self._phase, self._chemicals, False) def __getitem__(self, key): return self._data[self.get_index(key)] def __setitem__(self, key, data): self._data[self.get_index(key)] = data def sum_across_phases(self): return self._data @property def get_index(self): return self._chemicals.get_index def empty(self): self._data[:] = 0. def mix_from(self, others): self.phase = find_main_phase(others, self.phase) chemicals = self.chemicals data = self._data chemicals_data = [(i.chemicals, i._data.copy() if i is self else i.sum_across_phases()) for i in others] data[:] = 0. for ichemicals, idata in chemicals_data: if chemicals is ichemicals: data[:] += idata else: other_index, = np.where(idata) IDs = ichemicals.IDs self_index = chemicals.get_index(tuple([IDs[i] for i in other_index])) data[self_index] += idata[other_index] def to_material_indexer(self, phases): material_array = self._MaterialIndexer.blank(phases, self._chemicals) material_array[self.phase] = self._data return material_array def copy_like(self, other): if self.chemicals is other.chemicals: self._data[:] = other._data else: self.empty() other_index, = np.where(other._data) IDs = other.chemicals.IDs self_index = self.chemicals.get_index(tuple([IDs[i] for i in other_index])) self._data[self_index] = other._data[other_index] self.phase = other.phase def _copy_without_data(self): new = _new(self.__class__) new._chemicals = self._chemicals new._phase = self._phase.copy() new._data_cache = {} return new @classmethod def blank(cls, phase, chemicals=None): self = _new(cls) self._load_chemicals(chemicals) self._data = np.zeros(self._chemicals.size, float) self._phase = Phase.convert(phase) self._data_cache = {} return self @classmethod def from_data(cls, data, phase=NoPhase, chemicals=None, check_data=True): self = _new(cls) self._load_chemicals(chemicals) self._phase = Phase.convert(phase) if check_data: assert data.ndim == 1, 'material data must be a 1d numpy array' assert data.size == self._chemicals.size, ('size of material data must be equal to ' 'size of chemicals') self._data = data self._data_cache = {} return self @property def phase(self): return self._phase.phase @phase.setter def phase(self, phase): self._phase.phase = phase def __format__(self, tabs=""): if not tabs: tabs = 1 tabs = int(tabs) tab = tabs*4*" " phase = f"phase={repr(self.phase)}" if tab: dlim = ",\n" + tab phase = '\n' + tab + phase else: dlim = ", " ID_data = utils.repr_IDs_data(self._chemicals.IDs, self._data, dlim) return f"{type(self).__name__}({phase}{ID_data})" def __repr__(self): return self.__format__() def _info(self, N): """Return string with all specifications.""" IDs = self.chemicals.IDs data = self.data IDs, data = nonzeros(IDs, data) N_IDs = len(IDs) if N_IDs == 0: return f"{type(self).__name__}: (empty)" elif self.units: basic_info = f"{type(self).__name__} ({self.units}):\n" else: basic_info = f"{type(self).__name__}:\n" beginning = f' ({self.phase}) ' if self.phase else " " new_line = '\n' + len(beginning) * ' ' data_info = '' lengths = [len(i) for i in IDs] maxlen = max(lengths) + 1 N_max = N or tmo.Stream.display_units.N too_many_chemicals = N_IDs > N_max N = N_max if too_many_chemicals else N_IDs for i in range(N): spaces = ' ' * (maxlen - lengths[i]) if i != 0: data_info += new_line data_info += IDs[i] + spaces + f' {data[i]:.3g}' if too_many_chemicals: data_info += new_line + '...' return (basic_info + beginning + data_info)
[docs] def show(self, N=None): """Print all specifications. Parameters ---------- N: int, optional Number of compounds to display. """ print(self._info(N))
_ipython_display_ = show
[docs]@utils.chemicals_user class MaterialIndexer(Indexer): """ Create a MaterialIndexer that can index a multi-phase, 2d-array given the phase and chemical IDs. Parameters ---------- phases : tuple['s', 'l', 'g', 'S', 'L', 'G'] Phases of data rows. units : str Units of measure of input data. chemicals : Chemicals Required to define the chemicals that are present. **phase_data : tuple[str, float] phase-(ID, value) pairs Notes ----- A MaterialIndexer does not have any units defined. To use units of measure, use the `MolarIndexer`, `MassIndexer`, or `VolumetricIndexer`. """ __slots__ = ('_chemicals', '_phases', '_phase_indexer', '_index_cache', '_data_cache') _index_caches = {} _ChemicalIndexer = ChemicalIndexer def __new__(cls, phases=None, units=None, chemicals=None, **phase_data): self = cls.blank(phases or phase_data, chemicals) if phase_data: data = self._data get_index = self._chemicals.get_index get_phase_index = self.get_phase_index for phase, ID_data in phase_data.items(): check_phase(phase) IDs, row = zip(*ID_data) data[get_phase_index(phase), get_index(IDs)] = row if units: self.set_data(data, units) return self def __reduce__(self): return self.from_data, (self._data, self._phases, self._chemicals, False) def sum_across_phases(self): return self._data.sum(0) def empty(self): self._data[:] = 0 def copy_like(self, other): if isa(other, ChemicalIndexer): self.empty() other_data = other._data phase_index = self.get_phase_index(other.phase) if self.chemicals is other.chemicals: self._data[phase_index, :] = other_data else: other_index, = np.where(other_data) IDs = other.chemicals.IDs self_index = self.chemicals.get_index(tuple([IDs[i] for i in other_index])) self._data[phase_index, self_index] += other._data[other_index] else: if self.chemicals is other.chemicals: self._data[:] = other._data else: self.empty() other_data = other._data other_index, = np.where(other_data.any(0)) IDs = other.chemicals.IDs self_index = self.chemicals.get_index(tuple([IDs[i] for i in other_index])) self._data[:, self_index] = other_data[:, other_index] def mix_from(self, others): isa = isinstance data = self._data get_phase_index = self.get_phase_index chemicals = self._chemicals phases = self._phases indexer_data = [(i, i._data.copy() if i is self else i._data) for i in others] data[:] = 0. for i, idata in indexer_data: if isa(i, MaterialIndexer): if phases == i.phases: if chemicals is i.chemicals: data[:] += idata else: idata = i._data other_index, = np.where(idata.any(0)) IDs = i.chemicals.IDs self_index = chemicals.get_index(tuple([IDs[i] for i in other_index])) data[:, self_index] += idata[:, other_index] else: if chemicals is i.chemicals: for phase, idata in zip(i.phases, idata): if not idata.any(): continue data[get_phase_index(phase), :] += idata else: for phase, idata in zip(i.phases, idata): if not idata.any(): continue other_index, = np.where(idata) IDs = i.chemicals.IDs self_index = chemicals.get_index(tuple([IDs[i] for i in other_index])) data[get_phase_index(phase), self_index] += idata[other_index] elif isa(i, ChemicalIndexer): if chemicals is i.chemicals: data[get_phase_index(i.phase), :] += idata else: other_index, = np.where(idata != 0.) IDs = i.chemicals.IDs self_index = chemicals.get_index(tuple([IDs[i] for i in other_index])) data[get_phase_index(i.phase), self_index] += idata[other_index] else: raise ValueError("can only mix from chemical or material indexers") def _set_phases(self, phases): self._phases = phases = phase_tuple(phases) self._phase_indexer = PhaseIndexer(phases) def _set_cache(self): caches = self._index_caches key = self._phases, self._chemicals try: self._index_cache = caches[key] except KeyError: self._index_cache = caches[key] = {} def _copy_without_data(self): new = _new(self.__class__) new._phases = self._phases new._chemicals = self._chemicals new._phase_indexer = self._phase_indexer new._index_cache = self._index_cache new._data_cache = {} return new @classmethod def blank(cls, phases, chemicals=None): self = _new(cls) self._load_chemicals(chemicals) self._set_phases(phases) self._set_cache() shape = (len(self._phases), self._chemicals.size) self._data = np.zeros(shape, float) self._data_cache = {} return self @classmethod def from_data(cls, data, phases, chemicals=None, check_data=True): self = _new(cls) self._load_chemicals(chemicals) self._set_phases(phases) self._set_cache() if check_data: assert data.ndim == 2, ('material data must be an 2d numpy array') M_phases = len(self._phases) N_chemicals = self._chemicals.size M, N = data.shape assert M == M_phases, ('number of phases must be equal to ' 'the number of material data rows') assert N == N_chemicals, ('size of chemicals ' 'must be equal to ' 'number of material data columns') self._data = data self._data_cache = {} return self @property def phases(self): return self._phases @property def get_phase_index(self): return self._phase_indexer def to_chemical_indexer(self, phase=NoPhase): return self._ChemicalIndexer.from_data(self._data.sum(0), phase, self._chemicals, False) def to_material_indexer(self, phases): material_indexer = self.__class__.blank(phases, self._chemicals) for phase, data in self: if data.any(): material_indexer[phase] = data return material_indexer def get_phase(self, phase): return self._ChemicalIndexer.from_data(self._data[self.get_phase_index(phase)], LockedPhase(phase), self._chemicals, False) def __getitem__(self, key): index = self.get_index(key) if isa(index, ChemicalIndex): values = self._data[:, index.value].sum(0) else: values = self._data[index] return values def __setitem__(self, key, data): index = self.get_index(key) if isa(index, ChemicalIndex): raise IndexError("multiple phases present; must include phase key " "to set chemical data") self._data[index] = data def get_index(self, key): cache = self._index_cache try: index = cache[key] except KeyError: try: index = self._chemicals.get_index(key) except UndefinedChemical: index = self._get_index(key) else: index = ChemicalIndex(index) cache[key] = index utils.trim_cache(cache) except TypeError: raise TypeError("only strings, tuples, and ellipsis are valid index keys") return index def _get_index(self, phase_IDs): if isa(phase_IDs, str): index = self.get_phase_index(phase_IDs) elif phase_IDs is ...: index = slice(None) else: try: phase, IDs = phase_IDs except: raise IndexError("index by [phase, IDs] where phase is a " "(str, ellipsis, or missing), and IDs is a " "(str, tuple(str), ellipisis, or missing)") if isa(IDs, (str, tuple)): IDs_index = self._chemicals.get_index(IDs) elif IDs is ...: IDs_index = IDs else: raise TypeError("only strings, tuples, and ellipsis are valid index keys") if isa(phase, str): index = (self.get_phase_index(phase), IDs_index) elif phase is ...: index = (phase, IDs_index) else: raise IndexError("index by [phase, IDs] where phase is a " "(str, ellipsis, or missing), and IDs is a " "(str, tuple(str), ellipisis, or missing)") return index def __iter__(self): """Iterate over phase-data pairs.""" return zip(self._phases, self._data)
[docs] def iter_composition(self): """Iterate over phase-composition pairs.""" array = self._data total = array.sum() or 1. return zip(self._phases, array/total)
def __format__(self, tabs="1"): IDs = self._chemicals.IDs phase_data = [] for phase, data in self: ID_data = utils.repr_couples(", ", IDs, data) if ID_data: phase_data.append(f"{phase}=[{ID_data}]") tabs = int(tabs) if tabs else 1 if tabs: tab = tabs*4*" " dlim = ",\n" + tab else: dlim = ", " phase_data = dlim.join(phase_data) if self.data.sum(1).all(): phases = "" if phase_data: phase_data = "\n" + tab + phase_data else: phases = f'phases={self._phases}' if phase_data: phase_data = dlim + phase_data return f"{type(self).__name__}({phases}{phase_data})" def __repr__(self): return self.__format__("1") def _info(self, N): """Return string with all specifications.""" from thermosteam import Stream N_max = N or Stream.display_units.N IDs = self.chemicals.IDs index, = np.where(self.data.sum(0) != 0) len_ = len(index) if len_ == 0: return f"{type(self).__name__}: (empty)" elif self.units: basic_info = f"{type(self).__name__} ({self.units}):\n" else: basic_info = f"{type(self).__name__}:\n" all_IDs = tuple([IDs[i] for i in index]) # Length of chemical column all_lengths = [len(i) for i in IDs] maxlen = max(all_lengths + [8]) # Set up chemical data for all phases phases_data_info = '' for phase in self._phases: phase_data = self[phase, all_IDs] IDs, data = nonzeros(all_IDs, phase_data) if not IDs: continue # Get basic structure for phase data beginning = f' ({phase}) ' new_line = '\n' + len(beginning) * ' ' # Set chemical data data_info = '' N_IDs = len(data) too_many_chemicals = N_IDs > N_max N = N_max if too_many_chemicals else N_IDs lengths = [len(i) for i in IDs] for i in range(N): spaces = ' ' * (maxlen - lengths[i]) if i: data_info += new_line data_info += f'{IDs[i]} ' + spaces + f' {data[i]:.3g}' if too_many_chemicals: data += new_line + '...' # Put it together phases_data_info += beginning + data_info + '\n' return basic_info + phases_data_info.rstrip('\n') show = ChemicalIndexer.show _ipython_display_ = show
def _replace_indexer_doc(Indexer, Parent): doc = Parent.__doc__ doc = doc[:doc.index("Notes")] Indexer.__doc__ = doc.replace(Parent.__name__, Indexer.__name__) def _new_Indexer(name, units): ChemicalIndexerSubclass = type('Chemical' + name + 'Indexer', (ChemicalIndexer,), {}) MaterialIndexerSubclass = type(name + 'Indexer', (MaterialIndexer,), {}) ChemicalIndexerSubclass.__slots__ = \ MaterialIndexerSubclass.__slots__ = () ChemicalIndexerSubclass.units = \ MaterialIndexerSubclass.units = AbsoluteUnitsOfMeasure(units) MaterialIndexerSubclass._ChemicalIndexer = ChemicalIndexerSubclass ChemicalIndexerSubclass._MaterialIndexer = MaterialIndexerSubclass _replace_indexer_doc(ChemicalIndexerSubclass, ChemicalIndexer) _replace_indexer_doc(MaterialIndexerSubclass, MaterialIndexer) return ChemicalIndexerSubclass, MaterialIndexerSubclass ChemicalIndexer._MaterialIndexer = MaterialIndexer ChemicalMolarFlowIndexer, MolarFlowIndexer = _new_Indexer('MolarFlow', 'kmol/hr') ChemicalMassFlowIndexer, MassFlowIndexer = _new_Indexer('MassFlow', 'kg/hr') ChemicalVolumetricFlowIndexer, VolumetricFlowIndexer = _new_Indexer('VolumetricFlow', 'm^3/hr') # %% Mass flow properties @PropertyFactory(slots=('name', 'mol', 'index', 'MW')) def MassFlowProperty(self): """Mass flow (kg/hr).""" return self.mol[self.index] * self.MW @MassFlowProperty.setter def MassFlowProperty(self, value): self.mol[self.index] = value/self.MW def by_mass(self): """Return a ChemicalMassFlowIndexer that references this object's molar data.""" try: mass = self._data_cache['mass'] except: chemicals = self.chemicals mol = self.data mass = np.zeros_like(mol, dtype=object) for i, chem in enumerate(chemicals): mass[i] = MassFlowProperty(chem.ID, mol, i, chem.MW) self._data_cache['mass'] = mass = ChemicalMassFlowIndexer.from_data( property_array(mass), self._phase, chemicals, False) return mass ChemicalMolarFlowIndexer.by_mass = by_mass def by_mass(self): """Return a MassFlowIndexer that references this object's molar data.""" try: mass = self._data_cache['mass'] except: phases = self.phases chemicals = self.chemicals mol = self.data mass = np.zeros_like(mol, dtype=object) for i, phase in enumerate(phases): for j, chem in enumerate(chemicals): index = (i, j) mass[index] = MassFlowProperty(chem.ID, mol, index, chem.MW) self._data_cache['mass'] = mass = MassFlowIndexer.from_data( property_array(mass), phases, chemicals, False) return mass MolarFlowIndexer.by_mass = by_mass; del by_mass # %% Volumetric flow properties @PropertyFactory(slots=('name', 'mol', 'index', 'V', 'TP', 'phase', 'phase_container')) def VolumetricFlowProperty(self): """Volumetric flow (m^3/hr).""" f_mol = self.mol[self.index] phase = self.phase or self.phase_container.phase V = getattr(self.V, phase) if hasattr(self.V, phase) else self.V return 1000. * f_mol * V(*self.TP) if f_mol else 0. @VolumetricFlowProperty.setter def VolumetricFlowProperty(self, value): if value: phase = self.phase or self.phase_container.phase V = getattr(self.V, phase) if hasattr(self.V, phase) else self.V self.mol[self.index] = value / V(*self.TP) / 1000. else: self.mol[self.index] = 0. def by_volume(self, TP): """Return a ChemicalVolumetricFlowIndexer that references this object's molar data. Parameters ---------- TP : ThermalCondition """ try: vol = self._data_cache[TP] except: chemicals = self.chemicals mol = self.data vol = np.zeros_like(mol, dtype=object) for i, chem in enumerate(chemicals): vol[i] = VolumetricFlowProperty(chem.ID, mol, i, chem.V, TP, None, self._phase) self._data_cache[TP] = \ vol = ChemicalVolumetricFlowIndexer.from_data(property_array(vol), self._phase, chemicals, False) return vol ChemicalMolarFlowIndexer.by_volume = by_volume def by_volume(self, TP): """Return a VolumetricFlowIndexer that references this object's molar data. Parameters ---------- TP : ThermalCondition """ try: vol = self._data_cache[TP] except: phases = self.phases chemicals = self.chemicals mol = self.data vol = np.zeros_like(mol, dtype=object) for i, phase in enumerate(phases): for j, chem in enumerate(chemicals): index = i, j phase_name = tmo.settings._phase_names[phase] vol[index] = VolumetricFlowProperty(f"{phase_name}{chem.ID}", mol, index, chem.V, TP, phase) self._data_cache[TP] = \ vol = VolumetricFlowIndexer.from_data(property_array(vol), phases, chemicals, False) return vol MolarFlowIndexer.by_volume = by_volume; del by_volume del PropertyFactory