||
- import uuid
- import sys
- import json
- import atexit
- import threading
- from flask import Flask, render_template, request, redirect, url_for, flash, send_from_directory, jsonify, abort, current_app
- from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
- from werkzeug.security import generate_password_hash, check_password_hash
- from werkzeug.utils import secure_filename
- from apscheduler.schedulers.background import BackgroundScheduler
- from apscheduler.triggers.interval import IntervalTrigger
- from sqlalchemy import func
- from sqlalchemy.exc import SQLAlchemyError
- from models import db, SystemConfig, User, Department, CompanyEntity, ContractType, Contract, Counterparty, ContractAttachment, ContractVersion
- from flask import url_for, render_template_string
- from flask_mail import Mail, Message
- from datetime import datetime, timedelta
- from apscheduler.triggers.cron import CronTrigger
- import pytz
- from apscheduler.triggers.cron import CronTrigger
- from pytz import timezone
- from flask import send_file, make_response, abort
- import os
- from urllib.parse import quote
- # -----------------------------
- # 路径处理(PyInstaller 兼容)
- # -----------------------------
- if getattr(sys, 'frozen', False):
- # PyInstaller 打包后
- base_path = sys._MEIPASS
- instance_path = os.path.join(os.path.dirname(sys.executable), 'instance')
- else:
- base_path = os.path.abspath(".")
- instance_path = os.path.join(base_path, 'instance')
- os.makedirs(instance_path, exist_ok=True)
- # -----------------------------
- # Flask 初始化
- # -----------------------------
- app = Flask(__name__,
- template_folder=os.path.join(base_path, 'templates'),
- static_folder=os.path.join(base_path, 'static'))
- app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///' + os.path.join(instance_path, 'contracts.db')
- app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
- app.config['SECRET_KEY'] = 'dev-secret-key'
- app.config['UPLOAD_FOLDER'] = os.path.join(instance_path, 'uploads')
- app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB upload limit
- # ✅ 允许 APScheduler 在无请求上下文下生成完整 URL(解决 url_for 报错)
- app.config['SERVER_NAME'] = '127.0.0.1:8082'
- app.config['PREFERRED_URL_SCHEME'] = 'http'
- # 创建上传目录
- os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
- os.makedirs(os.path.join(app.config['UPLOAD_FOLDER'], 'icons'), exist_ok=True)
- os.makedirs(os.path.join(app.config['UPLOAD_FOLDER'], 'contracts'), exist_ok=True)
- # SQLAlchemy 初始化
- db.init_app(app)
- # Flask-Login
- login_manager = LoginManager(app)
- login_manager.login_view = 'login'
- # -----------------------------
- # 用户加载
- # -----------------------------
- @login_manager.user_loader
- def load_user(user_id):
- return db.session.get(User, int(user_id))
- # -----------------------------
- # 数据库初始化
- # -----------------------------
- def init_db():
- with app.app_context():
- db.create_all()
- if not SystemConfig.query.first():
- print("数据库已创建,请访问 /initial_setup 初始化系统")
- def generate_contract_number():
- """生成新合同编号,格式 HT-YYYY-XXXX"""
- from sqlalchemy import func
- year = datetime.now().year
- prefix = f"HT-{year}-"
- # 查询今年已有的合同数量,用作流水号
- last_contract = Contract.query.filter(Contract.contract_number.like(f"{prefix}%")) \
- .order_by(Contract.contract_number.desc()).first()
- if last_contract and last_contract.contract_number:
- try:
- last_number = int(last_contract.contract_number.split('-')[-1])
- except ValueError:
- last_number = 0
- else:
- last_number = 0
- new_number = last_number + 1
- return f"{prefix}{str(new_number).zfill(4)}"
- # -----------------------------
- # 合同到期检查(已优化)
- # -----------------------------
- def generate_contract_number_by_type(type_id):
- """根据合同类型生成编号,优先使用 prefix"""
- ctype = ContractType.query.get(type_id)
- if not ctype:
- return "HT-XXXX"
- # 优先使用 prefix,其次使用 code,兜底用 HT+ID
- type_code = ctype.prefix or ctype.code or f"HT{ctype.id}"
- year = datetime.now().year
- prefix = f"{type_code}-{year}-"
- # 查询该类型已有合同数量,获取流水号
- last_contract = Contract.query.filter(
- Contract.type_id == type_id,
- Contract.contract_number.like(f"{prefix}%")
- ).order_by(Contract.contract_number.desc()).first()
- if last_contract and last_contract.contract_number:
- try:
- last_number = int(last_contract.contract_number.split('-')[-1])
- except ValueError:
- last_number = 0
- else:
- last_number = 0
- new_number = last_number + 1
- return f"{prefix}{str(new_number).zfill(4)}"
- # 先创建 Mail 对象,不传 app
- mail = Mail()
- def load_mail_config():
- """
- 从 SystemConfig 加载邮件配置并初始化 Mail(自动处理 TLS/SSL)
- """
- try:
- config = SystemConfig.query.first()
- except Exception as e:
- app.logger.error(f"读取 SystemConfig 时出错: {e}")
- config = None
- if not config:
- print("⚠️ 未找到邮箱配置(SystemConfig 为空),邮件功能未启用")
- return
- try:
- mail_port = int(getattr(config, 'mail_port', 0))
- except Exception:
- mail_port = 0
- mail_server = getattr(config, 'mail_server', None)
- mail_username = getattr(config, 'mail_username', None)
- mail_password = getattr(config, 'mail_password', None)
- # 自动判断 TLS/SSL
- mail_use_tls = False
- mail_use_ssl = False
- if mail_port == 465:
- # SSL端口
- mail_use_ssl = True
- mail_use_tls = False
- elif mail_port == 587:
- # STARTTLS端口
- mail_use_tls = True
- mail_use_ssl = False
- else:
- # 普通SMTP端口
- mail_use_tls = False
- mail_use_ssl = False
- app.config.update(
- MAIL_SERVER=mail_server,
- MAIL_PORT=mail_port,
- MAIL_USERNAME=mail_username,
- MAIL_PASSWORD=mail_password,
- MAIL_USE_TLS=mail_use_tls,
- MAIL_USE_SSL=mail_use_ssl
- )
- try:
- mail.init_app(app)
- print(f"✅ 邮件模块已初始化,SERVER={mail_server} PORT={mail_port} TLS={mail_use_tls} SSL={mail_use_ssl}")
- if mail_use_tls and mail_use_ssl:
- print("⚠️ 警告:TLS和SSL同时启用,可能导致邮件发送失败")
- except Exception as e:
- print(f"❌ 邮件模块初始化失败: {e}")
- def check_expiring_contracts():
- with app.app_context():
- today = datetime.now().date()
- in_30_days = today + timedelta(days=30)
- # 查询合同
- expiring_contracts = Contract.query.filter(
- Contract.end_date <= in_30_days,
- Contract.end_date >= today,
- Contract.is_active == True
- ).all()
- expired_contracts = Contract.query.filter(
- Contract.end_date < today,
- Contract.is_active == True
- ).all()
- # 构建合同链接
- def get_contract_url(contract_id):
- try:
- return url_for('view_contract', contract_id=contract_id, _external=True)
- except RuntimeError:
- return f"http://127.0.0.1:8082/view_contract/{contract_id}"
- # 处理合同数据,计算剩余天数或逾期天数
- def build_contract_data(contract, expired=False):
- delta_days = (contract.end_date - today).days
- return {
- 'name': contract.name,
- 'contract_number': getattr(contract, 'contract_number', ''), # 改这里
- 'end_date': contract.end_date.strftime('%Y-%m-%d'),
- 'days_left': max(delta_days, 0),
- 'days_overdue': abs(min(delta_days, 0)),
- 'url': get_contract_url(contract.id)
- }
- expiring_data = [build_contract_data(c) for c in expiring_contracts]
- expired_data = [build_contract_data(c, expired=True) for c in expired_contracts]
- if not expiring_data and not expired_data:
- print("[定时任务] 无到期或即将到期合同。")
- return
- print(f"[定时任务] 已找到 {len(expiring_contracts)} 个即将到期合同,{len(expired_contracts)} 个已到期合同。")
- # 渲染邮件 HTML
- html_content = render_template_string("""
- <div style="font-family: Arial, sans-serif; max-width: 900px; margin: auto; padding: 20px;">
- <h2 style="color: #007BFF;">合同到期提醒 - 汇总</h2>
- <p>管理员您好,系统在 {{ now_date }} 检测到以下合同的到期情况:</p>
- {% if expiring %}
- <h3 style="color:#1a73e8;">📅 即将到期(30天内)</h3>
- <table style="width:100%; border-collapse: collapse; margin-bottom:18px;">
- <thead>
- <tr style="background:#f8f9fa;">
- <th style="padding:8px; border:1px solid #e3e6ea; text-align:left;">合同名称</th>
- <th style="padding:8px; border:1px solid #e3e6ea; text-align:left;">合同编号</th>
- <th style="padding:8px; border:1px solid #e3e6ea; text-align:left;">到期日期</th>
- <th style="padding:8px; border:1px solid #e3e6ea; text-align:left;">剩余天数</th>
- <th style="padding:8px; border:1px solid #e3e6ea;">操作</th>
- </tr>
- </thead>
- <tbody>
- {% for c in expiring %}
- <tr>
- <td style="padding:8px; border:1px solid #e3e6ea;">{{ c.name }}</td>
- <td style="padding:8px; border:1px solid #e3e6ea;">{{ c.contract_number }}</td>
- <td style="padding:8px; border:1px solid #e3e6ea;">{{ c.end_date }}</td>
- <td style="padding:8px; border:1px solid #e3e6ea;">{{ c.days_left }} 天</td>
- <td style="padding:8px; border:1px solid #e3e6ea;"><a href="{{ c.url }}" style="color:#1a73e8;">查看</a></td>
- </tr>
- {% endfor %}
- </tbody>
- </table>
- {% endif %}
- {% if expired %}
- <h3 style="color:#d93025;">⚠️ 已到期(请尽快处理)</h3>
- <table style="width:100%; border-collapse: collapse; margin-bottom:18px;">
- <thead>
- <tr style="background:#f8f9fa;">
- <th style="padding:8px; border:1px solid #e3e6ea; text-align:left;">合同名称</th>
- <th style="padding:8px; border:1px solid #e3e6ea; text-align:left;">合同编号</th>
- <th style="padding:8px; border:1px solid #e3e6ea; text-align:left;">到期日期</th>
- <th style="padding:8px; border:1px solid #e3e6ea;">逾期天数</th>
- <th style="padding:8px; border:1px solid #e3e6ea;">操作</th>
- </tr>
- </thead>
- <tbody>
- {% for c in expired %}
- <tr>
- <td style="padding:8px; border:1px solid #e3e6ea;">{{ c.name }}</td>
- <td style="padding:8px; border:1px solid #e3e6ea;">{{ c.contract_number }}</td>
- <td style="padding:8px; border:1px solid #e3e6ea;">{{ c.end_date }}</td>
- <td style="padding:8px; border:1px solid #e3e6ea;">{{ c.days_overdue }} 天</td>
- <td style="padding:8px; border:1px solid #e3e6ea;"><a href="{{ c.url }}" style="color:#d93025;">查看</a></td>
- </tr>
- {% endfor %}
- </tbody>
- </table>
- {% endif %}
- <p style="color:#666; font-size:13px;">此邮件由合同管理系统自动发送。如有问题请登录系统查看或联系系统管理员。<br>{{ now_full }}</p>
- </div>
- """, expiring=expiring_data, expired=expired_data,
- now_date=today.strftime("%Y-%m-%d"),
- now_full=datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
- # 邮件发送
- try:
- subject = f"【合同到期提醒】{today.strftime('%Y-%m-%d')} - 即将到期 {len(expiring_data)} / 已到期 {len(expired_data)}"
- sender = app.config.get('MAIL_USERNAME') or f"no-reply@{app.config.get('MAIL_SERVER') or 'local'}"
- # 从数据库获取管理员邮箱
- admin_users = User.query.filter_by(is_admin=True).all()
- recipients = [u.email for u in admin_users if u.email]
- if not recipients:
- print("⚠️ 没有找到管理员邮箱,邮件发送被跳过")
- return
- msg = Message(subject=subject,
- recipients=recipients,
- html=html_content,
- sender=sender)
- def _send():
- try:
- with app.app_context():
- mail.send(msg)
- print(
- f"✅ 已发送到期提醒邮件给 {recipients}(即将到期: {len(expiring_data)},已到期: {len(expired_data)})")
- except Exception as e:
- print(f"❌ 邮件发送失败: {e}")
- t = threading.Thread(target=_send, daemon=True)
- t.start()
- except Exception as e:
- print(f"❌ 构造或发送邮件时发生异常: {e}")
- app.logger.error(f"check_expiring_contracts error: {e}")
- # -----------------------------
- # APScheduler 调度(根据配置动态设置)
- # -----------------------------
- scheduler = BackgroundScheduler(timezone=pytz.timezone('Asia/Shanghai'))
- # -----------------------------
- # APScheduler 调度(根据配置动态设置)
- # -----------------------------
- scheduler = BackgroundScheduler(timezone=pytz.timezone('Asia/Shanghai'))
- def get_scheduler_config():
- """从系统配置获取定时任务设置"""
- # 确保在应用上下文中执行数据库查询
- with app.app_context():
- config = SystemConfig.query.first()
- if not config:
- # 默认值
- return {
- 'frequency': 'daily',
- 'weekdays': [],
- 'month_day': 1,
- 'hour': 9,
- 'minute': 0
- }
- return {
- 'frequency': config.scheduler_frequency or 'daily',
- 'weekdays': config.scheduler_weekdays or [],
- 'month_day': config.scheduler_month_day or 1,
- 'hour': config.scheduler_hour or 9,
- 'minute': config.scheduler_minute or 0
- }
- def create_scheduler_trigger():
- """根据配置创建触发器"""
- config = get_scheduler_config()
- if config['frequency'] == 'daily':
- # 每天执行
- return CronTrigger(
- hour=config['hour'],
- minute=config['minute'],
- timezone='Asia/Shanghai'
- )
- elif config['frequency'] == 'weekly':
- # 每周执行
- return CronTrigger(
- day_of_week=','.join(map(str, config['weekdays'])),
- hour=config['hour'],
- minute=config['minute'],
- timezone='Asia/Shanghai'
- )
- elif config['frequency'] == 'monthly':
- # 每月执行
- return CronTrigger(
- day=config['month_day'],
- hour=config['hour'],
- minute=config['minute'],
- timezone='Asia/Shanghai'
- )
- else:
- # 默认每天执行
- return CronTrigger(
- hour=9,
- minute=0,
- timezone='Asia/Shanghai'
- )
- # 添加定时任务
- scheduler.add_job(
- check_expiring_contracts,
- create_scheduler_trigger(),
- id='daily_contract_check',
- replace_existing=True
- )
- # 开始调度
- if not hasattr(app, 'scheduler_started'):
- try:
- scheduler.start()
- app.scheduler_started = True
- # 打印定时任务配置
- config = get_scheduler_config()
- if config['frequency'] == 'daily':
- print(f"⏰ Scheduler 已启动(每天 {config['hour']}:{config['minute']} 检查合同到期)")
- elif config['frequency'] == 'weekly':
- weekdays = ', '.join(
- [['周一', '周二', '周三', '周四', '周五', '周六', '周日'][int(d)] for d in config['weekdays']])
- print(f"⏰ Scheduler 已启动(每周 {weekdays} {config['hour']}:{config['minute']} 检查合同到期)")
- elif config['frequency'] == 'monthly':
- print(f"⏰ Scheduler 已启动(每月 {config['month_day']}号 {config['hour']}:{config['minute']} 检查合同到期)")
- except Exception as e:
- print(f"⚠️ Scheduler 启动时出错: {e}")
- # 程序退出时关闭调度器
- atexit.register(lambda: scheduler.shutdown(wait=False))
- # -----------------------------
- # 路由
- # -----------------------------
- @app.route('/')
- def index():
- if not SystemConfig.query.first():
- return redirect(url_for('initial_setup'))
- return redirect(url_for('login'))
- @app.route('/initial_setup', methods=['GET', 'POST'])
- def initial_setup():
- if SystemConfig.query.first():
- return redirect(url_for('index'))
- if request.method == 'POST':
- config = SystemConfig(
- site_name=request.form['site_name'],
- mail_server=request.form['mail_server'],
- mail_port=int(request.form['mail_port']),
- mail_username=request.form['mail_username'],
- mail_password=request.form['mail_password'],
- mail_use_tls='mail_use_tls' in request.form
- )
- db.session.add(config)
- admin = User(
- username=request.form['admin_username'],
- password=generate_password_hash(request.form['admin_password']),
- name=request.form['admin_name'],
- department=request.form['admin_department'],
- email=request.form['admin_email'],
- is_admin=True,
- can_create=True,
- can_view=True,
- can_edit=True,
- can_delete=True
- )
- db.session.add(admin)
- departments = ['行政部', '财务部', '法务部', '市场部']
- for dept in departments:
- db.session.add(Department(name=dept))
- entities = ['总公司', '分公司A', '分公司B']
- for entity in entities:
- db.session.add(CompanyEntity(name=entity))
- contract_types = ['采购合同', '销售合同', '服务合同', '租赁合同']
- for ct in contract_types:
- db.session.add(ContractType(name=ct))
- db.session.commit()
- flash('系统初始化成功,请使用管理员账号登录', 'success')
- return redirect(url_for('login'))
- return render_template('initial_setup.html')
- # -----------------------------
- # 登录/登出
- # -----------------------------
- @app.route('/login', methods=['GET', 'POST'])
- def login():
- if current_user.is_authenticated:
- return redirect(url_for('dashboard'))
- if request.method == 'POST':
- username = request.form['username']
- password = request.form['password']
- user = User.query.filter_by(username=username).first()
- if user and check_password_hash(user.password, password):
- login_user(user)
- next_page = request.args.get('next')
- return redirect(next_page or url_for('dashboard'))
- else:
- flash('用户名或密码错误', 'danger')
- return render_template('login.html')
- @app.route('/logout')
- @login_required
- def logout():
- logout_user()
- return redirect(url_for('login'))
- # -----------------------------
- # 仪表盘
- # -----------------------------
- @app.route('/dashboard')
- @login_required
- def dashboard():
- # 使用东八区时间
- tz = timezone('Asia/Shanghai')
- today = datetime.now(tz).date()
- # 使用多个单独的查询来统计,避免复杂的SQL CASE语句
- total = Contract.query.count()
- active = Contract.query.filter(Contract.is_active == True).count()
- expiring = Contract.query.filter(
- Contract.end_date.between(today, today + timedelta(days=30)),
- Contract.is_active == True
- ).count()
- expired_already = Contract.query.filter(
- Contract.end_date < today,
- Contract.is_active == True
- ).count()
- uncollected = Contract.query.filter(
- Contract.collected_date.is_(None),
- Contract.is_active == True
- ).count()
- terminated = Contract.query.filter(Contract.is_active == False).count()
- stats = {
- 'total': total,
- 'active': active,
- 'expiring': expiring,
- 'expired_already': expired_already,
- 'uncollected': uncollected,
- 'terminated': terminated
- }
- # 即将到期合同(30天内)- 只包含激活状态的合同
- expiring_soon = Contract.query.filter(
- Contract.end_date.between(today, today + timedelta(days=30)),
- Contract.is_active == True
- ).order_by(Contract.end_date).limit(5).all()
- # 已到期合同 - 只包含激活状态的合同
- expired_contracts = Contract.query.filter(
- Contract.end_date < today,
- Contract.is_active == True
- ).order_by(Contract.end_date.desc()).limit(5).all()
- # 已终止合同 - 状态为不激活
- terminated_contracts = Contract.query.filter(
- Contract.is_active == False
- ).order_by(Contract.end_date.desc()).limit(5).all()
- # 最近添加的合同
- recent_contracts = Contract.query.order_by(Contract.created_at.desc()).limit(5).all()
- # 待收回合同 - 只包含激活状态的合同
- uncollected_contracts = Contract.query.filter(
- Contract.collected_date.is_(None),
- Contract.is_active == True
- ).order_by(Contract.end_date.desc()).limit(5).all()
- # 公司证件类合同 - 只包含激活状态的合同
- company_docs = Contract.query.join(ContractType).filter(
- ContractType.name == "公司证件",
- Contract.is_active == True
- ).order_by(Contract.end_date.desc()).limit(5).all()
- # 公用模板合同 - 只包含激活状态的合同
- public_templates = Contract.query.join(ContractType).filter(
- ContractType.name == "公用模板",
- Contract.is_active == True
- ).order_by(Contract.created_at.desc()).limit(5).all()
- return render_template(
- 'dashboard.html',
- stats=stats,
- expiring_soon=expiring_soon,
- expired_contracts=expired_contracts,
- terminated_contracts=terminated_contracts,
- recent_contracts=recent_contracts,
- uncollected_contracts=uncollected_contracts,
- company_docs=company_docs,
- public_templates=public_templates,
- today=today
- )
- @app.route('/contracts')
- @login_required # 添加登录要求
- def contract_list():
- if not current_user.can_view:
- flash('您没有查看合同的权限', 'danger')
- return redirect(url_for('dashboard'))
- # 获取 type 参数
- type_filter = request.args.get('type')
- today = datetime.now().date()
- if type_filter == 'all':
- # 全部合同
- contracts = Contract.query.order_by(Contract.end_date.desc()).all()
- elif type_filter == 'active':
- # 有效合同
- contracts = Contract.query.filter_by(is_active=True).order_by(Contract.end_date.desc()).all()
- elif type_filter == 'expiring':
- # 即将到期合同(30天内)
- end_date_limit = today + timedelta(days=30)
- contracts = Contract.query.filter(
- Contract.end_date >= today,
- Contract.end_date <= end_date_limit,
- Contract.is_active == True
- ).order_by(Contract.end_date.asc()).all()
- elif type_filter == 'expired':
- # 已到期合同
- contracts = Contract.query.filter(
- Contract.end_date < today,
- Contract.is_active == True
- ).order_by(Contract.end_date.desc()).all()
- elif type_filter == 'uncollected':
- # 待收回合同
- contracts = Contract.query.filter(
- Contract.collected_date.is_(None),
- Contract.is_active == True
- ).order_by(Contract.end_date.desc()).all()
- elif type_filter == 'terminated':
- # 已终止合同
- contracts = Contract.query.filter_by(is_active=False).order_by(Contract.end_date.desc()).all()
- elif type_filter == 'recent':
- # 最近添加的合同
- contracts = Contract.query.order_by(Contract.created_at.desc()).limit(10).all()
- elif type_filter == 'company_doc':
- # 公司证件
- contracts = Contract.query.join(ContractType).filter(
- ContractType.name == '公司证件',
- Contract.is_active == True
- ).order_by(Contract.end_date.desc()).all()
- elif type_filter == 'template':
- # 公用模板
- contracts = Contract.query.join(ContractType).filter(
- ContractType.name == '公用模板',
- Contract.is_active == True
- ).order_by(Contract.created_at.desc()).all()
- else:
- # 默认:全部合同
- contracts = Contract.query.order_by(Contract.end_date.desc()).all()
- # 准备合同数据
- contracts_data = []
- for contract in contracts:
- attachments = [
- {
- 'id': att.id,
- 'filename': att.filename,
- 'url': url_for('view_attachment', attachment_id=att.id),
- 'download_url': url_for('download_file', attachment_id=att.id)
- } for att in contract.attachments
- ]
- contracts_data.append({
- 'contract': contract,
- 'attachments': attachments
- })
- return render_template('contract_list.html', contracts=contracts_data)
- # -----------------------------
- # 创建合同
- # -----------------------------
- @app.route('/contract/create', methods=['GET', 'POST'])
- @login_required
- def create_contract():
- if not current_user.can_create:
- flash('您没有创建合同的权限', 'danger')
- return redirect(url_for('contract_list'))
- contract_types = ContractType.query.all()
- company_entities = CompanyEntity.query.all()
- default_number = ""
- if contract_types:
- # 默认使用第一个合同类型生成编号
- default_number = generate_contract_number_by_type(contract_types[0].id)
- if request.method == 'POST':
- try:
- contract_number = request.form['contract_number'] or default_number
- contract = Contract(
- contract_number=contract_number,
- name=request.form['name'],
- type_id=request.form['type_id'],
- company_entity_id=request.form['company_entity_id'],
- signer_id=current_user.id,
- start_date=datetime.strptime(request.form['start_date'], '%Y-%m-%d').date(),
- end_date=datetime.strptime(request.form['end_date'], '%Y-%m-%d').date(),
- remind_before=int(request.form['remind_before']) if request.form['remind_before'] else None,
- notes=request.form['notes'],
- creator_id=current_user.id,
- signing_method=request.form.get('signing_method'),
- collected_date=datetime.strptime(request.form['collected_date'], '%Y-%m-%d').date()
- if request.form.get('collected_date') else None,
- storage_box=request.form.get('storage_box')
- )
- db.session.add(contract)
- db.session.flush() # 获取合同ID以便添加附件
- # 保存合同方
- counterparties = request.form.getlist('counterparty_name')
- for cp in counterparties:
- if cp.strip():
- db.session.add(Counterparty(name=cp.strip(), contract_id=contract.id))
- # 保存初始版本
- save_version(contract, current_user.id)
- # ------------------------------
- # 处理附件上传 - 使用UUID生成文件名
- files = request.files.getlist('new_attachments') # 注意字段名改为 new_attachments
- upload_folder = os.path.join(app.config['UPLOAD_FOLDER'], 'contracts')
- os.makedirs(upload_folder, exist_ok=True)
- # 文件类型白名单
- ALLOWED_EXTENSIONS = {'pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp',
- 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx'}
- MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
- for file in files:
- if file.filename == '':
- continue
- # 验证文件扩展名
- filename = file.filename
- ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
- if ext not in ALLOWED_EXTENSIONS:
- flash(f'文件 {filename} 类型不被允许', 'warning')
- continue
- # 验证文件大小
- file.seek(0, os.SEEK_END)
- file_size = file.tell()
- file.seek(0)
- if file_size > MAX_FILE_SIZE:
- flash(f'文件 {filename} 超过5MB大小限制', 'warning')
- continue
- # 使用UUID生成唯一文件名
- unique_id = uuid.uuid4().hex
- safe_filename = secure_filename(f"{unique_id}_{filename}")
- filepath = os.path.join(upload_folder, safe_filename)
- try:
- file.save(filepath)
- except Exception as e:
- flash(f'保存文件 {filename} 失败: {str(e)}', 'danger')
- continue
- # 创建附件记录
- attachment = ContractAttachment(
- contract_id=contract.id,
- filename=filename,
- filepath=filepath
- )
- db.session.add(attachment)
- db.session.commit()
- flash('合同创建成功', 'success')
- return redirect(url_for('view_contract', contract_id=contract.id))
- except Exception as e:
- db.session.rollback()
- flash(f'创建合同失败: {str(e)}', 'danger')
- app.logger.error(f'Contract creation failed: {str(e)}')
- return render_template('contract_form.html',
- contract_types=contract_types,
- company_entities=company_entities,
- default_number=default_number)
- @app.route('/contract/generate_number/<int:type_id>')
- @login_required
- def generate_number(type_id):
- """根据合同类型生成编号(线程安全)"""
- ctype = ContractType.query.get(type_id)
- if not ctype:
- return jsonify({"number": "HT-XXXX"})
- type_code = ctype.prefix or ctype.code or f"HT{ctype.id}"
- year = datetime.now().year
- prefix = f"{type_code}-{year}-"
- try:
- # 事务锁定,避免并发重复
- last_number = db.session.query(
- func.max(func.substr(Contract.contract_number, -4, 4).cast(db.Integer))
- ).filter(
- Contract.contract_number.like(f"{prefix}%")
- ).scalar() or 0
- new_number = last_number + 1
- return jsonify({"number": f"{prefix}{str(new_number).zfill(4)}"})
- except SQLAlchemyError as e:
- db.session.rollback()
- return jsonify({"number": f"{prefix}0001"})
- @app.route('/contract/<int:contract_id>')
- @login_required
- def view_contract(contract_id):
- contract = Contract.query.get_or_404(contract_id)
- if not current_user.can_view:
- flash('您没有查看合同的权限', 'danger')
- return redirect(url_for('contract_list'))
- # 获取合同方
- counterparties = Counterparty.query.filter_by(contract_id=contract_id).all()
- # 获取附件,并为前端预览添加 url/download_url
- attachments = ContractAttachment.query.filter_by(contract_id=contract_id).all()
- for att in attachments:
- att.url = url_for('view_attachment', attachment_id=att.id) # 浏览器直接预览
- att.download_url = url_for('download_file', attachment_id=att.id) # 下载用
- return render_template(
- 'contract_view.html',
- contract=contract,
- counterparties=counterparties,
- attachments=attachments
- )
- @app.route('/attachments/<int:attachment_id>')
- @login_required
- def view_attachment(attachment_id):
- attachment = ContractAttachment.query.get_or_404(attachment_id)
- filename = os.path.basename(attachment.filepath)
- upload_folder = os.path.join(app.config['UPLOAD_FOLDER'], 'contracts')
- full_path = os.path.join(upload_folder, filename)
- if not os.path.exists(full_path):
- abort(404)
- ext = os.path.splitext(filename)[1].lower()
- if ext in ['.pdf', '.jpg', '.jpeg', '.png', '.gif']:
- # 直接浏览器预览
- return send_from_directory(upload_folder, filename)
- else:
- # 其他文件下载
- return send_from_directory(upload_folder, filename, as_attachment=True)
- # -----------------------------
- # 编辑合同
- # -----------------------------
- @app.route('/contract/edit/<int:contract_id>', methods=['GET', 'POST'])
- @login_required
- def edit_contract(contract_id):
- contract = Contract.query.get_or_404(contract_id)
- if not current_user.can_edit:
- flash('您没有编辑合同的权限', 'danger')
- return redirect(url_for('view_contract', contract_id=contract.id))
- if request.method == 'POST':
- try:
- # 保存历史版本
- save_version(contract, current_user.id)
- # 更新合同字段
- contract.contract_number = request.form['contract_number']
- contract.name = request.form['name']
- contract.type_id = request.form['type_id']
- contract.company_entity_id = request.form['company_entity_id']
- contract.start_date = datetime.strptime(request.form['start_date'], '%Y-%m-%d').date()
- contract.end_date = datetime.strptime(request.form['end_date'], '%Y-%m-%d').date()
- contract.remind_before = int(request.form['remind_before']) if request.form['remind_before'] else None
- contract.notes = request.form['notes']
- contract.signing_method = request.form.get('signing_method')
- contract.collected_date = datetime.strptime(request.form['collected_date'],
- '%Y-%m-%d').date() if request.form.get(
- 'collected_date') else None
- contract.storage_box = request.form.get('storage_box')
- # 更新合同方
- Counterparty.query.filter_by(contract_id=contract.id).delete()
- counterparties = request.form.getlist('counterparty_name')
- for cp in counterparties:
- if cp.strip():
- db.session.add(Counterparty(name=cp.strip(), contract_id=contract.id))
- # 删除选中的旧附件
- delete_ids = request.form.getlist('delete_attachment')
- for att_id in delete_ids:
- attachment = ContractAttachment.query.get(att_id)
- if attachment and attachment.contract_id == contract.id:
- try:
- if os.path.exists(attachment.filepath):
- os.remove(attachment.filepath)
- except Exception as e:
- app.logger.error(f"删除附件文件失败: {attachment.filepath}, 错误: {str(e)}")
- db.session.delete(attachment)
- # 上传新附件 - 关键修复点
- files = request.files.getlist('new_attachments')
- upload_folder = os.path.join(app.config['UPLOAD_FOLDER'], 'contracts')
- os.makedirs(upload_folder, exist_ok=True)
- # 文件类型白名单
- ALLOWED_EXTENSIONS = {'pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp',
- 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx'}
- MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
- for file in files:
- if file.filename == '':
- continue
- # 验证文件扩展名
- filename = file.filename
- ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
- if ext not in ALLOWED_EXTENSIONS:
- flash(f'文件 {filename} 类型不被允许', 'warning')
- continue
- # 验证文件大小
- file.seek(0, os.SEEK_END)
- file_size = file.tell()
- file.seek(0)
- if file_size > MAX_FILE_SIZE:
- flash(f'文件 {filename} 超过5MB大小限制', 'warning')
- continue
- # 使用UUID生成唯一文件名
- unique_id = uuid.uuid4().hex
- safe_filename = secure_filename(f"{unique_id}_{filename}")
- filepath = os.path.join(upload_folder, safe_filename)
- try:
- file.save(filepath)
- except Exception as e:
- flash(f'保存文件 {filename} 失败: {str(e)}', 'danger')
- continue
- # 创建附件记录
- new_attachment = ContractAttachment(
- contract_id=contract.id,
- filename=filename, # 原始文件名
- filepath=filepath # 服务器存储路径
- )
- db.session.add(new_attachment)
- db.session.commit()
- flash('合同更新成功', 'success')
- return redirect(url_for('view_contract', contract_id=contract.id))
- except Exception as e:
- db.session.rollback()
- flash(f'更新合同失败: {str(e)}', 'danger')
- app.logger.error(f'Contract update failed: {str(e)}')
- # 渲染编辑页面
- contract_types = ContractType.query.all()
- company_entities = CompanyEntity.query.all()
- counterparties = Counterparty.query.filter_by(contract_id=contract.id).all()
- attachments = ContractAttachment.query.filter_by(contract_id=contract.id).all()
- return render_template(
- 'contract_form.html',
- contract=contract,
- contract_types=contract_types,
- company_entities=company_entities,
- counterparties=counterparties,
- attachments=attachments
- )
- def save_version(contract, user_id):
- """Save current contract state as a version"""
- data = {
- 'contract_number': contract.contract_number,
- 'name': contract.name,
- 'type_id': contract.type_id,
- 'company_entity_id': contract.company_entity_id,
- 'start_date': str(contract.start_date),
- 'end_date': str(contract.end_date),
- 'remind_before': contract.remind_before,
- 'notes': contract.notes,
- 'counterparties': [cp.name for cp in contract.counterparties],
- # 新增字段
- 'signing_method': contract.signing_method,
- 'collected_date': str(contract.collected_date) if contract.collected_date else None,
- 'storage_box': contract.storage_box,
- 'is_active': contract.is_active # 添加合同状态
- }
- max_version = db.session.query(db.func.max(ContractVersion.version)).filter_by(
- contract_id=contract.id).scalar() or 0
- version = ContractVersion(
- contract_id=contract.id,
- version=max_version + 1,
- data=json.dumps(data, ensure_ascii=False, indent=2),
- modified_by=user_id
- )
- db.session.add(version)
- @app.route('/contract/delete/<int:contract_id>', methods=['POST'])
- @login_required
- def delete_contract(contract_id):
- contract = Contract.query.get_or_404(contract_id)
- if not current_user.can_delete:
- flash('您没有删除合同的权限', 'danger')
- return redirect(url_for('view_contract', contract_id=contract_id))
- try:
- contract.is_active = False
- db.session.commit()
- flash('合同已标记为终止', 'success')
- except Exception as e:
- db.session.rollback()
- flash(f'终止合同失败: {str(e)}', 'danger')
- app.logger.error(f'Contract termination failed: {str(e)}')
- return redirect(url_for('contract_list'))
- @app.route('/contract/renew/<int:contract_id>', methods=['GET', 'POST'])
- @login_required
- def renew_contract(contract_id):
- original_contract = Contract.query.get_or_404(contract_id)
- if not current_user.can_create:
- flash('您没有创建合同的权限', 'danger')
- return redirect(url_for('view_contract', contract_id=contract_id))
- if request.method == 'POST':
- try:
- # 自动生成续签合同编号
- existing_renewals = Contract.query.filter(
- Contract.contract_number.like(f"{original_contract.contract_number}-续签%")
- ).all()
- renewal_number = len(existing_renewals) + 1
- new_contract_number = f"{original_contract.contract_number}-续签{renewal_number:02d}"
- contract = Contract(
- contract_number=new_contract_number,
- name=request.form['name'],
- type_id=request.form['type_id'],
- company_entity_id=request.form['company_entity_id'],
- signer_id=current_user.id,
- start_date=datetime.strptime(request.form['start_date'], '%Y-%m-%d').date(),
- end_date=datetime.strptime(request.form['end_date'], '%Y-%m-%d').date(),
- remind_before=int(request.form['remind_before']) if request.form['remind_before'] else None,
- notes=request.form['notes'],
- creator_id=current_user.id,
- original_contract_id=original_contract.id,
- # 新增字段(可以默认继承原合同)
- signing_method=original_contract.signing_method,
- collected_date=original_contract.collected_date,
- storage_box=original_contract.storage_box
- )
- db.session.add(contract)
- db.session.commit()
- # 复制合同方
- counterparties = Counterparty.query.filter_by(contract_id=original_contract.id).all()
- for cp in counterparties:
- db.session.add(Counterparty(name=cp.name, contract_id=contract.id))
- # 保存初始版本
- save_version(contract, current_user.id)
- # 处理附件上传 - 使用UUID生成文件名
- files = request.files.getlist('new_attachments') # 注意字段名改为 new_attachments
- upload_folder = os.path.join(app.config['UPLOAD_FOLDER'], 'contracts')
- os.makedirs(upload_folder, exist_ok=True)
- # 文件类型白名单
- ALLOWED_EXTENSIONS = {'pdf', 'jpg', 'jpeg', 'png', 'gif', 'bmp', 'tiff', 'webp',
- 'doc', 'docx', 'xls', 'xlsx', 'ppt', 'pptx'}
- MAX_FILE_SIZE = 5 * 1024 * 1024 # 5MB
- for file in files:
- if file.filename == '':
- continue
- # 验证文件扩展名
- filename = file.filename
- ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
- if ext not in ALLOWED_EXTENSIONS:
- flash(f'文件 {filename} 类型不被允许', 'warning')
- continue
- # 验证文件大小
- file.seek(0, os.SEEK_END)
- file_size = file.tell()
- file.seek(0)
- if file_size > MAX_FILE_SIZE:
- flash(f'文件 {filename} 超过5MB大小限制', 'warning')
- continue
- # 使用UUID生成唯一文件名
- unique_id = uuid.uuid4().hex
- safe_filename = secure_filename(f"{unique_id}_{filename}")
- filepath = os.path.join(upload_folder, safe_filename)
- try:
- file.save(filepath)
- except Exception as e:
- flash(f'保存文件 {filename} 失败: {str(e)}', 'danger')
- continue
- # 创建附件记录
- attachment = ContractAttachment(
- contract_id=contract.id,
- filename=filename,
- filepath=filepath
- )
- db.session.add(attachment)
- db.session.commit()
- flash('合同续签成功', 'success')
- return redirect(url_for('view_contract', contract_id=contract.id))
- except Exception as e:
- db.session.rollback()
- flash(f'续签合同失败: {str(e)}', 'danger')
- app.logger.error(f'Contract renewal failed: {str(e)}')
- # GET 请求预填充
- contract_types = ContractType.query.all()
- company_entities = CompanyEntity.query.all()
- counterparties = Counterparty.query.filter_by(contract_id=contract_id).all()
- new_start_date = original_contract.end_date + timedelta(days=1)
- new_end_date = new_start_date + timedelta(days=365)
- return render_template('contract_form.html',
- contract=original_contract,
- contract_types=contract_types,
- company_entities=company_entities,
- counterparties=counterparties,
- renew=True,
- new_start_date=new_start_date.strftime('%Y-%m-%d'),
- new_end_date=new_end_date.strftime('%Y-%m-%d'))
- @app.route('/contract/<int:contract_id>/versions')
- @login_required
- def contract_versions(contract_id):
- contract = Contract.query.get_or_404(contract_id)
- # 相关合同 IDs
- contract_ids = set([contract.id])
- if contract.original_contract_id:
- contract_ids.add(contract.original_contract_id)
- # 加上续签合同
- renewals = Contract.query.filter(Contract.original_contract_id == contract.id).all()
- contract_ids.update([c.id for c in renewals])
- # 查询所有版本
- versions = ContractVersion.query.filter(
- ContractVersion.contract_id.in_(contract_ids)
- ).join(Contract).order_by(
- Contract.contract_number.asc(),
- ContractVersion.version.desc()
- ).all()
- # 生成 parsed_versions
- parsed_versions = []
- for v in versions:
- try:
- data_dict = json.loads(v.data)
- except Exception as e:
- data_dict = {"error": f"解析失败: {str(e)}"}
- parsed_versions.append({
- 'version_obj': v,
- 'data': data_dict,
- 'belongs_to': v.contract
- })
- # ====== 调试输出 ======
- print("=== 调试: parsed_versions 内容 ===")
- for idx, pv in enumerate(parsed_versions):
- print(f"{idx+1}. 合同编号: {pv['belongs_to'].contract_number}, 版本: {pv['version_obj'].version}, 修改人: {pv['version_obj'].modifier.name}")
- print(f" 数据: {pv['data']}")
- print("=== 调试结束 ===")
- contract_types = {t.id: t.name for t in ContractType.query.all()}
- company_entities = {e.id: e.name for e in CompanyEntity.query.all()}
- return render_template(
- 'contract_versions.html',
- contract=contract,
- parsed_versions=parsed_versions,
- contract_types=contract_types,
- company_entities=company_entities
- )
- @app.route('/contract/attachment/delete/<int:attachment_id>', methods=['POST'])
- @login_required
- def delete_attachment(attachment_id):
- attachment = ContractAttachment.query.get_or_404(attachment_id)
- contract_id = attachment.contract_id
- if not current_user.can_edit:
- flash('您没有编辑合同的权限', 'danger')
- return redirect(url_for('view_contract', contract_id=contract_id))
- try:
- # Delete file from filesystem
- if os.path.exists(attachment.filepath):
- os.remove(attachment.filepath)
- # Delete record from database
- db.session.delete(attachment)
- db.session.commit()
- flash('附件已删除', 'success')
- except Exception as e:
- db.session.rollback()
- flash(f'删除附件失败: {str(e)}', 'danger')
- app.logger.error(f'Attachment deletion failed: {str(e)}')
- return redirect(url_for('view_contract', contract_id=contract_id))
- # -----------------------------
- # 下载附件
- # -----------------------------
- @app.route('/download/<int:attachment_id>')
- @login_required
- def download_file(attachment_id):
- # 获取附件对象
- attachment = ContractAttachment.query.get_or_404(attachment_id)
- # 构建文件路径
- filename = os.path.basename(attachment.filepath)
- upload_folder = os.path.join(app.config['UPLOAD_FOLDER'], 'contracts')
- file_path = os.path.join(upload_folder, filename)
- # 检查文件是否存在
- if not os.path.exists(file_path):
- abort(404)
- # 使用 download_name 参数设置用户友好的文件名
- return send_file(
- file_path,
- as_attachment=True,
- download_name=attachment.filename
- )
- @app.route('/users')
- @login_required
- def user_list():
- if not current_user.is_admin:
- flash('您没有权限访问此页面', 'danger')
- return redirect(url_for('dashboard'))
- users = User.query.order_by(User.is_admin.desc(), User.username).all()
- return render_template('user_list.html', users=users)
- @app.route('/user/create', methods=['GET', 'POST'])
- @login_required
- def create_user():
- if not current_user.is_admin:
- flash('您没有权限访问此页面', 'danger')
- return redirect(url_for('dashboard'))
- if request.method == 'POST':
- try:
- user = User(
- username=request.form['username'],
- password=generate_password_hash(request.form['password']),
- name=request.form['name'],
- department=request.form['department'],
- email=request.form['email'],
- is_admin='is_admin' in request.form,
- can_create='can_create' in request.form,
- can_view='can_view' in request.form,
- can_edit='can_edit' in request.form,
- can_delete='can_delete' in request.form
- )
- db.session.add(user)
- db.session.commit()
- flash('用户创建成功', 'success')
- return redirect(url_for('user_list'))
- except Exception as e:
- db.session.rollback()
- flash(f'创建用户失败: {str(e)}', 'danger')
- app.logger.error(f'User creation failed: {str(e)}')
- return render_template('user_form.html')
- @app.route('/user/edit/<int:user_id>', methods=['GET', 'POST'])
- @login_required
- def edit_user(user_id):
- if not current_user.is_admin:
- flash('您没有权限访问此页面', 'danger')
- return redirect(url_for('dashboard'))
- user = User.query.get_or_404(user_id)
- if request.method == 'POST':
- try:
- user.username = request.form['username']
- if request.form['password']:
- user.password = generate_password_hash(request.form['password'])
- user.name = request.form['name']
- user.department = request.form['department']
- user.email = request.form['email']
- user.is_admin = 'is_admin' in request.form
- user.can_create = 'can_create' in request.form
- user.can_view = 'can_view' in request.form
- user.can_edit = 'can_edit' in request.form
- user.can_delete = 'can_delete' in request.form
- db.session.commit()
- flash('用户信息更新成功', 'success')
- return redirect(url_for('user_list'))
- except Exception as e:
- db.session.rollback()
- flash(f'更新用户信息失败: {str(e)}', 'danger')
- app.logger.error(f'User update failed: {str(e)}')
- return render_template('user_form.html', user=user)
- @app.route('/user/delete/<int:user_id>', methods=['POST'])
- @login_required
- def delete_user(user_id):
- if not current_user.is_admin:
- flash('您没有权限访问此页面', 'danger')
- return redirect(url_for('dashboard'))
- if current_user.id == user_id:
- flash('不能删除当前登录的用户', 'danger')
- return redirect(url_for('user_list'))
- user = User.query.get_or_404(user_id)
- try:
- db.session.delete(user)
- db.session.commit()
- flash('用户删除成功', 'success')
- except Exception as e:
- db.session.rollback()
- flash(f'删除用户失败: {str(e)}', 'danger')
- app.logger.error(f'User deletion failed: {str(e)}')
- return redirect(url_for('user_list'))
- @app.route('/system/config', methods=['GET', 'POST'])
- @login_required
- def system_config():
- if not current_user.is_admin:
- flash('您没有权限访问此页面', 'danger')
- return redirect(url_for('dashboard'))
- config = SystemConfig.query.first()
- if request.method == 'POST':
- config.site_name = request.form['site_name']
- config.mail_server = request.form['mail_server']
- config.mail_port = int(request.form['mail_port'])
- config.mail_username = request.form['mail_username']
- config.mail_password = request.form['mail_password']
- config.mail_use_tls = 'mail_use_tls' in request.form
- db.session.commit()
- flash('系统配置更新成功', 'success')
- return redirect(url_for('system_config'))
- departments = Department.query.all()
- entities = CompanyEntity.query.all()
- contract_types = ContractType.query.all()
- return render_template('system_config.html', config=config,
- departments=departments,
- entities=entities,
- contract_types=contract_types)
- @app.route('/system/config/department/add', methods=['POST'])
- @login_required
- def add_department():
- data = request.get_json()
- dept = Department(name=data['name'])
- db.session.add(dept)
- db.session.commit()
- return '', 204
- @app.route('/system/config/entity/add', methods=['POST'])
- @login_required
- def add_entity():
- data = request.get_json()
- entity = CompanyEntity(name=data['name'])
- db.session.add(entity)
- db.session.commit()
- return '', 204
- @app.route('/system/config/type/add', methods=['POST'])
- @login_required
- def add_contract_type():
- data = request.get_json()
- ct = ContractType(name=data['name'])
- db.session.add(ct)
- db.session.commit()
- return '', 204
- @app.route('/system/config/<string:type>/delete/<int:id>', methods=['POST'])
- @login_required
- def delete_config_item(type, id):
- model = {'department': Department, 'entity': CompanyEntity, 'type': ContractType}[type]
- item = model.query.get_or_404(id)
- db.session.delete(item)
- db.session.commit()
- return '', 204
- @app.route('/system/config/scheduler', methods=['POST'])
- @login_required
- def save_scheduler_config():
- if not current_user.is_admin:
- return jsonify({'error': '无权访问'}), 403
- data = request.get_json()
- # 确保在应用上下文中执行数据库操作
- with app.app_context():
- config = SystemConfig.query.first()
- if not config:
- return jsonify({'error': '系统配置未初始化'}), 400
- try:
- config.scheduler_frequency = data.get('frequency', 'daily')
- config.scheduler_weekdays = data.get('weekdays', [])
- config.scheduler_month_day = data.get('month_day', 1)
- config.scheduler_hour = data.get('hour', 9)
- config.scheduler_minute = data.get('minute', 0)
- db.session.commit()
- # 更新定时任务
- scheduler.reschedule_job(
- 'daily_contract_check',
- trigger=create_scheduler_trigger()
- )
- return jsonify({'status': 'success'})
- except Exception as e:
- db.session.rollback()
- return jsonify({'error': str(e)}), 500
- def update_scheduler_job(config):
- """根据配置更新定时任务"""
- scheduler.remove_job('daily_contract_check')
- if config.scheduler_frequency == 'daily':
- # 每天执行
- scheduler.add_job(
- check_expiring_contracts,
- CronTrigger(
- hour=config.scheduler_hour,
- minute=config.scheduler_minute,
- timezone='Asia/Shanghai'
- ),
- id='daily_contract_check'
- )
- elif config.scheduler_frequency == 'weekly':
- # 每周执行
- scheduler.add_job(
- check_expiring_contracts,
- CronTrigger(
- day_of_week=','.join(map(str, config.scheduler_weekdays)),
- hour=config.scheduler_hour,
- minute=config.scheduler_minute,
- timezone='Asia/Shanghai'
- ),
- id='daily_contract_check'
- )
- elif config.scheduler_frequency == 'monthly':
- # 每月执行
- scheduler.add_job(
- check_expiring_contracts,
- CronTrigger(
- day=config.scheduler_month_day,
- hour=config.scheduler_hour,
- minute=config.scheduler_minute,
- timezone='Asia/Shanghai'
- ),
- id='daily_contract_check'
- )
- print(f"⏰ 定时任务已更新: {config.scheduler_frequency} {config.scheduler_hour}:{config.scheduler_minute}")
- @app.route('/api/contracts/expiring')
- @login_required
- def api_expiring_contracts():
- if not current_user.can_view:
- return jsonify({'error': '无权访问'}), 403
- days = request.args.get('days', default=30, type=int)
- today = datetime.now().date() # 改用本地时间
- end_date = today + timedelta(days=days)
- contracts = Contract.query.filter(
- Contract.end_date <= end_date,
- Contract.end_date >= today,
- Contract.is_active == True
- ).order_by(Contract.end_date).all()
- result = [{
- 'id': c.id,
- 'name': c.name,
- 'contract_number': c.contract_number,
- 'end_date': c.end_date.strftime('%Y-%m-%d'),
- 'days_left': (c.end_date - today).days,
- 'url': url_for('view_contract', contract_id=c.id)
- } for c in contracts]
- return jsonify(result)
- @app.errorhandler(404)
- def page_not_found(e):
- return render_template('404.html'), 404
- @app.errorhandler(403)
- def forbidden(e):
- return render_template('403.html'), 403
- @app.errorhandler(500)
- def internal_server_error(e):
- db.session.rollback()
- return render_template('500.html'), 500
- @app.route('/uploads/icons/<filename>')
- def uploaded_icon(filename):
- icon_dir = os.path.join(app.instance_path, 'uploads', 'icons')
- return send_from_directory(icon_dir, filename)
- @app.context_processor
- def inject_config():
- # 从数据库获取配置,假设你有 Config 模型
- config = current_app.config.get('SITE_CONFIG') # 或者从数据库取
- return dict(config=config)
- @app.context_processor
- def inject_now():
- # 返回当前时间戳,避免浏览器缓存旧图标
- return {'now_timestamp': int(datetime.utcnow().timestamp())}
- @app.route('/profile', methods=['GET', 'POST'])#个人资料
- @login_required
- def profile():
- if request.method == 'POST':
- try:
- current_user.name = request.form['name']
- current_user.department = request.form['department']
- current_user.email = request.form['email']
- if request.form['password']:
- current_user.password = generate_password_hash(request.form['password'])
- db.session.commit()
- flash('个人资料更新成功', 'success')
- except Exception as e:
- db.session.rollback()
- flash(f'更新失败: {str(e)}', 'danger')
- return render_template('profile.html', user=current_user)
- @app.route('/contract/delete/permanent/<int:contract_id>', methods=['POST'])
- @login_required
- def delete_contract_permanent(contract_id):
- contract = Contract.query.get_or_404(contract_id)
- if not current_user.can_delete:
- flash('您没有删除合同的权限', 'danger')
- return redirect(url_for('view_contract', contract_id=contract_id))
- try:
- # 删除附件文件
- for attachment in contract.attachments:
- if os.path.exists(attachment.filepath):
- os.remove(attachment.filepath)
- db.session.delete(attachment)
- # 删除合同方
- for cp in contract.counterparties:
- db.session.delete(cp)
- # 删除合同版本
- for version in contract.versions:
- db.session.delete(version)
- db.session.delete(contract)
- db.session.commit()
- flash('合同已彻底删除', 'success')
- return redirect(url_for('contract_list'))
- except Exception as e:
- db.session.rollback()
- flash(f'删除合同失败: {str(e)}', 'danger')
- app.logger.error(f'Permanent contract deletion failed: {str(e)}')
- return redirect(url_for('view_contract', contract_id=contract_id))
- @app.route('/api/contracts/uncollected')
- @login_required
- def api_uncollected_contracts():
- if not current_user.can_view:
- return jsonify({'error': '无权访问'}), 403
- contracts = Contract.query.filter(
- Contract.collected_date.is_(None)
- ).order_by(Contract.end_date.desc()).all()
- result = [{
- 'id': c.id,
- 'name': c.name,
- 'contract_number': c.contract_number,
- 'end_date': c.end_date.strftime('%Y-%m-%d') if c.end_date else None,
- 'url': url_for('view_contract', contract_id=c.id)
- } for c in contracts]
- return jsonify(result)
- # 设置合同类型前缀
- @app.route('/system/config/type/set_prefix/<int:type_id>', methods=['POST'])
- @login_required
- def set_contract_type_prefix(type_id):
- from flask import request, jsonify
- data = request.get_json()
- prefix = data.get('prefix', '').strip()
- ct = ContractType.query.get_or_404(type_id)
- ct.prefix = prefix
- db.session.commit()
- return jsonify({"status": "ok"})
- @login_required
- def download_attachment(attachment_id):
- attachment = ContractAttachment.query.get_or_404(attachment_id)
- file_path = os.path.join(UPLOAD_FOLDER, attachment.filename)
- if os.path.exists(file_path):
- return send_from_directory(UPLOAD_FOLDER, attachment.filename, as_attachment=True)
- else:
- abort(404)
- @app.route('/favicon.ico')
- def favicon():
- return send_from_directory(
- os.path.join(app.root_path, 'static'),
- 'favicon.ico',
- mimetype='image/vnd.microsoft.icon'
- )
- # 在应用启动时调用
- if __name__ == '__main__':
- if not os.path.exists(os.path.join(app.instance_path, 'contracts.db')):
- init_db()
- with app.app_context():
- load_mail_config()
- app.run(debug=False, port=8082)
|