Source code for gemini_framework.abstract.database_reader_abstract

"""Abstract database reader organizing internal/external data flows.

Coordinates reading from an external source and writing into the framework's
internal time-series database (InfluxDB by default), including tag
registration and time handling.
"""

import logging
import os
import traceback
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone

import pytz

from gemini_framework.database.connector.influxdb_driver import InfluxdbDriver

tz_ams = pytz.timezone("Europe/Amsterdam")

logger = logging.getLogger(__name__)
logger.addHandler(logging.StreamHandler())
logger.setLevel(logging.INFO)


[docs] class DatabaseReaderAbstract(ABC): """Abstract base class for database readers.""" def __init__(self): """Initialize database reader.""" self.category = None self.internal_db_driver = InfluxdbDriver() self.external_db_driver = None self.tags = [] self.parameters = dict() self.delta_t = 900 self.logger = logger
[docs] def update_parameters(self, parameters): """Update reader parameters.""" for key, value in parameters.items(): self.parameters[key] = value self.delta_t = self.parameters[self.category]["interval"] self.set_internal_db_parameters() self.set_external_db_parameters()
[docs] def set_internal_db_parameters(self): """Set internal database parameters.""" influxdb_param = { "url": os.getenv("INFLUXDB_URL"), "org": os.getenv("INFLUXDB_ORG"), "username": os.getenv("INFLUXDB_USERNAME"), "password": os.getenv("INFLUXDB_PASSWORD"), "bucket": os.getenv("INFLUXDB_BUCKET"), } self.internal_db_driver.update_parameters(influxdb_param)
[docs] @abstractmethod def set_external_db_parameters(self): """Set external database parameters.""" pass
[docs] def connect(self): """Connect to databases.""" self.internal_db_driver.connect() self.external_db_driver.connect()
[docs] def disconnect(self): """Disconnect from databases.""" self.internal_db_driver.disconnect() self.external_db_driver.disconnect()
[docs] def register_tags(self, units): """Register tags for units.""" for unit in units: for key, value in unit.tags[self.category].items(): tag = { "plant_name": unit.plant.name, "asset_name": unit.name, "internal_tagname": key + "." + self.category, "external_tagname": value[-1], } self.tags.append(tag)
[docs] def get_current_time_str(self): """Get current time as string.""" current_time_datetime = self.round_minutes(datetime.utcnow(), "down", self.delta_t / 60) current_time_str = current_time_datetime.strftime("%Y-%m-%dT%H:%M:%SZ") return current_time_str
[docs] def write_internal_database(self, plant_name, asset_name, internal_tagname, time, result): """Write data to internal database.""" self.internal_db_driver.write_data(plant_name, asset_name, internal_tagname, time, result)
[docs] def read_internal_database( self, plant_name, asset_name, internal_tagname, starttime_str, endtime_str, timestep=None ): """Read data from internal database.""" if timestep is None: timestep = self.delta_t result, time = self.internal_db_driver.read_data( plant_name, asset_name, internal_tagname, starttime_str, endtime_str, timestep ) return result, time
[docs] def read_external_database(self, external_tagname, starttime_str, endtime_str, timestep=None): """Read data from external database.""" if timestep is None: timestep = self.delta_t result, time = self.external_db_driver.read_data( external_tagname, starttime_str, endtime_str, timestep ) return result, time
[docs] def get_internal_database_last_time_str(self, plant_name, asset_name, tagname): """Get last time from internal database.""" _, timestamps = self.internal_db_driver.get_last_data(plant_name, asset_name, tagname) if timestamps: lasttime_str = timestamps[0] else: lasttime_datetime = datetime.strptime( self.parameters["start_time"], "%Y-%m-%d %H:%M:%S" ) lasttime_datetime = tz_ams.localize(lasttime_datetime) lasttime_str = lasttime_datetime.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") return lasttime_str
[docs] def delete(self, plant_name): """Delete plant data from database.""" self.internal_db_driver.delete_database_all(plant_name)
[docs] def import_raw_data(self): """Import raw data from external to internal database.""" endtime_str = self.get_current_time_str() for tag in self.tags: if tag["external_tagname"] == "": continue starttime_str = self.get_internal_database_last_time_str( tag["plant_name"], tag["asset_name"], tag["internal_tagname"] ) if not (starttime_str == endtime_str): try: self.logger.info( "Reading " + tag["external_tagname"] + " from " + starttime_str + " to " + endtime_str ) result, time = self.read_external_database( tag["external_tagname"], starttime_str, endtime_str ) self.logger.info( "Writing : " + tag["internal_tagname"] + " of " + tag["asset_name"] + " from " + starttime_str + " to " + endtime_str ) self.write_internal_database( tag["plant_name"], tag["asset_name"], tag["internal_tagname"], time, result ) except Exception: self.logger.error( "ERROR in module " + self.__class__.__name__ + " : " + traceback.format_exc() )
[docs] @staticmethod def round_minutes(dt, direction, resolution): """Round datetime to specified resolution.""" new_minute = (dt.minute // resolution + (1 if direction == "up" else 0)) * resolution return dt + timedelta( minutes=new_minute - dt.minute, seconds=-dt.second, microseconds=-dt.microsecond )