import uuid import sys import json import atexit import threading from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, jsonify, abort, current_app from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user from werkzeug.security import generate_password_hash, check_password_hash from werkzeug.utils import secure_filename from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.interval import IntervalTrigger from sqlalchemy import func from sqlalchemy.exc import SQLAlchemyError from models import db, SystemConfig, User, Department, CompanyEntity, ContractType, Contract, Counterparty, ContractAttachment, ContractVersion from flask import url_for, render_template_string from flask_mail import Mail, Message from datetime import datetime, timedelta from apscheduler.triggers.cron import CronTrigger import pytz from apscheduler.triggers.cron import CronTrigger from pytz import timezone from flask import send_file, make_response, abort import os from urllib.parse import quote # ----------------------------- # 路径处理(PyInstaller 兼容) # ----------------------------- if getattr(sys, 'frozen', False): # PyInstaller 打包后 base_path = sys._MEIPASS instance_path = os.path.join(os.path.dirname(sys.executable), 'instance') else: base_path = os.path.abspath(".") instance_path = os.path.join(base_path, 'instance') os.makedirs(instance_path, exist_ok=True) # ----------------------------- # Flask 初始化 # ----------------------------- app = Flask(__name__, template_folder=os.path.join(base_path, 'templates'), static_folder=os.path.join(base_path, 'static')) app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(instance_path, 'contracts.db') app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SECRET_KEY'] = 'dev-secret-key' app.config['UPLOAD_FOLDER'] = os.path.join(instance_path, 'uploads') app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB upload limit # ✅ 允许 APScheduler 在无请求上下文下生成完整 URL(解决 url_for 报错) app.config['SERVER_NAME'] = '127.0.0.1:8082' app.config['PREFERRED_URL_SCHEME'] = 'http' # 创建上传目录 os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) os.makedirs(os.path.join(app.config['UPLOAD_FOLDER'], 'icons'), exist_ok=True) os.makedirs(os.path.join(app.config['UPLOAD_FOLDER'], 'contracts'), exist_ok=True) # SQLAlchemy 初始化 db.init_app(app) # Flask-Login login_manager = LoginManager(app) login_manager.login_view = 'login' # ----------------------------- # 用户加载 # ----------------------------- @login_manager.user_loader def load_user(user_id): return db.session.get(User, int(user_id)) # ----------------------------- # 数据库初始化 # ----------------------------- def init_db(): with app.app_context(): db.create_all() if not SystemConfig.query.first(): print("数据库已创建,请访问 /initial_setup 初始化系统") def generate_contract_number(): """生成新合同编号,格式 HT-YYYY-XXXX""" from sqlalchemy import func year = datetime.now().year prefix = f"HT-{year}-" # 查询今年已有的合同数量,用作流水号 last_contract = Contract.query.filter(Contract.contract_number.like(f"{prefix}%")) \ .order_by(Contract.contract_number.desc()).first() if last_contract and last_contract.contract_number: try: last_number = int(last_contract.contract_number.split('-')[-1]) except ValueError: last_number = 0 else: last_number = 0 new_number = last_number + 1 return f"{prefix}{str(new_number).zfill(4)}" # ----------------------------- # 合同到期检查(已优化) # ----------------------------- def generate_contract_number_by_type(type_id): """根据合同类型生成编号,优先使用 prefix""" ctype = ContractType.query.get(type_id) if not ctype: return "HT-XXXX" # 优先使用 prefix,其次使用 code,兜底用 HT+ID type_code = ctype.prefix or ctype.code or f"HT{ctype.id}" year = datetime.now().year prefix = f"{type_code}-{year}-" # 查询该类型已有合同数量,获取流水号 last_contract = Contract.query.filter( Contract.type_id == type_id, Contract.contract_number.like(f"{prefix}%") ).order_by(Contract.contract_number.desc()).first() if last_contract and last_contract.contract_number: try: last_number = int(last_contract.contract_number.split('-')[-1]) except ValueError: last_number = 0 else: last_number = 0 new_number = last_number + 1 return f"{prefix}{str(new_number).zfill(4)}" # 先创建 Mail 对象,不传 app mail = Mail() def load_mail_config(): """ 从 SystemConfig 加载邮件配置并初始化 Mail(自动处理 TLS/SSL) """ try: config = SystemConfig.query.first() except Exception as e: app.logger.error(f"读取 SystemConfig 时出错: {e}") config = None if not config: print("⚠️ 未找到邮箱配置(SystemConfig 为空),邮件功能未启用") return try: mail_port = int(getattr(config, 'mail_port', 0)) except Exception: mail_port = 0 mail_server = getattr(config, 'mail_server', None) mail_username = getattr(config, 'mail_username', None) mail_password = getattr(config, 'mail_password', None) # 自动判断 TLS/SSL mail_use_tls = False mail_use_ssl = False if mail_port == 465: # SSL端口 mail_use_ssl = True mail_use_tls = False elif mail_port == 587: # STARTTLS端口 mail_use_tls = True mail_use_ssl = False else: # 普通SMTP端口 mail_use_tls = False mail_use_ssl = False app.config.update( MAIL_SERVER=mail_server, MAIL_PORT=mail_port, MAIL_USERNAME=mail_username, MAIL_PASSWORD=mail_password, MAIL_USE_TLS=mail_use_tls, MAIL_USE_SSL=mail_use_ssl ) try: mail.init_app(app) print(f"✅ 邮件模块已初始化,SERVER={mail_server} PORT={mail_port} TLS={mail_use_tls} SSL={mail_use_ssl}") if mail_use_tls and mail_use_ssl: print("⚠️ 警告:TLS和SSL同时启用,可能导致邮件发送失败") except Exception as e: print(f"❌ 邮件模块初始化失败: {e}") def check_expiring_contracts(): with app.app_context(): today = datetime.now().date() in_30_days = today + timedelta(days=30) # 查询合同 expiring_contracts = Contract.query.filter( Contract.end_date <= in_30_days, Contract.end_date >= today, Contract.is_active == True ).all() expired_contracts = Contract.query.filter( Contract.end_date < today, Contract.is_active == True ).all() # 构建合同链接 def get_contract_url(contract_id): try: return url_for('view_contract', contract_id=contract_id, _external=True) except RuntimeError: return f"http://127.0.0.1:8082/view_contract/{contract_id}" # 处理合同数据,计算剩余天数或逾期天数 def build_contract_data(contract, expired=False): delta_days = (contract.end_date - today).days return { 'name': contract.name, 'contract_number': getattr(contract, 'contract_number', ''), # 改这里 'end_date': contract.end_date.strftime('%Y-%m-%d'), 'days_left': max(delta_days, 0), 'days_overdue': abs(min(delta_days, 0)), 'url': get_contract_url(contract.id) } expiring_data = [build_contract_data(c) for c in expiring_contracts] expired_data = [build_contract_data(c, expired=True) for c in expired_contracts] if not expiring_data and not expired_data: print("[定时任务] 无到期或即将到期合同。") return print(f"[定时任务] 已找到 {len(expiring_contracts)} 个即将到期合同,{len(expired_contracts)} 个已到期合同。") # 渲染邮件 HTML html_content = render_template_string("""

