"""Abstract module interface for unit components.
Defines the contract for unit modules that link inputs/outputs to the
framework database and update model parameters over time windows.
"""
import logging
from abc import ABC, abstractmethod
from datetime import datetime
import numpy as np
logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)
[docs]
class UnitModuleAbstract(ABC):
"""Abstract base class for unit modules."""
logger = logger
unit = None
loop = None
tags = {
"input": {"measured": {}, "filtered": {}, "calculated": {}},
"output": {"measured": {}, "filtered": {}, "calculated": {}},
}
def __init__(self, unit):
"""Initialize unit module."""
self.unit = unit
[docs]
def link(self):
"""Link module inputs and outputs."""
self.logger.error(
print("Module " + self.__class__.__name__ + " did not implement a link method")
)
[docs]
def init(self, loop):
"""Initialize module with loop."""
self.loop = loop
[docs]
def link_output(self, unit, category, tagname):
"""Link output tag to module."""
reference = unit.tags[category][tagname]
self.tags["output"][category][tagname] = {
"external_name": reference,
"internal_name": tagname + "." + category,
"unit_name": unit.name,
}
[docs]
def get_output_last_data_time(self, tagname):
"""Get last data time for output tag."""
for category in list(self.tags["output"].keys()):
if tagname in list(self.tags["output"][category].keys()):
break
time_str = self.unit.plant.database.get_internal_database_last_time_str(
self.unit.plant.name,
self.tags["output"][category][tagname]["unit_name"],
self.tags["output"][category][tagname]["internal_name"],
)
return time_str
[docs]
def write_output_data(self, tagname, time, result):
"""Write output data for tag."""
for category in list(self.tags["output"].keys()):
if tagname in list(self.tags["output"][category].keys()):
break
self.unit.plant.database.write_internal_database(
self.unit.plant.name,
self.tags["output"][category][tagname]["unit_name"],
self.tags["output"][category][tagname]["internal_name"],
time,
result,
)
[docs]
def get_parameter_index(self, unit, timestamps):
"""Get parameter index for given timestamp."""
timestamps_unix = datetime.fromisoformat(timestamps).timestamp()
timestamps_parameters_unix = []
for timestamp_parameter in unit.parameters["timestamps"]:
timestamps_parameters_unix.append(
datetime.strptime(timestamp_parameter, "%Y-%m-%d %H:%M:%S").timestamp()
)
index = np.argwhere(np.array(timestamps_parameters_unix) <= timestamps_unix).max()
return index
[docs]
@abstractmethod
def update_model_parameter(self, timestamp):
"""Update model parameters for given timestamp."""
pass