Source code for gemini_application.esp.esp

"""ESP application for pump performance analysis and pump curve generation."""

from datetime import datetime, timezone
from pathlib import Path

import joblib
import matplotlib.dates as mdates
import numpy as np
import pandas as pd
import pytz
from matplotlib import pyplot as plt
from sklearn.preprocessing import StandardScaler

from gemini_application.application_abstract import ApplicationAbstract
from gemini_model.pump.esp import ESP

tzobject = pytz.timezone("Europe/Amsterdam")


[docs] class ESPApp(ApplicationAbstract): """Class for application ESP calculation.""" def __init__(self): """Initialize ESP application.""" super().__init__() self.ESP = ESP()
[docs] def init_parameters(self): """Initialize model parameters.""" esp_unit = self.unit esp_param = dict() esp_param["no_stages"] = esp_unit.parameters["property"]["esp_no_stage"][0] esp_param["pump_name"] = esp_unit.parameters["property"]["esp_type"][0] esp_param["head_coeff"] = np.asarray( esp_unit.parameters["property"]["esp_head_coeff"][0].split(";"), dtype=np.float32 ) esp_param["power_coeff"] = np.asarray( esp_unit.parameters["property"]["esp_power_coeff"][0].split(";"), dtype=np.float32 ) esp_param["min_flow"] = esp_unit.parameters["property"]["esp_min_flow"][0] esp_param["max_flow"] = esp_unit.parameters["property"]["esp_max_flow"][0] esp_param["bep_flow"] = esp_unit.parameters["property"]["esp_bep_flow"][0] self.ESP.update_parameters(esp_param) self.outputs["esp_correction_factor"] = esp_unit.parameters["property"][ "esp_correction_factor" ][0]
[docs] def get_data(self): """Get ESP data.""" start_time = datetime.strptime(self.inputs["start_time"], "%Y-%m-%d %H:%M:%S") start_time = tzobject.localize(start_time) start_time = start_time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") end_time = datetime.strptime(self.inputs["end_time"], "%Y-%m-%d %H:%M:%S") end_time = tzobject.localize(end_time) end_time = end_time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") timestep = 3600 # hardcoded 1 hour since flowrate is in m3/h database = self.plant.database result, time = database.read_internal_database( self.unit.plant.name, self.unit.name, "esp_flow.measured", start_time, end_time, timestep, ) self.outputs["flow_measured"] = np.array(result) # m3/hr self.outputs["time"] = np.array(time) result, time = database.read_internal_database( self.unit.plant.name, self.unit.name, "esp_frequency.measured", start_time, end_time, timestep, ) self.outputs["frequency_measured"] = np.array(result) # Hz result, time = database.read_internal_database( self.unit.plant.name, self.unit.name, "esp_inlet_pressure.measured", start_time, end_time, timestep, ) self.outputs["inlet_pressure_measured"] = np.array(result) # bar result, time = database.read_internal_database( self.unit.plant.name, self.unit.name, "esp_vlp_head.calculated", start_time, end_time, timestep, ) self.outputs["esp_vlp_head_calculated"] = np.array(result) result, time = database.read_internal_database( self.unit.plant.name, self.unit.name, "esp_theoretical_head.calculated", start_time, end_time, timestep, ) self.outputs["esp_theoretical_head_calculated"] = np.array(result) # bar result, time = database.read_internal_database( self.unit.plant.name, self.unit.name, "esp_vlp_outlet_pressure.calculated", start_time, end_time, timestep, ) self.outputs["esp_vlp_outlet_pressure_calculated"] = np.array(result) # bar result, time = database.read_internal_database( self.unit.plant.name, self.unit.name, "esp_vlp_ipr_inlet_pressure.calculated", start_time, end_time, timestep, ) self.outputs["esp_vlp_ipr_inlet_pressure_calculated"] = np.array(result) # bar result, time = database.read_internal_database( self.unit.plant.name, self.unit.name, "esp_theoretical_outlet_pressure.calculated", start_time, end_time, timestep, ) self.outputs["esp_theoretical_outlet_pressure_calculated"] = np.array(result) # bar
[docs] def calibrate_esp_head_simple(self): """Calibrate ESP head using simple method.""" start_time = datetime.strptime(self.inputs["calibration_start_time"], "%Y-%m-%d %H:%M:%S") start_time = tzobject.localize(start_time) start_time = start_time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") end_time = datetime.strptime(self.inputs["calibration_end_time"], "%Y-%m-%d %H:%M:%S") end_time = tzobject.localize(end_time) end_time = end_time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") timestep = 3600 # hardcoded 1 hour since flowrate is in m3/h database = self.plant.database esp_vlp_head, _ = database.read_internal_database( self.unit.plant.name, self.unit.name, "esp_vlp_head.calculated", start_time, end_time, timestep, ) esp_theoretical_head, _ = database.read_internal_database( self.unit.plant.name, self.unit.name, "esp_theoretical_head.calculated", start_time, end_time, timestep, ) x = [] y = [] for xi, yi in zip(esp_vlp_head, esp_theoretical_head): if xi is not None and yi is not None: x.append(xi) y.append(yi) x = np.array(x) y = np.array(y) if len(x) < 2: raise ValueError("Not enough valid data points for calibration.") # Linear regression: y = ax + b x_mean = np.mean(x) y_mean = np.mean(y) a = np.sum((x - x_mean) * (y - y_mean)) / np.sum((x - x_mean) ** 2) b = y_mean - a * x_mean self.outputs["esp_correction_factor"] = f"{a};{b}"
[docs] def calculate(self): """Calculate IPR, VLP, ESP.""" xValues_arrays = [] min_flow_array = [] max_flow_array = [] frequency_array = np.arange( self.inputs["min_frequency"], self.inputs["max_frequency"] + 10, 10 ) min_flow = self.ESP.parameters["min_flow"] max_flow = self.ESP.parameters["max_flow"] for freq in frequency_array: Xmin = 0 Xmax = max_flow * 2 min_flow_freq = min_flow * freq / 60 min_flow_array.append(min_flow_freq) max_flow_freq = max_flow * freq / 60 max_flow_array.append(max_flow_freq) # Flow rates scaled by frequency x_array = np.arange(Xmin * freq / 60, Xmax * freq / 60, 10 * freq / 60) xValues_arrays.append(x_array) self.outputs["xValues"] = xValues_arrays self.outputs["frequency"] = frequency_array self.outputs["min_flow_array"] = min_flow_array self.outputs["max_flow_array"] = max_flow_array self.get_data() self.calculate_pump_head_curve()
[docs] def calculate_pump_head_curve(self): """Calculate ESP output.""" try: pump_head_all = [] pump_power_all = [] pump_eff_all = [] head_min_flow_freq = [] head_max_flow_freq = [] min_flow = self.ESP.parameters["min_flow"] max_flow = self.ESP.parameters["max_flow"] for freq_idx, freq in enumerate(self.outputs["frequency"]): u = dict() pump_head_freq = [] pump_power_freq = [] pump_eff_freq = [] xValues_array = self.outputs["xValues"][freq_idx] for flow in xValues_array: u["pump_freq"] = freq u["pump_flow"] = flow * 60 / freq / 3600 # Convert flow to m^3/s x = [] self.ESP.calculate_output(u, x) # ASSERT y = self.ESP.get_output() pump_head_freq.append(y["pump_head"] / 1e5) # Convert to bar pump_power_freq.append(y["pump_power"]) pump_eff_freq.append(y["pump_eff"]) pump_head_all.append(pump_head_freq) pump_power_all.append(pump_power_freq) pump_eff_all.append(pump_eff_freq) # Min flow u = dict() u["pump_freq"] = freq u["pump_flow"] = min_flow / 3600 # Convert flow to m^3/s x = [] self.ESP.calculate_output(u, x) # ASSERT y = self.ESP.get_output() head_min_flow = y["pump_head"] / 1e5 head_min_flow_freq.append(head_min_flow) # Max flow u = dict() u["pump_freq"] = freq u["pump_flow"] = max_flow / 3600 # Convert flow to m^3/s x = [] self.ESP.calculate_output(u, x) # ASSERT y = self.ESP.get_output() head_max_flow = y["pump_head"] / 1e5 head_max_flow_freq.append(head_max_flow) except Exception as e: print("ERROR:" + repr(e)) pump_head_all = None pump_power_all = None pump_eff_all = None self.outputs["pump_head"] = pump_head_all self.outputs["pump_power"] = pump_power_all self.outputs["pump_eff"] = pump_eff_all self.outputs["pump_head_min_flow"] = head_min_flow_freq self.outputs["pump_head_max_flow"] = head_max_flow_freq
[docs] def get_sensor_data_failure_prediction(self): """Get all sensor data required for failure prediction from database.""" start_time = datetime.strptime(self.inputs["start_time"], "%Y-%m-%d %H:%M:%S") start_time = tzobject.localize(start_time) start_time = start_time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") end_time = datetime.strptime(self.inputs["end_time"], "%Y-%m-%d %H:%M:%S") end_time = tzobject.localize(end_time) end_time = end_time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") timestep = 3600 # 1 hour database = self.plant.database unit_name = self.unit.name plant_name = self.unit.plant.name well_unit = None if self.unit.from_units: well_unit = self.unit.from_units[0] sensor_data = {} time_array = None try: result, time_array = database.read_internal_database( plant_name, unit_name, "esp_flow.measured", start_time, end_time, timestep ) sensor_data["Brine flow rate"] = result except Exception as e: print(f"Warning: Could not read esp_flow.measured: {e}") start_dt = datetime.strptime(start_time, "%Y-%m-%dT%H:%M:%SZ") end_dt = datetime.strptime(end_time, "%Y-%m-%dT%H:%M:%SZ") time_array = pd.date_range(start_dt, end_dt, freq=f"{timestep}S") data_length = len(time_array) tags_mapping = { "intake pressure ESP": ("esp_inlet_pressure", "measured"), "discharge pressure ESP": ("esp_outlet_pressure", "measured"), "ESP Motor temperature": ("esp_motor_temperature", "measured"), "Vibration (Vx)": ("esp_vibration_x", "measured"), "Vibration (Vy, Vz)": ("esp_vibration_y", "measured"), } for col_name, (tag_base, tag_type) in tags_mapping.items(): tag = f"{tag_base}.{tag_type}" try: result, _ = database.read_internal_database( plant_name, unit_name, tag, start_time, end_time, timestep ) sensor_data[col_name] = result except Exception as e: print(f"Warning: Could not read {tag}: {e}") sensor_data[col_name] = [None] * data_length if well_unit: try: result, _ = database.read_internal_database( plant_name, well_unit.name, "productionwell_wellhead_pressure.measured", start_time, end_time, timestep, ) sensor_data["wellhead pressure"] = result except Exception as e: print(f"Warning: Could not read wellhead pressure: {e}") sensor_data["wellhead pressure"] = [None] * data_length try: result, _ = database.read_internal_database( plant_name, well_unit.name, "productionwell_wellhead_temperature.measured", start_time, end_time, timestep, ) sensor_data["wellhead temperature"] = result except Exception as e: print(f"Warning: Could not read wellhead temperature: {e}") sensor_data["wellhead temperature"] = [None] * data_length else: sensor_data["wellhead pressure"] = [None] * data_length sensor_data["wellhead temperature"] = [None] * data_length df = pd.DataFrame(sensor_data, index=pd.to_datetime(time_array)) return df
[docs] def filter_sensor_data(self, df): """Filter sensor data to remove unphysical values. This follows the same thresholds as used in the training pipeline. """ df.loc[df["Brine flow rate"] > 450, "Brine flow rate"] = np.nan df.loc[df["Brine flow rate"] < 0, "Brine flow rate"] = np.nan df.loc[df["wellhead pressure"] > 40, "wellhead pressure"] = np.nan df.loc[df["wellhead pressure"] < 0, "wellhead pressure"] = np.nan df.loc[df["wellhead temperature"] > 120, "wellhead temperature"] = np.nan df.loc[df["wellhead temperature"] < -5, "wellhead temperature"] = np.nan df.loc[df["intake pressure ESP"] > 150, "intake pressure ESP"] = np.nan df.loc[df["intake pressure ESP"] < 0, "intake pressure ESP"] = np.nan df.loc[df["discharge pressure ESP"] > 150, "discharge pressure ESP"] = np.nan df.loc[df["discharge pressure ESP"] < 0, "discharge pressure ESP"] = np.nan df.loc[df["ESP Motor temperature"] > 200, "ESP Motor temperature"] = np.nan df.loc[df["ESP Motor temperature"] < 20, "ESP Motor temperature"] = np.nan df.loc[df["Vibration (Vx)"] > 3, "Vibration (Vx)"] = np.nan df.loc[df["Vibration (Vx)"] < 0, "Vibration (Vx)"] = np.nan df.loc[df["Vibration (Vy, Vz)"] > 3, "Vibration (Vy, Vz)"] = np.nan df.loc[df["Vibration (Vy, Vz)"] < 0, "Vibration (Vy, Vz)"] = np.nan
[docs] def predict_failures(self, model_path=None, prediction_mode="forward"): """Predict failures using ML model based on sensor data. This function follows the exact preprocessing pipeline from training the model: 1. Filter unphysical values 2. Calculate Vibration from components 3. Apply rolling window average (96 points = 4 days for 1-hour data) 4. Extract 7 relevant features 5. Scale features using StandardScaler 6. Make predictions using HistGradientBoostingClassifier Args: model_path: Path to the ML model pickle file. prediction_mode: Mode of prediction: - "forward": Show 60-day window (30 days before + 30 days after selected time) - "historical": Analyze entire historical period """ if model_path is None: current_dir = Path(__file__).resolve().parent model_path = current_dir / "ml_model" / "SavedModel_TestTrainVal_30.pkl" original_start_time = None original_end_time = None if prediction_mode == "forward": selected_time_str = self.inputs.get("selected_time", None) if selected_time_str: try: selected_time = datetime.strptime(selected_time_str, "%Y-%m-%d %H:%M:%S") selected_time = tzobject.localize(selected_time) selected_time = pd.to_datetime(selected_time) window_start = selected_time - pd.Timedelta(days=30) window_end = selected_time + pd.Timedelta(days=30) original_start_time = self.inputs.get("start_time", None) original_end_time = self.inputs.get("end_time", None) self.inputs["start_time"] = window_start.strftime("%Y-%m-%d %H:%M:%S") self.inputs["end_time"] = window_end.strftime("%Y-%m-%d %H:%M:%S") msg = ( f"Forward mode: Data window set to {self.inputs['start_time']} -> " f"{self.inputs['end_time']} (60-day window around " f"selected_time {selected_time_str})" ) print(msg) except Exception as e: print(f"Warning: Could not apply selected_time window for forward mode: {e}") # Step 1: Get sensor data from database try: df = self.get_sensor_data_failure_prediction() if original_end_time is not None: self.inputs["end_time"] = original_end_time if original_start_time is not None: self.inputs["start_time"] = original_start_time except Exception as e: if original_end_time is not None: self.inputs["end_time"] = original_end_time if original_start_time is not None: self.inputs["start_time"] = original_start_time print(f"Error getting sensor data: {e}") self.outputs["failure_predictions"] = None self.outputs["failure_times"] = None return if df.empty: print("No sensor data available for failure prediction") self.outputs["failure_predictions"] = None self.outputs["failure_times"] = None return # Step 2: Filter unphysical values self.filter_sensor_data(df) # Step 3: Calculate Vibration (Pythagorean theorem) df["Vibration"] = np.sqrt(df["Vibration (Vx)"] ** 2 + df["Vibration (Vy, Vz)"] ** 2) # Step 4: Average noisy sensor data (rolling window = 96 points) # For 1-hour data: 96 points = 96 hours = 4 days rolling_window = 96 for col in df.columns: df[col] = df[col].rolling(window=rolling_window).mean() self._sensor_df_for_plot = df.copy() # Step 5: Specify relevant features (exact order as training) relevant = [ "Vibration", "ESP Motor temperature", "wellhead temperature", "wellhead pressure", "Brine flow rate", "intake pressure ESP", "discharge pressure ESP", ] missing = [f for f in relevant if f not in df.columns] if missing: print(f"Warning: Missing required features: {missing}") for f in missing: df[f] = np.nan features_df = df[relevant].copy() num_features = features_df.count(axis=1) # Step 6: Scale features using StandardScaler scaler = None model = None scaler_path = model_path.parent / f"{model_path.stem}_scaler.pkl" if scaler_path.exists(): try: scaler = joblib.load(scaler_path) print(f"Loaded scaler from training: {scaler_path}") except Exception as e: print(f"Warning: Could not load scaler from {scaler_path}: {e}") if scaler is None: try: model_data = joblib.load(model_path) if isinstance(model_data, dict) and "scaler" in model_data: scaler = model_data["scaler"] model = model_data.get("model", None) except Exception: pass if scaler is not None: features_scaled = pd.DataFrame( scaler.transform(features_df), index=features_df.index, columns=features_df.columns ) else: scaler = StandardScaler() features_scaled = pd.DataFrame( scaler.fit_transform(features_df), index=features_df.index, columns=features_df.columns, ) # Step 7: Load model and make predictions try: if model is None: model = joblib.load(model_path) if isinstance(model, dict) and "model" in model: model = model["model"] except Exception as e: print(f"Error loading ML model: {e}") self.outputs["failure_predictions"] = None self.outputs["failure_times"] = None return try: predictions = model.predict_proba(features_scaled)[:, 1] if len(features_scaled.index) > 1: idx_diff = features_scaled.index[1] - features_scaled.index[0] time_diff = idx_diff.total_seconds() / 3600 # hours if time_diff <= 0.5: # 15-minute or less data smoothing_window = 480 # 120 hours (5 days) for 15-minute data else: # 1-hour data smoothing_window = 120 # 120 hours (5 days) for 1-hour data else: smoothing_window = 120 # default (5 days) predictions_series = pd.Series(predictions, index=features_scaled.index) predictions_smoothed = predictions_series.rolling( window=smoothing_window, min_periods=1 ).mean() predictions = predictions_smoothed.values prediction_times = features_scaled.index if prediction_mode == "forward": selected_time = self.inputs.get("selected_time", None) if selected_time is None: selected_time = self.inputs.get("end_time", None) if selected_time: try: current_time = datetime.strptime(selected_time, "%Y-%m-%d %H:%M:%S") current_time = tzobject.localize(current_time) current_time = pd.to_datetime(current_time) self.outputs["selected_time"] = current_time except Exception as e: print(f"Warning: Could not parse selected_time for forward mode: {e}") num_features_values = num_features.values self.outputs["failure_predictions"] = predictions self.outputs["failure_times"] = prediction_times self.outputs["failure_probability"] = predictions self.outputs["num_features"] = num_features_values self.outputs["prediction_mode"] = prediction_mode except Exception as e: print(f"Error making predictions: {e}") self.outputs["failure_predictions"] = None self.outputs["failure_times"] = None
[docs] def plot(self): """Plot pump curves and the measured data.""" bar_to_meters = 10.1972 # 1 bar ≈ 10.1972 meters of water head def format_func(value, ti): date = pd.to_datetime(value) return date.strftime("%d %B %Y") time_measured = pd.to_datetime(self.outputs["time"]) flow_measured = self.outputs["flow_measured"] esp_vlp_head_calculated = self.outputs["esp_vlp_head_calculated"] time_x = flow_measured pump_head_all = self.outputs["pump_head"] xValues = self.outputs["xValues"] frequency_array = self.outputs["frequency"] # Head calibration a, b = [float(x) for x in self.outputs["esp_correction_factor"].split(";")] head_corrected = np.array( [a * x + b if x is not None else None for x in esp_vlp_head_calculated] ) head_corrected_m = head_corrected * bar_to_meters plt.figure() # Plot measured data with a color map representing time scatter = plt.scatter(time_x, head_corrected_m, c=time_measured.astype("int64"), cmap="jet") colorbar = plt.colorbar(scatter) colorbar.ax.yaxis.set_major_formatter(plt.FuncFormatter(format_func)) # Iterate over each frequency and corresponding flow array for i, freq in enumerate(frequency_array): pump_head_meters = np.array(pump_head_all[i]) * bar_to_meters plt.plot(xValues[i], pump_head_meters, label=f"Frequency: {freq} Hz") head_min_flow_freq = np.array(self.outputs["pump_head_min_flow"]) * bar_to_meters head_max_flow_freq = np.array(self.outputs["pump_head_max_flow"]) * bar_to_meters min_flow_array = self.outputs["min_flow_array"] max_flow_array = self.outputs["max_flow_array"] plt.plot(min_flow_array, head_min_flow_freq, label="Min optimum rate", linestyle="--") plt.plot(max_flow_array, head_max_flow_freq, label="Max optimum rate", linestyle="--") plt.xlabel("Flow Rate (m³/h)") plt.ylabel("Pump Head (m)") plt.title("Pump Head vs Flow Rate") plt.legend() plt.xlim(left=0) plt.ylim(bottom=0) plt.xticks(ticks=range(0, int(max(xValues[i])) + 20, 20)) plt.yticks(ticks=range(0, int(max(pump_head_meters)) + 100, 100)) plt.grid(True) plt.show()
[docs] def plot_features(self): """Plot key ESP sensor features over time.""" time_measured = pd.to_datetime(self.outputs["time"]) flow_measured = np.array(self.outputs["flow_measured"]) frequency_measured = np.array(self.outputs["frequency_measured"]) motor_temp_data = None motor_temp_times = None if hasattr(self, "_sensor_df_for_plot") and self._sensor_df_for_plot is not None: df_plot = self._sensor_df_for_plot if "ESP Motor temperature" in df_plot.columns: motor_temp_series = df_plot["ESP Motor temperature"] valid_mask = ~motor_temp_series.isna() if valid_mask.any(): motor_temp_data = motor_temp_series[valid_mask].values motor_temp_times = motor_temp_series[valid_mask].index flow_valid_mask = ( (flow_measured is not None) & ~np.isnan(flow_measured) & (flow_measured > 0) ) flow_valid = flow_measured[flow_valid_mask] flow_times_valid = time_measured[flow_valid_mask] freq_valid_mask = (frequency_measured is not None) & ~np.isnan(frequency_measured) freq_valid = frequency_measured[freq_valid_mask] freq_times_valid = time_measured[freq_valid_mask] end_time_str = self.inputs.get("end_time", None) if end_time_str: current_time = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S") current_time = tzobject.localize(current_time) current_time = pd.to_datetime(current_time) else: current_time = time_measured[-1] if len(time_measured) > 0 else None fig, ax1 = plt.subplots(figsize=(16, 6)) ax1.grid(True, which="major", linestyle=":", alpha=0.4) ax1.set_axisbelow(True) ax1.xaxis.set_major_locator(mdates.AutoDateLocator()) ax1.xaxis.set_major_formatter(mdates.ConciseDateFormatter(ax1.xaxis.get_major_locator())) if motor_temp_data is not None and len(motor_temp_data) > 0: ax1.scatter( motor_temp_times, motor_temp_data, color="#2E8B57", s=15, alpha=0.6, edgecolor="none", label="Motor Temperature", zorder=3, ) ylabel = "Motor Temperature (°C)" ax1.set_ylabel(ylabel, fontsize=12, fontweight="bold", color="#2E8B57") ax1.set_ylim([20, 200]) ax1.tick_params(axis="y", labelcolor="#2E8B57") if len(freq_valid) > 0: ax2 = ax1.twinx() ax2.scatter( freq_times_valid, freq_valid, color="#9370DB", s=15, alpha=0.6, edgecolor="none", label="Frequency", zorder=3, ) ax2.set_ylabel("Frequency (Hz)", fontsize=12, fontweight="bold", color="#9370DB") ax2.set_ylim([0, 70]) ax2.tick_params(axis="y", labelcolor="#9370DB") ax2.spines["right"].set_position(("outward", 60)) if len(flow_valid) > 0: ax3 = ax1.twinx() ax3.scatter( flow_times_valid, flow_valid, color="#FF4FA3", s=12, alpha=0.6, edgecolor="none", label="Flowrate", zorder=3, ) ax3.set_ylabel("Flowrate (m³/h)", fontsize=12, fontweight="bold", color="#FF4FA3") ax3.set_ylim([0, 450]) ax3.tick_params(axis="y", labelcolor="#FF4FA3") ax3.spines["right"].set_position(("outward", 120)) if current_time: ax1.axvline( x=current_time, color="black", linestyle="-", linewidth=2, alpha=0.7, zorder=4, label="Current Time", ) handles, labels = ax1.get_legend_handles_labels() if len(freq_valid) > 0: handles2, labels2 = ax2.get_legend_handles_labels() handles.extend(handles2) labels.extend(labels2) if len(flow_valid) > 0: handles3, labels3 = ax3.get_legend_handles_labels() handles.extend(handles3) labels.extend(labels3) ax1.set_xlabel("Time", fontsize=12) title = "ESP Sensor Features - Motor Temperature, Frequency, and Flow Rate" ax1.set_title(title, fontsize=13, fontweight="bold", pad=15) fig.tight_layout() plt.show()
[docs] def plot_failure_prediction(self): """Plot failure prediction probability over time. The plot shows: - Probability of failure occurring within the next 30 days from each timestamp - Horizontal bands for risk zones (High/Moderate/Low) - A dashed vertical line at the selected time in forward mode Modes: - "forward": x-axis is a 60-day window (30 days before and 30 days after the selected time), with a dashed line at the selected time. - "historical": shows the full available period. """ failure_preds = self.outputs.get("failure_predictions") if "failure_predictions" not in self.outputs or failure_preds is None: print("No failure predictions available. Run predict_failures() first.") return failure_predictions = self.outputs["failure_predictions"] failure_times = self.outputs["failure_times"] if isinstance(failure_times, np.ndarray): failure_times = pd.to_datetime(failure_times) elif not isinstance(failure_times, pd.DatetimeIndex): failure_times = pd.to_datetime(failure_times) prediction_mode = self.outputs.get("prediction_mode", "forward") current_time = None if prediction_mode == "forward": current_time = self.outputs.get("selected_time", None) if current_time is None: end_time_str = self.inputs.get("end_time", None) if end_time_str: try: current_time = datetime.strptime(end_time_str, "%Y-%m-%d %H:%M:%S") current_time = tzobject.localize(current_time) current_time = pd.to_datetime(current_time) except Exception: current_time = failure_times[-1] if len(failure_times) > 0 else None else: current_time = failure_times[-1] if len(failure_times) > 0 else None fig, ax = plt.subplots(figsize=(16, 6)) ax.grid(True, which="major", linestyle=":", alpha=0.4) ax.set_axisbelow(True) ax.xaxis.set_major_locator(mdates.AutoDateLocator()) ax.xaxis.set_major_formatter(mdates.ConciseDateFormatter(ax.xaxis.get_major_locator())) high_risk_threshold = 0.7 moderate_risk_threshold = 0.3 x_min = failure_times.min() if len(failure_times) > 0 else pd.Timestamp.now() x_max = failure_times.max() if len(failure_times) > 0 else pd.Timestamp.now() ax.axhspan( high_risk_threshold, 1.0, xmin=0, xmax=1, color="red", alpha=0.15, zorder=1, edgecolor="none", ) ax.axhspan( moderate_risk_threshold, high_risk_threshold, xmin=0, xmax=1, color="orange", alpha=0.1, zorder=1, edgecolor="none", ) ax.axhspan( 0.0, moderate_risk_threshold, xmin=0, xmax=1, color="yellow", alpha=0.08, zorder=1, edgecolor="none", ) ax.axhline( y=high_risk_threshold, color="darkred", linestyle="--", linewidth=1.5, alpha=0.5, zorder=2, label=None, ) ax.axhline( y=moderate_risk_threshold, color="darkorange", linestyle="--", linewidth=1.5, alpha=0.5, zorder=2, label=None, ) ax.plot( failure_times, failure_predictions, color="#0047AB", linewidth=1.5, alpha=0.7, label="Failure Probability Prediction", zorder=3, ) ax.scatter( failure_times, failure_predictions, color="#0047AB", s=15, alpha=0.8, edgecolors="none", zorder=4, ) if current_time and prediction_mode == "forward": ax.axvline( x=current_time, color="black", linestyle="--", linewidth=2, alpha=0.9, zorder=6 ) ylabel = "Failure Probability\n(within next 30 days from each timestamp)" ax.set_ylabel(ylabel, fontsize=12, fontweight="bold") ax.set_ylim([0, 1]) ax.set_xlabel("Time", fontsize=12) if prediction_mode == "forward" and current_time is not None and len(failure_times) > 0: x_min = current_time - pd.Timedelta(days=30) x_max = current_time + pd.Timedelta(days=30) if x_min < failure_times.min(): x_min = failure_times.min() if x_max > failure_times.max(): x_max = failure_times.max() ax.set_xlim([x_min, x_max]) if prediction_mode == "forward": mode_title = ( "Forward Prediction Mode (60-day window: " "30 days before + 30 days after selected time)" ) else: mode_title = "Historical Analysis Mode" risk_desc = "Risk zones: High >70%, Moderate 30-70%, Low <30%" ax.set_title( f"ESP Failure Prediction ({mode_title})\n{risk_desc}", fontsize=13, fontweight="bold", pad=15, ) fig.tight_layout() plt.show()
[docs] def plot_comparison(self): """Plot the measured and calculated tags.""" time_measured = pd.to_datetime(self.outputs["time"]) flow_measured = self.outputs["flow_measured"] frequency_measured = self.outputs["frequency_measured"] esp_vlp_head_calculated = self.outputs["esp_vlp_head_calculated"] esp_theoretical_head_calculated = self.outputs["esp_theoretical_head_calculated"] esp_vlp_outlet_pressure_calculated = self.outputs["esp_vlp_outlet_pressure_calculated"] esp_vlp_ipr_inlet_pressure_calculated = self.outputs[ "esp_vlp_ipr_inlet_pressure_calculated" ] intake_pressure_measured = self.outputs["inlet_pressure_measured"] esp_theoretical_outlet_pressure_calculated = self.outputs[ "esp_theoretical_outlet_pressure_calculated" ] # Head calibration a, b = [float(x) for x in self.outputs["esp_correction_factor"][0].split(";")] head_corrected = np.array( [a * x + b if x is not None else None for x in esp_vlp_head_calculated] ) fig, axes = plt.subplots(4, 2, figsize=(12, 12)) fig.tight_layout(pad=4.0) axes[0, 0].plot(time_measured, flow_measured, label="Flow") axes[0, 0].legend() axes[0, 0].grid(True) axes[0, 0].set_title("Flow vs Time") axes[0, 1].plot(time_measured, frequency_measured, label="Frequency") axes[0, 1].legend() axes[0, 1].grid(True) axes[0, 1].set_title("Frequency vs Time") axes[1, 0].plot(time_measured, esp_vlp_head_calculated, label="Calculated Head via VLP") axes[1, 0].plot(time_measured, esp_theoretical_head_calculated, label="Theoretical Head") axes[1, 0].plot( time_measured, head_corrected, label=f"Calibrated VLP Head ({a:.2f}*x + {b:.2f})", linestyle="--", ) axes[1, 0].legend() axes[1, 0].grid(True) axes[1, 0].set_title("Head vs Time") axes[1, 1].plot( time_measured, esp_vlp_head_calculated - esp_theoretical_head_calculated, label="differences", ) axes[1, 1].legend() axes[1, 1].grid(True) axes[1, 1].set_title("Calculated head via VLP - Theoretical head vs Time") axes[2, 0].plot( time_measured, esp_vlp_outlet_pressure_calculated, label="Calculated Outlet Pressure via VLP", ) axes[2, 0].plot( time_measured, esp_theoretical_outlet_pressure_calculated, label="Theoretical Outlet Pressure", ) axes[2, 0].legend() axes[2, 0].grid(True) axes[2, 0].set_title("Outlet Pressure vs Time") axes[2, 1].plot( time_measured, esp_vlp_outlet_pressure_calculated - esp_theoretical_outlet_pressure_calculated, label="differences", ) axes[2, 1].legend() axes[2, 1].grid(True) axes[2, 1].set_title( "Calculated Outlet Pressure via VLP - Theoretical Outlet Pressure vs Time" ) axes[3, 0].plot(time_measured, intake_pressure_measured, label="Measured Inlet Pressure") axes[3, 0].plot( time_measured, esp_vlp_ipr_inlet_pressure_calculated, label="Calculated Inlet Pressure via VLP-IPR", ) axes[3, 0].legend() axes[3, 0].grid(True) axes[3, 0].set_title("Inlet Pressure vs Time") axes[3, 1].plot( time_measured, intake_pressure_measured - esp_vlp_ipr_inlet_pressure_calculated, label="differences", ) axes[3, 1].legend() axes[3, 1].grid(True) axes[3, 1].set_title( "Measured Inlet Pressure - Calculated Inlet Pressure via VLP-IPR vs Time" ) plt.show()