Source code for gemini_framework.database.connector.influxdb_driver

"""InfluxDB connector used as the framework's internal time-series store."""

from datetime import datetime

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS

from gemini_framework.abstract.database_driver_abstract import DatabaseDriverAbstract


[docs] class InfluxdbDriver(DatabaseDriverAbstract): """Time-series database connector for InfluxDB.""" def __init__(self): """Establish connection to InfluxDB internal database.""" super().__init__()
[docs] def connect(self): """Connect to InfluxDB.""" self.conn = InfluxDBClient( url=self.parameters["url"], org=self.parameters["org"], username=self.parameters["username"], password=self.parameters["password"], )
[docs] def read_data(self, plant_name, asset_name, tag_name, start_time, end_time, aggregate=None): """Read data from internal database.""" query_api = self.conn.query_api() start_time_unix = int(datetime.fromisoformat(start_time).timestamp() - 1) end_time_unix = int(datetime.fromisoformat(end_time).timestamp() + 1) if aggregate is None: query = ( 'from(bucket: "' + self.parameters["bucket"] + '")\ |> range(start:' + str(start_time_unix) + ",stop:" + str(end_time_unix) + ')\ |> filter(fn: (r) => r._measurement == "' + plant_name + '")\ |> filter(fn: (r) => r._field == "' + tag_name + '")\ |> filter(fn: (r) => r.asset_name == "' + asset_name + '")' ) else: query = ( 'from(bucket: "' + self.parameters["bucket"] + '")\ |> range(start:' + str(start_time_unix) + ",stop:" + str(end_time_unix) + ')\ |> filter(fn: (r) => r._measurement == "' + plant_name + '")\ |> filter(fn: (r) => r._field == "' + tag_name + '")\ |> filter(fn: (r) => r.asset_name == "' + asset_name + '")\ |> aggregateWindow(every:' + str(aggregate) + "s, fn: mean)" ) query_result = query_api.query(org=self.parameters["org"], query=query) results = [] timestamps = [] for table in query_result: for record in table.records: results.append(record.get_value()) timestamps.append(record.get_time().strftime("%Y-%m-%dT%H:%M:%SZ")) if aggregate: # fixing aggregation timestamps and values timestamps = timestamps[:-1] results = results[1:] return results, timestamps
[docs] def write_data(self, plant_name, asset_name, tag_name, time, value, write_option=SYNCHRONOUS): """Write data to internal database.""" data = [] if not isinstance(time, list): time = [time] if not isinstance(value, list): value = [value] for ii in range(0, len(value)): data.append( { "measurement": plant_name, "tags": {"asset_name": asset_name}, "fields": {tag_name: value[ii]}, "time": int(datetime.fromisoformat(time[ii]).timestamp()), } ) write_api = self.conn.write_api(write_option) write_api.write( self.parameters["bucket"], self.parameters["org"], data, write_precision="s" )
[docs] def get_last_data(self, plant_name, asset_name, tag_name): """Get last data from database.""" query_api = self.conn.query_api() query = ( 'from(bucket: "' + self.parameters["bucket"] + '")\ |> range(start: 0)\ |> filter(fn: (r) => r._measurement == "' + plant_name + '")\ |> filter(fn: (r) => r._field == "' + tag_name + '")\ |> filter(fn: (r) => r.asset_name == "' + asset_name + '")\ |> last()' ) query_result = query_api.query(org=self.parameters["org"], query=query) results = [] timestamps = [] for table in query_result: for record in table.records: results.append(record.get_value()) timestamps.append(record.get_time().strftime("%Y-%m-%dT%H:%M:%SZ")) return results, timestamps
[docs] def get_first_data(self, plant_name, asset_name, tag_name): """Get first data from database.""" query_api = self.conn.query_api() query = ( 'from(bucket: "' + self.parameters["bucket"] + '")\ |> range(start: 0)\ |> filter(fn: (r) => r._measurement == "' + plant_name + '")\ |> filter(fn: (r) => r._field == "' + tag_name + '")\ |> filter(fn: (r) => r.asset_name == "' + asset_name + '")\ |> first()' ) query_result = query_api.query(org=self.parameters["org"], query=query) results = [] timestamps = [] for table in query_result: for record in table.records: results.append(record.get_value()) timestamps.append(record.get_time().strftime("%Y-%m-%dT%H:%M:%SZ")) return results, timestamps
[docs] def delete_database(self, plant_name, start, stop): """Delete measurement from internal database.""" delete_api = self.conn.delete_api() delete_api.delete( start, stop, "_measurement=" + plant_name, bucket=self.parameters["bucket"], org=self.parameters["org"], )
[docs] def delete_database_all(self, plant_name): """Delete measurement from internal database.""" start = "1970-01-01T00:00:00Z" stop = "2100-01-01T00:00:00Z" delete_api = self.conn.delete_api() delete_api.delete( start, stop, "_measurement=" + plant_name, bucket=self.parameters["bucket"], org=self.parameters["org"], )
[docs] def get_tagnames(self, plant_name): """Get unitnames and tagnames from database.""" query_api = self.conn.query_api() query = ( 'import "influxdata/influxdb/schema"\n\n\ schema.measurementTagValues(\ bucket:"' + self.parameters["bucket"] + '", \ measurement:"' + plant_name + '",\ tag: "_field", )' ) query_result = query_api.query(org=self.parameters["org"], query=query) tagnames = [] tag_desc = [] for table in query_result: for record in table.records: tagnames.append(record.get_value()) return tagnames, tag_desc