import pandas as pd
from sqlalchemy import create_engine, text
from math import radians, cos, sin, asin, sqrt
import logging
from math import ceil

# ---------------- CONFIG ----------------
DB_USER = "root"
DB_PASS = "_z7b6CJc"
DB_HOST = "localhost"
DB_NAME = "immo_new"

CSV_FILE  = "buildingref-france-demande-de-valeurs-foncieres-geolocalisee-millesime.csv"
YEAR      = 2021
CHUNKSIZE = 50_000        # lecture du CSV
BATCHSIZE = 1_000         # nombre d’insertions par lot
# ----------------------------------------

# ---- Logging ----
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.FileHandler("populate_prices_districts_2021.log", mode="w", encoding="utf-8"),
        logging.StreamHandler()
    ]
)

engine = create_engine(f"mysql+pymysql://{DB_USER}:{DB_PASS}@{DB_HOST}/{DB_NAME}?charset=utf8mb4")

def haversine(lon1, lat1, lon2, lat2):
    """Distance en km entre deux points."""
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    dlon, dlat = lon2 - lon1, lat2 - lat1
    a = sin(dlat/2)**2 + cos(lat1)*cos(lat2)*sin(dlon/2)**2
    return 6371 * 2 * asin(sqrt(a))

# Cache des quartiers : {city_id: [(district_id, lat, lon), ...]}
districts_cache = {}

def get_nearest_district(city_id, lon, lat, conn):
    if city_id not in districts_cache:
        rows = conn.execute(
            text("SELECT district_id, latitude, longitude FROM districts WHERE city_id = :cid"),
            {"cid": city_id}
        ).fetchall()
        districts_cache[city_id] = [(r[0], float(r[1]), float(r[2])) for r in rows]
    best_id, best_dist = None, 999999
    for did, dlat, dlon in districts_cache[city_id]:
        dist = haversine(lon, lat, dlon, dlat)
        if dist < best_dist:
            best_id, best_dist = did, dist
    return best_id

usecols = [
    "Nature de la mutation","Valeur foncière","Code INSEE de la commune",
    "Type de local","Surface réelle du bâti","Longitude","Latitude"
]

logging.info("Début de l'agrégation par quartier…")
agg = {}  # key = (insee, district_id)

with engine.begin() as conn:
    for chunk in pd.read_csv(CSV_FILE, sep=";", usecols=usecols, dtype=str,
                             chunksize=CHUNKSIZE, low_memory=False):
        chunk = chunk[chunk["Nature de la mutation"]=="Vente"]
        chunk["Valeur foncière"] = pd.to_numeric(chunk["Valeur foncière"].str.replace(",", "."), errors="coerce")
        chunk["Surface réelle du bâti"] = pd.to_numeric(chunk["Surface réelle du bâti"].str.replace(",", "."), errors="coerce")
        chunk["Longitude"] = pd.to_numeric(chunk["Longitude"].str.replace(",", "."), errors="coerce")
        chunk["Latitude"]  = pd.to_numeric(chunk["Latitude"].str.replace(",", "."), errors="coerce")

        chunk = chunk[
            chunk["Type de local"].isin(["Maison","Appartement"]) &
            chunk["Valeur foncière"].notnull() &
            chunk["Surface réelle du bâti"].notnull() &
            chunk["Longitude"].notnull() &
            chunk["Latitude"].notnull() &
            (chunk["Surface réelle du bâti"] > 0)
        ]

        for _, row in chunk.iterrows():
            insee = row["Code INSEE de la commune"]
            city_id = conn.execute(
                text("SELECT id FROM cities WHERE insee_code=:i LIMIT 1"), {"i": insee}
            ).scalar()
            if not city_id: continue

            dist_id = get_nearest_district(city_id,
                                           float(row["Longitude"]),
                                           float(row["Latitude"]), conn)
            if not dist_id: continue

            key = (insee, dist_id)
            d = agg.setdefault(key, {"house_sum":0,"house_sqm":0,
                                     "apt_sum":0,"apt_sqm":0,"transactions":0})

            val = float(row["Valeur foncière"])
            sqm = float(row["Surface réelle du bâti"])
            if row["Type de local"] == "Maison":
                d["house_sum"] += val
                d["house_sqm"] += sqm
            else:
                d["apt_sum"] += val
                d["apt_sqm"] += sqm
            d["transactions"] += 1

logging.info("Agrégation terminée : %d couples ville/quartier.", len(agg))

# ---- Insertion/Update par lots ----
insert_sql = text("""
    INSERT INTO prices (
        insee_code, city_id, district_id, data_year,
        avg_price_per_sqm_apartment,
        avg_price_per_sqm_house,
        number_of_transactions,
        source
    ) VALUES (
        :insee, :city_id, :dist_id, :year,
        :apt, :house, :nb, 'DVF2021'
    )
    ON DUPLICATE KEY UPDATE
        avg_price_per_sqm_apartment = VALUES(avg_price_per_sqm_apartment),
        avg_price_per_sqm_house     = VALUES(avg_price_per_sqm_house),
        number_of_transactions      = VALUES(number_of_transactions),
        source                      = VALUES(source)
""")

keys = list(agg.keys())
batches = ceil(len(keys)/BATCHSIZE)
logging.info("Insertion/maj en %d lots…", batches)

with engine.begin() as conn:
    for i in range(batches):
        batch = keys[i*BATCHSIZE:(i+1)*BATCHSIZE]
        payload = []
        for insee, dist_id in batch:
            d = agg[(insee, dist_id)]
            city_id = conn.execute(
                text("SELECT id FROM cities WHERE insee_code=:i LIMIT 1"), {"i": insee}
            ).scalar()
            if not city_id: continue
            payload.append({
                "insee": insee,
                "city_id": city_id,
                "dist_id": dist_id,
                "year": YEAR,
                "apt": round(d["apt_sum"]/d["apt_sqm"],2) if d["apt_sqm"]>0 else None,
                "house": round(d["house_sum"]/d["house_sqm"],2) if d["house_sqm"]>0 else None,
                "nb": d["transactions"]
            })
        if payload:
            conn.execute(insert_sql, payload)
        logging.info("Lot %d/%d inséré (%d quartiers).", i+1, batches, len(payload))

logging.info("Terminé : %d enregistrements ville/quartier traités.", len(keys))
