Files
startseite/app/app.py
2025-12-03 16:46:11 +00:00

843 lines
29 KiB
Python

from flask import Flask, render_template, request, redirect, url_for, jsonify, session, flash
from urllib.parse import urljoin
import requests
import urllib3
import json
import sys
import time
import os
import uuid
from werkzeug.utils import secure_filename
from PIL import Image
from functools import wraps
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy import func
# Disable SSL warnings for local services
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
app = Flask(__name__)
UPLOAD_FOLDER = '/app/static/uploads'
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'svg', 'webp'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:////data/dashboard.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.secret_key = os.environ.get('SECRET_KEY', 'a-secure-default-secret-key-for-development-only')
db = SQLAlchemy(app)
# --- Caching for Status Checks ---
URL_CACHE = {}
CACHE_DURATION = 300 # 5 minutes
# --- Custom Filters ---
@app.template_filter('json_loads')
def json_loads_filter(s):
if not s: return {}
try:
return json.loads(s)
except Exception:
return {}
# --- Database Models ---
class Setting(db.Model):
key = db.Column(db.String(50), primary_key=True)
value = db.Column(db.String(255))
class Page(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
icon = db.Column(db.String(255))
order = db.Column(db.Integer, default=0)
groups = db.relationship('Group', backref='page', lazy=True, cascade="all, delete-orphan", order_by="Group.order")
widgets = db.relationship('Widget', backref='page', lazy=True, cascade="all, delete-orphan", order_by="Widget.order")
class Group(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
icon = db.Column(db.String(255))
order = db.Column(db.Integer, default=0)
page_id = db.Column(db.Integer, db.ForeignKey('page.id'), nullable=False)
links = db.relationship('Link', backref='group', lazy=True, cascade="all, delete-orphan", order_by="Link.order")
class Link(db.Model):
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(100), nullable=False)
url = db.Column(db.String(255), nullable=False)
icon = db.Column(db.String(255))
subtitle = db.Column(db.String(255))
target = db.Column(db.String(20), default='_self')
order = db.Column(db.Integer, default=0)
group_id = db.Column(db.Integer, db.ForeignKey('group.id'), nullable=False)
class Widget(db.Model):
id = db.Column(db.Integer, primary_key=True)
type = db.Column(db.String(50), nullable=False) # e.g., 'pihole_status', 'openmeteo_weather', 'iframe_widget'
config = db.Column(db.Text, nullable=True) # JSON-formatted string
order = db.Column(db.Integer, default=0)
page_id = db.Column(db.Integer, db.ForeignKey('page.id'), nullable=False) # Changed from group_id to page_id
def migrate_db():
from sqlalchemy import text
with app.app_context():
db.create_all()
try:
with db.engine.connect() as conn:
# Check Page order
try:
conn.execute(text("SELECT \"order\" FROM page LIMIT 1"))
except Exception:
print("Migrating DB: Adding order column to Page")
conn.execute(text("ALTER TABLE page ADD COLUMN \"order\" INTEGER DEFAULT 0"))
conn.commit()
# Check Group order
try:
conn.execute(text("SELECT \"order\" FROM \"group\" LIMIT 1"))
except Exception:
print("Migrating DB: Adding order column to Group")
conn.execute(text("ALTER TABLE \"group\" ADD COLUMN \"order\" INTEGER DEFAULT 0"))
conn.commit()
# Check Link order
try:
conn.execute(text("SELECT \"order\" FROM link LIMIT 1"))
except Exception:
print("Migrating DB: Adding order column to Link")
conn.execute(text("ALTER TABLE link ADD COLUMN \"order\" INTEGER DEFAULT 0"))
conn.commit()
# Check Widget page_id migration and Cleanup group_id
try:
# Check if group_id still exists by trying to select it
conn.execute(text("SELECT group_id FROM widget LIMIT 1"))
print("Migrating DB: Removing group_id from Widget (Recreating Table)")
# 1. Create new table
conn.execute(text("""
CREATE TABLE widget_new (
id INTEGER NOT NULL,
type VARCHAR(50) NOT NULL,
config TEXT,
"order" INTEGER,
page_id INTEGER NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY(page_id) REFERENCES page (id)
)
"""))
# 2. Copy data
# If page_id was already populated by previous migration, use it.
# If not (failed partial migration), try to derive from group.
# We try to copy page_id first.
try:
conn.execute(text("INSERT INTO widget_new (id, type, config, \"order\", page_id) SELECT id, type, config, \"order\", page_id FROM widget WHERE page_id IS NOT NULL"))
except Exception:
# Fallback: Calculate page_id from group_id link if page_id was missing/null
conn.execute(text("INSERT INTO widget_new (id, type, config, \"order\", page_id) SELECT w.id, w.type, w.config, w.\"order\", g.page_id FROM widget w JOIN \"group\" g ON w.group_id = g.id"))
# 3. Drop old table
conn.execute(text("DROP TABLE widget"))
# 4. Rename new table
conn.execute(text("ALTER TABLE widget_new RENAME TO widget"))
conn.commit()
print("Widget table migrated and cleaned successfully.")
except Exception as e:
# If group_id doesn't exist, we assume table is already clean or check logic is different
# But we should ensure page_id exists
try:
conn.execute(text("SELECT page_id FROM widget LIMIT 1"))
except:
print(f"Deep migration error: {e}")
# Check Widget order
try:
conn.execute(text("SELECT \"order\" FROM widget LIMIT 1"))
except Exception:
print("Migrating DB: Adding order column to Widget")
conn.execute(text("ALTER TABLE widget ADD COLUMN \"order\" INTEGER DEFAULT 0"))
conn.commit()
except Exception as e:
print(f"Migration general warning: {e}")
migrate_db()
# --- Authentication ---
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if not session.get('logged_in'):
return redirect(url_for('login', next=request.url))
return f(*args, **kwargs)
return decorated_function
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
if username == os.environ.get('ADMIN_USERNAME') and password == os.environ.get('ADMIN_PASSWORD'):
session['logged_in'] = True
flash('Login erfolgreich!', 'success')
next_url = request.args.get('next')
return redirect(next_url or url_for('admin'))
else:
flash('Falscher Benutzername oder Passwort.', 'danger')
return render_template('login.html')
@app.route('/logout')
def logout():
session.pop('logged_in', None)
flash('Du wurdest ausgeloggt.', 'info')
return redirect(url_for('index'))
# --- Helper & Image Processing ---
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def save_and_scale_icon(request_file):
if request_file and request_file.filename != '' and allowed_file(request_file.filename):
try:
unique_filename = f"icon_{uuid.uuid4().hex}.png"
temp_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
img = Image.open(request_file.stream)
img = img.resize((48, 48), Image.Resampling.LANCZOS)
if img.mode != 'RGBA': img = img.convert('RGBA')
img.save(temp_path, 'PNG')
return unique_filename
except Exception as e:
print(f"Fehler bei der Bildverarbeitung: {e}")
return None
return None
def save_background_or_logo(request_file, prefix):
if request_file and request_file.filename != '' and allowed_file(request_file.filename):
try:
filename = secure_filename(f"{prefix}_{request_file.filename}")
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
request_file.save(filepath)
return filename
except Exception as e:
print(f"Fehler beim Speichern: {e}")
return None
# --- Database Helper ---
def get_all_data():
settings_query = Setting.query.all()
settings = {s.key: s.value for s in settings_query}
defaults = default_settings()
for key, value in defaults.items():
if key not in settings:
settings[key] = value
pages = Page.query.order_by(Page.order).all()
return {
"settings": settings,
"pages": pages
}
def default_settings():
return {
"title": "Mein Dashboard",
"bg_image": "",
"logo_image": "",
"header_color": "#363636",
"header_font_color": "#FFFFFF",
"header_inactive_font_color": "#C0C0C0",
"group_title_font_color": "#FFFFFF",
"columns": "is-one-quarter-desktop",
"theme_color": "#3273dc",
"overlay_opacity": "0.85"
}
# --- Routes ---
def hex_to_rgb(hex_color):
hex_color = hex_color.lstrip('#')
return ",".join(str(int(hex_color[i:i+2], 16)) for i in (0, 2, 4))
@app.route('/')
def index():
data = get_all_data()
if not data['pages']:
return render_template('empty.html')
active_page_index = request.args.get('page', 0, type=int)
if active_page_index >= len(data['pages']):
active_page_index = 0
active_page = data['pages'][active_page_index]
if 'group_bg_color' in data['settings']:
data['settings']['group_bg_color_rgb'] = hex_to_rgb(data['settings']['group_bg_color'])
return render_template('index.html', data=data, active_page=active_page, active_index=active_page_index)
@app.route('/admin')
@login_required
def admin():
data = get_all_data()
return render_template('admin.html', data=data)
@app.route('/admin/local_icons')
@login_required
def local_icons():
return jsonify([f for f in os.listdir(UPLOAD_FOLDER) if f.startswith('icon_') or f.startswith('upload_') or f.startswith('link_')])
@app.route('/admin/media_manager')
@login_required
def media_manager():
files = []
for f in os.listdir(UPLOAD_FOLDER):
filepath = os.path.join(UPLOAD_FOLDER, f)
try:
if os.path.isfile(filepath):
files.append({
'name': f,
'size': f"{os.path.getsize(filepath) / 1024:.2f} KB",
'type': 'Icon' if f.startswith('icon_') else 'Hintergrund' if f.startswith('bg_') else 'Logo' if f.startswith('logo_') else 'Unbekannt'
})
except Exception as e:
continue
return render_template('media.html', files=files)
def _update_setting(key, value):
setting = Setting.query.filter_by(key=key).first()
if setting:
setting.value = value
else:
db.session.add(Setting(key=key, value=value))
@app.route('/admin/update_settings', methods=['POST'])
@login_required
def update_settings():
for key, value in request.form.items():
_update_setting(key, value)
bg_filename = save_background_or_logo(request.files.get('bg_file'), 'bg')
if bg_filename:
_update_setting('bg_image', bg_filename)
logo_filename = save_background_or_logo(request.files.get('logo_file'), 'logo')
if logo_filename:
_update_setting('logo_image', logo_filename)
_update_setting('settings_configured', 'true')
db.session.commit()
flash('Einstellungen gespeichert!', 'success')
return redirect(url_for('admin', tab='settings'))
@app.route('/admin/delete_media/<string:filename>')
@login_required
def delete_media(filename):
if '..' in filename or os.path.sep in filename:
return "Ungültiger Dateiname", 400
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(filepath):
os.remove(filepath)
# Check if this file was used as the background or logo
bg_setting = Setting.query.filter_by(key='bg_image', value=filename).first()
if bg_setting:
bg_setting.value = ""
logo_setting = Setting.query.filter_by(key='logo_image', value=filename).first()
if logo_setting:
logo_setting.value = ""
db.session.commit()
flash(f'Datei {filename} gelöscht.', 'info')
return redirect(url_for('media_manager'))
# --- CRUD Routes (Pages, Groups, Links, Widgets) ---
@app.route('/admin/add_page', methods=['POST'])
@login_required
def add_page():
name = request.form.get('name')
if name:
icon_val = save_and_scale_icon(request.files.get('icon_file')) or request.form.get('icon_text', '')
new_page = Page(name=name, icon=icon_val)
db.session.add(new_page)
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/delete_page/<int:page_id>')
@login_required
def delete_page(page_id):
page = Page.query.get_or_404(page_id)
db.session.delete(page)
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/add_group/<int:page_id>', methods=['POST'])
@login_required
def add_group(page_id):
page = Page.query.get_or_404(page_id)
name = request.form.get('name')
if name:
icon_val = save_and_scale_icon(request.files.get('icon_file')) or request.form.get('icon_text', '')
new_group = Group(name=name, icon=icon_val, page_id=page.id)
db.session.add(new_group)
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/delete_group/<int:group_id>')
@login_required
def delete_group(group_id):
group = Group.query.get_or_404(group_id)
db.session.delete(group)
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/add_link/<int:group_id>', methods=['POST'])
@login_required
def add_link(group_id):
group = Group.query.get_or_404(group_id)
name = request.form.get('name')
url = request.form.get('url')
if name and url:
icon_val = save_and_scale_icon(request.files.get('icon_file')) or request.form.get('icon_text', '')
new_link = Link(
name=name,
url=url,
icon=icon_val,
subtitle=request.form.get('subtitle'),
target=request.form.get('target', '_blank'),
group_id=group.id
)
db.session.add(new_link)
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/delete_link/<int:link_id>')
@login_required
def delete_link(link_id):
link = Link.query.get_or_404(link_id)
db.session.delete(link)
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/add_widget/<int:page_id>', methods=['POST'])
@login_required
def add_widget(page_id):
widget_type = request.form.get('type')
if not widget_type:
flash('Widget-Typ nicht angegeben.', 'danger')
return redirect(url_for('admin'))
config = {}
if widget_type == 'pihole_status':
config = {
'api_url': request.form.get('pihole_api_url'),
'api_key': request.form.get('pihole_api_key')
}
elif widget_type == 'openmeteo_weather':
config = {
'latitude': request.form.get('weather_lat'),
'longitude': request.form.get('weather_lon')
}
elif widget_type == 'iframe_widget':
config = {
'url': request.form.get('iframe_url'),
'height': request.form.get('iframe_height')
}
new_widget = Widget(
page_id=page_id,
type=widget_type,
config=json.dumps(config)
)
db.session.add(new_widget)
db.session.commit()
flash('Widget erfolgreich hinzugefügt!', 'success')
return redirect(url_for('admin'))
@app.route('/admin/edit_widget/<int:widget_id>', methods=['POST'])
@login_required
def edit_widget(widget_id):
widget = Widget.query.get_or_404(widget_id)
config = {}
if widget.type == 'pihole_status':
config = {
'api_url': request.form.get('pihole_api_url'),
'api_key': request.form.get('pihole_api_key')
}
elif widget.type == 'openmeteo_weather':
config = {
'latitude': request.form.get('weather_lat'),
'longitude': request.form.get('weather_lon')
}
elif widget.type == 'iframe_widget':
config = {
'url': request.form.get('iframe_url'),
'height': request.form.get('iframe_height')
}
widget.config = json.dumps(config)
db.session.commit()
flash('Widget erfolgreich aktualisiert!', 'success')
return redirect(url_for('admin'))
@app.route('/admin/delete_widget/<int:widget_id>')
@login_required
def delete_widget(widget_id):
widget = Widget.query.get_or_404(widget_id)
db.session.delete(widget)
db.session.commit()
flash('Widget gelöscht.', 'info')
return redirect(url_for('admin'))
@app.route('/admin/edit_link/<int:link_id>', methods=['POST'])
@login_required
def edit_link(link_id):
link = Link.query.get_or_404(link_id)
link.name = request.form.get('name')
link.url = request.form.get('url')
link.subtitle = request.form.get('subtitle')
link.target = request.form.get('target')
new_group_id = request.form.get('group_id')
if new_group_id and int(new_group_id) != link.group_id:
link.group_id = int(new_group_id)
new_icon_file = save_and_scale_icon(request.files.get('icon_file'))
new_icon_text = request.form.get('icon_text')
if new_icon_file:
link.icon = new_icon_file
elif new_icon_text:
link.icon = new_icon_text
db.session.commit()
return redirect(url_for('admin'))
@app.route('/admin/edit_page/<int:page_id>', methods=['POST'])
@login_required
def edit_page(page_id):
page = Page.query.get_or_404(page_id)
page.name = request.form.get('name')
new_icon_file = save_and_scale_icon(request.files.get('icon_file'))
new_icon_text = request.form.get('icon_text')
if new_icon_file:
page.icon = new_icon_file
elif new_icon_text or 'icon_text' in request.form:
page.icon = new_icon_text
db.session.commit()
flash('Seite erfolgreich aktualisiert!', 'success')
return redirect(url_for('admin'))
@app.route('/admin/edit_group/<int:group_id>', methods=['POST'])
@login_required
def edit_group(group_id):
group = Group.query.get_or_404(group_id)
group.name = request.form.get('name')
new_page_id = request.form.get('page_id')
if new_page_id and int(new_page_id) != group.page_id:
group.page_id = int(new_page_id)
new_icon_file = save_and_scale_icon(request.files.get('icon_file'))
new_icon_text = request.form.get('icon_text')
if new_icon_file:
group.icon = new_icon_file
elif new_icon_text or 'icon_text' in request.form:
group.icon = new_icon_text
db.session.commit()
flash('Gruppe erfolgreich aktualisiert!', 'success')
return redirect(url_for('admin'))
# --- Reordering ---
@app.route('/admin/reorder_pages', methods=['POST'])
@login_required
def reorder_pages():
order_data = request.json.get('order')
if not order_data: return jsonify({'status': 'error'}), 400
for index, page_id in enumerate(order_data):
page = Page.query.get(page_id)
if page: page.order = index
db.session.commit()
return jsonify({'status': 'success'})
@app.route('/admin/reorder_groups', methods=['POST'])
@login_required
def reorder_groups():
data = request.json
order_data = data.get('order')
page_id = data.get('page_id')
if not order_data: return jsonify({'status': 'error'}), 400
for index, group_id in enumerate(order_data):
group = Group.query.get(group_id)
if group:
group.order = index
if page_id:
group.page_id = int(page_id)
db.session.commit()
return jsonify({'status': 'success'})
@app.route('/admin/reorder_links', methods=['POST'])
@login_required
def reorder_links():
data = request.json
order_data = data.get('order')
group_id = data.get('group_id')
if not order_data: return jsonify({'status': 'error'}), 400
for index, link_id in enumerate(order_data):
link = Link.query.get(link_id)
if link:
link.order = index
if group_id:
link.group_id = int(group_id)
db.session.commit()
return jsonify({'status': 'success'})
# --- Utility Routes ---
@app.route('/check_url')
def check_url():
url = request.args.get('url')
if not url:
return jsonify({'status': 'offline', 'reason': 'No URL provided'}), 400
# Check Cache
current_time = time.time()
if url in URL_CACHE:
status, timestamp = URL_CACHE[url]
if current_time - timestamp < CACHE_DURATION:
return jsonify({'status': status, 'cached': True})
status = 'offline'
try:
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
# Use a short timeout, and don't verify SSL certs for local services
response = requests.head(url, headers=headers, timeout=5, verify=False, allow_redirects=True)
# If we get ANY response, the server is online/reachable.
status = 'online'
except requests.exceptions.SSLError:
# SSL Handshake failed, but server is there.
status = 'online'
except requests.exceptions.RequestException:
# Connection failed (Timeout, DNS, Refused)
# Try one last time with GET, sometimes HEAD is blocked/buggy
try:
requests.get(url, headers=headers, timeout=5, verify=False, stream=True)
status = 'online'
except Exception as e2:
# Log only actual failures
print(f"Check URL failed for {url}: {e2}", file=sys.stdout, flush=True)
status = 'offline'
# Update Cache
URL_CACHE[url] = (status, current_time)
return jsonify({'status': status})
@app.route('/search')
def search():
query = request.args.get('q', '').strip().lower()
if not query or len(query) < 2:
return jsonify([])
search_term = f"%{query}%"
results = db.session.query(Link, Group.name, Page.name)\
.join(Group, Link.group_id == Group.id)\
.join(Page, Group.page_id == Page.id)\
.filter(
(func.lower(Link.name).like(search_term)) |
(func.lower(Link.subtitle).like(search_term))
).all()
output = [{
'name': link.name,
'url': link.url,
'subtitle': link.subtitle,
'icon': link.icon,
'group_name': group_name,
'page_name': page_name
} for link, group_name, page_name in results]
return jsonify(output)
# --- Widget Data Handlers ---
def get_pihole_status(config):
user_url = config.get('api_url', '').rstrip('/')
password = config.get('api_key')
if not user_url:
return {'error': "Pi-hole API URL not configured."}
try:
if user_url.endswith('/api'):
v6_api_base = user_url + '/'
else:
v6_api_base = urljoin(user_url, 'api/')
# Step 1: Authenticate
auth_url = urljoin(v6_api_base, 'auth')
auth_response = requests.post(auth_url, json={'password': password}, verify=False, timeout=5)
if auth_response.status_code == 404:
raise requests.exceptions.RequestException("Auth endpoint not found (404), falling back to v5.")
auth_response.raise_for_status()
sid = auth_response.json().get('session', {}).get('sid')
if not sid:
return {'error': 'Pi-hole v6 auth successful, but no SID returned.'}
# Step 2a: Get blocking status
blocking_url = urljoin(v6_api_base, 'dns/blocking')
blocking_response = requests.get(blocking_url, params={'sid': sid}, verify=False, timeout=5)
blocking_response.raise_for_status()
blocking_data = blocking_response.json()
is_enabled = blocking_data.get('blocking') == 'enabled'
status = 'enabled' if is_enabled else 'disabled'
# Step 2b: Get summary stats
summary_url = urljoin(v6_api_base, 'stats/summary')
summary_response = requests.get(summary_url, params={'sid': sid}, verify=False, timeout=5)
summary_response.raise_for_status()
summary_data = summary_response.json()
# Parse nested v6 structure
queries_data = summary_data.get('queries', {})
gravity_data = summary_data.get('gravity', {})
domains = gravity_data.get('domains_being_blocked', 0)
queries = queries_data.get('total', 0)
percent = queries_data.get('percent_blocked', 0.0)
return {
'status': status,
'domains_being_blocked': domains,
'dns_queries_today': queries,
'ads_percentage_today': percent
}
except Exception as e:
# Fallback for v5
try:
params = {'summary': ''}
if password:
params['auth'] = password
v5_base_url = user_url
if v5_base_url.endswith('/api'):
v5_base_url = v5_base_url[:-4]
v5_url = f"{v5_base_url.rstrip('/')}/admin/api.php"
response = requests.get(v5_url, params=params, timeout=5, verify=False)
response.raise_for_status()
v5_data = response.json()
if 'ads_percentage_today' in v5_data and isinstance(v5_data['ads_percentage_today'], str):
v5_data['ads_percentage_today'] = float(v5_data['ads_percentage_today'].replace(',', '.'))
return v5_data
except Exception as e2:
return {'error': f'Pi-hole connection failed: {e2}'}
def get_openmeteo_weather(config):
lat = config.get('latitude')
lon = config.get('longitude')
if not lat or not lon:
return {'error': 'Latitude/Longitude not configured.'}
try:
url = "https://api.open-meteo.com/v1/forecast"
params = {
"latitude": lat,
"longitude": lon,
"current": "temperature_2m,weather_code,wind_speed_10m",
"daily": "temperature_2m_max,temperature_2m_min,sunrise,sunset",
"timezone": "auto"
}
response = requests.get(url, params=params, timeout=5)
response.raise_for_status()
data = response.json()
return {
'current': data.get('current', {}),
'daily': data.get('daily', {}),
'units': data.get('current_units', {})
}
except Exception as e:
return {'error': f"Weather API failed: {e}"}
def get_iframe_widget(config):
return {
'url': config.get('url'),
'height': config.get('height', '300')
}
WIDGET_HANDLERS = {
'pihole_status': get_pihole_status,
'openmeteo_weather': get_openmeteo_weather,
'iframe_widget': get_iframe_widget
}
@app.route('/api/widget_data/<int:widget_id>')
def widget_data(widget_id):
widget = Widget.query.get_or_404(widget_id)
try:
config = json.loads(widget.config) if widget.config else {}
except json.JSONDecodeError:
return jsonify({'error': 'Invalid widget configuration format.'}), 500
handler = WIDGET_HANDLERS.get(widget.type)
if not handler:
return jsonify({'error': f"No handler for widget type '{widget.type}'."}), 404
data = handler(config)
if 'error' in data:
return jsonify(data), 500
return jsonify(data)
if __name__ == '__main__':
os.makedirs(UPLOAD_FOLDER, exist_ok=True)
app.run(host='0.0.0.0', port=8080)