Source code for gemini_framework.database.connector.avevadb_driver

"""AVEVA Data Hub connector implementation."""

import math
from datetime import datetime

import adh_sample_library_preview as dbconn
import pandas as pd

from gemini_framework.abstract.database_driver_abstract import DatabaseDriverAbstract


[docs] class AvevaDriver(DatabaseDriverAbstract): """Database connector based on AVEVA Data Hub APIs.""" def __init__(self): """Establish connection to AVEVA database.""" super().__init__()
[docs] def connect(self): """Connect to AVEVA database.""" self.conn = dbconn.ADHClient( api_version=self.parameters["api_version"], url=self.parameters["url"], tenant=self.parameters["tenant"], client_id=self.parameters["client_id"], client_secret=self.parameters["client_secret"], )
[docs] def read_data(self, stream_id, start_time, end_time, interval): """Read data from AVEVA database.""" url = "/".join( [ self.parameters["url"], "api", self.parameters["api_version"], "Tenants", self.parameters["tenant"], "Namespaces", self.parameters["namespace_id"], "Streams", "", ] ) Nsize = 500 start_time_datetime = datetime.fromisoformat(start_time) end_time_datetime = datetime.fromisoformat(end_time) start_time_unix = int(start_time_datetime.timestamp()) end_time_unix = int(end_time_datetime.timestamp()) length_intervals = round((end_time_unix - start_time_unix) / interval) + 1 stream_data_dfs = None remaining = length_intervals for ii in range(math.ceil(length_intervals / Nsize)): start_index_unix = start_time_unix + ii * Nsize * interval start_index = datetime.utcfromtimestamp(start_index_unix).strftime("%Y-%m-%dT%H:%M:%SZ") if remaining > Nsize: end_index_unix = start_time_unix + ((ii + 1) * Nsize - 1) * interval end_index = datetime.utcfromtimestamp(end_index_unix).strftime("%Y-%m-%dT%H:%M:%SZ") else: end_index_unix = end_time_unix end_index = datetime.utcfromtimestamp(end_index_unix).strftime("%Y-%m-%dT%H:%M:%SZ") print(start_index + " to " + end_index) stream_interval = round((end_index_unix - start_index_unix) / interval) + 1 stream_data = self.conn.Streams.getRangeValuesInterpolatedUrl( url=url + stream_id, start=start_index, end=end_index, count=stream_interval, value_class=None, ) stream_data_df = pd.DataFrame(stream_data) time_col = "Timestamp" if "Timestamp" in stream_data_df.columns else "Time" stream_data_df["Timestamp"] = ( pd.to_datetime(stream_data_df[time_col], utc=True, format="mixed") .round("min") .dt.strftime("%Y-%m-%dT%H:%M:%SZ") ) stream_data_df = stream_data_df.set_index("Timestamp") value_col = "Value" if "Value" in stream_data_df.columns else "Double64" if stream_data_df.columns.to_list().__contains__(value_col): stream_data_df = stream_data_df.loc[:, [value_col]] stream_data_df.columns = [stream_id] stream_data_df[stream_id] = stream_data_df[stream_id].astype(float) else: stream_data_df[stream_id] = None stream_data_dfs = pd.concat([stream_data_dfs, stream_data_df]) remaining = remaining - Nsize results = stream_data_dfs[stream_id].to_list() timestamps = stream_data_dfs.index.values.tolist() return results, timestamps
[docs] def get_tagnames(self, tagname_keyword): """Get tagnames from a keyword.""" streams = self.conn.Streams.getStreams( self.parameters["namespace_id"], query=tagname_keyword + "*", count=10000 ) streams_records = [s.toDictionary() for s in streams] tagnames = [] tag_desc = [] for tag in streams_records: tagnames.append(tag["Id"]) tag_desc.append(tag["Description"]) return tagnames, tag_desc
[docs] def write_data(self): """Write data to AVEVA database (not implemented).""" return