from flask import Flask, render_template, request, redirect, url_for, jsonify, session, flash from urllib.parse import urljoin import requests import urllib3 import json import sys import time import os import uuid from werkzeug.utils import secure_filename from PIL import Image from functools import wraps from flask_sqlalchemy import SQLAlchemy from sqlalchemy import func # Disable SSL warnings for local services urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) app = Flask(__name__) UPLOAD_FOLDER = '/app/static/uploads' ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'webp'} app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////data/dashboard.db' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.secret_key = os.environ.get('SECRET_KEY', 'a-secure-default-secret-key-for-development-only') db = SQLAlchemy(app) # --- Caching for Status Checks --- URL_CACHE = {} CACHE_DURATION = 300 # 5 minutes # --- Custom Filters --- @app.template_filter('json_loads') def json_loads_filter(s): if not s: return {} try: return json.loads(s) except Exception: return {} # --- Database Models --- class Setting(db.Model): key = db.Column(db.String(50), primary_key=True) value = db.Column(db.String(255)) class Page(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False) icon = db.Column(db.String(255)) order = db.Column(db.Integer, default=0) groups = db.relationship('Group', backref='page', lazy=True, cascade="all, delete-orphan", order_by="Group.order") widgets = db.relationship('Widget', backref='page', lazy=True, cascade="all, delete-orphan", order_by="Widget.order") class Group(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False) icon = db.Column(db.String(255)) order = db.Column(db.Integer, default=0) page_id = db.Column(db.Integer, db.ForeignKey('page.id'), nullable=False) links = db.relationship('Link', backref='group', lazy=True, cascade="all, delete-orphan", order_by="Link.order") class Link(db.Model): id = db.Column(db.Integer, primary_key=True) name = db.Column(db.String(100), nullable=False) url = db.Column(db.String(255), nullable=False) icon = db.Column(db.String(255)) subtitle = db.Column(db.String(255)) target = db.Column(db.String(20), default='_self') order = db.Column(db.Integer, default=0) group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=False) class Widget(db.Model): id = db.Column(db.Integer, primary_key=True) type = db.Column(db.String(50), nullable=False) # e.g., 'pihole_status', 'openmeteo_weather', 'iframe_widget' config = db.Column(db.Text, nullable=True) # JSON-formatted string order = db.Column(db.Integer, default=0) page_id = db.Column(db.Integer, db.ForeignKey('page.id'), nullable=False) # Changed from group_id to page_id def migrate_db(): from sqlalchemy import text with app.app_context(): db.create_all() try: with db.engine.connect() as conn: # Check Page order try: conn.execute(text("SELECT \"order\" FROM page LIMIT 1")) except Exception: print("Migrating DB: Adding order column to Page") conn.execute(text("ALTER TABLE page ADD COLUMN \"order\" INTEGER DEFAULT 0")) conn.commit() # Check Group order try: conn.execute(text("SELECT \"order\" FROM \"group\" LIMIT 1")) except Exception: print("Migrating DB: Adding order column to Group") conn.execute(text("ALTER TABLE \"group\" ADD COLUMN \"order\" INTEGER DEFAULT 0")) conn.commit() # Check Link order try: conn.execute(text("SELECT \"order\" FROM link LIMIT 1")) except Exception: print("Migrating DB: Adding order column to Link") conn.execute(text("ALTER TABLE link ADD COLUMN \"order\" INTEGER DEFAULT 0")) conn.commit() # Check Widget page_id migration and Cleanup group_id try: # Check if group_id still exists by trying to select it conn.execute(text("SELECT group_id FROM widget LIMIT 1")) print("Migrating DB: Removing group_id from Widget (Recreating Table)") # 1. Create new table conn.execute(text(""" CREATE TABLE widget_new ( id INTEGER NOT NULL, type VARCHAR(50) NOT NULL, config TEXT, "order" INTEGER, page_id INTEGER NOT NULL, PRIMARY KEY (id), FOREIGN KEY(page_id) REFERENCES page (id) ) """)) # 2. Copy data # If page_id was already populated by previous migration, use it. # If not (failed partial migration), try to derive from group. # We try to copy page_id first. try: conn.execute(text("INSERT INTO widget_new (id, type, config, \"order\", page_id) SELECT id, type, config, \"order\", page_id FROM widget WHERE page_id IS NOT NULL")) except Exception: # Fallback: Calculate page_id from group_id link if page_id was missing/null conn.execute(text("INSERT INTO widget_new (id, type, config, \"order\", page_id) SELECT w.id, w.type, w.config, w.\"order\", g.page_id FROM widget w JOIN \"group\" g ON w.group_id = g.id")) # 3. Drop old table conn.execute(text("DROP TABLE widget")) # 4. Rename new table conn.execute(text("ALTER TABLE widget_new RENAME TO widget")) conn.commit() print("Widget table migrated and cleaned successfully.") except Exception as e: # If group_id doesn't exist, we assume table is already clean or check logic is different # But we should ensure page_id exists try: conn.execute(text("SELECT page_id FROM widget LIMIT 1")) except: print(f"Deep migration error: {e}") # Check Widget order try: conn.execute(text("SELECT \"order\" FROM widget LIMIT 1")) except Exception: print("Migrating DB: Adding order column to Widget") conn.execute(text("ALTER TABLE widget ADD COLUMN \"order\" INTEGER DEFAULT 0")) conn.commit() except Exception as e: print(f"Migration general warning: {e}") migrate_db() # --- Authentication --- def login_required(f): @wraps(f) def decorated_function(*args, **kwargs): if not session.get('logged_in'): return redirect(url_for('login', next=request.url)) return f(*args, **kwargs) return decorated_function @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] if username == os.environ.get('ADMIN_USERNAME') and password == os.environ.get('ADMIN_PASSWORD'): session['logged_in'] = True flash('Login erfolgreich!', 'success') next_url = request.args.get('next') return redirect(next_url or url_for('admin')) else: flash('Falscher Benutzername oder Passwort.', 'danger') return render_template('login.html') @app.route('/logout') def logout(): session.pop('logged_in', None) flash('Du wurdest ausgeloggt.', 'info') return redirect(url_for('index')) # --- Helper & Image Processing --- def allowed_file(filename): return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS def save_and_scale_icon(request_file): if request_file and request_file.filename != '' and allowed_file(request_file.filename): try: unique_filename = f"icon_{uuid.uuid4().hex}.png" temp_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename) img = Image.open(request_file.stream) img = img.resize((48, 48), Image.Resampling.LANCZOS) if img.mode != 'RGBA': img = img.convert('RGBA') img.save(temp_path, 'PNG') return unique_filename except Exception as e: print(f"Fehler bei der Bildverarbeitung: {e}") return None return None def save_background_or_logo(request_file, prefix): if request_file and request_file.filename != '' and allowed_file(request_file.filename): try: filename = secure_filename(f"{prefix}_{request_file.filename}") filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) request_file.save(filepath) return filename except Exception as e: print(f"Fehler beim Speichern: {e}") return None # --- Database Helper --- def get_all_data(): settings_query = Setting.query.all() settings = {s.key: s.value for s in settings_query} defaults = default_settings() for key, value in defaults.items(): if key not in settings: settings[key] = value pages = Page.query.order_by(Page.order).all() return { "settings": settings, "pages": pages } def default_settings(): return { "title": "Mein Dashboard", "bg_image": "", "logo_image": "", "header_color": "#363636", "header_font_color": "#FFFFFF", "header_inactive_font_color": "#C0C0C0", "group_title_font_color": "#FFFFFF", "columns": "is-one-quarter-desktop", "theme_color": "#3273dc", "overlay_opacity": "0.85" } # --- Routes --- def hex_to_rgb(hex_color): hex_color = hex_color.lstrip('#') return ",".join(str(int(hex_color[i:i+2], 16)) for i in (0, 2, 4)) @app.route('/') def index(): data = get_all_data() if not data['pages']: return render_template('empty.html') active_page_index = request.args.get('page', 0, type=int) if active_page_index >= len(data['pages']): active_page_index = 0 active_page = data['pages'][active_page_index] if 'group_bg_color' in data['settings']: data['settings']['group_bg_color_rgb'] = hex_to_rgb(data['settings']['group_bg_color']) return render_template('index.html', data=data, active_page=active_page, active_index=active_page_index) @app.route('/admin') @login_required def admin(): data = get_all_data() return render_template('admin.html', data=data) @app.route('/admin/local_icons') @login_required def local_icons(): return jsonify([f for f in os.listdir(UPLOAD_FOLDER) if f.startswith('icon_') or f.startswith('upload_') or f.startswith('link_')]) @app.route('/admin/media_manager') @login_required def media_manager(): files = [] for f in os.listdir(UPLOAD_FOLDER): filepath = os.path.join(UPLOAD_FOLDER, f) try: if os.path.isfile(filepath): files.append({ 'name': f, 'size': f"{os.path.getsize(filepath) / 1024:.2f} KB", 'type': 'Icon' if f.startswith('icon_') else 'Hintergrund' if f.startswith('bg_') else 'Logo' if f.startswith('logo_') else 'Unbekannt' }) except Exception as e: continue return render_template('media.html', files=files) def _update_setting(key, value): setting = Setting.query.filter_by(key=key).first() if setting: setting.value = value else: db.session.add(Setting(key=key, value=value)) @app.route('/admin/update_settings', methods=['POST']) @login_required def update_settings(): for key, value in request.form.items(): _update_setting(key, value) bg_filename = save_background_or_logo(request.files.get('bg_file'), 'bg') if bg_filename: _update_setting('bg_image', bg_filename) logo_filename = save_background_or_logo(request.files.get('logo_file'), 'logo') if logo_filename: _update_setting('logo_image', logo_filename) _update_setting('settings_configured', 'true') db.session.commit() flash('Einstellungen gespeichert!', 'success') return redirect(url_for('admin', tab='settings')) @app.route('/admin/delete_media/') @login_required def delete_media(filename): if '..' in filename or os.path.sep in filename: return "Ungültiger Dateiname", 400 filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) if os.path.exists(filepath): os.remove(filepath) # Check if this file was used as the background or logo bg_setting = Setting.query.filter_by(key='bg_image', value=filename).first() if bg_setting: bg_setting.value = "" logo_setting = Setting.query.filter_by(key='logo_image', value=filename).first() if logo_setting: logo_setting.value = "" db.session.commit() flash(f'Datei {filename} gelöscht.', 'info') return redirect(url_for('media_manager')) # --- CRUD Routes (Pages, Groups, Links, Widgets) --- @app.route('/admin/add_page', methods=['POST']) @login_required def add_page(): name = request.form.get('name') if name: icon_val = save_and_scale_icon(request.files.get('icon_file')) or request.form.get('icon_text', '') new_page = Page(name=name, icon=icon_val) db.session.add(new_page) db.session.commit() return redirect(url_for('admin')) @app.route('/admin/delete_page/') @login_required def delete_page(page_id): page = Page.query.get_or_404(page_id) db.session.delete(page) db.session.commit() return redirect(url_for('admin')) @app.route('/admin/add_group/', methods=['POST']) @login_required def add_group(page_id): page = Page.query.get_or_404(page_id) name = request.form.get('name') if name: icon_val = save_and_scale_icon(request.files.get('icon_file')) or request.form.get('icon_text', '') new_group = Group(name=name, icon=icon_val, page_id=page.id) db.session.add(new_group) db.session.commit() return redirect(url_for('admin')) @app.route('/admin/delete_group/') @login_required def delete_group(group_id): group = Group.query.get_or_404(group_id) db.session.delete(group) db.session.commit() return redirect(url_for('admin')) @app.route('/admin/add_link/', methods=['POST']) @login_required def add_link(group_id): group = Group.query.get_or_404(group_id) name = request.form.get('name') url = request.form.get('url') if name and url: icon_val = save_and_scale_icon(request.files.get('icon_file')) or request.form.get('icon_text', '') new_link = Link( name=name, url=url, icon=icon_val, subtitle=request.form.get('subtitle'), target=request.form.get('target', '_blank'), group_id=group.id ) db.session.add(new_link) db.session.commit() return redirect(url_for('admin')) @app.route('/admin/delete_link/') @login_required def delete_link(link_id): link = Link.query.get_or_404(link_id) db.session.delete(link) db.session.commit() return redirect(url_for('admin')) @app.route('/admin/add_widget/', methods=['POST']) @login_required def add_widget(page_id): widget_type = request.form.get('type') if not widget_type: flash('Widget-Typ nicht angegeben.', 'danger') return redirect(url_for('admin')) config = {} if widget_type == 'pihole_status': config = { 'api_url': request.form.get('pihole_api_url'), 'api_key': request.form.get('pihole_api_key') } elif widget_type == 'openmeteo_weather': config = { 'latitude': request.form.get('weather_lat'), 'longitude': request.form.get('weather_lon') } elif widget_type == 'iframe_widget': config = { 'url': request.form.get('iframe_url'), 'height': request.form.get('iframe_height') } new_widget = Widget( page_id=page_id, type=widget_type, config=json.dumps(config) ) db.session.add(new_widget) db.session.commit() flash('Widget erfolgreich hinzugefügt!', 'success') return redirect(url_for('admin')) @app.route('/admin/edit_widget/', methods=['POST']) @login_required def edit_widget(widget_id): widget = Widget.query.get_or_404(widget_id) config = {} if widget.type == 'pihole_status': config = { 'api_url': request.form.get('pihole_api_url'), 'api_key': request.form.get('pihole_api_key') } elif widget.type == 'openmeteo_weather': config = { 'latitude': request.form.get('weather_lat'), 'longitude': request.form.get('weather_lon') } elif widget.type == 'iframe_widget': config = { 'url': request.form.get('iframe_url'), 'height': request.form.get('iframe_height') } widget.config = json.dumps(config) db.session.commit() flash('Widget erfolgreich aktualisiert!', 'success') return redirect(url_for('admin')) @app.route('/admin/delete_widget/') @login_required def delete_widget(widget_id): widget = Widget.query.get_or_404(widget_id) db.session.delete(widget) db.session.commit() flash('Widget gelöscht.', 'info') return redirect(url_for('admin')) @app.route('/admin/edit_link/', methods=['POST']) @login_required def edit_link(link_id): link = Link.query.get_or_404(link_id) link.name = request.form.get('name') link.url = request.form.get('url') link.subtitle = request.form.get('subtitle') link.target = request.form.get('target') new_group_id = request.form.get('group_id') if new_group_id and int(new_group_id) != link.group_id: link.group_id = int(new_group_id) new_icon_file = save_and_scale_icon(request.files.get('icon_file')) new_icon_text = request.form.get('icon_text') if new_icon_file: link.icon = new_icon_file elif new_icon_text: link.icon = new_icon_text db.session.commit() return redirect(url_for('admin')) @app.route('/admin/edit_page/', methods=['POST']) @login_required def edit_page(page_id): page = Page.query.get_or_404(page_id) page.name = request.form.get('name') new_icon_file = save_and_scale_icon(request.files.get('icon_file')) new_icon_text = request.form.get('icon_text') if new_icon_file: page.icon = new_icon_file elif new_icon_text or 'icon_text' in request.form: page.icon = new_icon_text db.session.commit() flash('Seite erfolgreich aktualisiert!', 'success') return redirect(url_for('admin')) @app.route('/admin/edit_group/', methods=['POST']) @login_required def edit_group(group_id): group = Group.query.get_or_404(group_id) group.name = request.form.get('name') new_page_id = request.form.get('page_id') if new_page_id and int(new_page_id) != group.page_id: group.page_id = int(new_page_id) new_icon_file = save_and_scale_icon(request.files.get('icon_file')) new_icon_text = request.form.get('icon_text') if new_icon_file: group.icon = new_icon_file elif new_icon_text or 'icon_text' in request.form: group.icon = new_icon_text db.session.commit() flash('Gruppe erfolgreich aktualisiert!', 'success') return redirect(url_for('admin')) # --- Reordering --- @app.route('/admin/reorder_pages', methods=['POST']) @login_required def reorder_pages(): order_data = request.json.get('order') if not order_data: return jsonify({'status': 'error'}), 400 for index, page_id in enumerate(order_data): page = Page.query.get(page_id) if page: page.order = index db.session.commit() return jsonify({'status': 'success'}) @app.route('/admin/reorder_groups', methods=['POST']) @login_required def reorder_groups(): data = request.json order_data = data.get('order') page_id = data.get('page_id') if not order_data: return jsonify({'status': 'error'}), 400 for index, group_id in enumerate(order_data): group = Group.query.get(group_id) if group: group.order = index if page_id: group.page_id = int(page_id) db.session.commit() return jsonify({'status': 'success'}) @app.route('/admin/reorder_links', methods=['POST']) @login_required def reorder_links(): data = request.json order_data = data.get('order') group_id = data.get('group_id') if not order_data: return jsonify({'status': 'error'}), 400 for index, link_id in enumerate(order_data): link = Link.query.get(link_id) if link: link.order = index if group_id: link.group_id = int(group_id) db.session.commit() return jsonify({'status': 'success'}) # --- Utility Routes --- @app.route('/check_url') def check_url(): url = request.args.get('url') if not url: return jsonify({'status': 'offline', 'reason': 'No URL provided'}), 400 # Check Cache current_time = time.time() if url in URL_CACHE: status, timestamp = URL_CACHE[url] if current_time - timestamp < CACHE_DURATION: return jsonify({'status': status, 'cached': True}) status = 'offline' try: headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } # Use a short timeout, and don't verify SSL certs for local services response = requests.head(url, headers=headers, timeout=5, verify=False, allow_redirects=True) # If we get ANY response, the server is online/reachable. status = 'online' except requests.exceptions.SSLError: # SSL Handshake failed, but server is there. status = 'online' except requests.exceptions.RequestException: # Connection failed (Timeout, DNS, Refused) # Try one last time with GET, sometimes HEAD is blocked/buggy try: requests.get(url, headers=headers, timeout=5, verify=False, stream=True) status = 'online' except Exception as e2: # Log only actual failures print(f"Check URL failed for {url}: {e2}", file=sys.stdout, flush=True) status = 'offline' # Update Cache URL_CACHE[url] = (status, current_time) return jsonify({'status': status}) @app.route('/search') def search(): query = request.args.get('q', '').strip().lower() if not query or len(query) < 2: return jsonify([]) search_term = f"%{query}%" results = db.session.query(Link, Group.name, Page.name)\ .join(Group, Link.group_id == Group.id)\ .join(Page, Group.page_id == Page.id)\ .filter( (func.lower(Link.name).like(search_term)) | (func.lower(Link.subtitle).like(search_term)) ).all() output = [{ 'name': link.name, 'url': link.url, 'subtitle': link.subtitle, 'icon': link.icon, 'group_name': group_name, 'page_name': page_name } for link, group_name, page_name in results] return jsonify(output) # --- Widget Data Handlers --- def get_pihole_status(config): user_url = config.get('api_url', '').rstrip('/') password = config.get('api_key') if not user_url: return {'error': "Pi-hole API URL not configured."} try: if user_url.endswith('/api'): v6_api_base = user_url + '/' else: v6_api_base = urljoin(user_url, 'api/') # Step 1: Authenticate auth_url = urljoin(v6_api_base, 'auth') auth_response = requests.post(auth_url, json={'password': password}, verify=False, timeout=5) if auth_response.status_code == 404: raise requests.exceptions.RequestException("Auth endpoint not found (404), falling back to v5.") auth_response.raise_for_status() sid = auth_response.json().get('session', {}).get('sid') if not sid: return {'error': 'Pi-hole v6 auth successful, but no SID returned.'} # Step 2a: Get blocking status blocking_url = urljoin(v6_api_base, 'dns/blocking') blocking_response = requests.get(blocking_url, params={'sid': sid}, verify=False, timeout=5) blocking_response.raise_for_status() blocking_data = blocking_response.json() is_enabled = blocking_data.get('blocking') == 'enabled' status = 'enabled' if is_enabled else 'disabled' # Step 2b: Get summary stats summary_url = urljoin(v6_api_base, 'stats/summary') summary_response = requests.get(summary_url, params={'sid': sid}, verify=False, timeout=5) summary_response.raise_for_status() summary_data = summary_response.json() # Parse nested v6 structure queries_data = summary_data.get('queries', {}) gravity_data = summary_data.get('gravity', {}) domains = gravity_data.get('domains_being_blocked', 0) queries = queries_data.get('total', 0) percent = queries_data.get('percent_blocked', 0.0) return { 'status': status, 'domains_being_blocked': domains, 'dns_queries_today': queries, 'ads_percentage_today': percent } except Exception as e: # Fallback for v5 try: params = {'summary': ''} if password: params['auth'] = password v5_base_url = user_url if v5_base_url.endswith('/api'): v5_base_url = v5_base_url[:-4] v5_url = f"{v5_base_url.rstrip('/')}/admin/api.php" response = requests.get(v5_url, params=params, timeout=5, verify=False) response.raise_for_status() v5_data = response.json() if 'ads_percentage_today' in v5_data and isinstance(v5_data['ads_percentage_today'], str): v5_data['ads_percentage_today'] = float(v5_data['ads_percentage_today'].replace(',', '.')) return v5_data except Exception as e2: return {'error': f'Pi-hole connection failed: {e2}'} def get_openmeteo_weather(config): lat = config.get('latitude') lon = config.get('longitude') if not lat or not lon: return {'error': 'Latitude/Longitude not configured.'} try: url = "https://api.open-meteo.com/v1/forecast" params = { "latitude": lat, "longitude": lon, "current": "temperature_2m,weather_code,wind_speed_10m", "daily": "temperature_2m_max,temperature_2m_min,sunrise,sunset", "timezone": "auto" } response = requests.get(url, params=params, timeout=5) response.raise_for_status() data = response.json() return { 'current': data.get('current', {}), 'daily': data.get('daily', {}), 'units': data.get('current_units', {}) } except Exception as e: return {'error': f"Weather API failed: {e}"} def get_iframe_widget(config): return { 'url': config.get('url'), 'height': config.get('height', '300') } WIDGET_HANDLERS = { 'pihole_status': get_pihole_status, 'openmeteo_weather': get_openmeteo_weather, 'iframe_widget': get_iframe_widget } @app.route('/api/widget_data/') def widget_data(widget_id): widget = Widget.query.get_or_404(widget_id) try: config = json.loads(widget.config) if widget.config else {} except json.JSONDecodeError: return jsonify({'error': 'Invalid widget configuration format.'}), 500 handler = WIDGET_HANDLERS.get(widget.type) if not handler: return jsonify({'error': f"No handler for widget type '{widget.type}'."}), 404 data = handler(config) if 'error' in data: return jsonify(data), 500 return jsonify(data) if __name__ == '__main__': os.makedirs(UPLOAD_FOLDER, exist_ok=True) app.run(host='0.0.0.0', port=8080)