import os import math import csv import sqlite3 import random from datetime import date from io import StringIO from flask import ( Flask, g, render_template, redirect, url_for, flash, request, session ) from functools import wraps from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash DATABASE = os.path.join(os.path.dirname(__file__), "avent.db") UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), "static", "uploads") os.makedirs(UPLOAD_FOLDER, exist_ok=True) ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif"} app = Flask(__name__) app.secret_key = "change-me-super-secret-key-2025" app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER app.config["MAX_CONTENT_LENGTH"] = 5 * 1024 * 1024 # Max 5 MB upload def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def get_db(): if 'db' not in g: g.db = sqlite3.connect(DATABASE) g.db.row_factory = sqlite3.Row return g.db @app.teardown_appcontext def close_db(error=None): db = g.pop('db', None) if db is not None: db.close() def init_db(): db = get_db() with open(os.path.join(os.path.dirname(__file__), "schema.sql")) as f: db.executescript(f.read()) cur = db.execute("SELECT COUNT(*) AS c FROM user") if cur.fetchone()['c'] == 0: hashed_admin = generate_password_hash("admin") db.execute("INSERT INTO user (username, password) VALUES (?, ?)", ("admin", hashed_admin)) cur = db.execute("SELECT COUNT(*) AS c FROM project") if cur.fetchone()['c'] == 0: project_id = db.execute( "INSERT INTO project (name, description, image_url, total_days) VALUES (?, ?, ?, ?)", ( "Calendrier de l'Avent 2025", "Le calendrier de l'Avent officiel avec tirage aléatoire et gestion des utilisateurs.", "https://cdn-icons-png.flaticon.com/512/616/616408.png", 24 ) ).lastrowid people_list = ["Valentin", "Nicolas", "Victor", "Julie", "Louis", "Alexandre", "David", "Raphaël"] for name in people_list: db.execute( "INSERT INTO people (project_id, name, draws, max_draws) VALUES (?, ?, 0, 0)", (project_id, name) ) recalc_max_draws_for_project(project_id) db.commit() def get_user_by_username(username): db = get_db() cur = db.execute("SELECT * FROM user WHERE username = ?", (username,)) return cur.fetchone() def check_login(username, password): user = get_user_by_username(username) if user and check_password_hash(user["password"], password): return user return None def login_required(fn): @wraps(fn) def wrapped(*args, **kwargs): if 'user_id' not in session: return redirect(url_for('login')) return fn(*args, **kwargs) return wrapped def recalc_max_draws_for_project(project_id): db = get_db() cur = db.execute("SELECT total_days FROM project WHERE id = ?", (project_id,)) project = cur.fetchone() if not project: return total_days = project["total_days"] cur = db.execute("SELECT COUNT(*) AS c FROM people WHERE project_id = ?", (project_id,)) count = cur.fetchone()["c"] if count == 0: return max_draws = math.ceil(total_days / count) db.execute("UPDATE people SET max_draws = ? WHERE project_id = ?", (max_draws, project_id)) db.commit() def get_project(project_id=None): db = get_db() if project_id: cur = db.execute("SELECT * FROM project WHERE id = ?", (project_id,)) return cur.fetchone() cur = db.execute("SELECT * FROM project ORDER BY id ASC") return cur.fetchall() def get_people_stats(project_id): db = get_db() cur = db.execute("SELECT * FROM people WHERE project_id = ? ORDER BY name ASC", (project_id,)) return cur.fetchall() def get_draw_for_project_day(project_id, day): db = get_db() cur = db.execute(""" SELECT draws.day, people.name, draws.draw_time FROM draws JOIN people ON people.id = draws.person_id WHERE draws.project_id = ? AND draws.day = ? """, (project_id, day)) return cur.fetchone() def get_all_draws_for_project(project_id): db = get_db() cur = db.execute(""" SELECT draws.day, people.name, draws.draw_time FROM draws JOIN people ON people.id = draws.person_id WHERE draws.project_id = ? ORDER BY draws.day ASC """, (project_id,)) return cur.fetchall() def draw_for_project_day(project_id, day): db = get_db() existing = get_draw_for_project_day(project_id, day) if existing: return existing["name"], False cur = db.execute(""" SELECT id, name, draws, max_draws FROM people WHERE project_id = ? AND draws < max_draws """, (project_id,)) candidates = cur.fetchall() if not candidates: return None, False chosen = random.choice(candidates) db.execute("UPDATE people SET draws = draws + 1 WHERE id = ?", (chosen["id"],)) db.execute("INSERT INTO draws (day, person_id, project_id) VALUES (?, ?, ?)", (day, chosen["id"], project_id)) db.commit() return chosen["name"], True def catchup_draws(project_id): db = get_db() project = get_project(project_id) if not project: return 0 today = date.today() today_day = today.day if today.month == 12 else 1 if today_day > project["total_days"]: today_day = project["total_days"] new_draws = 0 for day in range(1, today_day + 1): existing = get_draw_for_project_day(project_id, day) if not existing: name, success = draw_for_project_day(project_id, day) if success: new_draws += 1 db.commit() return new_draws @app.route("/public") def public_dashboard(): projects = get_project() return render_template("public_projects.html", projects=projects) @app.route("/public/project/") def public_project_view(project_id): project = get_project(project_id) if not project: flash("Projet non trouvé.") return redirect(url_for('public_dashboard')) today = date.today() today_day = today.day if today.month == 12 else 1 if today_day > project["total_days"]: today_day = project["total_days"] today_draw = get_draw_for_project_day(project_id, today_day) all_draws = get_all_draws_for_project(project_id) stats = get_people_stats(project_id) return render_template("public_project_view.html", project=project, today_day=today_day, today_draw=today_draw, all_draws=all_draws, stats=stats) @app.route("/") @login_required def dashboard(): projects = get_project() return render_template("projects.html", projects=projects) @app.route("/project/") @login_required def project_view(project_id): project = get_project(project_id) if not project: flash("Projet non trouvé.") return redirect(url_for('dashboard')) today = date.today() today_day = today.day if today.month == 12 else 1 if today_day > project["total_days"]: today_day = project["total_days"] today_draw = get_draw_for_project_day(project_id, today_day) all_draws = get_all_draws_for_project(project_id) stats = get_people_stats(project_id) return render_template("project_view.html", project=project, today_day=today_day, today_draw=today_draw, all_draws=all_draws, stats=stats) @app.route("/project//draw-today") @login_required def draw_today(project_id): today = date.today() today_day = today.day if today.month == 12 else 1 project = get_project(project_id) if not project or today_day > project["total_days"]: flash("Projet ou jour invalide.") return redirect(url_for('dashboard')) name, is_new = draw_for_project_day(project_id, today_day) if not name: flash("Plus aucune personne disponible pour ce projet.") else: if is_new: flash(f"Tirage du jour {today_day} pour '{project['name']}': {name}") else: flash(f"Le jour {today_day} a déjà été tiré: {name}") return redirect(url_for("project_view", project_id=project_id)) @app.route("/project//catchup") @login_required def catchup_draws_route(project_id): project = get_project(project_id) if not project: flash("Projet non trouvé.") return redirect(url_for('dashboard')) new_draws = catchup_draws(project_id) if new_draws > 0: flash(f"✅ {new_draws} tirages effectués pour rattraper les jours manquants !") else: flash("Tous les jours ont déjà été tirés.") return redirect(url_for("project_view", project_id=project_id)) @app.route("/admin/projects", methods=["GET", "POST"]) @login_required def admin_projects(): db = get_db() if request.method == "POST": action = request.form.get("action") if action in ("add", "update"): name = request.form.get("name", "").strip() description = request.form.get("description", "").strip() total_days = int(request.form.get("total_days", 24)) image_url = None if 'image_file' in request.files: file = request.files['image_file'] if file and allowed_file(file.filename): filename = secure_filename(file.filename) filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) file.save(filepath) image_url = f"/static/uploads/{filename}" if action == "add": if name: db.execute( "INSERT INTO project (name, description, image_url, total_days) VALUES (?, ?, ?, ?)", (name, description, image_url, total_days) ) db.commit() flash("Projet ajouté avec succès.") else: # update project_id = int(request.form.get("project_id")) if image_url is None: cur = db.execute("SELECT image_url FROM project WHERE id = ?", (project_id,)) row = cur.fetchone() image_url = row["image_url"] if row else None db.execute( "UPDATE project SET name = ?, description = ?, image_url = ?, total_days = ? WHERE id = ?", (name, description, image_url, total_days, project_id) ) recalc_max_draws_for_project(project_id) db.commit() flash("Projet mis à jour avec succès.") elif action == "delete": project_id = int(request.form.get("project_id")) db.execute("DELETE FROM project WHERE id = ?", (project_id,)) db.commit() flash("Projet supprimé.") projects = get_project() return render_template("admin_projects.html", projects=projects) @app.route("/admin/project//people", methods=["GET", "POST"]) @login_required def admin_project_people(project_id): db = get_db() project = get_project(project_id) if not project: flash("Projet non trouvé.") return redirect(url_for('admin_projects')) if request.method == "POST": action = request.form.get("action") if action == "add": name = request.form.get("name", "").strip() if name: db.execute("INSERT INTO people (project_id, name, draws, max_draws) VALUES (?, ?, 0, 0)", (project_id, name)) recalc_max_draws_for_project(project_id) db.commit() flash("Personne ajoutée.") elif action == "import_csv": if 'csv_file' in request.files: csv_file = request.files['csv_file'] if csv_file.filename: content = csv_file.read().decode('utf-8-sig') reader = csv.DictReader(StringIO(content)) count = 0 for row in reader: name = row.get("name", "").strip() if name: cur = db.execute("SELECT id FROM people WHERE name = ? AND project_id = ?", (name, project_id)) if not cur.fetchone(): db.execute("INSERT INTO people (project_id, name, draws, max_draws) VALUES (?, ?, 0, 0)", (project_id, name)) count += 1 recalc_max_draws_for_project(project_id) db.commit() flash(f"✅ {count} personnes importées (espaces + accents préservés)") elif action == "delete": person_id = int(request.form.get("person_id")) db.execute("DELETE FROM people WHERE id = ? AND project_id = ?", (person_id, project_id)) recalc_max_draws_for_project(project_id) db.commit() flash("Personne supprimée.") elif action == "reset_draws": db.execute("UPDATE people SET draws = 0 WHERE project_id = ?", (project_id,)) db.execute("DELETE FROM draws WHERE project_id = ?", (project_id,)) db.commit() flash("Tirages remis à zéro.") people = get_people_stats(project_id) return render_template("admin_project_people.html", project=project, people=people) @app.route("/admin/change-password", methods=["GET", "POST"]) @login_required def admin_change_password(): db = get_db() if request.method == "POST": current_password = request.form.get("current_password") new_password = request.form.get("new_password") confirm_password = request.form.get("confirm_password") user_id = session.get("user_id") cur = db.execute("SELECT password FROM user WHERE id = ?", (user_id,)) user = cur.fetchone() if not user: flash("Erreur utilisateur.") return render_template("admin_change_password.html") if not check_password_hash(user["password"], current_password): flash("Mot de passe actuel incorrect.") return render_template("admin_change_password.html") if new_password != confirm_password: flash("Les nouveaux mots de passe ne correspondent pas.") return render_template("admin_change_password.html") if len(new_password) < 6: flash("Le nouveau mot de passe doit faire au moins 6 caractères.") return render_template("admin_change_password.html") hashed_password = generate_password_hash(new_password) db.execute("UPDATE user SET password = ? WHERE id = ?", (hashed_password, user_id)) db.commit() flash("✅ Mot de passe changé avec succès !") return redirect(url_for("admin_projects")) return render_template("admin_change_password.html") @app.route("/login", methods=["GET", "POST"]) def login(): if request.method == "POST": username = request.form.get("username") password = request.form.get("password") user = check_login(username, password) if user: session["user_id"] = user["id"] session["username"] = user["username"] flash("Connexion réussie.") return redirect(url_for("dashboard")) else: flash("Identifiants invalides.") return render_template("login.html") @app.route("/logout") def logout(): session.clear() flash("Déconnecté.") return redirect(url_for("login")) if __name__ == "__main__": if not os.path.exists(DATABASE): with app.app_context(): init_db() app.run(host="0.0.0.0", debug=True)