"""CO2 corrosion analysis application with caliper log processing and model optimization."""
import glob
import os
import re
from datetime import datetime, timezone
import lasio as ls
import numpy as np
import pandas as pd
import pytz
import ruptures as rpt
from gemini_application.application_abstract import ApplicationAbstract
from gemini_application.wims.model_optimization import OptCO2Corrosion
from gemini_model.corrosion.co2_corrosion_opt import CO2CorrosionOpt
from gemini_model.corrosion.correlation.co2_partial_pressure_model import CO2PartialPressureModel
from gemini_model.fluid.pvt_water_stp import PVTConstantSTP
from gemini_model.well.pressure_drop import DPDT
tzobject = pytz.timezone("Europe/Amsterdam")
[docs]
class CO2CorrosionApplication(ApplicationAbstract):
"""Class for CO2 Corrosion application."""
def __init__(self):
"""Initialize CO2 corrosion application."""
super().__init__()
self.VLP = DPDT()
self.corrosion_models = []
self.co2_models = []
def _get_well_type(self):
"""Return 'productionwell' or 'injectionwell' from unit type or name."""
try:
ut = self.unit.parameters.get("type")
if ut == "production_well":
return "productionwell"
if ut == "injection_well":
return "injectionwell"
except (KeyError, TypeError):
pass
if "production" in self.unit.name.lower():
return "productionwell"
if "injection" in self.unit.name.lower():
return "injectionwell"
return "productionwell"
def _get_tally_from_well_parameters(self):
"""Get well tally from unit parameters (well parameter). Returns list of dicts or None."""
well_type = self._get_well_type()
key = f"{well_type}_tally_table"
prop = self.unit.parameters.get("property") or {}
table = prop.get(key)
if table is not None and len(table) > 0:
# table can be list per timestamp [[row,...], ...] or single list [row,...]
first = table[0]
if isinstance(first, list):
if first:
return list(first)
else:
return list(table)
if key in self.unit.parameters and self.unit.parameters[key]:
tbl = self.unit.parameters[key]
if isinstance(tbl, list) and tbl:
first = tbl[0]
if isinstance(first, list) and first:
return list(first)
return list(tbl)
return None
[docs]
def init_parameters(self, corrosion_model_name="DLD"):
"""Initialize models parameters. Tally from well parameter first, then tally folder."""
well_unit = self.unit
well_tally = self._get_tally_from_well_parameters()
if not well_tally:
project_data_folder = os.path.join(
well_unit.plant.project_path, well_unit.plant.name + "/wims_data"
)
well_data_folder = os.path.join(project_data_folder, well_unit.name, "tally")
well_tally_files = glob.glob(os.path.join(well_data_folder, "*.csv"))
if well_tally_files:
well_tally = pd.read_csv(well_tally_files[0]).to_dict("records")
else:
raise ValueError(
"No well tally found: set tally in Well Parameters (app builder) or upload a tally file to wims_data/<well>/tally/"
)
self.inputs["well_tally"] = well_tally
length = []
diameter = []
angle = []
roughness = []
for entry in well_tally:
# Get values directly from the dictionary
top_md = entry["TopMD"]
bot_md = entry["BottomMD"]
top_tvd = entry["TopTVD"]
bot_tvd = entry["BottomTVD"]
# Calculate MD and TVD
MD = bot_md - top_md
TVD = bot_tvd - top_tvd
# Convert nominal inner diameter from inches to meters
nominal_id_m = entry["ID"] * 0.0254
# Append computed MD and diameter
length.append(MD)
diameter.append(nominal_id_m)
# Calculate the inclination in radians if MD is positive
if MD > 0:
# Calculate the inclination in degrees first, round it, then convert to radians.
incl_deg = np.round(90 - np.arccos(TVD / MD) * 180 / np.pi, 2)
incl_rad = incl_deg * np.pi / 180
else:
incl_rad = 0.0
angle.append(incl_rad)
# Append roughness value
roughness.append(entry["Roughness"])
# Init PVT model;
self.VLP.PVT = PVTConstantSTP()
# Build well_param and update the VLP
well_param = {
"friction_correlation": "darcy_weisbach",
"friction_correlation_2p": "darcy_weisbach",
"diameter": np.array(diameter),
"length": np.array(length),
"angle": np.array(angle),
"roughness": roughness,
}
self.VLP.update_parameters(well_param)
# Create CO2PartialPressureModel for each "section"
for _ in well_tally:
self.co2_models.append(CO2PartialPressureModel())
# Initialize corrosion models for each joint
# TODO: Add all parameters, so all models can be used! For now only, DLD is used.
for joint in well_tally:
joint_param = {
"roughness": joint["Roughness"],
"corrosion_model": corrosion_model_name,
"diameter": self.inches_to_meters(joint["ID"]),
}
# Create a new corrosion model instance for this joint
corrosion_model = CO2CorrosionOpt()
# Hard-coded optimization parameters initial guess
opt_param = {"A": 4.93, "B": 1119, "C": 0.58, "D": 2.45}
# Update the corrosion model with both joint and optimization parameters
corrosion_model.update_parameters(joint_param)
corrosion_model.update_parameters(opt_param)
# Add the updated model to the list of corrosion models
self.corrosion_models.append(corrosion_model)
[docs]
def get_caliper_logs(self):
"""Get caliper logs data."""
# Will hold caliper logs if any exist
logs = {"logName": [], "logDate": [], "logData": []}
if len(self.inputs["selectedLogs"]) == 0:
print("No caliper logs provided. Skipping caliper read.")
project_folder = os.path.join(self.plant.project_path, self.plant.name + "/wims_data")
unit_data_folder = os.path.join(project_folder, self.unit.name)
selected_well_data_folder = os.path.join(unit_data_folder, "calipers")
for log in self.inputs["selectedLogs"]:
logs["logName"].append(log)
log_path = os.path.join(selected_well_data_folder, log)
data = ls.read(log_path, ignore_data=False)
logs["logData"].append(data.df().sort_index())
# Check if log date is present
for headeritem in data.well:
if headeritem["mnemonic"] == "PID" or headeritem["mnemonic"] == "DATE":
logs["logDate"].append(headeritem["value"])
print(f"Caliper {log} loaded successfully!")
self.inputs["uploadedLogs"] = logs
[docs]
def get_water_analysis_data(self):
"""Get water analysis data."""
# TODO: load water chemistry from UI
project_folder = os.path.join(self.plant.project_path, self.plant.name)
# Get water chemistry analysis data
water_chemistry_file_name = "data/water_chemistry.csv"
try:
water_chemistry = pd.read_csv(os.path.join(project_folder, water_chemistry_file_name))
self.inputs["water_chemistry"] = water_chemistry
# print("Water analysis data loaded successfully!")
except (FileNotFoundError, pd.errors.EmptyDataError):
pass
[docs]
def get_production_data(self):
"""Get production data."""
# Extract production data based on the well type
if self.unit.parameters["type"] == "production_well":
# TODO: For production well, chenge to ESP
well_type = "productionwell"
elif self.unit.parameters["type"] == "injection_well":
well_type = "injectionwell"
start_time = datetime.strptime(self.inputs["start_time"], "%Y-%m-%d %H:%M:%S")
start_time = tzobject.localize(start_time)
start_time = start_time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
end_time = datetime.strptime(self.inputs["end_time"], "%Y-%m-%d %H:%M:%S")
end_time = tzobject.localize(end_time)
end_time = end_time.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
timestep = 3600 # hardcoded 1 hour since flowrate is in m3/h
database = self.plant.databases[0]
try:
result, time = database.read_internal_database(
self.unit.plant.name,
self.unit.name,
f"{well_type}_flow.measured",
start_time,
end_time,
str(timestep) + "s",
)
self.inputs["flow"] = np.array(result) # m3/hr
self.inputs["time"] = np.array(time)
result, time = database.read_internal_database(
self.unit.plant.name,
self.unit.name,
f"{well_type}_wellhead_pressure.measured",
start_time,
end_time,
str(timestep) + "s",
)
self.inputs["pressure"] = np.array(result)
result, time = database.read_internal_database(
self.unit.plant.name,
self.unit.name,
f"{well_type}_wellhead_temperature.measured",
start_time,
end_time,
str(timestep) + "s",
)
self.inputs["temperature"] = np.array(result)
# print("Production data loaded successfully!")
except Exception:
pass
[docs]
def get_prod_data_validation(self):
"""Read monthly production data from Excel file for validation purposes."""
try:
# Get the project folder path with correct structure
project_folder = os.path.join(self.plant.project_path, self.plant.name + "/data")
unit_data_folder = os.path.join(project_folder, self.unit.parameters["type"])
selected_well_data_folder = os.path.join(unit_data_folder, self.unit.parameters["name"])
# Construct path to the Excel file in the selected_well_data_folder
excel_file_path = os.path.join(
selected_well_data_folder, "monthly_production_data.xlsx"
)
# Read the Excel file
monthly_data = pd.read_excel(excel_file_path)
# Convert date column to datetime if it's not already
if "Date" in monthly_data.columns:
monthly_data["Date"] = pd.to_datetime(monthly_data["Date"])
# Store the data in inputs dictionary
self.inputs["monthly_flow"] = (
monthly_data["Flow [m3/h]"].values
if "Flow [m3/h]" in monthly_data.columns
else None
)
self.inputs["monthly_pressure"] = (
monthly_data["Pressure [bar]"].values
if "Pressure [bar]" in monthly_data.columns
else None
)
self.inputs["monthly_temperature"] = (
monthly_data["Temperature [C]"].values
if "Temperature [C]" in monthly_data.columns
else None
)
self.inputs["monthly_dates"] = (
monthly_data["Date"].values if "Date" in monthly_data.columns else None
)
print("Monthly production validation data loaded successfully!")
except Exception as e:
print(f"Error loading monthly production validation data: {str(e)}")
# Initialize empty arrays if loading fails
self.inputs["monthly_flow"] = None
self.inputs["monthly_pressure"] = None
self.inputs["monthly_temperature"] = None
self.inputs["monthly_dates"] = None
[docs]
def get_gas_analysis_data(self):
"""Get gas analysis data."""
pass
[docs]
def get_data(self):
"""Load data if available."""
self.get_caliper_logs()
# TODO: to be updated
# self.get_water_analysis_data()
# self.get_production_data()
[docs]
def calculate(self):
"""Execute main calculation pipeline."""
# print('Corrosion rate from logs started')
self.get_corrosion_rate_from_logs()
# print('Corrosion rate from models started')
self.get_corrosion_rate_from_models_segmented()
# print('Corrosion rate prediction started')
self.predict_corrosion_rate()
[docs]
def add_corrosion_columns(self, df):
"""Add corrosion columns to dataframe."""
df["Max. Pen. [%]"] = None
df["Max. Loss [%]"] = None
df["Max. Pen. Depth [m]"] = None
df["Min. Pen. Depth [m]"] = None
df["Max. ID [inch]"] = None
df["Min. ID [inch]"] = None
df["Mean. ID [inch]"] = None
df["Remaining wall thickness [inch]"] = None
return df
[docs]
def predict_corrosion_rate(self):
"""Predict corrosion rate."""
self.outputs["predictedCorrosionRate"] = pd.DataFrame(
range(1, len(self.corrosion_models) + 1),
index=range(len(self.corrosion_models)),
columns=["Joint No."],
)
self.outputs["predictedThickness"] = pd.DataFrame(
range(1, len(self.corrosion_models) + 1),
index=range(len(self.corrosion_models)),
columns=["Joint No."],
)
if len(self.outputs["processedLogs"]) > 0:
# Sort logs by date
unsorted_logs = list(
zip(
self.inputs["uploadedLogs"]["logName"],
self.inputs["uploadedLogs"]["logDate"],
self.inputs["uploadedLogs"]["logData"],
self.outputs["processedLogs"],
)
)
sorted_logs = sorted(
unsorted_logs, key=lambda x: datetime.strptime(x[1], "%H-%M-%S %d-%m-%Y")
)
# Get baseline ID (87.5 % of original thickness or based on the latest log mean ID)
latest_log_dates = datetime.strptime(sorted_logs[-1][1], "%H-%M-%S %d-%m-%Y")
# Get Corrosion rate for the period between basaline and the End date
fmt = "%Y-%m-%d %H:%M:%S"
end_date = datetime.strptime(self.inputs["end_time"], fmt)
col_label = f"Corroded [mm] between ({latest_log_dates} -> {end_date})"
self._compute_corrosion_for_interval(
latest_log_dates, end_date, col_label, output_switch="partial"
)
OD_nominal = [i.get("OD") for i in self.inputs["well_tally"]]
col_label2 = f"Remaining wall thickenss [inch] ({end_date})"
max_id_values = sorted_logs[-1][3]["Max. ID [inch]"].values.astype(float)
predicted_values = self.outputs["predictedCorrosionRate"][col_label].values / 25.4
self.outputs["predictedThickness"][col_label2] = np.round(
OD_nominal - (max_id_values + predicted_values), 3
)
else:
fmt = "%Y-%m-%d %H:%M:%S"
start_date = datetime.strptime(self.inputs["start_time"], fmt)
end_date = datetime.strptime(self.inputs["end_time"], fmt)
col_label = (
f"Corroded [mm] between "
f"({start_date.strftime('%Y-%m-%d')} -> {end_date.strftime('%Y-%m-%d')})"
)
self._compute_corrosion_for_interval(
start_date, end_date, col_label, output_switch="partial"
)
OD_nominal = [i.get("OD") for i in self.inputs["well_tally"]]
ID_nominal = [i.get("ID") for i in self.inputs["well_tally"]]
col_label2 = f"Predicted ID [inch] ({end_date})"
predicted_corrosion_values = self.outputs["predictedCorrosionRate"][col_label].values
self.outputs["predictedThickness"][col_label2] = np.round(
OD_nominal - (ID_nominal + predicted_corrosion_values / 25.4), 3
)
[docs]
def get_corrosion_rate_from_logs(self):
"""Compute measured corrosion based on log scenarios.
Scenarios:
- No logs => 'calculation not possible'
- 1 log => compare that log to well tally ID at baseline date
- 2+ logs => compare each log to the *previous* one (chronologically)
"""
# Check well type
well_type = self._get_well_type()
# Use well_tally length so DataFrame size matches when switching wells
n_tally = len(self.inputs["well_tally"])
n_logs = len(self.outputs["processedLogs"])
# Create a DataFrame with Joint No. (n_tally rows to match column value lengths)
corrosion_rate = pd.DataFrame(
range(1, n_tally + 1),
index=range(n_tally),
columns=["Joint No."],
)
# If zero logs: no measurement
if n_logs == 0:
print("No logs present. Corrosion rate cannot be calculated.")
corrosion_rate["Measured Corrosion [mm/year]"] = np.nan
self.outputs["measuredCorrosionRate"] = corrosion_rate
return
# If we have at least one "processed_logs" entry
# Note: processedLogs is available through self.outputs['processedLogs']
# Sort logs by date
unsorted_logs = list(
zip(
self.inputs["uploadedLogs"]["logName"],
self.inputs["uploadedLogs"]["logDate"],
self.inputs["uploadedLogs"]["logData"],
self.outputs["processedLogs"],
)
)
sorted_logs = sorted(
unsorted_logs, key=lambda x: datetime.strptime(x[1], "%H-%M-%S %d-%m-%Y")
)
# ------------------------------------------------------------------
# Baseline date -> first log (log can have fewer joints than tally)
# ------------------------------------------------------------------
fmt_log = "%H-%M-%S %d-%m-%Y"
first_date = datetime.strptime(sorted_logs[0][1], fmt_log)
first_data = sorted_logs[0][3]
fmt = "%Y-%m-%d %H:%M:%S" # The format of your date string
baseline_date = datetime.strptime(self.inputs["start_time"], fmt)
n_first = min(n_tally, len(first_data))
# ID at baseline (from well tally nominal ID at baseline date)
baseline_id_mm = np.array([row["ID"] * 25.4 for row in self.inputs["well_tally"]])
first_id_mm = np.array(first_data["Min. ID [inch]"].iloc[:n_first].astype(float) * 25.4)
delta_time = first_date - baseline_date
rate_baseline_to_first = (
abs(baseline_id_mm[:n_first] - first_id_mm)
* 365
/ abs(delta_time.total_seconds() / 86400)
).round(5)
col_baseline = (
f"Corrosion rate [mm/year] "
f"({baseline_date.strftime('%Y-%m-%d')} -> {first_date.strftime('%Y-%m-%d')})"
)
col_values = np.full(n_tally, np.nan, dtype=float)
col_values[:n_first] = np.asarray(rate_baseline_to_first).flat[:n_first]
corrosion_rate[col_baseline] = col_values
# ------------------------------------------------------------------
# If only 1 log in total, we're done
# ------------------------------------------------------------------
if n_logs == 1:
self.outputs["measuredCorrosionRate"] = corrosion_rate
self.get_remaining_thickness_at_log_dates()
return
# 3) If we have 2+ logs, do each consecutive pair (logs can have fewer joints than tally)
baseline_date = datetime.strptime(sorted_logs[0][1], "%H-%M-%S %d-%m-%Y")
baseline_data = sorted_logs[0][3]
for i in range(1, n_logs):
current_date = datetime.strptime(sorted_logs[i][1], "%H-%M-%S %d-%m-%Y")
current_data = sorted_logs[i][3]
duration_days = (current_date - baseline_date).total_seconds() / 86400
if duration_days <= 0:
duration_days = 1.0
n_use = min(n_tally, len(baseline_data), len(current_data))
baseline_mm = (
baseline_data["Mean. ID [inch]"].iloc[:n_use].astype(float) * 25.4
).values
current_mm = (current_data["Mean. ID [inch]"].iloc[:n_use].astype(float) * 25.4).values
measured_rate = (abs(baseline_mm - current_mm) * 365 / abs(duration_days)).round(5)
col_name = (
f"Corrosion rate [mm/year] "
f"({baseline_date.strftime('%Y-%m-%d')} -> "
f"{current_date.strftime('%Y-%m-%d')})"
)
col_values = np.full(n_tally, np.nan, dtype=float)
col_values[:n_use] = measured_rate
corrosion_rate[col_name] = col_values
baseline_date = current_date
baseline_data = current_data
self.outputs["measuredCorrosionRate"] = corrosion_rate
self.get_remaining_thickness_at_log_dates()
[docs]
def get_remaining_thickness_at_log_dates(self):
"""Compute remaining wall thickness [mm] at each log date.
For each processed log (sorted by date), remaining thickness at that date
is (OD - Max. ID) from well tally and log, converted to mm.
Output: self.outputs["remainingThicknessAtLogDate"] DataFrame with columns
Joint No. and one column per log date: "Remaining thickness [mm] (YYYY-MM-DD)".
"""
n_logs = len(self.outputs["processedLogs"])
if n_logs == 0:
self.outputs["remainingThicknessAtLogDate"] = None
return
unsorted_logs = list(
zip(
self.inputs["uploadedLogs"]["logName"],
self.inputs["uploadedLogs"]["logDate"],
self.inputs["uploadedLogs"]["logData"],
self.outputs["processedLogs"],
)
)
sorted_logs = sorted(
unsorted_logs, key=lambda x: datetime.strptime(x[1], "%H-%M-%S %d-%m-%Y")
)
n_tally = len(self.inputs["well_tally"])
OD_nominal = np.array([row.get("OD") for row in self.inputs["well_tally"]], dtype=float)
remaining = pd.DataFrame(
range(1, n_tally + 1),
index=range(n_tally),
columns=["Joint No."],
)
INCH_TO_MM = 25.4
fmt_log = "%H-%M-%S %d-%m-%Y"
for log_name, log_date_str, _log_data, log_df in sorted_logs:
log_date = datetime.strptime(log_date_str, fmt_log)
n_use = min(n_tally, len(log_df))
max_id = log_df["Min. ID [inch]"].iloc[:n_use].astype(float).values
# Remaining thickness in inches, then convert to mm
remaining_thickness_inch = OD_nominal[:n_use] - max_id
remaining_thickness_mm = np.full(n_tally, np.nan, dtype=float)
remaining_thickness_mm[:n_use] = (remaining_thickness_inch * INCH_TO_MM).flat[:n_use]
col_name = f"Remaining thickness [mm] ({log_date.strftime('%Y-%m-%d')})"
remaining[col_name] = np.round(remaining_thickness_mm, 3)
self.outputs["remainingThicknessAtLogDate"] = remaining
[docs]
def get_remaining_days_to_min_thickness(self, min_remaining_thickness_mm):
"""Compute remaining days until remaining thickness reaches the minimum.
Uses the latest (by date) corrosion rate and remaining thickness:
- Latest corrosion rate: last interval in measuredCorrosionRate (chronologically).
- Latest remaining thickness: last log date column in remainingThicknessAtLogDate.
Formula: days = (remaining_thickness_mm - min_remaining_thickness_mm) / (corrosion_rate_mm_per_year) * 365.25
Output: self.outputs["remainingDaysToMinThickness"] DataFrame with columns
Joint No. and "Remaining days to min. thickness [days]".
"""
measured = self.outputs.get("measuredCorrosionRate")
remaining = self.outputs.get("remainingThicknessAtLogDate")
if measured is None or remaining is None or remaining.empty:
self.outputs["remainingDaysToMinThickness"] = None
return
try:
min_mm = float(min_remaining_thickness_mm)
except (TypeError, ValueError):
self.outputs["remainingDaysToMinThickness"] = None
return
n_tally = len(self.inputs["well_tally"])
# Find latest corrosion rate column: "Corrosion rate [mm/year] (YYYY-MM-DD -> YYYY-MM-DD)"
rate_cols = [
c
for c in measured.columns
if c != "Joint No." and c.startswith("Corrosion rate [mm/year]")
]
date_end_re = re.compile(r"->\s*(\d{4}-\d{2}-\d{2})\s*\)?$")
latest_rate_col = None
latest_rate_date = None
for c in rate_cols:
m = date_end_re.search(c)
if m:
try:
dt = datetime.strptime(m.group(1).strip(), "%Y-%m-%d")
if latest_rate_date is None or dt >= latest_rate_date:
latest_rate_date = dt
latest_rate_col = c
except ValueError:
pass
if latest_rate_col is None and rate_cols:
latest_rate_col = rate_cols[-1]
# Find latest remaining thickness column: "Remaining thickness [mm] (YYYY-MM-DD)"
thick_cols = [
c
for c in remaining.columns
if c != "Joint No." and c.startswith("Remaining thickness [mm]")
]
date_re = re.compile(r"\((\d{4}-\d{2}-\d{2})\)\s*$")
latest_thick_col = None
latest_thick_date = None
for c in thick_cols:
m = date_re.search(c)
if m:
try:
dt = datetime.strptime(m.group(1), "%Y-%m-%d")
if latest_thick_date is None or dt >= latest_thick_date:
latest_thick_date = dt
latest_thick_col = c
except ValueError:
pass
if latest_thick_col is None and thick_cols:
latest_thick_col = thick_cols[-1]
if latest_rate_col is None or latest_thick_col is None:
self.outputs["remainingDaysToMinThickness"] = None
return
rate_mm_per_year = np.asarray(measured[latest_rate_col], dtype=float)
remaining_mm = np.asarray(remaining[latest_thick_col], dtype=float)
n_use = min(n_tally, len(rate_mm_per_year), len(remaining_mm))
# days = (T_current - T_min) / R * 365.25 [R in mm/year]
days_per_year = 365.25
days_arr = np.full(n_tally, np.nan, dtype=float)
for i in range(n_use):
t_cur = remaining_mm[i]
r = rate_mm_per_year[i]
if np.isnan(t_cur) or np.isnan(r):
continue
if t_cur <= min_mm:
days_arr[i] = 0.0
continue
if r <= 0 or not np.isfinite(r):
days_arr[i] = np.inf
continue
thickness_to_lose = t_cur - min_mm
days_arr[i] = (thickness_to_lose / r) * days_per_year
result = pd.DataFrame(
{
"Joint No.": range(1, n_tally + 1),
"Remaining days to min. thickness [days]": days_arr,
},
index=range(n_tally),
)
self.outputs["remainingDaysToMinThickness"] = result
[docs]
def process_caliper_logs(self):
"""Build processed caliper logs for each log.
If no logs, or only one, we'll handle in get_corrosion_rate_from_logs.
"""
# Check well type
if "production" in self.unit.name:
pass
elif "injection" in self.unit.name:
pass
processedLogs = []
if len(self.inputs["uploadedLogs"]) == 0:
# No logs => nothing to process. We'll handle fallback in get_corrosion_rate_from_logs.
self.outputs["processed_caliper_logs"] = processedLogs
return
# Process each log
for log_nr, log in enumerate(self.inputs["uploadedLogs"]["logData"]):
# Create a fresh DataFrame for each log to avoid duplication
df = pd.DataFrame(
range(1, len(self.inputs["well_tally"]) + 1),
index=range(len(self.inputs["well_tally"])),
columns=["Joint No."],
)
processed_log = self.add_corrosion_columns(df)
for joint_nr, joint in enumerate(self.inputs["well_tally"]):
# Extract the slice from the log data using the 'Top [m MD]' and
# 'Bottom [m MD]' values from the joint dictionary
# TODO: Make more robust regex filter
caliper_subset = log.loc[joint["TopMD"] : joint["BottomMD"]].filter(
regex=r"^D\d{2}$"
)
try:
# Calculate statistics from the caliper subset
mean_caliper = caliper_subset.mean().mean()
max_caliper = caliper_subset.max().max()
min_caliper = caliper_subset.min().min()
# Determine the location indices of the max and min caliper values
max_cal_loc = caliper_subset.stack().idxmax()[0]
min_cal_loc = caliper_subset.stack().idxmin()[0]
# Get the base inner and outer diameters from the joint dictionary
base_id = joint["ID"]
base_od = joint["OD"]
# Calculate maximum penetration percentage
max_penetration = 100 * (max_caliper - base_id) / (base_od - base_id)
# Calculate maximum circumferential wall loss
max_circ_wall_loss = (
100
/ 60
* ((caliper_subset**2 - base_id**2) / (base_od**2 - base_id**2)).sum(axis=1)
).mean()
# Remaining wall thickness
remaining_wall_thickness = base_od - max_caliper
# Build a dictionary with the computed result values
result_values = {
"Max. Pen. Depth [m]": max_cal_loc,
"Min. Pen. Depth [m]": min_cal_loc,
"Max. Pen. [%]": np.round(max_penetration, 1),
"Max. Loss [%]": np.round(max_circ_wall_loss, 1),
"Max. ID [inch]": max_caliper,
"Min. ID [inch]": min_caliper,
"Mean. ID [inch]": np.round(mean_caliper, 3),
"Remaining wall thickness [inch]": np.round(remaining_wall_thickness, 3),
}
# Update the processed_log DataFrame at the row corresponding to joint_nr
# Convert the keys and values to lists for assignment
processed_log.loc[joint_nr, list(result_values.keys())] = list(
result_values.values()
)
except (KeyError, IndexError, ValueError):
# If any issue occurs (e.g., missing keys or empty subsets), skip this joint.
continue
# Append a copy of the processed log to avoid reference issues
processedLogs.append(processed_log.copy())
self.outputs["processedLogs"] = processedLogs
# print("Log Processed successfully!")
[docs]
def get_corrosion_rate_from_models_segmented(self):
"""Compute modelled corrosion in multiple intervals.
- Baseline date -> 1st log date
- 1st log date -> 2nd log date
- 2nd log date -> 3rd log date
- etc.
For each interval, we:
1. Filter the flow/pressure/temp data to [start_date, end_date)
2. Compute partial corrosion with a pairwise approach
3. Convert the sum of partial corrosion to [mm/year] over that interval
4. Store in a new column in ``self.outputs['modelledCorrosionRate']``
"""
# Check well type
if self.unit.parameters["type"] == "production_well":
pass
elif self.unit.parameters["type"] == "injection_well":
pass
# We'll store multiple columns, one per interval
# Start with the "base" DataFrame
self.outputs["modelledCorrosionRate"] = pd.DataFrame(
range(1, len(self.corrosion_models) + 1),
index=range(len(self.corrosion_models)),
columns=["Joint No."],
)
unsorted_logs = list(
zip(
self.inputs["uploadedLogs"]["logName"],
self.inputs["uploadedLogs"]["logDate"],
self.inputs["uploadedLogs"]["logData"],
self.outputs["processedLogs"],
)
)
sorted_logs = sorted(
unsorted_logs, key=lambda x: datetime.strptime(x[1], "%H-%M-%S %d-%m-%Y")
)
# 2) Build a sorted list of log dates plus baseline date from start_time
fmt = "%Y-%m-%d %H:%M:%S" # The format of your date string
baseline_date = datetime.strptime(self.inputs["start_time"], fmt)
sorted_log_dates = []
for log in sorted_logs:
sorted_log_dates.append(datetime.strptime(log[1], "%H-%M-%S %d-%m-%Y"))
# If no logs at all => just 1 interval from baseline_date -> now (or skip)
if len(sorted_log_dates) == 0:
print("No logs => single interval from baseline to 'end of data' assumed.")
interval_label = (
f"Modelled Corrosion "
f"({baseline_date.strftime('%Y-%m-%d')}->{datetime.now().strftime('%Y-%m-%d')})"
)
self._compute_corrosion_for_interval(baseline_date, datetime.now(), interval_label)
return
# If we have logs, compute each interval:
# 3) Compute first interval: baseline_date -> first_log_date
first_log_date = sorted_log_dates[0]
if first_log_date > baseline_date:
start_date = baseline_date
end_date = first_log_date
col_label = (
f"Corrosion rate [mm/year] "
f"({start_date.strftime('%Y-%m-%d')} -> "
f"{end_date.strftime('%Y-%m-%d')})"
)
# Calculate the corrosion rate for the interval
self._compute_corrosion_for_interval(start_date, end_date, col_label)
# 4) For each pair of consecutive logs:
for i in range(1, len(sorted_log_dates)):
start_date = sorted_log_dates[i - 1]
end_date = sorted_log_dates[i]
col_label = (
f"Corrosion rate [mm/year] "
f"({start_date.strftime('%Y-%m-%d')} -> "
f"{end_date.strftime('%Y-%m-%d')})"
)
# Calculate the corrosion rate for the interval
self._compute_corrosion_for_interval(start_date, end_date, col_label)
def _compute_corrosion_for_interval(
self, start_date, end_date, column_label, output_switch="rate"
):
"""Compute the corrosion rate for a specific interval."""
# Check well type
try:
well_type = self.unit.parameters["type"]
if well_type == "production_well":
# TODO: For production well, change to ESP
well_type = "productionwell"
elif well_type == "injection_well":
well_type = "injectionwell"
except KeyError:
pass
flow_df = pd.DataFrame(
{"datetime": self.inputs["time"], "value": self.inputs["flow"]}
).copy()
if self.inputs["interval_type"] == "logs":
# Get the subset of the data for the interval
flow_subset = flow_df[
(flow_df["datetime"] >= start_date) & (flow_df["datetime"] < end_date)
].sort_values("datetime")
else:
# Get the subset of the data for the interval
flow_subset = flow_df[
(flow_df["datetime"] >= start_date) & (flow_df["datetime"] < end_date)
].sort_values("datetime")
pressure_df = pd.DataFrame(
{"datetime": self.inputs["time"], "value": self.inputs["pressure"]}
).copy()
pressure_subset = pressure_df[
(pressure_df["datetime"] >= start_date) & (pressure_df["datetime"] < end_date)
].sort_values("datetime")
temperature_df = pd.DataFrame(
{"datetime": self.inputs["time"], "value": self.inputs["temperature"]}
).copy()
temperature_subset = temperature_df[
(temperature_df["datetime"] >= start_date) & (temperature_df["datetime"] < end_date)
].sort_values("datetime")
# Check if we have data for the interval
if flow_subset.empty or pressure_subset.empty or temperature_subset.empty:
print(f"No data for interval {start_date} to {end_date}")
return
# Get the number of joints
n_joints = len(self.inputs["well_tally"])
# Initialize the corrosion rate arrays
corrosion_rates = np.zeros(n_joints)
partial_corrosion = np.zeros(n_joints)
# Try to coarsen the time series
try:
flow_subset, pressure_subset, temperature_subset = (
self.coarsen_timeseries_by_change_point(
flow_subset, pressure_subset, temperature_subset
)
)
except Exception:
pass
# Get the flow, pressure, and temperature values for the interval
flow_values = flow_subset["value"].values
pressure_values = pressure_subset["value"].values
temperature_values = temperature_subset["value"].values
# Check if we have data for the interval
if len(flow_values) == 0 or len(pressure_values) == 0 or len(temperature_values) == 0:
print(f"No data for interval {start_date} to {end_date}")
return
# Here for simplicity, we say i-th row in flow_subset aligns with
# i-th row in pressure_subset, etc.
n_time_points = len(flow_values)
# Initialize the well hydraulic model
well_hydraulic_param = {
"flow": flow_values,
"pressure": pressure_values,
"temperature": temperature_values,
"n_time_points": n_time_points,
"n_joints": n_joints,
"water_chemistry_data": self.inputs.get("water_chemistry"),
"monthly_production_data": self.inputs.get("monthly_production_data"),
"monthly_dates": self.inputs.get("monthly_dates"),
"direction": "down",
}
self.VLP.update_parameters(well_hydraulic_param)
# Loop through each joint
for joint_idx in range(n_joints):
# Calculate the corrosion rate for this joint
for sec_idx, (sec_temp_c, sec_pres_bar) in enumerate(
zip(temperature_values, pressure_values)
):
# Get the CO2 partial pressure
co2_partial_pressure = self.co2_models[joint_idx].get_co2_partial_pressure(
sec_temp_c, sec_pres_bar, self.inputs.get("water_chemistry")
)
# Get the corrosion rate
corrosion_rate = self.corrosion_models[joint_idx].get_corrosion_rate(
sec_temp_c, sec_pres_bar, co2_partial_pressure
)
# Add the corrosion rate to the total
corrosion_rates[joint_idx] += corrosion_rate
# Calculate the average corrosion rate for this joint
corrosion_rates[joint_idx] /= n_time_points
# Calculate the partial corrosion for this joint
delta_time = end_date - start_date
partial_corrosion[joint_idx] = (
corrosion_rates[joint_idx] * delta_time.total_seconds() / 31536000
)
# Store the results in the outputs
try:
# Create a DataFrame with the corrosion rates
corrosion_df = pd.DataFrame(
{"Joint": range(1, n_joints + 1), column_label: corrosion_rates}
)
# Set the index to the joint number
corrosion_df.set_index("Joint", inplace=True)
# Store the corrosion rates in the outputs
self.outputs["modelledCorrosionRate"][column_label] = corrosion_df[column_label]
# Store the partial corrosion in the outputs
if output_switch == "partial":
if "modelledCorrosionRateCalibrated" not in self.outputs:
self.outputs["modelledCorrosionRateCalibrated"] = pd.DataFrame()
self.outputs["modelledCorrosionRateCalibrated"][column_label] = partial_corrosion
except Exception:
pass
[docs]
def optimize_models(self):
"""Optimize the corrosion models to fit the observed data."""
OptCO2Corrosion(self.inputs, self.outputs, self.corrosion_models, self.co2_models, self.VLP)
[docs]
def inches_to_meters(self, inches):
"""Convert inches to meters."""
return inches * 0.0254
[docs]
def coarsen_timeseries_by_change_point(
self, df1, df2, df3, value_col="value", pen=3, plot=False
):
"""Coarsen a time series by detecting segments where the values do not change much.
Parameters:
df (pd.DataFrame): DataFrame with a DateTime index.
value_col (str): The column name containing the values to analyze.
pen (float or int): Penalty parameter for the PELT change-point
detection algorithm.
plot (bool): If True, plot the original time series with detected
change points.
Returns:
pd.DataFrame: A DataFrame with columns ['start_date', 'end_date',
'mean_value'] for each segment.
"""
# Fit the change-point detection algorithm (PELT) on the time series values
algo = rpt.Pelt(model="l2").fit(df1[value_col].values)
change_points = algo.predict(pen=pen)
dfs = [df1, df2, df3]
dfs_coarse = []
for df in dfs:
segments = []
start = 0
segments.append({"datetime": df.datetime.iloc[0], "value": df.value.iloc[0]})
for cp in change_points:
# Extract the segment from start index to the current change point
segment = df.iloc[start:cp]
mean_value = segment[value_col].mean()
segments.append(
{"datetime": segment.datetime[(segment.index[-1])], "value": mean_value}
)
start = cp
dfs_coarse.append(pd.DataFrame(segments))
return dfs_coarse