合同到期提醒 - 汇总

管理员您好,系统在 {{ now_date }} 检测到以下合同的到期情况:

{% if expiring %}

📅 即将到期(30天内)

{% for c in expiring %} {% endfor %}
合同名称 合同编号 到期日期 剩余天数 操作
{{ c.name }} {{ c.contract_number }} {{ c.end_date }} {{ c.days_left }} 天 查看
{% endif %} {% if expired %}

⚠️ 已到期(请尽快处理)

{% for c in expired %} {% endfor %}
合同名称 合同编号 到期日期 逾期天数 操作
{{ c.name }} {{ c.contract_number }} {{ c.end_date }} {{ c.days_overdue }} 天 查看
{% endif %}

此邮件由合同管理系统自动发送。如有问题请登录系统查看或联系系统管理员。
{{ now_full }}

""", expiring=expiring_data, expired=expired_data, now_date=today.strftime("%Y-%m-%d"), now_full=datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 邮件发送 try: subject = f"【合同到期提醒】{today.strftime('%Y-%m-%d')} - 即将到期 {len(expiring_data)} / 已到期 {len(expired_data)}" sender = app.config.get('MAIL_USERNAME') or f"no-reply@{app.config.get('MAIL_SERVER') or 'local'}" # 从数据库获取管理员邮箱 admin_users = User.query.filter_by(is_admin=True).all() recipients = [u.email for u in admin_users if u.email] if not recipients: print("⚠️ 没有找到管理员邮箱,邮件发送被跳过") return msg = Message(subject=subject, recipients=recipients, html=html_content, sender=sender) def _send(): try: with app.app_context(): mail.send(msg) print( f"✅ 已发送到期提醒邮件给 {recipients}(即将到期: {len(expiring_data)},已到期: {len(expired_data)})") except Exception as e: print(f"❌ 邮件发送失败: {e}") t = threading.Thread(target=_send, daemon=True) t.start() except Exception as e: print(f"❌ 构造或发送邮件时发生异常: {e}") app.logger.error(f"check_expiring_contracts error: {e}") # ----------------------------- # APScheduler 调度(根据配置动态设置) # ----------------------------- scheduler = BackgroundScheduler(timezone=pytz.timezone('Asia/Shanghai')) # ----------------------------- # APScheduler 调度(根据配置动态设置) # ----------------------------- scheduler = BackgroundScheduler(timezone=pytz.timezone('Asia/Shanghai')) def get_scheduler_config(): """从系统配置获取定时任务设置""" # 确保在应用上下文中执行数据库查询 with app.app_context(): config = SystemConfig.query.first() if not config: # 默认值 return { 'frequency': 'daily', 'weekdays': [], 'month_day': 1, 'hour': 9, 'minute': 0 } return { 'frequency': config.scheduler_frequency or 'daily', 'weekdays': config.scheduler_weekdays or [], 'month_day': config.scheduler_month_day or 1, 'hour': config.scheduler_hour or 9, 'minute': config.scheduler_minute or 0 } def create_scheduler_trigger(): """根据配置创建触发器""" config = get_scheduler_config() if config['frequency'] == 'daily': # 每天执行 return CronTrigger( hour=config['hour'], minute=config['minute'], timezone='Asia/Shanghai' ) elif config['frequency'] == 'weekly': # 每周执行 return CronTrigger( day_of_week=','.join(map(str, config['weekdays'])), hour=config['hour'], minute=config['minute'], timezone='Asia/Shanghai' ) elif config['frequency'] == 'monthly': # 每月执行 return CronTrigger( day=config['month_day'], hour=config['hour'], minute=config['minute'], timezone='Asia/Shanghai' ) else: # 默认每天执行 return CronTrigger( hour=9, minute=0, timezone='Asia/Shanghai' ) # 添加定时任务 scheduler.add_job( check_expiring_contracts, create_scheduler_trigger(), id='daily_contract_check', replace_existing=True ) # 开始调度 if not hasattr(app, 'scheduler_started'): try: scheduler.start() app.scheduler_started = True # 打印定时任务配置 config = get_scheduler_config() if config['frequency'] == 'daily': print(f"⏰ Scheduler 已启动(每天 {config['hour']}:{config['minute']} 检查合同到期)") elif config['frequency'] == 'weekly': weekdays = ', '.join( [['周一', '周二', '周三', '周四', '周五', '周六', '周日'][int(d)] for d in config['weekdays']]) print(f"⏰ Scheduler 已启动(每周 {weekdays} {config['hour']}:{config['minute']} 检查合同到期)") elif config['frequency'] == 'monthly': print(f"⏰ Scheduler 已启动(每月 {config['month_day']}号 {config['hour']}:{config['minute']} 检查合同到期)") except Exception as e: print(f"⚠️ Scheduler 启动时出错: {e}") # 程序退出时关闭调度器 atexit.register(lambda: scheduler.shutdown(wait=False)) # ----------------------------- # 路由 # ----------------------------- @app.route('/') def index(): if not SystemConfig.query.first(): return redirect(url_for('initial_setup')) return redirect(url_for('login')) @app.route('/initial_setup', methods=['GET', 'POST']) def initial_setup(): if SystemConfig.query.first(): return redirect(url_for('index')) if request.method == 'POST': config = SystemConfig( site_name=request.form['site_name'], mail_server=request.form['mail_server'], mail_port=int(request.form['mail_port']), mail_username=request.form['mail_username'], mail_password=request.form['mail_password'], mail_use_tls='mail_use_tls' in request.form ) db.session.add(config) admin = User( username=request.form['admin_username'], password=generate_password_hash(request.form['admin_password']), name=request.form['admin_name'], department=request.form['admin_department'], email=request.form['admin_email'], is_admin=True, can_create=True, can_view=True, can_edit=True, can_delete=True ) db.session.add(admin) departments = ['行政部', '财务部', '法务部', '市场部'] for dept in departments: db.session.add(Department(name=dept)) entities = ['总公司', '分公司A', '分公司B'] for entity in entities: db.session.add(CompanyEntity(name=entity)) contract_types = ['采购合同', '销售合同', '服务合同', '租赁合同'] for ct in contract_types: db.session.add(ContractType(name=ct)) db.session.commit() flash('系统初始化成功,请使用管理员账号登录', 'success') return redirect(url_for('login')) return render_template('initial_setup.html') # ----------------------------- # 登录/登出 # ----------------------------- @app.route('/login', methods=['GET', 'POST']) def login(): if current_user.is_authenticated: return redirect(url_for('dashboard')) if request.method == 'POST': username = request.form['username'] password = request.form['password'] user = User.query.filter_by(username=username).first() if user and check_password_hash(user.password, password): login_user(user) next_page = request.args.get('next') return redirect(next_page or url_for('dashboard')) else: flash('用户名或密码错误', 'danger') return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('login')) # ----------------------------- # 仪表盘 # ----------------------------- @app.route('/dashboard') @login_required def dashboard(): # 使用东八区时间 tz = timezone('Asia/Shanghai') today = datetime.now(tz).date() # 使用多个单独的查询来统计,避免复杂的SQL CASE语句 total = Contract.query.count() active = Contract.query.filter(Contract.is_active == True).count() expiring = Contract.query.filter( Contract.end_date.between(today, today + timedelta(days=30)), Contract.is_active == True ).count() expired_already = Contract.query.filter( Contract.end_date < today, Contract.is_active == True ).count() uncollected = Contract.query.filter( Contract.collected_date.is_(None), Contract.is_active == True ).count() terminated = Contract.query.filter(Contract.is_active == False).count() stats = { 'total': total, 'active': active, 'expiring': expiring, 'expired_already': expired_already, 'uncollected': uncollected, 'terminated': terminated } # 即将到期合同(30天内)- 只包含激活状态的合同 expiring_soon = Contract.query.filter( Contract.end_date.between(today, today + timedelta(days=30)), Contract.is_active == True ).order_by(Contract.end_date).limit(5).all() # 已到期合同 - 只包含激活状态的合同 expired_contracts = Contract.query.filter( Contract.end_date < today, Contract.is_active == True ).order_by(Contract.end_date.desc()).limit(5).all() # 已终止合同 - 状态为不激活 terminated_contracts = Contract.query.filter( Contract.is_active == False ).order_by(Contract.end_date.desc()).limit(5).all() # 最近添加的合同 recent_contracts = Contract.query.order_by(Contract.created_at.desc()).limit(5).all() # 待收回合同 - 只包含激活状态的合同 uncollected_contracts = Contract.query.filter( Contract.collected_date.is_(None), Contract.is_active == True ).order_by(Contract.end_date.desc()).limit(5).all() # 公司证件类合同 - 只包含激活状态的合同 company_docs = Contract.query.join(ContractType).filter( ContractType.name == "公司证件", Contract.is_active == True ).order_by(Contract.end_date.desc()).limit(5).all() # 公用模板合同 - 只包含激活状态的合同 public_templates = Contract.query.join(ContractType).filter( ContractType.name == "公用模板", Contract.is_active == True ).order_by(Contract.created_at.desc()).limit(5).all() return render_template( 'dashboard.html', stats=stats, expiring_soon=expiring_soon, expired_contracts=expired_contracts, terminated_contracts=terminated_contracts, recent_contracts=recent_contracts, uncollected_contracts=uncollected_contracts, company_docs=company_docs, public_templates=public_templates, today=today ) @app.route('/contracts') @login_required # 添加登录要求 def contract_list(): if not current_user.can_view: flash('您没有查看合同的权限', 'danger') return redirect(url_for('dashboard')) # 获取 type 参数 type_filter = request.args.get('type') today = datetime.now().date() if type_filter == 'all': # 全部合同 contracts = Contract.query.order_by(Contract.end_date.desc()).all() elif type_filter == 'active': # 有效合同 contracts = Contract.query.filter_by(is_active=True).order_by(Contract.end_date.desc()).all() elif type_filter == 'expiring': # 即将到期合同(30天内) end_date_limit = today + timedelta(days=30) contracts = Contract.query.filter( Contract.end_date >= today, Contract.end_date <= end_date_limit, Contract.is_active == True ).order_by(Contract.end_date.asc()).all() elif type_filter == 'expired': # 已到期合同 contracts = Contract.query.filter( Contract.end_date < today, Contract.is_active == True ).order_by(Contract.end_date.desc()).all() elif type_filter == 'uncollected': # 待收回合同 contracts = Contract.query.filter( Contract.collected_date.is_(None), Contract.is_active == True ).order_by(Contract.end_date.desc()).all() elif type_filter == 'terminated': # 已终止合同 contracts = Contract.query.filter_by(is_active=False).order_by(Contract.end_date.desc()).all() elif type_filter == 'recent': # 最近添加的合同 contracts = Contract.query.order_by(Contract.created_at.desc()).limit(10).all() elif type_filter == 'company_doc': # 公司证件 contracts = Contract.query.join(ContractType).filter( ContractType.name == '公司证件', Contract.is_active == True ).order_by(Contract.end_date.desc()).all() elif type_filter == 'template': # 公用模板 contracts = Contract.query.join(ContractType).filter( ContractType.name == '公用模板', Contract.is_active == True ).order_by(Contract.created_at.desc()).all() else: # 默认:全部合同 contracts = Contract.query.order_by(Contract.end_date.desc()).all() # 准备合同数据 contracts_data = [] for contract in contracts: attachments = [ { 'id': att.id, 'filename': att.filename, 'url': url_for('view_attachment', attachment_id=att.id), 'download_url': url_for('download_file', attachment_id=att.id) } for att in contract.attachments ] contracts_data.append({ 'contract': contract, 'attachments': attachments }) return render_template('contract_list.html', contracts=contracts_data) # ----------------------------- # 创建合同 # ----------------------------- @app.route('/contract/create', methods=['GET', 'POST']) @login_required def create_contract(): if not current_user.can_create: flash('您没有创建合同的权限', 'danger') return redirect(url_for('contract_list')) contract_types = ContractType.query.all() company_entities = CompanyEntity.query.all() default_number = "" if contract_types: # 默认使用第一个合同类型生成编号 default_number = generate_contract_number_by_type(contract_types[0].id) if request.method == 'POST': try: contract_number = request.form['contract_number'] or default_number contract = Contract( contract_number=contract_number, name=request.form['name'], type_id=request.form['type_id'], company_entity_id=request.form['company_entity_id'], signer_id=current_user.id, start_date=datetime.strptime(request.form['start_date'], '%Y-%m-%d').date(), end_date=datetime.strptime(request.form['end_date'], '%Y-%m-%d').date(), remind_before=int(request.form['remind_before']) if request.form['remind_before'] else None, notes=request.form['notes'], creator_id=current_user.id, signing_method=request.form.get('signing_method'), collected_date=datetime.strptime(request.form['collected_date'], '%Y-%m-%d').date() if request.form.get('collected_date') else None, storage_box=request.form.get('storage_box') ) db.session.add(contract) db.session.flush() # 获取合同ID以便添加附件 # 保存合同方 counterparties = request.form.getlist('counterparty_name') for cp in counterparties: if cp.strip(): db.session.add(Counterparty(name=cp.strip(), contract_id=contract.id)) # 保存初始版本 save_version(contract, current_user.id) # ------------------------------ # 处理附件上传 - 使用UUID生成文件名 files = request.files.getlist('new_attachments') # 注意字段名改为 new_attachments upload_folder = os.path.join(app.config['UPLOAD_FOLDER'], 'contracts') os.makedirs(upload_folder, exist_ok=True) # 文件类型白名单 ALLOWED_EXTENSIONS = {'pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx'} MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB for file in files: if file.filename == '': continue # 验证文件扩展名 filename = file.filename ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else '' if ext not in ALLOWED_EXTENSIONS: flash(f'文件 {filename} 类型不被允许', 'warning') continue # 验证文件大小 file.seek(0, os.SEEK_END) file_size = file.tell() file.seek(0) if file_size > MAX_FILE_SIZE: flash(f'文件 {filename} 超过5MB大小限制', 'warning') continue # 使用UUID生成唯一文件名 unique_id = uuid.uuid4().hex safe_filename = secure_filename(f"{unique_id}_{filename}") filepath = os.path.join(upload_folder, safe_filename) try: file.save(filepath) except Exception as e: flash(f'保存文件 {filename} 失败: {str(e)}', 'danger') continue # 创建附件记录 attachment = ContractAttachment( contract_id=contract.id, filename=filename, filepath=filepath ) db.session.add(attachment) db.session.commit() flash('合同创建成功', 'success') return redirect(url_for('view_contract', contract_id=contract.id)) except Exception as e: db.session.rollback() flash(f'创建合同失败: {str(e)}', 'danger') app.logger.error(f'Contract creation failed: {str(e)}') return render_template('contract_form.html', contract_types=contract_types, company_entities=company_entities, default_number=default_number) @app.route('/contract/generate_number/') @login_required def generate_number(type_id): """根据合同类型生成编号(线程安全)""" ctype = ContractType.query.get(type_id) if not ctype: return jsonify({"number": "HT-XXXX"}) type_code = ctype.prefix or ctype.code or f"HT{ctype.id}" year = datetime.now().year prefix = f"{type_code}-{year}-" try: # 事务锁定,避免并发重复 last_number = db.session.query( func.max(func.substr(Contract.contract_number, -4, 4).cast(db.Integer)) ).filter( Contract.contract_number.like(f"{prefix}%") ).scalar() or 0 new_number = last_number + 1 return jsonify({"number": f"{prefix}{str(new_number).zfill(4)}"}) except SQLAlchemyError as e: db.session.rollback() return jsonify({"number": f"{prefix}0001"}) @app.route('/contract/') @login_required def view_contract(contract_id): contract = Contract.query.get_or_404(contract_id) if not current_user.can_view: flash('您没有查看合同的权限', 'danger') return redirect(url_for('contract_list')) # 获取合同方 counterparties = Counterparty.query.filter_by(contract_id=contract_id).all() # 获取附件,并为前端预览添加 url/download_url attachments = ContractAttachment.query.filter_by(contract_id=contract_id).all() for att in attachments: att.url = url_for('view_attachment', attachment_id=att.id) # 浏览器直接预览 att.download_url = url_for('download_file', attachment_id=att.id) # 下载用 return render_template( 'contract_view.html', contract=contract, counterparties=counterparties, attachments=attachments ) @app.route('/attachments/') @login_required def view_attachment(attachment_id): attachment = ContractAttachment.query.get_or_404(attachment_id) filename = os.path.basename(attachment.filepath) upload_folder = os.path.join(app.config['UPLOAD_FOLDER'], 'contracts') full_path = os.path.join(upload_folder, filename) if not os.path.exists(full_path): abort(404) ext = os.path.splitext(filename)[1].lower() if ext in ['.pdf', '.jpg', '.jpeg', '.png', '.gif']: # 直接浏览器预览 return send_from_directory(upload_folder, filename) else: # 其他文件下载 return send_from_directory(upload_folder, filename, as_attachment=True) # ----------------------------- # 编辑合同 # ----------------------------- @app.route('/contract/edit/', methods=['GET', 'POST']) @login_required def edit_contract(contract_id): contract = Contract.query.get_or_404(contract_id) if not current_user.can_edit: flash('您没有编辑合同的权限', 'danger') return redirect(url_for('view_contract', contract_id=contract.id)) if request.method == 'POST': try: # 保存历史版本 save_version(contract, current_user.id) # 更新合同字段 contract.contract_number = request.form['contract_number'] contract.name = request.form['name'] contract.type_id = request.form['type_id'] contract.company_entity_id = request.form['company_entity_id'] contract.start_date = datetime.strptime(request.form['start_date'], '%Y-%m-%d').date() contract.end_date = datetime.strptime(request.form['end_date'], '%Y-%m-%d').date() contract.remind_before = int(request.form['remind_before']) if request.form['remind_before'] else None contract.notes = request.form['notes'] contract.signing_method = request.form.get('signing_method') contract.collected_date = datetime.strptime(request.form['collected_date'], '%Y-%m-%d').date() if request.form.get( 'collected_date') else None contract.storage_box = request.form.get('storage_box') # 更新合同方 Counterparty.query.filter_by(contract_id=contract.id).delete() counterparties = request.form.getlist('counterparty_name') for cp in counterparties: if cp.strip(): db.session.add(Counterparty(name=cp.strip(), contract_id=contract.id)) # 删除选中的旧附件 delete_ids = request.form.getlist('delete_attachment') for att_id in delete_ids: attachment = ContractAttachment.query.get(att_id) if attachment and attachment.contract_id == contract.id: try: if os.path.exists(attachment.filepath): os.remove(attachment.filepath) except Exception as e: app.logger.error(f"删除附件文件失败: {attachment.filepath}, 错误: {str(e)}") db.session.delete(attachment) # 上传新附件 - 关键修复点 files = request.files.getlist('new_attachments') upload_folder = os.path.join(app.config['UPLOAD_FOLDER'], 'contracts') os.makedirs(upload_folder, exist_ok=True) # 文件类型白名单 ALLOWED_EXTENSIONS = {'pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx'} MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB for file in files: if file.filename == '': continue # 验证文件扩展名 filename = file.filename ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else '' if ext not in ALLOWED_EXTENSIONS: flash(f'文件 {filename} 类型不被允许', 'warning') continue # 验证文件大小 file.seek(0, os.SEEK_END) file_size = file.tell() file.seek(0) if file_size > MAX_FILE_SIZE: flash(f'文件 {filename} 超过5MB大小限制', 'warning') continue # 使用UUID生成唯一文件名 unique_id = uuid.uuid4().hex safe_filename = secure_filename(f"{unique_id}_{filename}") filepath = os.path.join(upload_folder, safe_filename) try: file.save(filepath) except Exception as e: flash(f'保存文件 {filename} 失败: {str(e)}', 'danger') continue # 创建附件记录 new_attachment = ContractAttachment( contract_id=contract.id, filename=filename, # 原始文件名 filepath=filepath # 服务器存储路径 ) db.session.add(new_attachment) db.session.commit() flash('合同更新成功', 'success') return redirect(url_for('view_contract', contract_id=contract.id)) except Exception as e: db.session.rollback() flash(f'更新合同失败: {str(e)}', 'danger') app.logger.error(f'Contract update failed: {str(e)}') # 渲染编辑页面 contract_types = ContractType.query.all() company_entities = CompanyEntity.query.all() counterparties = Counterparty.query.filter_by(contract_id=contract.id).all() attachments = ContractAttachment.query.filter_by(contract_id=contract.id).all() return render_template( 'contract_form.html', contract=contract, contract_types=contract_types, company_entities=company_entities, counterparties=counterparties, attachments=attachments ) def save_version(contract, user_id): """Save current contract state as a version""" data = { 'contract_number': contract.contract_number, 'name': contract.name, 'type_id': contract.type_id, 'company_entity_id': contract.company_entity_id, 'start_date': str(contract.start_date), 'end_date': str(contract.end_date), 'remind_before': contract.remind_before, 'notes': contract.notes, 'counterparties': [cp.name for cp in contract.counterparties], # 新增字段 'signing_method': contract.signing_method, 'collected_date': str(contract.collected_date) if contract.collected_date else None, 'storage_box': contract.storage_box, 'is_active': contract.is_active # 添加合同状态 } max_version = db.session.query(db.func.max(ContractVersion.version)).filter_by( contract_id=contract.id).scalar() or 0 version = ContractVersion( contract_id=contract.id, version=max_version + 1, data=json.dumps(data, ensure_ascii=False, indent=2), modified_by=user_id ) db.session.add(version) @app.route('/contract/delete/', methods=['POST']) @login_required def delete_contract(contract_id): contract = Contract.query.get_or_404(contract_id) if not current_user.can_delete: flash('您没有删除合同的权限', 'danger') return redirect(url_for('view_contract', contract_id=contract_id)) try: contract.is_active = False db.session.commit() flash('合同已标记为终止', 'success') except Exception as e: db.session.rollback() flash(f'终止合同失败: {str(e)}', 'danger') app.logger.error(f'Contract termination failed: {str(e)}') return redirect(url_for('contract_list')) @app.route('/contract/renew/', methods=['GET', 'POST']) @login_required def renew_contract(contract_id): original_contract = Contract.query.get_or_404(contract_id) if not current_user.can_create: flash('您没有创建合同的权限', 'danger') return redirect(url_for('view_contract', contract_id=contract_id)) if request.method == 'POST': try: # 自动生成续签合同编号 existing_renewals = Contract.query.filter( Contract.contract_number.like(f"{original_contract.contract_number}-续签%") ).all() renewal_number = len(existing_renewals) + 1 new_contract_number = f"{original_contract.contract_number}-续签{renewal_number:02d}" contract = Contract( contract_number=new_contract_number, name=request.form['name'], type_id=request.form['type_id'], company_entity_id=request.form['company_entity_id'], signer_id=current_user.id, start_date=datetime.strptime(request.form['start_date'], '%Y-%m-%d').date(), end_date=datetime.strptime(request.form['end_date'], '%Y-%m-%d').date(), remind_before=int(request.form['remind_before']) if request.form['remind_before'] else None, notes=request.form['notes'], creator_id=current_user.id, original_contract_id=original_contract.id, # 新增字段(可以默认继承原合同) signing_method=original_contract.signing_method, collected_date=original_contract.collected_date, storage_box=original_contract.storage_box ) db.session.add(contract) db.session.commit() # 复制合同方 counterparties = Counterparty.query.filter_by(contract_id=original_contract.id).all() for cp in counterparties: db.session.add(Counterparty(name=cp.name, contract_id=contract.id)) # 保存初始版本 save_version(contract, current_user.id) # 处理附件上传 - 使用UUID生成文件名 files = request.files.getlist('new_attachments') # 注意字段名改为 new_attachments upload_folder = os.path.join(app.config['UPLOAD_FOLDER'], 'contracts') os.makedirs(upload_folder, exist_ok=True) # 文件类型白名单 ALLOWED_EXTENSIONS = {'pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp', 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx'} MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB for file in files: if file.filename == '': continue # 验证文件扩展名 filename = file.filename ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else '' if ext not in ALLOWED_EXTENSIONS: flash(f'文件 {filename} 类型不被允许', 'warning') continue # 验证文件大小 file.seek(0, os.SEEK_END) file_size = file.tell() file.seek(0) if file_size > MAX_FILE_SIZE: flash(f'文件 {filename} 超过5MB大小限制', 'warning') continue # 使用UUID生成唯一文件名 unique_id = uuid.uuid4().hex safe_filename = secure_filename(f"{unique_id}_{filename}") filepath = os.path.join(upload_folder, safe_filename) try: file.save(filepath) except Exception as e: flash(f'保存文件 {filename} 失败: {str(e)}', 'danger') continue # 创建附件记录 attachment = ContractAttachment( contract_id=contract.id, filename=filename, filepath=filepath ) db.session.add(attachment) db.session.commit() flash('合同续签成功', 'success') return redirect(url_for('view_contract', contract_id=contract.id)) except Exception as e: db.session.rollback() flash(f'续签合同失败: {str(e)}', 'danger') app.logger.error(f'Contract renewal failed: {str(e)}') # GET 请求预填充 contract_types = ContractType.query.all() company_entities = CompanyEntity.query.all() counterparties = Counterparty.query.filter_by(contract_id=contract_id).all() new_start_date = original_contract.end_date + timedelta(days=1) new_end_date = new_start_date + timedelta(days=365) return render_template('contract_form.html', contract=original_contract, contract_types=contract_types, company_entities=company_entities, counterparties=counterparties, renew=True, new_start_date=new_start_date.strftime('%Y-%m-%d'), new_end_date=new_end_date.strftime('%Y-%m-%d')) @app.route('/contract//versions') @login_required def contract_versions(contract_id): contract = Contract.query.get_or_404(contract_id) # 相关合同 IDs contract_ids = set([contract.id]) if contract.original_contract_id: contract_ids.add(contract.original_contract_id) # 加上续签合同 renewals = Contract.query.filter(Contract.original_contract_id == contract.id).all() contract_ids.update([c.id for c in renewals]) # 查询所有版本 versions = ContractVersion.query.filter( ContractVersion.contract_id.in_(contract_ids) ).join(Contract).order_by( Contract.contract_number.asc(), ContractVersion.version.desc() ).all() # 生成 parsed_versions parsed_versions = [] for v in versions: try: data_dict = json.loads(v.data) except Exception as e: data_dict = {"error": f"解析失败: {str(e)}"} parsed_versions.append({ 'version_obj': v, 'data': data_dict, 'belongs_to': v.contract }) # ====== 调试输出 ====== print("=== 调试: parsed_versions 内容 ===") for idx, pv in enumerate(parsed_versions): print(f"{idx+1}. 合同编号: {pv['belongs_to'].contract_number}, 版本: {pv['version_obj'].version}, 修改人: {pv['version_obj'].modifier.name}") print(f" 数据: {pv['data']}") print("=== 调试结束 ===") contract_types = {t.id: t.name for t in ContractType.query.all()} company_entities = {e.id: e.name for e in CompanyEntity.query.all()} return render_template( 'contract_versions.html', contract=contract, parsed_versions=parsed_versions, contract_types=contract_types, company_entities=company_entities ) @app.route('/contract/attachment/delete/', methods=['POST']) @login_required def delete_attachment(attachment_id): attachment = ContractAttachment.query.get_or_404(attachment_id) contract_id = attachment.contract_id if not current_user.can_edit: flash('您没有编辑合同的权限', 'danger') return redirect(url_for('view_contract', contract_id=contract_id)) try: # Delete file from filesystem if os.path.exists(attachment.filepath): os.remove(attachment.filepath) # Delete record from database db.session.delete(attachment) db.session.commit() flash('附件已删除', 'success') except Exception as e: db.session.rollback() flash(f'删除附件失败: {str(e)}', 'danger') app.logger.error(f'Attachment deletion failed: {str(e)}') return redirect(url_for('view_contract', contract_id=contract_id)) # ----------------------------- # 下载附件 # ----------------------------- @app.route('/download/') @login_required def download_file(attachment_id): # 获取附件对象 attachment = ContractAttachment.query.get_or_404(attachment_id) # 构建文件路径 filename = os.path.basename(attachment.filepath) upload_folder = os.path.join(app.config['UPLOAD_FOLDER'], 'contracts') file_path = os.path.join(upload_folder, filename) # 检查文件是否存在 if not os.path.exists(file_path): abort(404) # 使用 download_name 参数设置用户友好的文件名 return send_file( file_path, as_attachment=True, download_name=attachment.filename ) @app.route('/users') @login_required def user_list(): if not current_user.is_admin: flash('您没有权限访问此页面', 'danger') return redirect(url_for('dashboard')) users = User.query.order_by(User.is_admin.desc(), User.username).all() return render_template('user_list.html', users=users) @app.route('/user/create', methods=['GET', 'POST']) @login_required def create_user(): if not current_user.is_admin: flash('您没有权限访问此页面', 'danger') return redirect(url_for('dashboard')) if request.method == 'POST': try: user = User( username=request.form['username'], password=generate_password_hash(request.form['password']), name=request.form['name'], department=request.form['department'], email=request.form['email'], is_admin='is_admin' in request.form, can_create='can_create' in request.form, can_view='can_view' in request.form, can_edit='can_edit' in request.form, can_delete='can_delete' in request.form ) db.session.add(user) db.session.commit() flash('用户创建成功', 'success') return redirect(url_for('user_list')) except Exception as e: db.session.rollback() flash(f'创建用户失败: {str(e)}', 'danger') app.logger.error(f'User creation failed: {str(e)}') return render_template('user_form.html') @app.route('/user/edit/', methods=['GET', 'POST']) @login_required def edit_user(user_id): if not current_user.is_admin: flash('您没有权限访问此页面', 'danger') return redirect(url_for('dashboard')) user = User.query.get_or_404(user_id) if request.method == 'POST': try: user.username = request.form['username'] if request.form['password']: user.password = generate_password_hash(request.form['password']) user.name = request.form['name'] user.department = request.form['department'] user.email = request.form['email'] user.is_admin = 'is_admin' in request.form user.can_create = 'can_create' in request.form user.can_view = 'can_view' in request.form user.can_edit = 'can_edit' in request.form user.can_delete = 'can_delete' in request.form db.session.commit() flash('用户信息更新成功', 'success') return redirect(url_for('user_list')) except Exception as e: db.session.rollback() flash(f'更新用户信息失败: {str(e)}', 'danger') app.logger.error(f'User update failed: {str(e)}') return render_template('user_form.html', user=user) @app.route('/user/delete/', methods=['POST']) @login_required def delete_user(user_id): if not current_user.is_admin: flash('您没有权限访问此页面', 'danger') return redirect(url_for('dashboard')) if current_user.id == user_id: flash('不能删除当前登录的用户', 'danger') return redirect(url_for('user_list')) user = User.query.get_or_404(user_id) try: db.session.delete(user) db.session.commit() flash('用户删除成功', 'success') except Exception as e: db.session.rollback() flash(f'删除用户失败: {str(e)}', 'danger') app.logger.error(f'User deletion failed: {str(e)}') return redirect(url_for('user_list')) @app.route('/system/config', methods=['GET', 'POST']) @login_required def system_config(): if not current_user.is_admin: flash('您没有权限访问此页面', 'danger') return redirect(url_for('dashboard')) config = SystemConfig.query.first() if request.method == 'POST': config.site_name = request.form['site_name'] config.mail_server = request.form['mail_server'] config.mail_port = int(request.form['mail_port']) config.mail_username = request.form['mail_username'] config.mail_password = request.form['mail_password'] config.mail_use_tls = 'mail_use_tls' in request.form db.session.commit() flash('系统配置更新成功', 'success') return redirect(url_for('system_config')) departments = Department.query.all() entities = CompanyEntity.query.all() contract_types = ContractType.query.all() return render_template('system_config.html', config=config, departments=departments, entities=entities, contract_types=contract_types) @app.route('/system/config/department/add', methods=['POST']) @login_required def add_department(): data = request.get_json() dept = Department(name=data['name']) db.session.add(dept) db.session.commit() return '', 204 @app.route('/system/config/entity/add', methods=['POST']) @login_required def add_entity(): data = request.get_json() entity = CompanyEntity(name=data['name']) db.session.add(entity) db.session.commit() return '', 204 @app.route('/system/config/type/add', methods=['POST']) @login_required def add_contract_type(): data = request.get_json() ct = ContractType(name=data['name']) db.session.add(ct) db.session.commit() return '', 204 @app.route('/system/config//delete/', methods=['POST']) @login_required def delete_config_item(type, id): model = {'department': Department, 'entity': CompanyEntity, 'type': ContractType}[type] item = model.query.get_or_404(id) db.session.delete(item) db.session.commit() return '', 204 @app.route('/system/config/scheduler', methods=['POST']) @login_required def save_scheduler_config(): if not current_user.is_admin: return jsonify({'error': '无权访问'}), 403 data = request.get_json() # 确保在应用上下文中执行数据库操作 with app.app_context(): config = SystemConfig.query.first() if not config: return jsonify({'error': '系统配置未初始化'}), 400 try: config.scheduler_frequency = data.get('frequency', 'daily') config.scheduler_weekdays = data.get('weekdays', []) config.scheduler_month_day = data.get('month_day', 1) config.scheduler_hour = data.get('hour', 9) config.scheduler_minute = data.get('minute', 0) db.session.commit() # 更新定时任务 scheduler.reschedule_job( 'daily_contract_check', trigger=create_scheduler_trigger() ) return jsonify({'status': 'success'}) except Exception as e: db.session.rollback() return jsonify({'error': str(e)}), 500 def update_scheduler_job(config): """根据配置更新定时任务""" scheduler.remove_job('daily_contract_check') if config.scheduler_frequency == 'daily': # 每天执行 scheduler.add_job( check_expiring_contracts, CronTrigger( hour=config.scheduler_hour, minute=config.scheduler_minute, timezone='Asia/Shanghai' ), id='daily_contract_check' ) elif config.scheduler_frequency == 'weekly': # 每周执行 scheduler.add_job( check_expiring_contracts, CronTrigger( day_of_week=','.join(map(str, config.scheduler_weekdays)), hour=config.scheduler_hour, minute=config.scheduler_minute, timezone='Asia/Shanghai' ), id='daily_contract_check' ) elif config.scheduler_frequency == 'monthly': # 每月执行 scheduler.add_job( check_expiring_contracts, CronTrigger( day=config.scheduler_month_day, hour=config.scheduler_hour, minute=config.scheduler_minute, timezone='Asia/Shanghai' ), id='daily_contract_check' ) print(f"⏰ 定时任务已更新: {config.scheduler_frequency} {config.scheduler_hour}:{config.scheduler_minute}") @app.route('/api/contracts/expiring') @login_required def api_expiring_contracts(): if not current_user.can_view: return jsonify({'error': '无权访问'}), 403 days = request.args.get('days', default=30, type=int) today = datetime.now().date() # 改用本地时间 end_date = today + timedelta(days=days) contracts = Contract.query.filter( Contract.end_date <= end_date, Contract.end_date >= today, Contract.is_active == True ).order_by(Contract.end_date).all() result = [{ 'id': c.id, 'name': c.name, 'contract_number': c.contract_number, 'end_date': c.end_date.strftime('%Y-%m-%d'), 'days_left': (c.end_date - today).days, 'url': url_for('view_contract', contract_id=c.id) } for c in contracts] return jsonify(result) @app.errorhandler(404) def page_not_found(e): return render_template('404.html'), 404 @app.errorhandler(403) def forbidden(e): return render_template('403.html'), 403 @app.errorhandler(500) def internal_server_error(e): db.session.rollback() return render_template('500.html'), 500 @app.route('/uploads/icons/') def uploaded_icon(filename): icon_dir = os.path.join(app.instance_path, 'uploads', 'icons') return send_from_directory(icon_dir, filename) @app.context_processor def inject_config(): # 从数据库获取配置,假设你有 Config 模型 config = current_app.config.get('SITE_CONFIG') # 或者从数据库取 return dict(config=config) @app.context_processor def inject_now(): # 返回当前时间戳,避免浏览器缓存旧图标 return {'now_timestamp': int(datetime.utcnow().timestamp())} @app.route('/profile', methods=['GET', 'POST'])#个人资料 @login_required def profile(): if request.method == 'POST': try: current_user.name = request.form['name'] current_user.department = request.form['department'] current_user.email = request.form['email'] if request.form['password']: current_user.password = generate_password_hash(request.form['password']) db.session.commit() flash('个人资料更新成功', 'success') except Exception as e: db.session.rollback() flash(f'更新失败: {str(e)}', 'danger') return render_template('profile.html', user=current_user) @app.route('/contract/delete/permanent/', methods=['POST']) @login_required def delete_contract_permanent(contract_id): contract = Contract.query.get_or_404(contract_id) if not current_user.can_delete: flash('您没有删除合同的权限', 'danger') return redirect(url_for('view_contract', contract_id=contract_id)) try: # 删除附件文件 for attachment in contract.attachments: if os.path.exists(attachment.filepath): os.remove(attachment.filepath) db.session.delete(attachment) # 删除合同方 for cp in contract.counterparties: db.session.delete(cp) # 删除合同版本 for version in contract.versions: db.session.delete(version) db.session.delete(contract) db.session.commit() flash('合同已彻底删除', 'success') return redirect(url_for('contract_list')) except Exception as e: db.session.rollback() flash(f'删除合同失败: {str(e)}', 'danger') app.logger.error(f'Permanent contract deletion failed: {str(e)}') return redirect(url_for('view_contract', contract_id=contract_id)) @app.route('/api/contracts/uncollected') @login_required def api_uncollected_contracts(): if not current_user.can_view: return jsonify({'error': '无权访问'}), 403 contracts = Contract.query.filter( Contract.collected_date.is_(None) ).order_by(Contract.end_date.desc()).all() result = [{ 'id': c.id, 'name': c.name, 'contract_number': c.contract_number, 'end_date': c.end_date.strftime('%Y-%m-%d') if c.end_date else None, 'url': url_for('view_contract', contract_id=c.id) } for c in contracts] return jsonify(result) # 设置合同类型前缀 @app.route('/system/config/type/set_prefix/', methods=['POST']) @login_required def set_contract_type_prefix(type_id): from flask import request, jsonify data = request.get_json() prefix = data.get('prefix', '').strip() ct = ContractType.query.get_or_404(type_id) ct.prefix = prefix db.session.commit() return jsonify({"status": "ok"}) @login_required def download_attachment(attachment_id): attachment = ContractAttachment.query.get_or_404(attachment_id) file_path = os.path.join(UPLOAD_FOLDER, attachment.filename) if os.path.exists(file_path): return send_from_directory(UPLOAD_FOLDER, attachment.filename, as_attachment=True) else: abort(404) @app.route('/favicon.ico') def favicon(): return send_from_directory( os.path.join(app.root_path, 'static'), 'favicon.ico', mimetype='image/vnd.microsoft.icon' ) # 在应用启动时调用 if __name__ == '__main__': if not os.path.exists(os.path.join(app.instance_path, 'contracts.db')): init_db() with app.app_context(): load_mail_config() app.run(debug=False, port=8082)