"""OSIsoft PI connector implementation (AF interpolated values)."""
import json
from urllib.parse import urlparse
import requests
from requests.auth import HTTPBasicAuth
from gemini_framework.abstract.database_driver_abstract import DatabaseDriverAbstract
[docs]
class OsisoftPIDriver(DatabaseDriverAbstract):
"""Database connector based on OSIsoft PI Web API."""
def __init__(self):
"""Establish connection to OSISOFT database."""
self.parameters = dict()
[docs]
def update_parameters(self, parameters):
"""Update driver parameters."""
for key, value in parameters.items():
self.parameters[key] = value
[docs]
def connect(self):
"""Connect to OSIsoft PI database."""
self.security_auth = HTTPBasicAuth(self.parameters["username"], self.parameters["password"])
[docs]
def disconnect(self):
"""Disconnect from OSIsoft PI database."""
return
[docs]
def read_data(self, pi_af_tagname, start_time, end_time, interval):
"""Read data from OSIsoft PI database."""
query_result = self._get_AF_interpolated_values(
pi_af_tagname, start_time, end_time, interval
)
results = []
timestamps = []
for record in query_result:
results.append(record["Value"])
timestamps.append(record["Timestamp"])
return results, timestamps
[docs]
def write_data(self):
"""Write data to OSIsoft PI database."""
return
def _get_AF_interpolated_values(self, pi_af_tagname, start_time, end_time, interval):
request_url = "{}/attributes?path=\\\\{}".format(self.parameters["url"], pi_af_tagname)
url = urlparse(request_url)
response = requests.get(url.geturl(), auth=self.security_auth)
if response.status_code == 200:
# Deserialize the JSON Response
data = json.loads(response.text)
url = urlparse(
self.parameters["url"]
+ "/streams/"
+ data["WebId"]
+ "/recorded?startTime="
+ start_time
+ "&endtime="
+ end_time
+ "&interval="
+ interval
+ "s"
)
# Read the set of values
response = requests.get(url.geturl(), auth=self.security_auth)
return json.loads(response.text)
else:
print(response.status_code, response.reason, response.text)
return response.status_code