import uuid from werkzeug.utils import secure_filename import os from urllib.parse import urlparse import urllib.parse import requests from bs4 import BeautifulSoup from flask import Flask import os from flask import render_template, request, redirect, url_for, flash, jsonify, abort from flask_login import LoginManager, login_user, logout_user, login_required, current_user from werkzeug.utils import secure_filename from werkzeug.security import generate_password_hash, check_password_hash from datetime import datetime from flask import Flask import os from flask import render_template, request, redirect, url_for, flash, jsonify, abort, make_response, session from flask import send_from_directory from config import Config from models import db, User, Category, Site from forms import LoginForm, RegisterForm, SiteForm, CategoryForm, ImportBookmarksForm, SearchForm from forms import UserForm # 确保这行存在 import tempfile import json import atexit import glob # 创建Flask应用 app = Flask(__name__) app.config.from_object(Config) # Ensure instance folder exists instance_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance') if not os.path.exists(instance_path): os.makedirs(instance_path) # 确保instance文件夹存在 try: os.makedirs(app.instance_path) except OSError: pass # 初始化数据库 db.init_app(app) with app.app_context(): print(f"Database URI: {app.config['SQLALCHEMY_DATABASE_URI']}") try: db.create_all() print("Database tables created successfully.") except Exception as e: print(f"Error creating database tables: {e}") # 配置登录管理 login_manager = LoginManager() login_manager.init_app(app) login_manager.login_view = 'login' login_manager.login_message = '请先登录以访问此页面' @login_manager.user_loader def load_user(user_id): return db.session.get(User, int(user_id)) # 工具函数 def get_favicon_url(url): """获取网站 favicon,如果失败返回默认图标""" try: parsed = urllib.parse.urlparse(url) if not parsed.scheme: url = 'http://' + url parsed = urllib.parse.urlparse(url) domain = parsed.netloc if not domain: return url_for('static', filename='images/default-icon.png') # 尝试标准位置 favicon_url = f"{parsed.scheme}://{domain}/favicon.ico" resp = requests.head(favicon_url, allow_redirects=True, timeout=3) if resp.status_code == 200: return favicon_url # 尝试网页中 link rel="icon" resp = requests.get(url, timeout=5) if resp.status_code == 200: soup = BeautifulSoup(resp.text, 'html.parser') icon_link = soup.find('link', rel=lambda x: x and 'icon' in x.lower()) if icon_link and icon_link.get('href'): icon_href = icon_link['href'] if icon_href.startswith('http'): return icon_href elif icon_href.startswith('/'): return f"{parsed.scheme}://{domain}{icon_href}" else: return f"{parsed.scheme}://{domain}/{icon_href}" return url_for('static', filename='images/default-icon.png') except: return url_for('static', filename='images/default-icon.png') def parse_bookmarks_html(file_path): """解析浏览器书签HTML文件""" with open(file_path, 'r', encoding='utf-8') as f: soup = BeautifulSoup(f.read(), 'html.parser') # 查找书签栏和文件夹 bookmarks = [] folders = {} # 查找所有h3标签(文件夹)和对应的dl(书签列表) for h3 in soup.find_all('h3'): folder_name = h3.string if not folder_name: continue dl = h3.find_next_sibling('dl') if not dl: continue folder_bookmarks = [] for a in dl.find_all('a'): href = a.get('href') name = a.string if not href or not name: continue # 获取描述(通常是添加日期或其他属性) description = a.get('add_date', '') or a.get('icon', '') or '' folder_bookmarks.append({ 'name': name.strip(), 'url': href.strip(), 'description': description.strip() }) if folder_bookmarks: folders[folder_name.strip()] = folder_bookmarks return folders def create_default_admin(): try: # 先检查是否已存在管理员账户 existing_admin = User.query.filter_by(email='admin@example.com').first() if existing_admin: print("默认管理员账户已存在") return # 创建新管理员 admin = User( username='admin', email='admin@example.com', is_admin=True ) admin.set_password('admin123') # 设置默认密码 db.session.add(admin) db.session.commit() print("成功创建默认管理员账户") except Exception as e: db.session.rollback() print(f"创建管理员账户时出错: {str(e)}") def create_default_categories(): # 如果已经有任何分类存在,说明已经初始化过了,不用重复创建 if Category.query.first() is not None: return # 找到默认管理员用户 admin = User.query.filter_by(email='admin@example.com').first() if not admin: print("[警告] 未找到默认管理员用户,无法创建默认分类") return default_categories = [ '搜索引擎', '社交媒体', '新闻资讯', '购物网站', '工具网站', '娱乐休闲' ] for cat_name in default_categories: category = Category( name=cat_name, user_id=admin.id, # ✅ 关键:指定管理员的 ID 作为分类拥有者 is_public=True ) db.session.add(category) db.session.commit() print(f"[INFO] 已成功创建 {len(default_categories)} 个默认分类,归属于管理员 {admin.username}") # API 路由定义 @app.route('/api/category/', methods=['GET']) def get_category(category_id): category = Category.query.get_or_404(category_id) return jsonify({ 'id': category.id, 'name': category.name, 'is_public': category.is_public }) @app.route('/api/category', methods=['POST']) def api_add_category(): data = request.get_json() if not data or 'name' not in data: return jsonify({'success': False, 'message': '分类名称不能为空'}), 400 category = Category( name=data['name'], user_id=current_user.id, is_public=data.get('is_public', False) ) db.session.add(category) db.session.commit() return jsonify({'success': True, 'message': '分类添加成功'}) @app.route('/api/category/', methods=['PUT']) def api_update_category(category_id): category = Category.query.get_or_404(category_id) data = request.get_json() if not data or 'name' not in data: return jsonify({'success': False, 'message': '分类名称不能为空'}), 400 category.name = data['name'] category.is_public = data.get('is_public', False) db.session.commit() return jsonify({'success': True, 'message': '分类更新成功'}) # 路由定义 # 首页路由 @app.route('/') def index(): search_form = SearchForm() # 公共站点:is_public=True 的站点 + 分类也是公开的 public_sites_query = Site.query.join(Category).filter( Site.is_public == True, Category.is_public == True ) if search_form.q.data and search_form.validate(): q = search_form.q.data.strip() public_sites_query = public_sites_query.filter( Site.name.contains(q) | Site.description.contains(q) | Site.url.contains(q) ) public_sites = public_sites_query.order_by(Site.created_at.asc()).all() # 公共分类 public_categories = Category.query.filter_by(is_public=True).order_by(Category.name).all() # 我的私有站点和分类(仅登录用户) my_sites = [] my_categories = [] if current_user.is_authenticated: my_sites = Site.query.filter_by(user_id=current_user.id).order_by(Site.created_at.asc()).all() my_categories = Category.query.filter_by(user_id=current_user.id).order_by(Category.name).all() saved_engine = request.cookies.get('search-engine', 'local') return render_template('index.html', public_sites=public_sites, public_categories=public_categories, my_sites=my_sites, my_categories=my_categories, search_form=search_form, saved_engine=saved_engine) @app.route('/register', methods=['GET', 'POST']) def register(): if current_user.is_authenticated: return redirect(url_for('index')) form = RegisterForm() if form.validate_on_submit(): # 检查用户名和邮箱是否已存在 if User.query.filter_by(username=form.username.data).first(): flash('用户名已存在', 'danger') return render_template('register.html', form=form) if User.query.filter_by(email=form.email.data).first(): flash('邮箱已存在', 'danger') return render_template('register.html', form=form) # 创建新用户 user = User( username=form.username.data, email=form.email.data ) user.set_password(form.password.data) db.session.add(user) db.session.commit() flash('注册成功,请登录', 'success') return redirect(url_for('login')) return render_template('register.html', form=form) @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(url_for('index')) form = LoginForm() if form.validate_on_submit(): user = User.query.filter_by(username=form.username.data).first() if user is None or not user.check_password(form.password.data): flash('用户名或密码错误', 'danger') return render_template('login.html', form=form) login_user(user, remember=True) next_page = request.args.get('next') if not next_page or not next_page.startswith('/'): next_page = url_for('index') return redirect(next_page) return render_template('login.html', form=form) # 添加成员路由 @app.route('/admin/user/add', methods=['GET', 'POST']) @login_required def add_user(): if not current_user.is_admin: abort(403) form = UserForm() if form.validate_on_submit(): user = User( username=form.username.data, email=form.email.data, is_admin=form.is_admin.data ) user.set_password(form.password.data) db.session.add(user) db.session.commit() flash('用户添加成功', 'success') return redirect(url_for('admin')) # 修改为admin return render_template('manage_user.html', form=form, action='add') # 同样修改其他相关视图函数中的重定向 @app.route('/admin/user//edit', methods=['GET', 'POST']) @login_required def edit_user(user_id): if not current_user.is_admin: abort(403) user = User.query.get_or_404(user_id) form = UserForm(obj=user) if form.validate_on_submit(): user.username = form.username.data user.email = form.email.data user.is_admin = form.is_admin.data if form.password.data: user.set_password(form.password.data) db.session.commit() flash('用户信息更新成功', 'success') return redirect(url_for('admin')) # 修改为admin return render_template('manage_user.html', form=form, action='edit', user=user, title=f'编辑用户 - {user.username}') # 删除成员路由 @app.route('/admin/user//delete', methods=['POST']) @login_required def delete_user(user_id): if not current_user.is_admin: abort(403) user = User.query.get_or_404(user_id) if user.id == current_user.id: flash('不能删除当前登录用户', 'danger') return redirect(url_for('admin')) db.session.delete(user) db.session.commit() flash('用户删除成功', 'success') return redirect(url_for('admin')) @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('index')) @app.route('/dashboard') @login_required def dashboard(): # 获取用户的分类和站点 categories = Category.query.filter_by(user_id=current_user.id).order_by(Category.name).all() sites = Site.query.filter_by(user_id=current_user.id).order_by(Site.created_at.desc()).all() # 获取用户收藏的公共分类和站点 favorite_categories = current_user.favorite_categories favorite_sites = current_user.favorite_sites return render_template('dashboard.html', categories=categories, sites=sites, favorite_categories=favorite_categories, favorite_sites=favorite_sites) # 在admin路由中修改查询条件 @app.route('/admin') @login_required def admin(): if not current_user.is_admin: abort(403) users = User.query.all() # 确保查询的是公共分类 public_categories = Category.query.filter_by(is_public=True).order_by(Category.name).all() # 确保查询的是公共站点 public_sites = Site.query.filter_by(is_public=True).order_by(Site.created_at.desc()).all() return render_template('admin_dashboard.html', users=users, public_categories=public_categories, public_sites=public_sites) @app.route('/category/add', methods=['GET', 'POST']) @login_required def add_category(): form = CategoryForm() if form.validate_on_submit(): category = Category( name=form.name.data, user_id=current_user.id, is_public=form.is_public.data ) db.session.add(category) db.session.commit() flash('分类添加成功', 'success') return redirect(url_for('dashboard')) return render_template('manage_categories.html', form=form, action='add') @app.route('/site/add', methods=['GET', 'POST']) @login_required def add_site(): form = SiteForm() # 使用单个查询获取用户的分类以及公共分类,避免重复 from sqlalchemy import or_ all_categories = Category.query.filter( or_( Category.user_id == current_user.id, Category.is_public == True ) ).all() form.category_id.choices = [(c.id, c.name) for c in all_categories] if form.validate_on_submit(): site = Site( name=form.name.data, url=form.url.data, description=form.description.data, category_id=form.category_id.data, user_id=current_user.id, is_public=form.is_public.data ) # 处理自定义图标 if form.custom_icon.data: upload_folder = os.path.join(app.root_path, 'static', 'uploads', 'icons') os.makedirs(upload_folder, exist_ok=True) file_ext = os.path.splitext(form.custom_icon.data.filename)[1] filename = f"icon_{current_user.id}_{form.name.data}_{int(datetime.now().timestamp())}{file_ext}" filename = secure_filename(filename) file_path = os.path.join(upload_folder, filename) form.custom_icon.data.save(file_path) site.custom_icon = url_for('static', filename=f'uploads/icons/{filename}') site.icon = None else: # 自动抓取 favicon site.icon = get_favicon_url(site.url) site.custom_icon = None db.session.add(site) db.session.commit() flash('站点添加成功', 'success') return redirect(url_for('dashboard')) return render_template('manage_sites.html', form=form, action='add') @app.route('/site//edit', methods=['GET', 'POST']) @login_required def edit_site(site_id): site = Site.query.get_or_404(site_id) # 权限检查:站点所有者或管理员 if site.user_id != current_user.id and not current_user.is_admin: abort(403) form = SiteForm() # 获取用户分类 + 公共分类 user_categories = Category.query.filter_by(user_id=current_user.id).all() public_categories = Category.query.filter_by(is_public=True).all() all_categories = user_categories + public_categories form.category_id.choices = [(c.id, c.name) for c in all_categories] if form.validate_on_submit(): custom_icon_url = site.custom_icon # 默认保持原图标 # 如果上传了新自定义图标 if form.custom_icon.data: upload_folder = os.path.join(app.root_path, 'static', 'uploads', 'icons') os.makedirs(upload_folder, exist_ok=True) file_ext = os.path.splitext(form.custom_icon.data.filename)[1] filename = f"icon_{current_user.id}_{form.name.data}_{int(datetime.now().timestamp())}{file_ext}" filename = secure_filename(filename) file_path = os.path.join(upload_folder, filename) form.custom_icon.data.save(file_path) custom_icon_url = url_for('static', filename=f'uploads/icons/{filename}') # 更新站点信息 site.name = form.name.data site.url = form.url.data site.description = form.description.data site.category_id = form.category_id.data site.is_public = form.is_public.data if custom_icon_url: # 优先使用自定义图标 site.custom_icon = custom_icon_url site.icon = None else: # 如果没有自定义图标,则抓取网站 favicon site.icon = get_favicon_url(site.url) site.custom_icon = None db.session.commit() flash('站点更新成功', 'success') return redirect(url_for('dashboard')) # GET 请求时预填充表单 if request.method == 'GET': form.name.data = site.name form.url.data = site.url form.description.data = site.description form.category_id.data = site.category_id form.is_public.data = site.is_public return render_template('manage_sites.html', form=form, action='edit', site=site) @app.route('/site//delete', methods=['POST']) @login_required def delete_site(site_id): site = Site.query.get_or_404(site_id) # 只允许站点所有者或管理员删除 if site.user_id != current_user.id and not current_user.is_admin: abort(403) db.session.delete(site) db.session.commit() flash('站点删除成功', 'success') return redirect(url_for('dashboard')) @app.route('/category//delete', methods=['POST']) @login_required def delete_category(category_id): category = Category.query.get_or_404(category_id) # 只允许分类所有者或管理员删除 if category.user_id != current_user.id and not current_user.is_admin: abort(403) # 确保不删除公共分类(除非是管理员) if category.is_public and not current_user.is_admin: abort(403) db.session.delete(category) db.session.commit() flash('分类删除成功', 'success') return redirect(url_for('dashboard')) import json from flask import jsonify, request from sqlalchemy import or_ @app.route('/import-bookmarks', methods=['GET', 'POST']) @login_required def import_bookmarks(): form = ImportBookmarksForm() if form.validate_on_submit(): # 保存上传的文件 f = form.bookmark_file.data filename = secure_filename(f.filename) # 使用临时文件 import uuid unique_filename = f"{uuid.uuid4().hex}_{filename}" file_path = os.path.join(tempfile.gettempdir(), unique_filename) # 确保目录存在 os.makedirs(os.path.dirname(file_path), exist_ok=True) f.save(file_path) try: # 解析书签文件 bookmarks = parse_bookmarks_html(file_path) # 准备预览数据 preview_data = { 'file_token': unique_filename, 'total_count': 0, 'folders': [], 'import_as_public': form.import_as_public.data and current_user.is_admin } # 分析重复项 existing_urls = {site.url for site in Site.query.filter_by(user_id=current_user.id).all()} for folder_name, folder_bookmarks in bookmarks.items(): folder_data = { 'name': folder_name, 'bookmarks': [], 'new_count': 0, 'duplicate_count': 0 } for bm in folder_bookmarks: is_duplicate = bm['url'] in existing_urls bookmark_data = { 'name': bm['name'], 'url': bm['url'], 'description': bm.get('description', ''), 'is_duplicate': is_duplicate, 'selected': not is_duplicate } folder_data['bookmarks'].append(bookmark_data) if not is_duplicate: folder_data['new_count'] += 1 else: folder_data['duplicate_count'] += 1 preview_data['total_count'] += 1 preview_data['folders'].append(folder_data) # 将预览数据保存到临时文件,而不是session preview_file_path = os.path.join(tempfile.gettempdir(), f"preview_{unique_filename}.json") with open(preview_file_path, 'w', encoding='utf-8') as f: json.dump(preview_data, f, ensure_ascii=False, indent=2) # 只在session中存储文件路径 session['preview_file'] = preview_file_path # 重定向到预览页面 return redirect(url_for('preview_bookmarks')) except Exception as e: app.logger.error(f"书签解析失败: {str(e)}") flash(f'文件解析失败: {str(e)}', 'danger') finally: # 保留文件用于后续导入 pass return render_template('import_bookmarks.html', form=form) def parse_bookmarks_html(file_path): """解析浏览器书签HTML文件""" with open(file_path, 'r', encoding='utf-8') as f: soup = BeautifulSoup(f.read(), 'html.parser') bookmarks = {} # 查找所有书签链接 for a in soup.find_all('a'): href = a.get('href') name = a.text.strip() if href and name: # 获取父级文件夹名称 folder_name = '导入的书签' # 默认文件夹名 parent = a.find_parent('dl') if parent: h3 = parent.find_previous_sibling('h3') if h3: folder_name = h3.text.strip() if folder_name not in bookmarks: bookmarks[folder_name] = [] bookmarks[folder_name].append({ 'name': name, 'url': href, 'description': a.get('add_date', '') or a.get('last_modified', '') }) return bookmarks @app.route('/preview-bookmarks') @login_required def preview_bookmarks(): """书签导入预览页面""" preview_file_path = session.get('preview_file') if not preview_file_path or not os.path.exists(preview_file_path): flash('请先上传书签文件', 'warning') return redirect(url_for('import_bookmarks')) try: # 从文件加载预览数据 with open(preview_file_path, 'r', encoding='utf-8') as f: preview_data = json.load(f) except Exception as e: app.logger.error(f"加载预览数据失败: {str(e)}") flash('预览数据加载失败,请重新上传', 'warning') return redirect(url_for('import_bookmarks')) # 获取用户现有分类,用于编辑时选择 user_categories = Category.query.filter_by(user_id=current_user.id).all() return render_template('preview_bookmarks.html', preview_data=preview_data, user_categories=user_categories) @app.route('/api/import-bookmarks', methods=['POST']) @login_required def api_import_bookmarks(): """API接口:执行书签导入(支持进度查询)""" print("收到导入请求") if request.method == 'POST': try: data = request.get_json() print(f"请求数据: {data}") selected_bookmarks = data.get('selected_bookmarks', []) import_as_public = data.get('import_as_public', False) print(f"要导入的书签数量: {len(selected_bookmarks)}") print(f"导入为公开: {import_as_public}") # 创建导入任务ID import_id = str(uuid.uuid4()) print(f"创建任务ID: {import_id}") # 将导入任务信息存储在临时文件中,而不是session import_task = { 'id': import_id, 'total': len(selected_bookmarks), 'processed': 0, 'status': 'processing', 'results': { 'success': 0, 'failed': 0, 'errors': [] } } # 保存导入任务到临时文件 task_file_path = os.path.join(tempfile.gettempdir(), f"task_{import_id}.json") with open(task_file_path, 'w', encoding='utf-8') as f: json.dump(import_task, f, ensure_ascii=False, indent=2) # 在session中只存储任务ID session['import_task_id'] = import_id # 异步执行导入(这里简化为同步) print("开始执行导入...") execute_import(import_id, selected_bookmarks, import_as_public, current_user.id) print("导入执行完成") return jsonify({ 'import_id': import_id, 'status': 'started', 'total': len(selected_bookmarks) }) except Exception as e: print(f"API处理错误: {e}") return jsonify({'error': str(e)}), 500 @app.route('/api/import-progress/') @login_required def api_import_progress(import_id): """查询导入进度""" print(f"查询进度: {import_id}") # 从临时文件加载任务信息 task_file_path = os.path.join(tempfile.gettempdir(), f"task_{import_id}.json") print(f"任务文件路径: {task_file_path}") if not os.path.exists(task_file_path): print("任务文件不存在") return jsonify({'error': '任务不存在'}), 404 try: with open(task_file_path, 'r', encoding='utf-8') as f: task = json.load(f) print(f"任务状态: {task}") return jsonify({ 'status': task['status'], 'processed': task['processed'], 'total': task['total'], 'results': task['results'] }) except Exception as e: print(f"读取任务文件错误: {e}") return jsonify({'error': '任务数据损坏'}), 500 def execute_import(import_id, selected_bookmarks, import_as_public, user_id): """执行实际的书签导入""" # 从临时文件加载任务信息 task_file_path = os.path.join(tempfile.gettempdir(), f"task_{import_id}.json") try: with open(task_file_path, 'r', encoding='utf-8') as f: task = json.load(f) except Exception as e: print(f"无法加载任务文件: {e}") return try: categories_cache = {} # 缓存分类对象 for i, bookmark in enumerate(selected_bookmarks): # 更新进度 task['processed'] = i + 1 # 保存进度到文件 with open(task_file_path, 'w', encoding='utf-8') as f: json.dump(task, f, ensure_ascii=False, indent=2) try: folder_name = bookmark.get('folder', '导入的书签') custom_category = bookmark.get('custom_category') # 确定分类 if custom_category: folder_name = custom_category # 获取或创建分类 if folder_name not in categories_cache: category = Category.query.filter_by( name=folder_name, user_id=user_id ).first() if not category: category = Category( name=folder_name, user_id=user_id, is_public=import_as_public ) db.session.add(category) db.session.flush() # 获取ID但不提交事务 categories_cache[folder_name] = category category = categories_cache[folder_name] # 检查是否已存在 existing_site = Site.query.filter_by( url=bookmark['url'], user_id=user_id ).first() if existing_site: task['results']['failed'] += 1 task['results']['errors'].append(f"书签已存在: {bookmark['name']}") continue # 创建新站点 site = Site( name=bookmark['name'][:100], url=bookmark['url'][:500], description=bookmark.get('description', '')[:200], category_id=category.id, user_id=user_id, is_public=import_as_public ) # 获取favicon site.icon = get_favicon_url(bookmark['url']) db.session.add(site) task['results']['success'] += 1 except Exception as e: task['results']['failed'] += 1 task['results']['errors'].append(f"导入失败 {bookmark['name']}: {str(e)}") db.session.commit() task['status'] = 'completed' # 保存最终状态 with open(task_file_path, 'w', encoding='utf-8') as f: json.dump(task, f, ensure_ascii=False, indent=2) print(f"导入完成: 成功 {task['results']['success']}, 失败 {task['results']['failed']}") except Exception as e: db.session.rollback() task['status'] = 'failed' task['results']['errors'].append(f"导入过程出错: {str(e)}") # 保存错误状态 with open(task_file_path, 'w', encoding='utf-8') as f: json.dump(task, f, ensure_ascii=False, indent=2) print(f"导入失败: {e}") @app.route('/site/') def site_detail(site_id): site = Site.query.get_or_404(site_id) return render_template('site_detail.html', site=site) def cleanup_temp_files(): """清理临时文件""" temp_dir = tempfile.gettempdir() # 清理预览文件 for file in glob.glob(os.path.join(temp_dir, "preview_*.json")): try: os.remove(file) except: pass # 清理任务文件 for file in glob.glob(os.path.join(temp_dir, "task_*.json")): try: os.remove(file) except: pass # 注册退出时的清理函数 atexit.register(cleanup_temp_files) # 搜索路由 @app.route('/search') def search(): query_text = request.args.get('q', '').strip() search_engine = request.args.get('search-engine', 'local') resp = make_response() # 保存用户选择的搜索引擎到 cookie (30 天) resp.set_cookie('search-engine', search_engine, max_age=30*24*3600) if search_engine == 'local': if not query_text: flash('请输入搜索关键词', 'warning') return redirect(url_for('index')) sites = Site.query.filter( Site.is_public == True ).join(Category).filter_by(is_public=True).filter( Site.name.contains(query_text) | Site.description.contains(query_text) | Site.url.contains(query_text) ).all() categories = Category.query.filter_by(is_public=True).order_by(Category.name).all() resp.response = render_template('index.html', sites=sites, categories=categories, search_query=query_text, saved_engine=search_engine) return resp else: # 外部搜索跳转 engines = { 'baidu': f'https://www.baidu.com/s?wd={query_text}', 'bing': f'https://www.bing.com/search?q={query_text}', 'google': f'https://www.google.com/search?q={query_text}' } if search_engine in engines: return redirect(engines[search_engine]) else: flash('不支持的搜索引擎', 'warning') return redirect(url_for('index')) # 错误处理 @app.errorhandler(404) def page_not_found(e): return render_template('404.html'), 404 @app.errorhandler(403) def forbidden(e): return render_template('404.html', message='您没有权限访问此页面'), 403 # 创建数据库表和初始数据 @app.before_first_request def create_tables(): db.create_all() create_default_admin() create_default_categories() # 编辑用户角色(提升/降级管理员) @app.route('/admin/user//toggle_admin', methods=['POST']) @login_required def toggle_admin(user_id): if not current_user.is_admin: abort(403) user = User.query.get_or_404(user_id) if user.id == current_user.id: flash('您不能更改自己的管理员状态', 'warning') else: user.is_admin = not user.is_admin db.session.commit() status = '管理员' if user.is_admin else '普通用户' flash(f'已将用户 "{user.username}" 设置为 {status}', 'success') return redirect(url_for('admin')) @app.route('/admin/category//edit', methods=['GET', 'POST']) @login_required def edit_public_category(category_id): if not current_user.is_admin: flash('您没有管理员权限', 'danger') return redirect(url_for('admin')) category = Category.query.get_or_404(category_id) # 修改权限检查:管理员可以编辑所有公共分类 if not category.is_public: flash('只能编辑公共分类', 'danger') return redirect(url_for('admin')) form = CategoryForm(obj=category) if form.validate_on_submit(): category.name = form.name.data category.is_public = True # 保持为公共分类 db.session.commit() flash('分类更新成功', 'success') return redirect(url_for('admin')) return render_template('edit_public_category.html', form=form, category=category) # 修改删除公共分类路由 @app.route('/admin/category//delete', methods=['POST']) @login_required def delete_public_category(category_id): if not current_user.is_admin: abort(403) category = Category.query.get_or_404(category_id) # 修改权限检查 if not category.is_public: flash('只能删除公共分类', 'danger') return redirect(url_for('admin')) db.session.delete(category) db.session.commit() flash('分类删除成功', 'success') return redirect(url_for('admin')) # 编辑公共站点 # 编辑公共站点路由 @app.route('/admin/site//edit', methods=['GET', 'POST']) @login_required def edit_public_site(site_id): if not current_user.is_admin: abort(403) site = Site.query.get_or_404(site_id) # 修改权限检查:管理员可以编辑所有公共站点 if not site.is_public: flash('只能编辑公共站点', 'danger') return redirect(url_for('admin')) form = SiteForm(obj=site) form.category_id.choices = [(c.id, c.name) for c in Category.query.filter_by(is_public=True).all()] if form.validate_on_submit(): site.name = form.name.data site.url = form.url.data site.description = form.description.data site.category_id = form.category_id.data site.is_public = True db.session.commit() flash('站点更新成功', 'success') return redirect(url_for('admin')) return render_template('edit_public_site.html', form=form, site=site) @app.route('/edit_category/', methods=['GET', 'POST']) @login_required def edit_category(category_id): category = Category.query.get_or_404(category_id) # 确保当前用户有权编辑此分类 if category.user_id != current_user.id and not current_user.is_admin: abort(403) form = CategoryForm(obj=category) if form.validate_on_submit(): # 记录原始公共状态 original_is_public = category.is_public # 更新分类信息 category.name = form.name.data category.is_public = form.is_public.data # 如果公共属性发生变化,更新该分类下所有站点的公共属性 if form.is_public.data != original_is_public: # 获取该分类下的所有站点 sites = Site.query.filter_by(category_id=category.id).all() # 更新每个站点的公共属性 for site in sites: site.is_public = category.is_public flash(f'已更新该分类下 {len(sites)} 个站点的公共属性', 'info') db.session.commit() flash('分类更新成功', 'success') return redirect(url_for('dashboard')) return render_template('edit_category.html', form=form, category=category, action='edit') # 修改删除公共站点路由 @app.route('/admin/site//delete', methods=['POST']) @login_required def delete_public_site(site_id): if not current_user.is_admin: abort(403) site = Site.query.get_or_404(site_id) # 修改权限检查 if not site.is_public: flash('只能删除公共站点', 'danger') return redirect(url_for('admin')) db.session.delete(site) db.session.commit() flash('站点删除成功', 'success') return redirect(url_for('admin')) @app.route('/favicon.ico') def favicon(): return send_from_directory(os.path.join(app.root_path, 'static'), 'favicon.ico', mimetype='image/vnd.microsoft.icon') if __name__ == '__main__': app.run(debug=False,port=6565)