||
- import uuid
- from werkzeug.utils import secure_filename
- import os
- from urllib.parse import urlparse
- import urllib.parse
- import requests
- from bs4 import BeautifulSoup
- from flask import Flask
- import os
- from flask import render_template, request, redirect, url_for, flash, jsonify, abort
- from flask_login import LoginManager, login_user, logout_user, login_required, current_user
- from werkzeug.utils import secure_filename
- from werkzeug.security import generate_password_hash, check_password_hash
- from datetime import datetime
- from flask import Flask
- import os
- from flask import render_template, request, redirect, url_for, flash, jsonify, abort, make_response, session
- from flask import send_from_directory
- from config import Config
- from models import db, User, Category, Site
- from forms import LoginForm, RegisterForm, SiteForm, CategoryForm, ImportBookmarksForm, SearchForm
- from forms import UserForm # 确保这行存在
- import tempfile
- import json
- import atexit
- import glob
- # 创建Flask应用
- app = Flask(__name__)
- app.config.from_object(Config)
- # Ensure instance folder exists
- instance_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'instance')
- if not os.path.exists(instance_path):
- os.makedirs(instance_path)
- # 确保instance文件夹存在
- try:
- os.makedirs(app.instance_path)
- except OSError:
- pass
- # 初始化数据库
- db.init_app(app)
- with app.app_context():
- print(f"Database URI: {app.config['SQLALCHEMY_DATABASE_URI']}")
- try:
- db.create_all()
- print("Database tables created successfully.")
- except Exception as e:
- print(f"Error creating database tables: {e}")
- # 配置登录管理
- login_manager = LoginManager()
- login_manager.init_app(app)
- login_manager.login_view = 'login'
- login_manager.login_message = '请先登录以访问此页面'
- @login_manager.user_loader
- def load_user(user_id):
- return db.session.get(User, int(user_id))
- # 工具函数
- def get_favicon_url(url):
- """获取网站 favicon,如果失败返回默认图标"""
- try:
- parsed = urllib.parse.urlparse(url)
- if not parsed.scheme:
- url = 'http://' + url
- parsed = urllib.parse.urlparse(url)
- domain = parsed.netloc
- if not domain:
- return url_for('static', filename='images/default-icon.png')
- # 尝试标准位置
- favicon_url = f"{parsed.scheme}://{domain}/favicon.ico"
- resp = requests.head(favicon_url, allow_redirects=True, timeout=3)
- if resp.status_code == 200:
- return favicon_url
- # 尝试网页中 link rel="icon"
- resp = requests.get(url, timeout=5)
- if resp.status_code == 200:
- soup = BeautifulSoup(resp.text, 'html.parser')
- icon_link = soup.find('link', rel=lambda x: x and 'icon' in x.lower())
- if icon_link and icon_link.get('href'):
- icon_href = icon_link['href']
- if icon_href.startswith('http'):
- return icon_href
- elif icon_href.startswith('/'):
- return f"{parsed.scheme}://{domain}{icon_href}"
- else:
- return f"{parsed.scheme}://{domain}/{icon_href}"
- return url_for('static', filename='images/default-icon.png')
- except:
- return url_for('static', filename='images/default-icon.png')
- def parse_bookmarks_html(file_path):
- """解析浏览器书签HTML文件"""
- with open(file_path, 'r', encoding='utf-8') as f:
- soup = BeautifulSoup(f.read(), 'html.parser')
- # 查找书签栏和文件夹
- bookmarks = []
- folders = {}
- # 查找所有h3标签(文件夹)和对应的dl(书签列表)
- for h3 in soup.find_all('h3'):
- folder_name = h3.string
- if not folder_name:
- continue
- dl = h3.find_next_sibling('dl')
- if not dl:
- continue
- folder_bookmarks = []
- for a in dl.find_all('a'):
- href = a.get('href')
- name = a.string
- if not href or not name:
- continue
- # 获取描述(通常是添加日期或其他属性)
- description = a.get('add_date', '') or a.get('icon', '') or ''
- folder_bookmarks.append({
- 'name': name.strip(),
- 'url': href.strip(),
- 'description': description.strip()
- })
- if folder_bookmarks:
- folders[folder_name.strip()] = folder_bookmarks
- return folders
- def create_default_admin():
- try:
- # 先检查是否已存在管理员账户
- existing_admin = User.query.filter_by(email='admin@example.com').first()
- if existing_admin:
- print("默认管理员账户已存在")
- return
- # 创建新管理员
- admin = User(
- username='admin',
- email='admin@example.com',
- is_admin=True
- )
- admin.set_password('admin123') # 设置默认密码
- db.session.add(admin)
- db.session.commit()
- print("成功创建默认管理员账户")
- except Exception as e:
- db.session.rollback()
- print(f"创建管理员账户时出错: {str(e)}")
- def create_default_categories():
- # 如果已经有任何分类存在,说明已经初始化过了,不用重复创建
- if Category.query.first() is not None:
- return
- # 找到默认管理员用户
- admin = User.query.filter_by(email='admin@example.com').first()
- if not admin:
- print("[警告] 未找到默认管理员用户,无法创建默认分类")
- return
- default_categories = [
- '搜索引擎', '社交媒体', '新闻资讯', '购物网站', '工具网站', '娱乐休闲'
- ]
- for cat_name in default_categories:
- category = Category(
- name=cat_name,
- user_id=admin.id, # ✅ 关键:指定管理员的 ID 作为分类拥有者
- is_public=True
- )
- db.session.add(category)
- db.session.commit()
- print(f"[INFO] 已成功创建 {len(default_categories)} 个默认分类,归属于管理员 {admin.username}")
- # API 路由定义
- @app.route('/api/category/<int:category_id>', methods=['GET'])
- def get_category(category_id):
- category = Category.query.get_or_404(category_id)
- return jsonify({
- 'id': category.id,
- 'name': category.name,
- 'is_public': category.is_public
- })
- @app.route('/api/category', methods=['POST'])
- def api_add_category():
- data = request.get_json()
- if not data or 'name' not in data:
- return jsonify({'success': False, 'message': '分类名称不能为空'}), 400
- category = Category(
- name=data['name'],
- user_id=current_user.id,
- is_public=data.get('is_public', False)
- )
- db.session.add(category)
- db.session.commit()
- return jsonify({'success': True, 'message': '分类添加成功'})
- @app.route('/api/category/<int:category_id>', methods=['PUT'])
- def api_update_category(category_id):
- category = Category.query.get_or_404(category_id)
- data = request.get_json()
- if not data or 'name' not in data:
- return jsonify({'success': False, 'message': '分类名称不能为空'}), 400
- category.name = data['name']
- category.is_public = data.get('is_public', False)
- db.session.commit()
- return jsonify({'success': True, 'message': '分类更新成功'})
- # 路由定义
- # 首页路由
- @app.route('/')
- def index():
- search_form = SearchForm()
- # 公共站点:is_public=True 的站点 + 分类也是公开的
- public_sites_query = Site.query.join(Category).filter(
- Site.is_public == True,
- Category.is_public == True
- )
- if search_form.q.data and search_form.validate():
- q = search_form.q.data.strip()
- public_sites_query = public_sites_query.filter(
- Site.name.contains(q) |
- Site.description.contains(q) |
- Site.url.contains(q)
- )
- public_sites = public_sites_query.order_by(Site.created_at.asc()).all()
- # 公共分类
- public_categories = Category.query.filter_by(is_public=True).order_by(Category.name).all()
- # 我的私有站点和分类(仅登录用户)
- my_sites = []
- my_categories = []
- if current_user.is_authenticated:
- my_sites = Site.query.filter_by(user_id=current_user.id).order_by(Site.created_at.asc()).all()
- my_categories = Category.query.filter_by(user_id=current_user.id).order_by(Category.name).all()
- saved_engine = request.cookies.get('search-engine', 'local')
- return render_template('index.html',
- public_sites=public_sites,
- public_categories=public_categories,
- my_sites=my_sites,
- my_categories=my_categories,
- search_form=search_form,
- saved_engine=saved_engine)
- @app.route('/register', methods=['GET', 'POST'])
- def register():
- if current_user.is_authenticated:
- return redirect(url_for('index'))
- form = RegisterForm()
- if form.validate_on_submit():
- # 检查用户名和邮箱是否已存在
- if User.query.filter_by(username=form.username.data).first():
- flash('用户名已存在', 'danger')
- return render_template('register.html', form=form)
- if User.query.filter_by(email=form.email.data).first():
- flash('邮箱已存在', 'danger')
- return render_template('register.html', form=form)
- # 创建新用户
- user = User(
- username=form.username.data,
- email=form.email.data
- )
- user.set_password(form.password.data)
- db.session.add(user)
- db.session.commit()
- flash('注册成功,请登录', 'success')
- return redirect(url_for('login'))
- return render_template('register.html', form=form)
- @app.route('/login', methods=['GET', 'POST'])
- def login():
- if current_user.is_authenticated:
- return redirect(url_for('index'))
- form = LoginForm()
- if form.validate_on_submit():
- user = User.query.filter_by(username=form.username.data).first()
- if user is None or not user.check_password(form.password.data):
- flash('用户名或密码错误', 'danger')
- return render_template('login.html', form=form)
- login_user(user, remember=True)
- next_page = request.args.get('next')
- if not next_page or not next_page.startswith('/'):
- next_page = url_for('index')
- return redirect(next_page)
- return render_template('login.html', form=form)
- # 添加成员路由
- @app.route('/admin/user/add', methods=['GET', 'POST'])
- @login_required
- def add_user():
- if not current_user.is_admin:
- abort(403)
- form = UserForm()
- if form.validate_on_submit():
- user = User(
- username=form.username.data,
- email=form.email.data,
- is_admin=form.is_admin.data
- )
- user.set_password(form.password.data)
- db.session.add(user)
- db.session.commit()
- flash('用户添加成功', 'success')
- return redirect(url_for('admin')) # 修改为admin
- return render_template('manage_user.html', form=form, action='add')
- # 同样修改其他相关视图函数中的重定向
- @app.route('/admin/user/<int:user_id>/edit', methods=['GET', 'POST'])
- @login_required
- def edit_user(user_id):
- if not current_user.is_admin:
- abort(403)
- user = User.query.get_or_404(user_id)
- form = UserForm(obj=user)
- if form.validate_on_submit():
- user.username = form.username.data
- user.email = form.email.data
- user.is_admin = form.is_admin.data
- if form.password.data:
- user.set_password(form.password.data)
- db.session.commit()
- flash('用户信息更新成功', 'success')
- return redirect(url_for('admin')) # 修改为admin
- return render_template('manage_user.html',
- form=form,
- action='edit',
- user=user,
- title=f'编辑用户 - {user.username}')
- # 删除成员路由
- @app.route('/admin/user/<int:user_id>/delete', methods=['POST'])
- @login_required
- def delete_user(user_id):
- if not current_user.is_admin:
- abort(403)
- user = User.query.get_or_404(user_id)
- if user.id == current_user.id:
- flash('不能删除当前登录用户', 'danger')
- return redirect(url_for('admin'))
- db.session.delete(user)
- db.session.commit()
- flash('用户删除成功', 'success')
- return redirect(url_for('admin'))
- @app.route('/logout')
- @login_required
- def logout():
- logout_user()
- return redirect(url_for('index'))
- @app.route('/dashboard')
- @login_required
- def dashboard():
- # 获取用户的分类和站点
- categories = Category.query.filter_by(user_id=current_user.id).order_by(Category.name).all()
- sites = Site.query.filter_by(user_id=current_user.id).order_by(Site.created_at.desc()).all()
- # 获取用户收藏的公共分类和站点
- favorite_categories = current_user.favorite_categories
- favorite_sites = current_user.favorite_sites
- return render_template('dashboard.html',
- categories=categories,
- sites=sites,
- favorite_categories=favorite_categories,
- favorite_sites=favorite_sites)
- # 在admin路由中修改查询条件
- @app.route('/admin')
- @login_required
- def admin():
- if not current_user.is_admin:
- abort(403)
- users = User.query.all()
- # 确保查询的是公共分类
- public_categories = Category.query.filter_by(is_public=True).order_by(Category.name).all()
- # 确保查询的是公共站点
- public_sites = Site.query.filter_by(is_public=True).order_by(Site.created_at.desc()).all()
- return render_template('admin_dashboard.html',
- users=users,
- public_categories=public_categories,
- public_sites=public_sites)
- @app.route('/category/add', methods=['GET', 'POST'])
- @login_required
- def add_category():
- form = CategoryForm()
- if form.validate_on_submit():
- category = Category(
- name=form.name.data,
- user_id=current_user.id,
- is_public=form.is_public.data
- )
- db.session.add(category)
- db.session.commit()
- flash('分类添加成功', 'success')
- return redirect(url_for('dashboard'))
- return render_template('manage_categories.html', form=form, action='add')
- @app.route('/site/add', methods=['GET', 'POST'])
- @login_required
- def add_site():
- form = SiteForm()
- # 使用单个查询获取用户的分类以及公共分类,避免重复
- from sqlalchemy import or_
- all_categories = Category.query.filter(
- or_(
- Category.user_id == current_user.id,
- Category.is_public == True
- )
- ).all()
- form.category_id.choices = [(c.id, c.name) for c in all_categories]
- if form.validate_on_submit():
- site = Site(
- name=form.name.data,
- url=form.url.data,
- description=form.description.data,
- category_id=form.category_id.data,
- user_id=current_user.id,
- is_public=form.is_public.data
- )
- # 处理自定义图标
- if form.custom_icon.data:
- upload_folder = os.path.join(app.root_path, 'static', 'uploads', 'icons')
- os.makedirs(upload_folder, exist_ok=True)
- file_ext = os.path.splitext(form.custom_icon.data.filename)[1]
- filename = f"icon_{current_user.id}_{form.name.data}_{int(datetime.now().timestamp())}{file_ext}"
- filename = secure_filename(filename)
- file_path = os.path.join(upload_folder, filename)
- form.custom_icon.data.save(file_path)
- site.custom_icon = url_for('static', filename=f'uploads/icons/{filename}')
- site.icon = None
- else:
- # 自动抓取 favicon
- site.icon = get_favicon_url(site.url)
- site.custom_icon = None
- db.session.add(site)
- db.session.commit()
- flash('站点添加成功', 'success')
- return redirect(url_for('dashboard'))
- return render_template('manage_sites.html', form=form, action='add')
- @app.route('/site/<int:site_id>/edit', methods=['GET', 'POST'])
- @login_required
- def edit_site(site_id):
- site = Site.query.get_or_404(site_id)
- # 权限检查:站点所有者或管理员
- if site.user_id != current_user.id and not current_user.is_admin:
- abort(403)
- form = SiteForm()
- # 获取用户分类 + 公共分类
- user_categories = Category.query.filter_by(user_id=current_user.id).all()
- public_categories = Category.query.filter_by(is_public=True).all()
- all_categories = user_categories + public_categories
- form.category_id.choices = [(c.id, c.name) for c in all_categories]
- if form.validate_on_submit():
- custom_icon_url = site.custom_icon # 默认保持原图标
- # 如果上传了新自定义图标
- if form.custom_icon.data:
- upload_folder = os.path.join(app.root_path, 'static', 'uploads', 'icons')
- os.makedirs(upload_folder, exist_ok=True)
- file_ext = os.path.splitext(form.custom_icon.data.filename)[1]
- filename = f"icon_{current_user.id}_{form.name.data}_{int(datetime.now().timestamp())}{file_ext}"
- filename = secure_filename(filename)
- file_path = os.path.join(upload_folder, filename)
- form.custom_icon.data.save(file_path)
- custom_icon_url = url_for('static', filename=f'uploads/icons/{filename}')
- # 更新站点信息
- site.name = form.name.data
- site.url = form.url.data
- site.description = form.description.data
- site.category_id = form.category_id.data
- site.is_public = form.is_public.data
- if custom_icon_url:
- # 优先使用自定义图标
- site.custom_icon = custom_icon_url
- site.icon = None
- else:
- # 如果没有自定义图标,则抓取网站 favicon
- site.icon = get_favicon_url(site.url)
- site.custom_icon = None
- db.session.commit()
- flash('站点更新成功', 'success')
- return redirect(url_for('dashboard'))
- # GET 请求时预填充表单
- if request.method == 'GET':
- form.name.data = site.name
- form.url.data = site.url
- form.description.data = site.description
- form.category_id.data = site.category_id
- form.is_public.data = site.is_public
- return render_template('manage_sites.html', form=form, action='edit', site=site)
- @app.route('/site/<int:site_id>/delete', methods=['POST'])
- @login_required
- def delete_site(site_id):
- site = Site.query.get_or_404(site_id)
- # 只允许站点所有者或管理员删除
- if site.user_id != current_user.id and not current_user.is_admin:
- abort(403)
- db.session.delete(site)
- db.session.commit()
- flash('站点删除成功', 'success')
- return redirect(url_for('dashboard'))
- @app.route('/category/<int:category_id>/delete', methods=['POST'])
- @login_required
- def delete_category(category_id):
- category = Category.query.get_or_404(category_id)
- # 只允许分类所有者或管理员删除
- if category.user_id != current_user.id and not current_user.is_admin:
- abort(403)
- # 确保不删除公共分类(除非是管理员)
- if category.is_public and not current_user.is_admin:
- abort(403)
- db.session.delete(category)
- db.session.commit()
- flash('分类删除成功', 'success')
- return redirect(url_for('dashboard'))
- import json
- from flask import jsonify, request
- from sqlalchemy import or_
- @app.route('/import-bookmarks', methods=['GET', 'POST'])
- @login_required
- def import_bookmarks():
- form = ImportBookmarksForm()
- if form.validate_on_submit():
- # 保存上传的文件
- f = form.bookmark_file.data
- filename = secure_filename(f.filename)
- # 使用临时文件
- import uuid
- unique_filename = f"{uuid.uuid4().hex}_{filename}"
- file_path = os.path.join(tempfile.gettempdir(), unique_filename)
- # 确保目录存在
- os.makedirs(os.path.dirname(file_path), exist_ok=True)
- f.save(file_path)
- try:
- # 解析书签文件
- bookmarks = parse_bookmarks_html(file_path)
- # 准备预览数据
- preview_data = {
- 'file_token': unique_filename,
- 'total_count': 0,
- 'folders': [],
- 'import_as_public': form.import_as_public.data and current_user.is_admin
- }
- # 分析重复项
- existing_urls = {site.url for site in Site.query.filter_by(user_id=current_user.id).all()}
- for folder_name, folder_bookmarks in bookmarks.items():
- folder_data = {
- 'name': folder_name,
- 'bookmarks': [],
- 'new_count': 0,
- 'duplicate_count': 0
- }
- for bm in folder_bookmarks:
- is_duplicate = bm['url'] in existing_urls
- bookmark_data = {
- 'name': bm['name'],
- 'url': bm['url'],
- 'description': bm.get('description', ''),
- 'is_duplicate': is_duplicate,
- 'selected': not is_duplicate
- }
- folder_data['bookmarks'].append(bookmark_data)
- if not is_duplicate:
- folder_data['new_count'] += 1
- else:
- folder_data['duplicate_count'] += 1
- preview_data['total_count'] += 1
- preview_data['folders'].append(folder_data)
- # 将预览数据保存到临时文件,而不是session
- preview_file_path = os.path.join(tempfile.gettempdir(), f"preview_{unique_filename}.json")
- with open(preview_file_path, 'w', encoding='utf-8') as f:
- json.dump(preview_data, f, ensure_ascii=False, indent=2)
- # 只在session中存储文件路径
- session['preview_file'] = preview_file_path
- # 重定向到预览页面
- return redirect(url_for('preview_bookmarks'))
- except Exception as e:
- app.logger.error(f"书签解析失败: {str(e)}")
- flash(f'文件解析失败: {str(e)}', 'danger')
- finally:
- # 保留文件用于后续导入
- pass
- return render_template('import_bookmarks.html', form=form)
- def parse_bookmarks_html(file_path):
- """解析浏览器书签HTML文件"""
- with open(file_path, 'r', encoding='utf-8') as f:
- soup = BeautifulSoup(f.read(), 'html.parser')
- bookmarks = {}
- # 查找所有书签链接
- for a in soup.find_all('a'):
- href = a.get('href')
- name = a.text.strip()
- if href and name:
- # 获取父级文件夹名称
- folder_name = '导入的书签' # 默认文件夹名
- parent = a.find_parent('dl')
- if parent:
- h3 = parent.find_previous_sibling('h3')
- if h3:
- folder_name = h3.text.strip()
- if folder_name not in bookmarks:
- bookmarks[folder_name] = []
- bookmarks[folder_name].append({
- 'name': name,
- 'url': href,
- 'description': a.get('add_date', '') or a.get('last_modified', '')
- })
- return bookmarks
- @app.route('/preview-bookmarks')
- @login_required
- def preview_bookmarks():
- """书签导入预览页面"""
- preview_file_path = session.get('preview_file')
- if not preview_file_path or not os.path.exists(preview_file_path):
- flash('请先上传书签文件', 'warning')
- return redirect(url_for('import_bookmarks'))
- try:
- # 从文件加载预览数据
- with open(preview_file_path, 'r', encoding='utf-8') as f:
- preview_data = json.load(f)
- except Exception as e:
- app.logger.error(f"加载预览数据失败: {str(e)}")
- flash('预览数据加载失败,请重新上传', 'warning')
- return redirect(url_for('import_bookmarks'))
- # 获取用户现有分类,用于编辑时选择
- user_categories = Category.query.filter_by(user_id=current_user.id).all()
- return render_template('preview_bookmarks.html',
- preview_data=preview_data,
- user_categories=user_categories)
- @app.route('/api/import-bookmarks', methods=['POST'])
- @login_required
- def api_import_bookmarks():
- """API接口:执行书签导入(支持进度查询)"""
- print("收到导入请求")
- if request.method == 'POST':
- try:
- data = request.get_json()
- print(f"请求数据: {data}")
- selected_bookmarks = data.get('selected_bookmarks', [])
- import_as_public = data.get('import_as_public', False)
- print(f"要导入的书签数量: {len(selected_bookmarks)}")
- print(f"导入为公开: {import_as_public}")
- # 创建导入任务ID
- import_id = str(uuid.uuid4())
- print(f"创建任务ID: {import_id}")
- # 将导入任务信息存储在临时文件中,而不是session
- import_task = {
- 'id': import_id,
- 'total': len(selected_bookmarks),
- 'processed': 0,
- 'status': 'processing',
- 'results': {
- 'success': 0,
- 'failed': 0,
- 'errors': []
- }
- }
- # 保存导入任务到临时文件
- task_file_path = os.path.join(tempfile.gettempdir(), f"task_{import_id}.json")
- with open(task_file_path, 'w', encoding='utf-8') as f:
- json.dump(import_task, f, ensure_ascii=False, indent=2)
- # 在session中只存储任务ID
- session['import_task_id'] = import_id
- # 异步执行导入(这里简化为同步)
- print("开始执行导入...")
- execute_import(import_id, selected_bookmarks, import_as_public, current_user.id)
- print("导入执行完成")
- return jsonify({
- 'import_id': import_id,
- 'status': 'started',
- 'total': len(selected_bookmarks)
- })
- except Exception as e:
- print(f"API处理错误: {e}")
- return jsonify({'error': str(e)}), 500
- @app.route('/api/import-progress/<import_id>')
- @login_required
- def api_import_progress(import_id):
- """查询导入进度"""
- print(f"查询进度: {import_id}")
- # 从临时文件加载任务信息
- task_file_path = os.path.join(tempfile.gettempdir(), f"task_{import_id}.json")
- print(f"任务文件路径: {task_file_path}")
- if not os.path.exists(task_file_path):
- print("任务文件不存在")
- return jsonify({'error': '任务不存在'}), 404
- try:
- with open(task_file_path, 'r', encoding='utf-8') as f:
- task = json.load(f)
- print(f"任务状态: {task}")
- return jsonify({
- 'status': task['status'],
- 'processed': task['processed'],
- 'total': task['total'],
- 'results': task['results']
- })
- except Exception as e:
- print(f"读取任务文件错误: {e}")
- return jsonify({'error': '任务数据损坏'}), 500
- def execute_import(import_id, selected_bookmarks, import_as_public, user_id):
- """执行实际的书签导入"""
- # 从临时文件加载任务信息
- task_file_path = os.path.join(tempfile.gettempdir(), f"task_{import_id}.json")
- try:
- with open(task_file_path, 'r', encoding='utf-8') as f:
- task = json.load(f)
- except Exception as e:
- print(f"无法加载任务文件: {e}")
- return
- try:
- categories_cache = {} # 缓存分类对象
- for i, bookmark in enumerate(selected_bookmarks):
- # 更新进度
- task['processed'] = i + 1
- # 保存进度到文件
- with open(task_file_path, 'w', encoding='utf-8') as f:
- json.dump(task, f, ensure_ascii=False, indent=2)
- try:
- folder_name = bookmark.get('folder', '导入的书签')
- custom_category = bookmark.get('custom_category')
- # 确定分类
- if custom_category:
- folder_name = custom_category
- # 获取或创建分类
- if folder_name not in categories_cache:
- category = Category.query.filter_by(
- name=folder_name,
- user_id=user_id
- ).first()
- if not category:
- category = Category(
- name=folder_name,
- user_id=user_id,
- is_public=import_as_public
- )
- db.session.add(category)
- db.session.flush() # 获取ID但不提交事务
- categories_cache[folder_name] = category
- category = categories_cache[folder_name]
- # 检查是否已存在
- existing_site = Site.query.filter_by(
- url=bookmark['url'],
- user_id=user_id
- ).first()
- if existing_site:
- task['results']['failed'] += 1
- task['results']['errors'].append(f"书签已存在: {bookmark['name']}")
- continue
- # 创建新站点
- site = Site(
- name=bookmark['name'][:100],
- url=bookmark['url'][:500],
- description=bookmark.get('description', '')[:200],
- category_id=category.id,
- user_id=user_id,
- is_public=import_as_public
- )
- # 获取favicon
- site.icon = get_favicon_url(bookmark['url'])
- db.session.add(site)
- task['results']['success'] += 1
- except Exception as e:
- task['results']['failed'] += 1
- task['results']['errors'].append(f"导入失败 {bookmark['name']}: {str(e)}")
- db.session.commit()
- task['status'] = 'completed'
- # 保存最终状态
- with open(task_file_path, 'w', encoding='utf-8') as f:
- json.dump(task, f, ensure_ascii=False, indent=2)
- print(f"导入完成: 成功 {task['results']['success']}, 失败 {task['results']['failed']}")
- except Exception as e:
- db.session.rollback()
- task['status'] = 'failed'
- task['results']['errors'].append(f"导入过程出错: {str(e)}")
- # 保存错误状态
- with open(task_file_path, 'w', encoding='utf-8') as f:
- json.dump(task, f, ensure_ascii=False, indent=2)
- print(f"导入失败: {e}")
- @app.route('/site/<int:site_id>')
- def site_detail(site_id):
- site = Site.query.get_or_404(site_id)
- return render_template('site_detail.html', site=site)
- def cleanup_temp_files():
- """清理临时文件"""
- temp_dir = tempfile.gettempdir()
- # 清理预览文件
- for file in glob.glob(os.path.join(temp_dir, "preview_*.json")):
- try:
- os.remove(file)
- except:
- pass
- # 清理任务文件
- for file in glob.glob(os.path.join(temp_dir, "task_*.json")):
- try:
- os.remove(file)
- except:
- pass
- # 注册退出时的清理函数
- atexit.register(cleanup_temp_files)
- # 搜索路由
- @app.route('/search')
- def search():
- query_text = request.args.get('q', '').strip()
- search_engine = request.args.get('search-engine', 'local')
- resp = make_response()
- # 保存用户选择的搜索引擎到 cookie (30 天)
- resp.set_cookie('search-engine', search_engine, max_age=30*24*3600)
- if search_engine == 'local':
- if not query_text:
- flash('请输入搜索关键词', 'warning')
- return redirect(url_for('index'))
- sites = Site.query.filter(
- Site.is_public == True
- ).join(Category).filter_by(is_public=True).filter(
- Site.name.contains(query_text) |
- Site.description.contains(query_text) |
- Site.url.contains(query_text)
- ).all()
- categories = Category.query.filter_by(is_public=True).order_by(Category.name).all()
- resp.response = render_template('index.html',
- sites=sites,
- categories=categories,
- search_query=query_text,
- saved_engine=search_engine)
- return resp
- else:
- # 外部搜索跳转
- engines = {
- 'baidu': f'https://www.baidu.com/s?wd={query_text}',
- 'bing': f'https://www.bing.com/search?q={query_text}',
- 'google': f'https://www.google.com/search?q={query_text}'
- }
- if search_engine in engines:
- return redirect(engines[search_engine])
- else:
- flash('不支持的搜索引擎', 'warning')
- return redirect(url_for('index'))
- # 错误处理
- @app.errorhandler(404)
- def page_not_found(e):
- return render_template('404.html'), 404
- @app.errorhandler(403)
- def forbidden(e):
- return render_template('404.html', message='您没有权限访问此页面'), 403
- # 创建数据库表和初始数据
- @app.before_first_request
- def create_tables():
- db.create_all()
- create_default_admin()
- create_default_categories()
- # 编辑用户角色(提升/降级管理员)
- @app.route('/admin/user/<int:user_id>/toggle_admin', methods=['POST'])
- @login_required
- def toggle_admin(user_id):
- if not current_user.is_admin:
- abort(403)
- user = User.query.get_or_404(user_id)
- if user.id == current_user.id:
- flash('您不能更改自己的管理员状态', 'warning')
- else:
- user.is_admin = not user.is_admin
- db.session.commit()
- status = '管理员' if user.is_admin else '普通用户'
- flash(f'已将用户 "{user.username}" 设置为 {status}', 'success')
- return redirect(url_for('admin'))
- @app.route('/admin/category/<int:category_id>/edit', methods=['GET', 'POST'])
- @login_required
- def edit_public_category(category_id):
- if not current_user.is_admin:
- flash('您没有管理员权限', 'danger')
- return redirect(url_for('admin'))
- category = Category.query.get_or_404(category_id)
- # 修改权限检查:管理员可以编辑所有公共分类
- if not category.is_public:
- flash('只能编辑公共分类', 'danger')
- return redirect(url_for('admin'))
- form = CategoryForm(obj=category)
- if form.validate_on_submit():
- category.name = form.name.data
- category.is_public = True # 保持为公共分类
- db.session.commit()
- flash('分类更新成功', 'success')
- return redirect(url_for('admin'))
- return render_template('edit_public_category.html', form=form, category=category)
- # 修改删除公共分类路由
- @app.route('/admin/category/<int:category_id>/delete', methods=['POST'])
- @login_required
- def delete_public_category(category_id):
- if not current_user.is_admin:
- abort(403)
- category = Category.query.get_or_404(category_id)
- # 修改权限检查
- if not category.is_public:
- flash('只能删除公共分类', 'danger')
- return redirect(url_for('admin'))
- db.session.delete(category)
- db.session.commit()
- flash('分类删除成功', 'success')
- return redirect(url_for('admin'))
- # 编辑公共站点
- # 编辑公共站点路由
- @app.route('/admin/site/<int:site_id>/edit', methods=['GET', 'POST'])
- @login_required
- def edit_public_site(site_id):
- if not current_user.is_admin:
- abort(403)
- site = Site.query.get_or_404(site_id)
- # 修改权限检查:管理员可以编辑所有公共站点
- if not site.is_public:
- flash('只能编辑公共站点', 'danger')
- return redirect(url_for('admin'))
- form = SiteForm(obj=site)
- form.category_id.choices = [(c.id, c.name) for c in Category.query.filter_by(is_public=True).all()]
- if form.validate_on_submit():
- site.name = form.name.data
- site.url = form.url.data
- site.description = form.description.data
- site.category_id = form.category_id.data
- site.is_public = True
- db.session.commit()
- flash('站点更新成功', 'success')
- return redirect(url_for('admin'))
- return render_template('edit_public_site.html', form=form, site=site)
- @app.route('/edit_category/<int:category_id>', methods=['GET', 'POST'])
- @login_required
- def edit_category(category_id):
- category = Category.query.get_or_404(category_id)
- # 确保当前用户有权编辑此分类
- if category.user_id != current_user.id and not current_user.is_admin:
- abort(403)
- form = CategoryForm(obj=category)
- if form.validate_on_submit():
- # 记录原始公共状态
- original_is_public = category.is_public
- # 更新分类信息
- category.name = form.name.data
- category.is_public = form.is_public.data
- # 如果公共属性发生变化,更新该分类下所有站点的公共属性
- if form.is_public.data != original_is_public:
- # 获取该分类下的所有站点
- sites = Site.query.filter_by(category_id=category.id).all()
- # 更新每个站点的公共属性
- for site in sites:
- site.is_public = category.is_public
- flash(f'已更新该分类下 {len(sites)} 个站点的公共属性', 'info')
- db.session.commit()
- flash('分类更新成功', 'success')
- return redirect(url_for('dashboard'))
- return render_template('edit_category.html', form=form, category=category, action='edit')
- # 修改删除公共站点路由
- @app.route('/admin/site/<int:site_id>/delete', methods=['POST'])
- @login_required
- def delete_public_site(site_id):
- if not current_user.is_admin:
- abort(403)
- site = Site.query.get_or_404(site_id)
- # 修改权限检查
- if not site.is_public:
- flash('只能删除公共站点', 'danger')
- return redirect(url_for('admin'))
- db.session.delete(site)
- db.session.commit()
- flash('站点删除成功', 'success')
- return redirect(url_for('admin'))
- @app.route('/favicon.ico')
- def favicon():
- return send_from_directory(os.path.join(app.root_path, 'static'),
- 'favicon.ico', mimetype='image/vnd.microsoft.icon')
- if __name__ == '__main__':
- app.run(debug=False,port=6565)
|