import pandas as pd
from sqlalchemy import create_engine, text
import logging
from math import ceil

# === Paramètres ============================================================
DB_USER   = "root"
DB_PASS   = "_z7b6CJc"
DB_HOST   = "localhost"
DB_NAME   = "immo_new"
CSV_FILE  = "buildingref-france-demande-de-valeurs-foncieres-geolocalisee-millesime_2024.csv"
YEAR      = 2024

CHUNKSIZE = 100_000
BATCHSIZE = 1_000

MIN_SURFACE       = 10       # m² minimum
MIN_PRICE_PER_SQM = 200      # €/m² minimum réaliste
MAX_PRICE_PER_SQM = 50_000   # €/m² maximum réaliste
# ---------------------------------------------------------------------------

# Logging console + fichier
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("populate_prices_2024_plafond.log", mode="w", encoding="utf-8"),
        logging.StreamHandler()
    ]
)

engine = create_engine(
    f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME}?charset=utf8mb4"
)

usecols = [
    "Nature de la mutation", "Valeur foncière",
    "Code INSEE de la commune", "Type de local",
    "Surface réelle du bâti"
]

# ---------------------------------------------------------------------------
# 1. Lecture CSV + filtrage transactions valides
# ---------------------------------------------------------------------------
logging.info("Lecture et filtrage des données…")
rows = []

for chunk in pd.read_csv(CSV_FILE, sep=";", usecols=usecols, dtype=str,
                         chunksize=CHUNKSIZE, low_memory=False):
    chunk = chunk[chunk["Nature de la mutation"] == "Vente"]
    chunk["Valeur foncière"] = pd.to_numeric(chunk["Valeur foncière"].str.replace(",", "."), errors="coerce")
    chunk["Surface réelle du bâti"] = pd.to_numeric(chunk["Surface réelle du bâti"].str.replace(",", "."), errors="coerce")

    chunk = chunk[
        chunk["Type de local"].isin(["Maison", "Appartement"]) &
        chunk["Valeur foncière"].notnull() &
        chunk["Surface réelle du bâti"].notnull() &
        (chunk["Surface réelle du bâti"] >= MIN_SURFACE)
    ]

    chunk["prix_m2"] = chunk["Valeur foncière"] / chunk["Surface réelle du bâti"]
    chunk = chunk[
        (chunk["prix_m2"] >= MIN_PRICE_PER_SQM) &
        (chunk["prix_m2"] <= MAX_PRICE_PER_SQM)
    ]

    rows.append(chunk[["Code INSEE de la commune", "Type de local", "prix_m2"]])

df = pd.concat(rows, ignore_index=True)
logging.info("Transactions valides : %d", len(df))

# ---------------------------------------------------------------------------
# 2. Filtrage IQR par commune + type de local
# ---------------------------------------------------------------------------
logging.info("Application du filtre IQR…")
def filter_iqr(group):
    q1 = group["prix_m2"].quantile(0.25)
    q3 = group["prix_m2"].quantile(0.75)
    iqr = q3 - q1
    lower = q1 - 1.5 * iqr
    upper = q3 + 1.5 * iqr
    return group[(group["prix_m2"] >= lower) & (group["prix_m2"] <= upper)]

# include_groups=False pour supprimer le FutureWarning
df_filtered = df.groupby(["Code INSEE de la commune", "Type de local"], group_keys=False, as_index=False).apply(filter_iqr)
logging.info("Transactions après filtre IQR : %d", len(df_filtered))

# ---------------------------------------------------------------------------
# 3. Agrégation par commune + type de local => médiane et nombre de transactions séparé
# ---------------------------------------------------------------------------
agg = (
    df_filtered.groupby(["Code INSEE de la commune", "Type de local"])
               .agg(med_price=("prix_m2", "median"), transactions=("prix_m2", "count"))
               .reset_index()
)

pivot_price = agg.pivot_table(
    index="Code INSEE de la commune",
    columns="Type de local",
    values="med_price"
)
pivot_tx = agg.pivot_table(
    index="Code INSEE de la commune",
    columns="Type de local",
    values="transactions"
)

pivot_price.columns = ["avg_price_per_sqm_" + ("house" if c == "Maison" else "apartment") for c in pivot_price.columns]
pivot_tx.columns = ["number_of_transactions_" + ("house" if c == "Maison" else "apartment") for c in pivot_tx.columns]

final_df = pivot_price.join(pivot_tx).reset_index()
final_df.rename(columns={"Code INSEE de la commune": "insee_code"}, inplace=True)

# ---------------------------------------------------------------------------
# 4. Plafond dynamique selon nombre de transactions séparé pour chaque type
# ---------------------------------------------------------------------------
def dynamic_plafond(price, tx):
    if pd.isna(price):
        return None
    if tx < 5:
        return min(price, 4000)
    elif tx <= 10:
        return min(price, 6000)
    elif tx <= 50:
        return min(price, 10000)
    else:
        return price

final_df["avg_price_per_sqm_house"] = final_df.apply(
    lambda r: dynamic_plafond(r.get("avg_price_per_sqm_house"), r.get("number_of_transactions_house",0)), axis=1
)
final_df["avg_price_per_sqm_apartment"] = final_df.apply(
    lambda r: dynamic_plafond(r.get("avg_price_per_sqm_apartment"), r.get("number_of_transactions_apartment",0)), axis=1
)

# ---------------------------------------------------------------------------
# 5. Insertion / mise à jour en base
# ---------------------------------------------------------------------------
insert_sql = text("""
    INSERT INTO prices (
        insee_code, city_id, data_year,
        avg_price_per_sqm_apartment,
        avg_price_per_sqm_house,
        number_of_transactions,
        source
    ) VALUES (
        :insee, :city_id, :year,
        :apt, :house, :nb, 'DVF2024'
    )
    ON DUPLICATE KEY UPDATE
        avg_price_per_sqm_apartment = VALUES(avg_price_per_sqm_apartment),
        avg_price_per_sqm_house     = VALUES(avg_price_per_sqm_house),
        number_of_transactions      = VALUES(number_of_transactions),
        source                      = VALUES(source)
""")

with engine.begin() as conn:
    total = len(final_df)
    batches = ceil(total / BATCHSIZE)
    for i in range(batches):
        part = final_df.iloc[i * BATCHSIZE : (i + 1) * BATCHSIZE]
        data_to_insert = []
        for _, row in part.iterrows():
            city_id = conn.execute(
                text("SELECT id FROM cities WHERE insee_code = :insee LIMIT 1"),
                {"insee": row["insee_code"]}
            ).scalar()
            if not city_id:
                continue

            # Conversion sécurisée des NaN
            nb_house = row.get("number_of_transactions_house")
            nb_house = 0 if pd.isna(nb_house) else int(nb_house)

            nb_apt = row.get("number_of_transactions_apartment")
            nb_apt = 0 if pd.isna(nb_apt) else int(nb_apt)

            data_to_insert.append({
                "insee": row["insee_code"],
                "city_id": city_id,
                "year": YEAR,
                "apt":   round(row.get("avg_price_per_sqm_apartment", None), 2) if not pd.isna(row.get("avg_price_per_sqm_apartment", None)) else None,
                "house": round(row.get("avg_price_per_sqm_house", None), 2) if not pd.isna(row.get("avg_price_per_sqm_house", None)) else None,
                "nb":    nb_house + nb_apt
            })

        if data_to_insert:
            conn.execute(insert_sql, data_to_insert)

        logging.info("Lot %d/%d inséré (villes: %d)", i + 1, batches, len(data_to_insert))

logging.info("Terminé : %d communes insérées/mises à jour.", len(final_df))
