Source code for gemini_interface.blueprint.app_esp.routes

"""
ESP Application Routes.

======================

This module handles ESP (Electric Submersible Pump) analysis and monitoring functionality including
pump head curve calculations and ESP performance analysis.
"""

import json
import os

from celery import Celery
from flask import Blueprint, current_app, jsonify, request

from gemini_application.esp.esp import ESPApp
from gemini_framework.modules.esp.unit import ESPUnit
from gemini_interface.blueprint.celerytasks import (
    esp_app_calibrate_pumphead_data,
    esp_app_get_pumphead_data,
    esp_app_predict_failure,
)

# Create the ESP application blueprint
app_esp = Blueprint("app_esp", __name__)

# Initialize Celery for background task processing
celery = Celery(
    "gemini-celery-app",
    backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379"),
    broker=os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379"),
)

# Global application instance for ESP operations
app_instance = None


[docs] @app_esp.route("/app/esp/load_plant", methods=["POST"]) def load_plant(): """Load a plant into the ESP application instance.""" global app_instance app_instance = ESPApp() project_name = request.json["field_name"] project_folder_path = current_app.config["GEMINI_PROJECT_FOLDER"] app_instance.load_plant(project_folder_path, project_name) return "OK"
[docs] @app_esp.route("/app/esp/get_esp_list", methods=["POST"]) def get_esp_list(): """Get list of ESP units in the loaded plant.""" esp_unit_list = [] for unit in app_instance.plant.units: if isinstance(unit, ESPUnit): esp_unit_list.append(unit.name) return jsonify(esp_unit_list)
[docs] @app_esp.route("/app/esp/calculate_pump_curve", methods=["POST"]) def calculate_pump_curve(): """Calculate pump head curves for ESP analysis.""" inputs = {"parameters": request.json["parameters"], "boundary": request.json["boundary"]} project_folder_path = app_instance.plant.project_path project_name = app_instance.plant.name esp_name = app_instance.unit.name task = esp_app_get_pumphead_data.delay(project_folder_path, project_name, esp_name, inputs) task_id = str(task.id) return task_id
[docs] @app_esp.route("/app/esp/get_results_pump_curve", methods=["POST"]) def get_results_pump_curve(): """Get results from the pump curve calculation task.""" task_id = request.json["task_id"] task_result = celery.AsyncResult(task_id) try: result_data = task_result.result if isinstance(result_data, Exception): result_data = str(result_data) except Exception as e: result_data = f"Error reading result: {e}" result = {"task_id": task_id, "task_status": task_result.status, "task_result": result_data} return jsonify(result)
[docs] @app_esp.route("/app/esp/get_esp_parameters", methods=["POST"]) def get_esp_parameters(): """Get parameters for the selected ESP unit.""" esp_name = request.json["esp_name"] app_instance.select_unit(esp_name) esp_par = app_instance.unit.parameters return {"esp_par": esp_par}
[docs] @app_esp.route("/app/esp/calibrate_esp_factor", methods=["POST"]) def calibrate_esp_factor(): """Calibrate ESP correction factors.""" inputs = {"parameters": request.json["parameters"], "boundary": request.json["boundary"]} project_folder_path = app_instance.plant.project_path project_name = app_instance.plant.name esp_name = app_instance.unit.name task = esp_app_calibrate_pumphead_data.delay( project_folder_path, project_name, esp_name, inputs ) return str(task.id)
[docs] @app_esp.route("/app/esp/get_results_calibration_factor", methods=["POST"]) def get_results_calibration_factor(): """Get results from the ESP calibration factor task.""" task_id = request.json["task_id"] task_result = celery.AsyncResult(task_id) try: result_data = task_result.result if isinstance(result_data, Exception): result_data = str(result_data) except Exception as e: result_data = f"Error reading result: {e}" result = {"task_id": task_id, "task_status": task_result.status, "task_result": result_data} return jsonify(result)
[docs] @app_esp.route("/esp/update_correction_factors", methods=["POST"]) def update_correction_factors(): """Update ESP correction factors in the project parameters.""" try: field_name = request.json["field_name"] component_name = request.json["esp_name"] esp_correction_factor = request.json["esp_correction_factor"] if not isinstance(esp_correction_factor, str) or ";" not in esp_correction_factor: return jsonify({"message": "Invalid input"}), 400 project_folder_path = os.path.join(current_app.config["GEMINI_PROJECT_FOLDER"], field_name) param_path = os.path.join(project_folder_path, f"{component_name}.param") with open(param_path, "r") as jsonfile: component_content = json.load(jsonfile) if "parameters" in component_content and "property" in component_content["parameters"]: component_content["parameters"]["property"]["esp_correction_factor"] = [ esp_correction_factor ] else: return jsonify({"message": "Invalid input"}), 500 with open(param_path, "w") as jsonfile: json.dump(component_content, jsonfile, indent=4, sort_keys=True) return jsonify({"message": "Correction factor updated successfully."}) except Exception as e: return jsonify({"message": f"Internal server error: {str(e)}"}), 500
[docs] @app_esp.route("/app/esp/predict_failure", methods=["POST"]) def predict_failure(): """Predict ESP failures using ML model.""" inputs = {"parameters": request.json["parameters"], "boundary": request.json["boundary"]} project_folder_path = app_instance.plant.project_path project_name = app_instance.plant.name esp_name = app_instance.unit.name task = esp_app_predict_failure.delay(project_folder_path, project_name, esp_name, inputs) return str(task.id)
[docs] @app_esp.route("/app/esp/get_results_failure_prediction", methods=["POST"]) def get_results_failure_prediction(): """Get results from the failure prediction task.""" try: task_id = request.json["task_id"] task_result = celery.AsyncResult(task_id) try: result_data = task_result.result if isinstance(result_data, Exception): result_data = str(result_data) except Exception as e: result_data = f"Error reading result: {e}" result = {"task_id": task_id, "task_status": task_result.status, "task_result": result_data} return jsonify(result) except Exception as e: return jsonify({"error": f"Error processing request: {str(e)}"}), 500