Files
AdventCalendar/app.py
2025-12-02 14:51:30 +01:00

405 lines
15 KiB
Python

import os
import math
import csv
import sqlite3
import random
from datetime import date
from io import StringIO
from flask import (
Flask, g, render_template, redirect, url_for, flash,
request, session
)
from functools import wraps
from werkzeug.utils import secure_filename
DATABASE = os.path.join(os.path.dirname(__file__), "avent.db")
UPLOAD_FOLDER = os.path.join(os.path.dirname(__file__), "static", "uploads")
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
ALLOWED_EXTENSIONS = {"png", "jpg", "jpeg", "gif"}
app = Flask(__name__)
app.secret_key = "change-me-super-secret-key-2025"
app.config["UPLOAD_FOLDER"] = UPLOAD_FOLDER
app.config["MAX_CONTENT_LENGTH"] = 5 * 1024 * 1024 # Max 5 MB
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_db():
if 'db' not in g:
g.db = sqlite3.connect(DATABASE)
g.db.row_factory = sqlite3.Row
return g.db
@app.teardown_appcontext
def close_db(error=None):
db = g.pop('db', None)
if db is not None:
db.close()
def init_db():
db = get_db()
with open(os.path.join(os.path.dirname(__file__), "schema.sql")) as f:
db.executescript(f.read())
cur = db.execute("SELECT COUNT(*) AS c FROM user")
if cur.fetchone()['c'] == 0:
db.execute("INSERT INTO user (username, password) VALUES (?, ?)", ("admin", "admin"))
cur = db.execute("SELECT COUNT(*) AS c FROM project")
if cur.fetchone()['c'] == 0:
project_id = db.execute(
"INSERT INTO project (name, description, image_url, total_days) VALUES (?, ?, ?, ?)",
(
"Calendrier de l'Avent 2025",
"Le calendrier de l'Avent officiel avec tirage aléatoire et gestion des utilisateurs.",
"https://cdn-icons-png.flaticon.com/512/616/616408.png",
24
)
).lastrowid
people_list = ["User1"]
for name in people_list:
db.execute("INSERT INTO people (project_id, name, draws, max_draws) VALUES (?, ?, 0, 0)", (project_id, name))
recalc_max_draws_for_project(project_id)
db.commit()
def get_user_by_username(username):
db = get_db()
cur = db.execute("SELECT * FROM user WHERE username = ?", (username,))
return cur.fetchone()
def check_login(username, password):
user = get_user_by_username(username)
if user and user["password"] == password:
return user
return None
def login_required(fn):
@wraps(fn)
def wrapped(*args, **kwargs):
if 'user_id' not in session:
return redirect(url_for('login'))
return fn(*args, **kwargs)
return wrapped
def recalc_max_draws_for_project(project_id):
db = get_db()
cur = db.execute("SELECT total_days FROM project WHERE id = ?", (project_id,))
project = cur.fetchone()
if not project:
return
total_days = project["total_days"]
cur = db.execute("SELECT COUNT(*) AS c FROM people WHERE project_id = ?", (project_id,))
count = cur.fetchone()["c"]
if count == 0:
return
max_draws = math.ceil(total_days / count)
db.execute("UPDATE people SET max_draws = ? WHERE project_id = ?", (max_draws, project_id))
db.commit()
def get_project(project_id=None):
db = get_db()
if project_id:
cur = db.execute("SELECT * FROM project WHERE id = ?", (project_id,))
return cur.fetchone()
cur = db.execute("SELECT * FROM project ORDER BY id ASC")
return cur.fetchall()
def get_people_stats(project_id):
db = get_db()
cur = db.execute("SELECT * FROM people WHERE project_id = ? ORDER BY name ASC", (project_id,))
return cur.fetchall()
def get_draw_for_project_day(project_id, day):
db = get_db()
cur = db.execute("""
SELECT draws.day, people.name, draws.draw_time
FROM draws JOIN people ON people.id = draws.person_id
WHERE draws.project_id = ? AND draws.day = ?
""", (project_id, day))
return cur.fetchone()
def get_all_draws_for_project(project_id):
db = get_db()
cur = db.execute("""
SELECT draws.day, people.name, draws.draw_time
FROM draws JOIN people ON people.id = draws.person_id
WHERE draws.project_id = ?
ORDER BY draws.day ASC
""", (project_id,))
return cur.fetchall()
def draw_for_project_day(project_id, day):
db = get_db()
existing = get_draw_for_project_day(project_id, day)
if existing:
return existing["name"], False
cur = db.execute("""
SELECT id, name, draws, max_draws
FROM people WHERE project_id = ? AND draws < max_draws
""", (project_id,))
candidates = cur.fetchall()
if not candidates:
return None, False
chosen = random.choice(candidates)
db.execute("UPDATE people SET draws = draws + 1 WHERE id = ?", (chosen["id"],))
db.execute("INSERT INTO draws (day, person_id, project_id) VALUES (?, ?, ?)",
(day, chosen["id"], project_id))
db.commit()
return chosen["name"], True
def catchup_draws(project_id):
db = get_db()
project = get_project(project_id)
if not project:
return 0
today = date.today()
today_day = today.day if today.month == 12 else 1
if today_day > project["total_days"]:
today_day = project["total_days"]
new_draws = 0
for day in range(1, today_day + 1):
existing = get_draw_for_project_day(project_id, day)
if not existing:
name, success = draw_for_project_day(project_id, day)
if success:
new_draws += 1
db.commit()
return new_draws
@app.route("/public")
def public_dashboard():
projects = get_project()
return render_template("public_projects.html", projects=projects)
@app.route("/public/project/<int:project_id>")
def public_project_view(project_id):
project = get_project(project_id)
if not project:
flash("Projet non trouvé.")
return redirect(url_for('public_dashboard'))
today = date.today()
today_day = today.day if today.month == 12 else 1
if today_day > project["total_days"]:
today_day = project["total_days"]
today_draw = get_draw_for_project_day(project_id, today_day)
all_draws = get_all_draws_for_project(project_id)
stats = get_people_stats(project_id)
return render_template("public_project_view.html", project=project, today_day=today_day,
today_draw=today_draw, all_draws=all_draws, stats=stats)
@app.route("/")
@login_required
def dashboard():
projects = get_project()
return render_template("projects.html", projects=projects)
@app.route("/project/<int:project_id>")
@login_required
def project_view(project_id):
project = get_project(project_id)
if not project:
flash("Projet non trouvé.")
return redirect(url_for('dashboard'))
today = date.today()
today_day = today.day if today.month == 12 else 1
if today_day > project["total_days"]:
today_day = project["total_days"]
today_draw = get_draw_for_project_day(project_id, today_day)
all_draws = get_all_draws_for_project(project_id)
stats = get_people_stats(project_id)
return render_template("project_view.html", project=project, today_day=today_day,
today_draw=today_draw, all_draws=all_draws, stats=stats)
@app.route("/project/<int:project_id>/draw-today")
@login_required
def draw_today(project_id):
today = date.today()
today_day = today.day if today.month == 12 else 1
project = get_project(project_id)
if not project or today_day > project["total_days"]:
flash("Projet ou jour invalide.")
return redirect(url_for('dashboard'))
name, is_new = draw_for_project_day(project_id, today_day)
if not name:
flash("Plus aucune personne disponible pour ce projet.")
else:
if is_new:
flash(f"Tirage du jour {today_day} pour '{project['name']}': {name}")
else:
flash(f"Le jour {today_day} a déjà été tiré: {name}")
return redirect(url_for("project_view", project_id=project_id))
@app.route("/project/<int:project_id>/catchup")
@login_required
def catchup_draws_route(project_id):
project = get_project(project_id)
if not project:
flash("Projet non trouvé.")
return redirect(url_for('dashboard'))
new_draws = catchup_draws(project_id)
if new_draws > 0:
flash(f"{new_draws} tirages effectués pour rattraper les jours manquants !")
else:
flash("Tous les jours ont déjà été tirés.")
return redirect(url_for("project_view", project_id=project_id))
@app.route("/admin/projects", methods=["GET", "POST"])
@login_required
def admin_projects():
db = get_db()
if request.method == "POST":
action = request.form.get("action")
if action in ("add", "update"):
name = request.form.get("name", "").strip()
description = request.form.get("description", "").strip()
total_days = int(request.form.get("total_days", 24))
image_url = None
if 'image_file' in request.files:
file = request.files['image_file']
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
image_url = f"/static/uploads/{filename}"
if action == "add":
if name:
db.execute(
"INSERT INTO project (name, description, image_url, total_days) VALUES (?, ?, ?, ?)",
(name, description, image_url, total_days)
)
db.commit()
flash("Projet ajouté avec succès.")
else: # update
project_id = int(request.form.get("project_id"))
if image_url is None:
cur = db.execute("SELECT image_url FROM project WHERE id = ?", (project_id,))
row = cur.fetchone()
image_url = row["image_url"] if row else None
db.execute(
"UPDATE project SET name = ?, description = ?, image_url = ?, total_days = ? WHERE id = ?",
(name, description, image_url, total_days, project_id)
)
recalc_max_draws_for_project(project_id)
db.commit()
flash("Projet mis à jour avec succès.")
elif action == "delete":
project_id = int(request.form.get("project_id"))
db.execute("DELETE FROM project WHERE id = ?", (project_id,))
db.commit()
flash("Projet supprimé.")
projects = get_project()
return render_template("admin_projects.html", projects=projects)
@app.route("/admin/project/<int:project_id>/people", methods=["GET", "POST"])
@login_required
def admin_project_people(project_id):
db = get_db()
project = get_project(project_id)
if not project:
flash("Projet non trouvé.")
return redirect(url_for('admin_projects'))
if request.method == "POST":
action = request.form.get("action")
if action == "add":
name = request.form.get("name", "").strip()
if name:
db.execute("INSERT INTO people (project_id, name, draws, max_draws) VALUES (?, ?, 0, 0)",
(project_id, name))
recalc_max_draws_for_project(project_id)
db.commit()
flash("Personne ajoutée.")
elif action == "import_csv":
if 'csv_file' in request.files:
csv_file = request.files['csv_file']
if csv_file.filename:
# Lecture UTF-8 avec gestion BOM
content = csv_file.read().decode('utf-8-sig')
reader = csv.DictReader(StringIO(content))
count = 0
for row in reader:
# CONSERVE les espaces INTERNES, supprime SEULEMENT avant/après
raw_name = row.get("name", "").strip() # UNIQUEMENT les bords
if raw_name:
# Pas de .title() pour conserver la casse exacte
name = raw_name # ESPACES INTERNES préservés !
# Vérifier doublons exacts (espaces inclus)
cur = db.execute(
"SELECT id FROM people WHERE name = ? AND project_id = ?",
(name, project_id)
)
if not cur.fetchone():
db.execute(
"INSERT INTO people (project_id, name, draws, max_draws) VALUES (?, ?, 0, 0)",
(project_id, name)
)
count += 1
recalc_max_draws_for_project(project_id)
db.commit()
flash(f"{count} personnes importées (espaces + accents préservés !)")
elif action == "delete":
person_id = int(request.form.get("person_id"))
db.execute("DELETE FROM people WHERE id = ? AND project_id = ?", (person_id, project_id))
recalc_max_draws_for_project(project_id)
db.commit()
flash("Personne supprimée.")
elif action == "reset_draws":
db.execute("UPDATE people SET draws = 0 WHERE project_id = ?", (project_id,))
db.execute("DELETE FROM draws WHERE project_id = ?", (project_id,))
db.commit()
flash("Tirages remis à zéro.")
people = get_people_stats(project_id)
return render_template("admin_project_people.html", project=project, people=people)
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username")
password = request.form.get("password")
user = check_login(username, password)
if user:
session["user_id"] = user["id"]
session["username"] = user["username"]
flash("Connexion réussie.")
return redirect(url_for("dashboard"))
else:
flash("Identifiants invalides.")
return render_template("login.html")
@app.route("/logout")
def logout():
session.clear()
flash("Déconnecté.")
return redirect(url_for("login"))
if __name__ == "__main__":
if not os.path.exists(DATABASE):
with app.app_context():
init_db()
app.run(host="0.0.0.0", debug=True)