Source code for gemini_interface.blueprint.app_well_schematics.routes

"""
Well Schematics Application Routes.

==================================

This module handles well schematic generation and CO2 corrosion analysis including caliper log
processing and corrosion pattern analysis.
"""

import glob
import json
import os

import requests
from celery import Celery
from flask import Blueprint, current_app, jsonify, request

# TODO: Change to well integrity application
from gemini_application.wims.co2_corrosion import CO2CorrosionApplication
from gemini_framework.modules.injectionwell.unit import InjectionWellUnit
from gemini_framework.modules.productionwell.unit import ProductionWellUnit

# Create the well schematics application blueprint
app_well_schematics = Blueprint("app_well_schematics", __name__)

# Initialize Celery for background task processing
celery = Celery(
    "gemini-celery-app",
    backend=os.environ.get("CELERY_RESULT_BACKEND", "redis://localhost:6379"),
    broker=os.environ.get("CELERY_BROKER_URL", "redis://localhost:6379"),
)


[docs] @app_well_schematics.route("/app/well_schematics/load_plant", methods=["POST"]) def load_plant(): """Load a plant/project into the well schematics application instance.""" global app_instance app_instance = CO2CorrosionApplication() project_name = request.json["field_name"] project_folder_path = current_app.config["GEMINI_PROJECT_FOLDER"] app_instance.load_plant(project_folder_path, project_name) return "OK"
[docs] @app_well_schematics.route("/app/well_schematics/get_well_list", methods=["POST"]) def get_well_list(): """Get list of well names (Injection or Production) from the loaded plant.""" well_unit_list = [] for unit in app_instance.plant.units: if isinstance(unit, InjectionWellUnit) or isinstance(unit, ProductionWellUnit): well_unit_list.append(unit.name) return jsonify(well_unit_list)
[docs] @app_well_schematics.route("/app/well_schematics/get_log_list", methods=["POST"]) def get_log_list(): """Return a list of uploaded LAS log files for the selected well.""" log_list = [] selected_well = request.json.get("selected_well") project_data_folder = os.path.join( app_instance.plant.project_path, app_instance.plant.name + "/wims_data" ) for unit in app_instance.plant.units: if selected_well == unit.name: print("DEBUG: ", unit.parameters) unit_data_folder = os.path.join(project_data_folder, unit.name) if not os.path.exists(unit_data_folder): os.makedirs(unit_data_folder) uploaded_logs = glob.glob(os.path.join(unit_data_folder, "*.las")) for file in uploaded_logs: log_list.append(os.path.basename(file)) return jsonify(log_list)
[docs] @app_well_schematics.route("/app/well_schematics/get_saved_schematics", methods=["POST"]) def get_saved_schematics(): """Get list of saved schematics for the selected well.""" try: selected_well = request.json.get("selected_well") if not selected_well: return jsonify([]) # Create schematics folder path project_data_folder = os.path.join( app_instance.plant.project_path, app_instance.plant.name + "/wims_data" ) well_data_folder = os.path.join(project_data_folder, selected_well, "schematics") if not os.path.exists(well_data_folder): return jsonify([]) # Get all JSON files in the schematics folder schematic_files = glob.glob(os.path.join(well_data_folder, "*.json")) schematic_list = [] for file_path in schematic_files: filename = os.path.basename(file_path) # Remove .json extension for display schematic_name = filename[:-5] if filename.endswith(".json") else filename schematic_list.append({"name": schematic_name, "filename": filename, "path": file_path}) return jsonify(schematic_list) except Exception as e: return jsonify({"error": str(e)}), 400
[docs] @app_well_schematics.route("/app/well_schematics/load_schematic", methods=["POST"]) def load_schematic(): """Load a specific schematic file for the selected well.""" try: selected_well = request.json.get("selected_well") schematic_filename = request.json.get("schematic_filename") if not selected_well or not schematic_filename: return jsonify({"error": "Missing well name or schematic filename"}), 400 # Create path to schematic file project_data_folder = os.path.join( app_instance.plant.project_path, app_instance.plant.name + "/wims_data" ) well_data_folder = os.path.join(project_data_folder, selected_well, "schematics") schematic_path = os.path.join(well_data_folder, schematic_filename) if not os.path.exists(schematic_path): return jsonify({"error": "Schematic file not found"}), 404 # Load and return the JSON data with open(schematic_path, "r") as f: schematic_data = json.load(f) return jsonify(schematic_data) except Exception as e: return jsonify({"error": str(e)}), 400
[docs] @app_well_schematics.route("/app/well_schematics/save_schematic", methods=["POST"]) def save_schematic(): """Save a schematic for the selected well.""" try: data = request.get_json() selected_well = data.get("selected_well") schematic_name = data.get("schematic_name") schematic_data = data.get("schematic_data") if not selected_well or not schematic_name or not schematic_data: return jsonify({"error": "Missing required data"}), 400 # Create schematics folder path project_data_folder = os.path.join( app_instance.plant.project_path, app_instance.plant.name + "/wims_data" ) well_data_folder = os.path.join(project_data_folder, selected_well, "schematics") # Create directory if it doesn't exist os.makedirs(well_data_folder, exist_ok=True) # Save schematic file filename = f"{schematic_name}.json" schematic_path = os.path.join(well_data_folder, filename) with open(schematic_path, "w") as f: json.dump(schematic_data, f, indent=2) return jsonify({"message": "Schematic saved successfully", "filename": filename}) except Exception as e: return jsonify({"error": str(e)}), 400
[docs] @app_well_schematics.route("/app/well_schematics/generate_schematic", methods=["POST"]) def generate_schematic(): """Proxy route to generate a well schematic image from external backend API.""" try: schematic_data = request.get_json() # Forward request to external backend server backend_url = app_instance.plant.parameters["wims_backend_url"] response = requests.post( backend_url, json=schematic_data, headers={"Content-Type": "application/json"}, timeout=30, ) # Check if request was successful response.raise_for_status() # Return the response from the backend return jsonify(response.json()) except requests.exceptions.ConnectionError: return jsonify({"error": "Cannot connect to schematic generation server"}), 503 except requests.exceptions.Timeout: return jsonify({"error": "Request to schematic generation server timed out"}), 504 except requests.exceptions.HTTPError as e: error_msg = f"Error from backend server: {e.response.status_code}" try: error_data = e.response.json() if "error" in error_data: error_msg = error_data["error"] except: pass return jsonify({"error": error_msg}), e.response.status_code except Exception as e: return jsonify({"error": str(e)}), 400