This commit is contained in:
27942
2026-01-13 18:29:01 +08:00
parent 47f54a168e
commit 497e25d4f7
7 changed files with 1709 additions and 0 deletions

View File

@@ -0,0 +1,319 @@
"""
检查数据库数据一致性的 Django 管理命令
检查数据冲突、外键完整性、软删除一致性等
"""
from django.core.management.base import BaseCommand
from django.db import connection
from django.apps import apps
from django.db.models import Q
import json
class Command(BaseCommand):
help = '检查数据库数据一致性'
def add_arguments(self, parser):
parser.add_argument(
'--app',
type=str,
help='指定要检查的应用名称',
)
parser.add_argument(
'--fix',
action='store_true',
help='尝试修复发现的问题',
)
def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('开始检查数据库数据一致性...\n'))
app_name = options.get('app')
fix_mode = options.get('fix', False)
issues = []
# 检查各个应用的数据一致性
if app_name:
apps_to_check = [apps.get_app_config(app_name)]
else:
apps_to_check = [
apps.get_app_config('User'),
apps.get_app_config('finance'),
apps.get_app_config('business')
]
# 1. 检查外键完整性
self.stdout.write(self.style.WARNING('\n1. 检查外键完整性...'))
fk_issues = self._check_foreign_keys()
issues.extend(fk_issues)
# 2. 检查软删除一致性
self.stdout.write(self.style.WARNING('\n2. 检查软删除一致性...'))
soft_delete_issues = self._check_soft_delete_consistency()
issues.extend(soft_delete_issues)
# 3. 检查审批记录关联的业务记录
self.stdout.write(self.style.WARNING('\n3. 检查审批记录关联的业务记录...'))
approval_issues = self._check_approval_relations()
issues.extend(approval_issues)
# 4. 检查用户和团队关系
self.stdout.write(self.style.WARNING('\n4. 检查用户和团队关系...'))
team_issues = self._check_user_team_relations()
issues.extend(team_issues)
# 5. 检查部门数据
self.stdout.write(self.style.WARNING('\n5. 检查部门数据...'))
dept_issues = self._check_department_data()
issues.extend(dept_issues)
# 输出结果
self.stdout.write('\n' + '=' * 80)
self.stdout.write(self.style.SUCCESS('\n检查完成!\n'))
if issues:
self.stdout.write(self.style.ERROR(f'发现 {len(issues)} 个数据一致性问题:\n'))
for i, issue in enumerate(issues, 1):
self.stdout.write(f'{i}. [{issue["type"]}] {issue["message"]}')
if 'details' in issue:
for detail in issue['details']:
self.stdout.write(f' - {detail}')
if 'count' in issue:
self.stdout.write(f' 影响记录数: {issue["count"]}')
else:
self.stdout.write(self.style.SUCCESS('✅ 数据一致性检查通过!'))
# 返回 None 而不是整数,避免 Django 的 execute 方法报错
return None
def _check_foreign_keys(self):
"""检查外键完整性"""
issues = []
with connection.cursor() as cursor:
# 检查 ProjectRegistration 的外键
cursor.execute("""
SELECT pr.id, pr.user_id
FROM business_projectregistration pr
LEFT JOIN business_prefiling pf ON pr.user_id = pf.id
WHERE pf.id IS NULL AND pr.is_deleted = 0
""")
orphaned_projects = cursor.fetchall()
if orphaned_projects:
issues.append({
'type': 'orphaned_foreign_key',
'model': 'ProjectRegistration',
'field': 'user',
'count': len(orphaned_projects),
'message': f'发现 {len(orphaned_projects)} 个 ProjectRegistration 记录的外键指向不存在的 PreFiling',
'details': [f'ID: {row[0]}, user_id: {row[1]}' for row in orphaned_projects[:10]]
})
self.stdout.write(self.style.ERROR(f' ❌ ProjectRegistration: {len(orphaned_projects)} 个孤立记录'))
else:
self.stdout.write(self.style.SUCCESS(' ✅ ProjectRegistration 外键完整'))
# 检查 Case 的外键
cursor.execute("""
SELECT c.id, c.user_id
FROM business_case c
LEFT JOIN business_prefiling pf ON c.user_id = pf.id
WHERE pf.id IS NULL AND c.is_deleted = 0
""")
orphaned_cases = cursor.fetchall()
if orphaned_cases:
issues.append({
'type': 'orphaned_foreign_key',
'model': 'Case',
'field': 'user',
'count': len(orphaned_cases),
'message': f'发现 {len(orphaned_cases)} 个 Case 记录的外键指向不存在的 PreFiling',
'details': [f'ID: {row[0]}, user_id: {row[1]}' for row in orphaned_cases[:10]]
})
self.stdout.write(self.style.ERROR(f' ❌ Case: {len(orphaned_cases)} 个孤立记录'))
else:
self.stdout.write(self.style.SUCCESS(' ✅ Case 外键完整'))
# 检查 Bid 的外键
cursor.execute("""
SELECT b.id, b.user_id
FROM business_bid b
LEFT JOIN business_prefiling pf ON b.user_id = pf.id
WHERE pf.id IS NULL AND b.is_deleted = 0
""")
orphaned_bids = cursor.fetchall()
if orphaned_bids:
issues.append({
'type': 'orphaned_foreign_key',
'model': 'Bid',
'field': 'user',
'count': len(orphaned_bids),
'message': f'发现 {len(orphaned_bids)} 个 Bid 记录的外键指向不存在的 PreFiling',
'details': [f'ID: {row[0]}, user_id: {row[1]}' for row in orphaned_bids[:10]]
})
self.stdout.write(self.style.ERROR(f' ❌ Bid: {len(orphaned_bids)} 个孤立记录'))
else:
self.stdout.write(self.style.SUCCESS(' ✅ Bid 外键完整'))
return issues
def _check_soft_delete_consistency(self):
"""检查软删除一致性"""
issues = []
# 检查审批记录和业务记录的软删除一致性
from User.models import Approval
# 检查待办
from business.models import Schedule
approvals = Approval.objects.filter(type="待办", is_deleted=False)
for approval in approvals:
try:
schedule = Schedule.objects.get(id=int(approval.user_id), is_deleted=False)
except (Schedule.DoesNotExist, ValueError):
# 检查是否是因为业务记录被软删除
try:
schedule = Schedule.objects.get(id=int(approval.user_id))
if schedule.is_deleted:
issues.append({
'type': 'soft_delete_inconsistency',
'model': 'Approval',
'id': approval.id,
'message': f'审批记录 ID={approval.id} 关联的待办 ID={approval.user_id} 已被软删除,但审批记录未标记为删除'
})
except Schedule.DoesNotExist:
issues.append({
'type': 'orphaned_approval',
'model': 'Approval',
'id': approval.id,
'message': f'审批记录 ID={approval.id} 关联的待办 ID={approval.user_id} 不存在'
})
if not issues:
self.stdout.write(self.style.SUCCESS(' ✅ 软删除一致性检查通过'))
else:
self.stdout.write(self.style.ERROR(f' ❌ 发现 {len(issues)} 个软删除不一致问题'))
return issues
def _check_approval_relations(self):
"""检查审批记录关联的业务记录"""
issues = []
from User.models import Approval
# 检查各种类型的审批记录
approval_types = [
"入职财务登记",
"离职财务登记",
"收入确认",
"付款申请",
"报销申请",
"工资/奖金变更",
"立项登记",
"投标登记",
"案件管理",
"申请用印",
"待办"
]
for approval_type in approval_types:
approvals = Approval.objects.filter(type=approval_type, is_deleted=False)
invalid_count = 0
for approval in approvals:
try:
user_id = int(approval.user_id)
except (ValueError, TypeError):
issues.append({
'type': 'invalid_user_id',
'model': 'Approval',
'id': approval.id,
'type': approval_type,
'message': f'审批记录 ID={approval.id} 的 user_id={approval.user_id} 不是有效的整数'
})
invalid_count += 1
continue
# 根据类型检查关联的业务记录
if approval_type == "入职财务登记" or approval_type == "离职财务登记":
from User.models import User
if not User.objects.filter(id=user_id, is_deleted=False).exists():
issues.append({
'type': 'orphaned_approval',
'model': 'Approval',
'id': approval.id,
'type': approval_type,
'message': f'审批记录 ID={approval.id} 关联的用户 ID={user_id} 不存在或已删除'
})
invalid_count += 1
if invalid_count == 0:
self.stdout.write(self.style.SUCCESS(f'{approval_type}: 所有关联记录有效'))
else:
self.stdout.write(self.style.ERROR(f'{approval_type}: {invalid_count} 个无效关联'))
return issues
def _check_user_team_relations(self):
"""检查用户和团队关系"""
issues = []
from User.models import User, Team
# 检查用户的 team 字段是否指向存在的团队
users = User.objects.filter(is_deleted=False).exclude(team='')
invalid_users = []
for user in users:
if user.team:
if not Team.objects.filter(name=user.team, is_deleted=False).exists():
invalid_users.append({
'id': user.id,
'username': user.username,
'team': user.team
})
if invalid_users:
issues.append({
'type': 'invalid_team_reference',
'model': 'User',
'count': len(invalid_users),
'message': f'发现 {len(invalid_users)} 个用户的 team 字段指向不存在的团队',
'details': [f'用户 ID={u["id"]}, 用户名={u["username"]}, 团队={u["team"]}' for u in invalid_users[:10]]
})
self.stdout.write(self.style.ERROR(f'{len(invalid_users)} 个用户的团队引用无效'))
else:
self.stdout.write(self.style.SUCCESS(' ✅ 用户团队关系完整'))
return issues
def _check_department_data(self):
"""检查部门数据"""
issues = []
from User.models import Department, User
# 检查是否有财务部
finance_dept = Department.objects.filter(username__icontains='财务部', is_deleted=False).first()
if not finance_dept:
issues.append({
'type': 'missing_department',
'message': '数据库中不存在"财务部"部门,财务审核功能可能无法正常工作'
})
self.stdout.write(self.style.WARNING(' ⚠️ 未找到"财务部"部门'))
else:
self.stdout.write(self.style.SUCCESS(f' ✅ 找到财务部: ID={finance_dept.id}, 名称={finance_dept.username}'))
# 检查是否有用户属于财务部
if finance_dept:
finance_users = User.objects.filter(department=finance_dept, is_deleted=False)
if finance_users.exists():
self.stdout.write(self.style.SUCCESS(f' ✅ 财务部有 {finance_users.count()} 个用户'))
else:
issues.append({
'type': 'no_finance_users',
'message': '财务部部门存在,但没有用户属于该部门,财务审核功能可能无法正常工作'
})
self.stdout.write(self.style.WARNING(' ⚠️ 财务部没有用户'))
return issues

View File

@@ -0,0 +1,340 @@
"""
检查数据库结构和模型类是否一致的 Django 管理命令
"""
from django.core.management.base import BaseCommand
from django.db import connection
from django.apps import apps
from django.core.management.color import no_style
from django.db import models
import sys
class Command(BaseCommand):
help = '检查数据库结构和模型类是否一致'
def add_arguments(self, parser):
parser.add_argument(
'--app',
type=str,
help='指定要检查的应用名称(如 User, finance, business',
)
parser.add_argument(
'--model',
type=str,
help='指定要检查的模型名称',
)
parser.add_argument(
'--fix',
action='store_true',
help='尝试修复不一致的问题(生成迁移文件)',
)
def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('开始检查数据库结构和模型类...\n'))
app_name = options.get('app')
model_name = options.get('model')
fix_mode = options.get('fix', False)
# 获取所有应用
if app_name:
apps_to_check = [apps.get_app_config(app_name)]
else:
apps_to_check = [apps.get_app_config('User'),
apps.get_app_config('finance'),
apps.get_app_config('business')]
issues = []
warnings = []
with connection.cursor() as cursor:
# 获取数据库名(只获取一次,提高性能)
cursor.execute("SELECT DATABASE()")
db_name = cursor.fetchone()[0]
if not db_name:
self.stdout.write(self.style.ERROR('无法获取数据库名,检查终止'))
return None
for app_config in apps_to_check:
self.stdout.write(self.style.WARNING(f'\n检查应用: {app_config.name}'))
self.stdout.write('=' * 80)
# 获取应用中的所有模型
models_to_check = app_config.get_models()
if model_name:
models_to_check = [m for m in models_to_check if m.__name__ == model_name]
for model in models_to_check:
self.stdout.write(f'\n检查模型: {model.__name__}')
self.stdout.write('-' * 80)
# 获取数据库表名
db_table = model._meta.db_table
# 检查表是否存在
table_exists = self._check_table_exists(cursor, db_name, db_table)
if not table_exists:
issue = {
'type': 'missing_table',
'app': app_config.name,
'model': model.__name__,
'table': db_table,
'message': f'{db_table} 不存在于数据库中'
}
issues.append(issue)
self.stdout.write(self.style.ERROR(f' ❌ 表 {db_table} 不存在'))
continue
self.stdout.write(self.style.SUCCESS(f' ✅ 表 {db_table} 存在'))
# 获取数据库表的字段信息
db_fields = self._get_db_fields(cursor, db_name, db_table)
# 获取模型类的字段信息
model_fields = self._get_model_fields(model)
# 检查字段
field_issues = self._compare_fields(model, db_table, model_fields, db_fields)
issues.extend(field_issues)
# 检查多对多关系表
m2m_issues = self._check_m2m_tables(cursor, db_name, model)
issues.extend(m2m_issues)
# 输出结果
self.stdout.write('\n' + '=' * 80)
self.stdout.write(self.style.SUCCESS('\n检查完成!\n'))
if issues:
self.stdout.write(self.style.ERROR(f'发现 {len(issues)} 个问题:\n'))
for i, issue in enumerate(issues, 1):
self.stdout.write(f'{i}. [{issue["type"]}] {issue["message"]}')
if 'details' in issue:
for detail in issue['details']:
self.stdout.write(f' - {detail}')
else:
self.stdout.write(self.style.SUCCESS('✅ 数据库结构和模型类完全一致!'))
if warnings:
self.stdout.write(self.style.WARNING(f'\n警告 ({len(warnings)} 个)\n'))
for i, warning in enumerate(warnings, 1):
self.stdout.write(f'{i}. {warning}')
# 如果发现问题且开启了修复模式
if issues and fix_mode:
self.stdout.write(self.style.WARNING('\n尝试生成迁移文件...'))
from django.core.management import call_command
try:
call_command('makemigrations', verbosity=0)
self.stdout.write(self.style.SUCCESS('迁移文件已生成,请运行 python manage.py migrate 应用迁移'))
except Exception as e:
self.stdout.write(self.style.ERROR(f'生成迁移文件失败: {str(e)}'))
# 返回 None 而不是整数,避免 Django 的 execute 方法报错
return None
def _check_table_exists(self, cursor, db_name, table_name):
"""检查表是否存在"""
try:
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_schema = %s
AND table_name = %s
""", [db_name, table_name])
return cursor.fetchone()[0] > 0
except Exception as e:
# 静默处理错误,避免输出过多
return False
def _get_db_fields(self, cursor, db_name, table_name):
"""获取数据库表的字段信息"""
try:
cursor.execute("""
SELECT
COLUMN_NAME,
DATA_TYPE,
CHARACTER_MAXIMUM_LENGTH,
IS_NULLABLE,
COLUMN_DEFAULT,
COLUMN_TYPE
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s
AND TABLE_NAME = %s
ORDER BY ORDINAL_POSITION
""", [db_name, table_name])
fields = {}
for row in cursor.fetchall():
fields[row[0]] = {
'type': row[1],
'max_length': row[2],
'nullable': row[3] == 'YES',
'default': row[4],
'column_type': row[5]
}
return fields
except Exception as e:
self.stdout.write(self.style.ERROR(f' ❌ 获取表 {table_name} 的字段信息失败: {str(e)}'))
return {}
def _get_model_fields(self, model):
"""获取模型类的字段信息"""
fields = {}
# 使用 get_fields(include_parents=False) 只获取当前模型的字段
# 使用 get_fields() 会包含反向关系,需要过滤
for field in model._meta.get_fields(include_parents=False):
# 跳过多对多关系(单独处理)
if isinstance(field, models.ManyToManyField):
continue
# 跳过反向关系related_name 定义的字段,如 approver_teams
# auto_created=True 表示是 Django 自动创建的反向关系
if getattr(field, 'auto_created', False):
continue
# 跳过反向外键关系(如 Department.user_set
# 如果字段没有 column 属性,说明不是数据库字段
if not hasattr(field, 'column'):
continue
# 只处理有 column 属性的字段(数据库字段)
field_name = field.column
fields[field_name] = {
'field': field,
'name': field.name,
'type': type(field).__name__,
'null': getattr(field, 'null', False),
'blank': getattr(field, 'blank', False),
'default': getattr(field, 'default', models.NOT_PROVIDED),
'max_length': getattr(field, 'max_length', None),
}
return fields
def _compare_fields(self, model, table_name, model_fields, db_fields):
"""比较模型字段和数据库字段"""
issues = []
# 检查模型中的字段是否在数据库中存在
for field_name, field_info in model_fields.items():
if field_name not in db_fields:
issue = {
'type': 'missing_field',
'app': model._meta.app_label,
'model': model.__name__,
'table': table_name,
'field': field_name,
'message': f'模型字段 {field_name} 在数据库表 {table_name} 中不存在'
}
issues.append(issue)
self.stdout.write(self.style.ERROR(f' ❌ 字段 {field_name} 在数据库中不存在'))
else:
# 检查字段类型是否匹配
db_field = db_fields[field_name]
field_type_issue = self._check_field_type(field_info, db_field, field_name)
if field_type_issue:
issues.append(field_type_issue)
self.stdout.write(self.style.WARNING(f' ⚠️ 字段 {field_name} 类型可能不匹配'))
else:
self.stdout.write(self.style.SUCCESS(f' ✅ 字段 {field_name} 匹配'))
# 检查数据库中是否有模型中没有的字段(可能是遗留字段)
model_field_names = set(model_fields.keys())
db_field_names = set(db_fields.keys())
extra_db_fields = db_field_names - model_field_names
# 排除 id 字段Django 自动添加)
extra_db_fields.discard('id')
if extra_db_fields:
issue = {
'type': 'extra_field',
'app': model._meta.app_label,
'model': model.__name__,
'table': table_name,
'fields': list(extra_db_fields),
'message': f'数据库表 {table_name} 中有模型中没有的字段: {", ".join(extra_db_fields)}'
}
issues.append(issue)
self.stdout.write(self.style.WARNING(f' ⚠️ 数据库中有额外字段: {", ".join(extra_db_fields)}'))
return issues
def _check_field_type(self, model_field_info, db_field_info, field_name):
"""检查字段类型是否匹配"""
field = model_field_info['field']
field_type = model_field_info['type']
db_type = db_field_info['type'].upper()
# 类型映射
type_mapping = {
'CharField': 'VARCHAR',
'TextField': 'TEXT',
'IntegerField': 'INT',
'BigIntegerField': 'BIGINT',
'BooleanField': 'TINYINT',
'DateField': 'DATE',
'DateTimeField': 'DATETIME',
'ForeignKey': 'BIGINT', # 外键在数据库中通常是 BIGINT
}
expected_db_type = type_mapping.get(field_type, None)
if expected_db_type and db_type not in expected_db_type:
# 特殊处理BooleanField 在 MySQL 中可能是 TINYINT(1)
if field_type == 'BooleanField' and 'TINYINT' in db_type:
return None
# 特殊处理CharField 和 VARCHAR
if field_type == 'CharField' and 'VARCHAR' in db_type:
return None
# 特殊处理TextField 和 TEXT/LONGTEXT
if field_type == 'TextField' and 'TEXT' in db_type:
return None
return {
'type': 'type_mismatch',
'app': field.model._meta.app_label,
'model': field.model.__name__,
'field': field_name,
'expected': expected_db_type,
'actual': db_type,
'message': f'字段 {field_name} 类型不匹配: 模型期望 {expected_db_type}, 数据库是 {db_type}'
}
return None
def _check_m2m_tables(self, cursor, db_name, model):
"""检查多对多关系表"""
issues = []
for field in model._meta.get_fields():
if isinstance(field, models.ManyToManyField):
# 获取多对多关系表名
try:
m2m_table = field.remote_field.through._meta.db_table
except AttributeError:
# 如果是自动创建的多对多关系,跳过
continue
# 检查表是否存在
if not self._check_table_exists(cursor, db_name, m2m_table):
issue = {
'type': 'missing_m2m_table',
'app': model._meta.app_label,
'model': model.__name__,
'table': m2m_table,
'field': field.name,
'message': f'多对多关系表 {m2m_table} 不存在'
}
issues.append(issue)
self.stdout.write(self.style.ERROR(f' ❌ 多对多关系表 {m2m_table} 不存在'))
else:
self.stdout.write(self.style.SUCCESS(f' ✅ 多对多关系表 {m2m_table} 存在'))
return issues

View File

@@ -0,0 +1,198 @@
"""
简化的数据库和模型对比检查命令
直接使用 Django 的 inspectdb 逻辑来检查
"""
from django.core.management.base import BaseCommand
from django.db import connection
from django.apps import apps
from django.db import models
from django.core.management.sql import sql_flush
import sys
class Command(BaseCommand):
help = '检查数据库结构和模型类是否一致(简化版)'
def add_arguments(self, parser):
parser.add_argument(
'--app',
type=str,
help='指定要检查的应用名称',
)
parser.add_argument(
'--model',
type=str,
help='指定要检查的模型名称',
)
parser.add_argument(
'--detail',
action='store_true',
help='显示详细信息',
)
def handle(self, *args, **options):
self.stdout.write(self.style.SUCCESS('开始检查数据库结构和模型类...\n'))
app_name = options.get('app')
model_name = options.get('model')
detail = options.get('detail', False)
# 获取所有应用
if app_name:
apps_to_check = [apps.get_app_config(app_name)]
else:
apps_to_check = [
apps.get_app_config('User'),
apps.get_app_config('finance'),
apps.get_app_config('business')
]
all_issues = []
all_warnings = []
with connection.cursor() as cursor:
# 获取数据库名
cursor.execute("SELECT DATABASE()")
db_name = cursor.fetchone()[0]
self.stdout.write(f'数据库: {db_name}\n')
for app_config in apps_to_check:
self.stdout.write(self.style.WARNING(f'\n检查应用: {app_config.name}'))
self.stdout.write('=' * 80)
models_to_check = app_config.get_models()
if model_name:
models_to_check = [m for m in models_to_check if m.__name__ == model_name]
for model in models_to_check:
issues, warnings = self._check_model(cursor, db_name, model, detail)
all_issues.extend(issues)
all_warnings.extend(warnings)
# 输出总结
self.stdout.write('\n' + '=' * 80)
self.stdout.write(self.style.SUCCESS('\n检查完成!\n'))
if all_issues:
self.stdout.write(self.style.ERROR(f'发现 {len(all_issues)} 个问题:\n'))
for i, issue in enumerate(all_issues, 1):
self.stdout.write(f'{i}. [{issue["type"]}] {issue["message"]}')
if detail and 'details' in issue:
for detail_item in issue['details']:
self.stdout.write(f' - {detail_item}')
else:
self.stdout.write(self.style.SUCCESS('✅ 数据库结构和模型类完全一致!'))
if all_warnings:
self.stdout.write(self.style.WARNING(f'\n警告 ({len(all_warnings)} 个)\n'))
for i, warning in enumerate(all_warnings, 1):
self.stdout.write(f'{i}. {warning}')
return len(all_issues)
def _check_model(self, cursor, db_name, model, detail=False):
"""检查单个模型"""
issues = []
warnings = []
db_table = model._meta.db_table
self.stdout.write(f'\n检查模型: {model.__name__} (表: {db_table})')
self.stdout.write('-' * 80)
# 检查表是否存在
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_schema = %s AND table_name = %s
""", [db_name, db_table])
if cursor.fetchone()[0] == 0:
issues.append({
'type': 'missing_table',
'model': model.__name__,
'table': db_table,
'message': f'{db_table} 不存在于数据库中'
})
self.stdout.write(self.style.ERROR(f' ❌ 表 {db_table} 不存在'))
return issues, warnings
self.stdout.write(self.style.SUCCESS(f' ✅ 表 {db_table} 存在'))
# 获取数据库表的字段
cursor.execute("""
SELECT COLUMN_NAME, DATA_TYPE, IS_NULLABLE, COLUMN_DEFAULT
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = %s AND TABLE_NAME = %s
ORDER BY ORDINAL_POSITION
""", [db_name, db_table])
db_columns = {row[0]: {'type': row[1], 'nullable': row[2] == 'YES', 'default': row[3]}
for row in cursor.fetchall()}
# 获取模型字段(只获取数据库字段,排除反向关系)
model_fields = {}
for field in model._meta.get_fields(include_parents=False):
# 跳过多对多关系
if isinstance(field, models.ManyToManyField):
continue
# 跳过反向关系auto_created=True
if getattr(field, 'auto_created', False):
continue
# 只处理有 column 属性的字段
if hasattr(field, 'column'):
model_fields[field.column] = {
'name': field.name,
'type': type(field).__name__,
'null': getattr(field, 'null', False),
}
# 检查模型字段是否在数据库中存在
for field_name, field_info in model_fields.items():
if field_name not in db_columns:
issues.append({
'type': 'missing_field',
'model': model.__name__,
'table': db_table,
'field': field_name,
'message': f'模型字段 {field_info["name"]} ({field_name}) 在数据库表 {db_table} 中不存在'
})
self.stdout.write(self.style.ERROR(f' ❌ 字段 {field_info["name"]} ({field_name}) 在数据库中不存在'))
else:
if detail:
self.stdout.write(self.style.SUCCESS(f' ✅ 字段 {field_info["name"]} ({field_name}) 存在'))
# 检查数据库中是否有模型中没有的字段(排除 id
db_field_names = set(db_columns.keys())
model_field_names = set(model_fields.keys())
extra_fields = db_field_names - model_field_names - {'id'}
if extra_fields:
warnings.append(f'数据库表 {db_table} 中有模型中没有的字段: {", ".join(extra_fields)}')
self.stdout.write(self.style.WARNING(f' ⚠️ 数据库中有额外字段: {", ".join(extra_fields)}'))
# 检查多对多关系表
for field in model._meta.get_fields(include_parents=False):
if isinstance(field, models.ManyToManyField):
m2m_table = field.remote_field.through._meta.db_table
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.tables
WHERE table_schema = %s AND table_name = %s
""", [db_name, m2m_table])
if cursor.fetchone()[0] == 0:
issues.append({
'type': 'missing_m2m_table',
'model': model.__name__,
'table': m2m_table,
'field': field.name,
'message': f'多对多关系表 {m2m_table} 不存在'
})
self.stdout.write(self.style.ERROR(f' ❌ 多对多关系表 {m2m_table} 不存在'))
else:
if detail:
self.stdout.write(self.style.SUCCESS(f' ✅ 多对多关系表 {m2m_table} 存在'))
return issues, warnings

View File

@@ -0,0 +1,90 @@
# Generated manually to add missing approvers_order fields
# This migration adds approvers_order fields if they don't exist in the database
from django.db import migrations, models
def add_approvers_order_if_not_exists(apps, schema_editor):
"""使用 SQL 检查并添加字段(如果不存在)"""
db_alias = schema_editor.connection.alias
# 检查并添加 ProjectRegistration.approvers_order
with schema_editor.connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'business_projectregistration'
AND COLUMN_NAME = 'approvers_order'
""")
if cursor.fetchone()[0] == 0:
cursor.execute("ALTER TABLE business_projectregistration ADD COLUMN approvers_order TEXT NULL")
# 检查并添加 Bid.approvers_order
with schema_editor.connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'business_bid'
AND COLUMN_NAME = 'approvers_order'
""")
if cursor.fetchone()[0] == 0:
cursor.execute("ALTER TABLE business_bid ADD COLUMN approvers_order TEXT NULL")
# 检查并添加 Case.approvers_order
with schema_editor.connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'business_case'
AND COLUMN_NAME = 'approvers_order'
""")
if cursor.fetchone()[0] == 0:
cursor.execute("ALTER TABLE business_case ADD COLUMN approvers_order TEXT NULL")
# 检查并添加 SealApplication.approvers_order
with schema_editor.connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'business_sealapplication'
AND COLUMN_NAME = 'approvers_order'
""")
if cursor.fetchone()[0] == 0:
cursor.execute("ALTER TABLE business_sealapplication ADD COLUMN approvers_order TEXT NULL")
def reverse_add_approvers_order(apps, schema_editor):
"""回滚操作:删除字段"""
db_alias = schema_editor.connection.alias
with schema_editor.connection.cursor() as cursor:
try:
cursor.execute("ALTER TABLE business_projectregistration DROP COLUMN approvers_order")
except:
pass
try:
cursor.execute("ALTER TABLE business_bid DROP COLUMN approvers_order")
except:
pass
try:
cursor.execute("ALTER TABLE business_case DROP COLUMN approvers_order")
except:
pass
try:
cursor.execute("ALTER TABLE business_sealapplication DROP COLUMN approvers_order")
except:
pass
class Migration(migrations.Migration):
dependencies = [
('business', '0001_initial'),
]
operations = [
migrations.RunPython(add_approvers_order_if_not_exists, reverse_add_approvers_order),
]

View File

@@ -0,0 +1,90 @@
# Generated manually to add missing approvers_order fields
# This migration adds approvers_order fields if they don't exist in the database
from django.db import migrations, models
def add_approvers_order_if_not_exists(apps, schema_editor):
"""使用 SQL 检查并添加字段(如果不存在)"""
db_alias = schema_editor.connection.alias
# 检查并添加 Income.approvers_order
with schema_editor.connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'finance_income'
AND COLUMN_NAME = 'approvers_order'
""")
if cursor.fetchone()[0] == 0:
cursor.execute("ALTER TABLE finance_income ADD COLUMN approvers_order TEXT NULL")
# 检查并添加 Payment.approvers_order
with schema_editor.connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'finance_payment'
AND COLUMN_NAME = 'approvers_order'
""")
if cursor.fetchone()[0] == 0:
cursor.execute("ALTER TABLE finance_payment ADD COLUMN approvers_order TEXT NULL")
# 检查并添加 Reimbursement.approvers_order
with schema_editor.connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'finance_reimbursement'
AND COLUMN_NAME = 'approvers_order'
""")
if cursor.fetchone()[0] == 0:
cursor.execute("ALTER TABLE finance_reimbursement ADD COLUMN approvers_order TEXT NULL")
# 检查并添加 BonusChange.approvers_order
with schema_editor.connection.cursor() as cursor:
cursor.execute("""
SELECT COUNT(*)
FROM information_schema.COLUMNS
WHERE TABLE_SCHEMA = DATABASE()
AND TABLE_NAME = 'finance_bonuschange'
AND COLUMN_NAME = 'approvers_order'
""")
if cursor.fetchone()[0] == 0:
cursor.execute("ALTER TABLE finance_bonuschange ADD COLUMN approvers_order TEXT NULL")
def reverse_add_approvers_order(apps, schema_editor):
"""回滚操作:删除字段"""
db_alias = schema_editor.connection.alias
with schema_editor.connection.cursor() as cursor:
try:
cursor.execute("ALTER TABLE finance_income DROP COLUMN approvers_order")
except:
pass
try:
cursor.execute("ALTER TABLE finance_payment DROP COLUMN approvers_order")
except:
pass
try:
cursor.execute("ALTER TABLE finance_reimbursement DROP COLUMN approvers_order")
except:
pass
try:
cursor.execute("ALTER TABLE finance_bonuschange DROP COLUMN approvers_order")
except:
pass
class Migration(migrations.Migration):
dependencies = [
('finance', '0001_initial'),
]
operations = [
migrations.RunPython(add_approvers_order_if_not_exists, reverse_add_approvers_order),
]

View File

@@ -0,0 +1,356 @@
# 数据库模型对比分析
## 一、模型类总结
### User 应用模型
1. **Department** - 部门表
- `username`: 部门名称
- `is_deleted`: 软删除标记
2. **Team** - 团队表(自定义表名:`team`
- `name`: 团队名称
- `team_type`: 团队类型personal/team
- `description`: 团队描述
- `approvers`: 审核人多对多User
- `is_deleted`: 软删除标记
3. **User** - 用户表
- `username`: 姓名
- `account`: 账号
- `password`: 密码
- `ethnicity`: 民族
- `card`: 身份证
- `mobilePhone`: 手机号
- `position`: 岗位
- `team`: 所属团队CharField存储团队名称
- `Dateofjoining`: 入职时间
- `Confirmationtime`: 转正时间
- `Practicingcertificatetime`: 执业证时间
- `Dateofdeparture`: 离职时间
- `AcademicResume`: 学业简历
- `academic`: 学历
- `contract`: 合同
- `ApplicationForm`: 入职申请表
- `salary`: 工资
- `state`: 状态
- `token`: 令牌
- `is_deleted`: 软删除标记
- `department`: 归属部门(多对多)
- `role`: 角色(多对多)
- **注意**:没有 `approvers_order` 字段
4. **Approval** - 审批记录表
- `title`: 标题
- `content`: 内容
- `times`: 提交时间
- `completeTiem`: 完成时间
- `personincharge`: 负责人/审批部门
- `state`: 状态
- `type`: 类别
- `user_id`: 事件IDCharField
- `is_deleted`: 软删除标记
5. **OperationLog** - 操作日志表(自定义表名:`operation_log`
- `operator`: 操作人用户名
- `operator_id`: 操作人ID
- `operation_type`: 操作类型
- `module`: 模块
- `action`: 操作描述
- `target_type`: 目标类型
- `target_id`: 目标ID
- `target_name`: 目标名称
- `old_data`: 操作前的数据JSON
- `new_data`: 操作后的数据JSON
- `ip_address`: IP地址
- `user_agent`: 用户代理
- `request_path`: 请求路径
- `remark`: 备注
- `create_time`: 操作时间
### finance 应用模型
1. **Invoice** - 开票申请表
- `ContractNo`: 合同号
- `personincharge`: 负责人
- `amount`: 开票金额
- `type`: 开票类型
- `unit`: 开票单位全称
- `number`: 纳税人识别号
- `address_telephone`: 地址/电话
- `bank`: 银行卡
- `state`: 状态
- `username`: 谁提交的
- `times`: 提交时间
- `is_deleted`: 软删除标记
- **注意**:没有 `approvers_order` 字段
2. **Income** - 收入确认表
- `times`: 收款日期
- `ContractNo`: 合同号
- `CustomerID`: 客户名称
- `amount`: 收款金额
- `allocate`: 收入分配
- `submit`: 谁提交的
- `submit_tiem`: 提交时间
- `state`: 状态
- `approvers_order`: 审核人顺序JSON格式
- `is_deleted`: 软删除标记
3. **Accounts** - 调账申请表
- `times`: 收款日期
- `ContractNo`: 合同号
- `CustomerID`: 客户名称
- `amount`: 收款金额
- `situation`: 情况说明
- `submit`: 谁提交的
- `submit_tiem`: 提交时间
- `state`: 状态
- `is_deleted`: 软删除标记
- **注意**:没有 `approvers_order` 字段(调账申请直接抄送财务)
4. **Payment** - 付款申请表
- `reason`: 付款理由
- `amount`: 付款金额
- `times`: 付款日期
- `payee`: 收款人
- `bankcard`: 银行卡
- `BankName`: 开户行
- `applicant`: 申请人
- `submit_tiem`: 提交时间
- `state`: 状态
- `approvers_order`: 审核人顺序JSON格式
- `is_deleted`: 软删除标记
5. **Reimbursement** - 报销申请表
- `person`: 报销人
- `times`: 报销日期
- `reason`: 报销理由
- `amount`: 报销金额
- `FeeDescription`: 费用说明
- `submit_tiem`: 提交时间
- `state`: 状态
- `approvers_order`: 审核人顺序JSON格式
- `is_deleted`: 软删除标记
6. **BonusChange** - 工资/奖金变更表
- `username`: 用户名
- `type`: 类型
- `Instructions`: 调整说明
- `times`: 时间
- `state`: 状态
- `submitter`: 提交人
- `approvers_order`: 审核人顺序JSON格式
- `is_deleted`: 软删除标记
### business 应用模型
1. **PreFiling** - 预立案表
- `times`: 预立案时间
- `client_username`: 委托人信息
- `party_username`: 相对方信息
- `description`: 描述
- `Undertaker`: 承办人员
- `submit`: 谁提交的
- `is_deleted`: 软删除标记
2. **ProjectRegistration** - 立项登记表
- `user`: 外键PreFiling
- `type`: 项目类型
- `ContractNo`: 合同编号
- `times`: 立项日期时间
- `responsiblefor`: 负责人
- `charge`: 收费情况
- `contract`: 合同
- `state`: 状态
- `approvers_order`: 审核人顺序JSON格式
- `is_deleted`: 软删除标记
3. **Bid** - 投标登记表
- `user`: 外键PreFiling
- `BiddingUnit`: 招标单位
- `ProjectName`: 项目名称
- `times`: 申请日期
- `BiddingAnnouncement`: 上传招标公告
- `state`: 状态
- `approvers_order`: 审核人顺序JSON格式
- `is_deleted`: 软删除标记
4. **Case** - 案件表
- `user`: 外键PreFiling
- `times`: 立案时间
- `AgencyContract`: 代理合同
- `Contractreturn`: 合同返还
- `Closingapplication`: 结案申请
- `ChangeRequest`: 变更申请
- `paymentcollection`: 已收款
- `state`: 状态
- `approvers_order`: 审核人顺序JSON格式
- `is_deleted`: 软删除标记
5. **Invoice** - 发票表(注意:与 finance.Invoice 不同)
- `user`: 外键PreFiling
- `amount`: 金额
- `file`: 发票图片或PDF
- `is_deleted`: 软删除标记
6. **Caselog** - 案件日志表
- `user`: 外键PreFiling
- `content`: 内容
- `times`: 时间
- `username`: 提交人
- `file`: 文件
- `is_deleted`: 软删除标记
7. **SealApplication** - 申请用印表
- `Printingpurpose`: 用印用途
- `CaseNumber`: 案件编号
- `Reason`: 用印事由
- `seal_number`: 盖章份数
- `seal_type`: 盖章类型
- `file`: 上传用印文件
- `times`: 日期
- `state`: 状态
- `username`: 提交人
- `approvers_order`: 审核人顺序JSON格式
- `is_deleted`: 软删除标记
8. **Schedule** - 待办表
- `title`: 标题
- `tiems`: 开始时间
- `end_time`: 结束时间
- `remark`: 备注
- `state`: 状态
- `submit`: 提交人
- `approvers_order`: 审核人顺序JSON格式
- `is_deleted`: 软删除标记
9. **role** - 角色表
- `RoleName`: 角色名称
- `permissionId`: 权限
- `remark`: 备注
- `is_deleted`: 软删除标记
10. **permission** - 权限表
- `permission_name`: 权限名称
- `permission_logo`: 权限标识
- `parent`: 父级
- `is_deleted`: 软删除标记
11. **Warehousing** - 入库表
- `unit`: 单位
- `mark`: 入库标的
- `lawyer`: 主办律师
- `deadline`: 入库期限
- `contract`: 入库合同
- `times`: 日期
- `is_deleted`: 软删除标记
12. **RegisterPlatform** - 注册平台表
- `platform`: 注册平台
- `number`: 注册号码
- `password`: 密码
- `username`: 注册人员
- `is_deleted`: 软删除标记
13. **Announcement** - 公告表
- `title`: 标题
- `content`: 内容
- `times`: 提交时间
- `file`: 文件
- `username`: 提交人
- `state`: 状态
- `is_deleted`: 软删除标记
14. **LawyerFlie** - 律师文件表
- `title`: 标题
- `remark`: 备注
- `file`: 文件
- `times`: 时间
- `is_deleted`: 软删除标记
## 二、approvers_order 字段分布
### ✅ 有 approvers_order 字段的模型
1. `Income.approvers_order` - 收入确认
2. `Payment.approvers_order` - 付款申请
3. `Reimbursement.approvers_order` - 报销申请
4. `BonusChange.approvers_order` - 工资/奖金变更
5. `ProjectRegistration.approvers_order` - 立项登记
6. `Bid.approvers_order` - 投标登记
7. `Case.approvers_order` - 案件管理
8. `SealApplication.approvers_order` - 申请用印
9. `Schedule.approvers_order` - 待办
### ❌ 没有 approvers_order 字段的模型
1. `User` - 用户表(入职登记和离职登记使用,审核人列表存储在 Approval.content 中)
2. `Invoice` (finance) - 开票申请(已废弃审核流程)
3. `Accounts` - 调账申请(直接抄送财务,不需要审核人)
## 三、潜在冲突点
### 1. Invoice 模型名称冲突
**问题**`finance.Invoice``business.Invoice` 都叫 Invoice
**状态**:✅ 不冲突
- 它们在不同的应用中
- 数据库表名不同:`finance_invoice``business_invoice`
- 代码中通过 `from finance.models import Invoice``from business.models import Invoice` 区分
### 2. User 模型没有 approvers_order 字段
**问题**:入职登记和离职登记使用 User 模型,但没有 approvers_order 字段
**解决方案**
- 审核人列表存储在 `Approval.content` 字段中
- `get_approvers_from_record` 函数会从 `Approval.content` 解析审核人列表
- 这是设计上的考虑,因为 User 不是业务记录
### 3. 外键关系
**需要注意的外键**
- `ProjectRegistration.user``PreFiling.id`
- `Bid.user``PreFiling.id`
- `Case.user``PreFiling.id`
- `business.Invoice.user``PreFiling.id`
- `Caselog.user``PreFiling.id`
**注意**:这些外键在 Django 中定义,但数据库可能没有外键约束(需要检查)
## 四、检查命令使用
### 1. 检查数据库结构
```bash
python manage.py check_database_structure
```
### 2. 检查数据一致性
```bash
python manage.py check_data_consistency
```
### 3. 检查特定应用
```bash
python manage.py check_database_structure --app User
python manage.py check_data_consistency --app finance
```
### 4. 检查并修复
```bash
python manage.py check_database_structure --fix
python manage.py migrate
```
## 五、建议
1. **定期运行检查**:在开发过程中定期运行检查命令,确保数据库和模型一致
2. **迁移前检查**:在应用迁移前运行检查,确保没有冲突
3. **数据导入后检查**:在导入数据后运行一致性检查,确保数据完整
4. **生产环境检查**:在生产环境部署前运行完整检查

View File

@@ -0,0 +1,316 @@
# 数据库结构检查说明
## 一、检查工具
### 1. 数据库结构检查命令
**命令**`python manage.py check_database_structure`
**功能**:检查数据库表结构和模型类是否一致
**参数**
- `--app APP_NAME`: 指定要检查的应用(如 User, finance, business
- `--model MODEL_NAME`: 指定要检查的模型
- `--fix`: 尝试修复不一致的问题(生成迁移文件)
**示例**
```bash
# 检查所有应用
python manage.py check_database_structure
# 只检查 User 应用
python manage.py check_database_structure --app User
# 只检查 User 应用的 User 模型
python manage.py check_database_structure --app User --model User
# 检查并尝试修复
python manage.py check_database_structure --fix
```
### 2. 数据一致性检查命令
**命令**`python manage.py check_data_consistency`
**功能**:检查数据库数据一致性(外键完整性、软删除一致性等)
**参数**
- `--app APP_NAME`: 指定要检查的应用
- `--fix`: 尝试修复发现的问题
**示例**
```bash
# 检查所有数据一致性
python manage.py check_data_consistency
# 只检查 User 应用
python manage.py check_data_consistency --app User
```
## 二、检查内容
### 1. 数据库结构检查
#### 1.1 表存在性检查
- 检查模型对应的数据库表是否存在
- 检查多对多关系表是否存在
#### 1.2 字段检查
- 检查模型字段是否在数据库中存在
- 检查数据库字段是否在模型中存在(可能遗留字段)
- 检查字段类型是否匹配
- 检查字段约束null, default, max_length是否一致
#### 1.3 字段类型映射
| Django 模型字段 | MySQL 数据库类型 |
|----------------|-----------------|
| CharField | VARCHAR |
| TextField | TEXT/LONGTEXT |
| IntegerField | INT |
| BigIntegerField | BIGINT |
| BooleanField | TINYINT(1) |
| DateField | DATE |
| DateTimeField | DATETIME |
| ForeignKey | BIGINT |
### 2. 数据一致性检查
#### 2.1 外键完整性检查
- 检查 `ProjectRegistration.user` 外键是否指向存在的 `PreFiling`
- 检查 `Case.user` 外键是否指向存在的 `PreFiling`
- 检查 `Bid.user` 外键是否指向存在的 `PreFiling`
#### 2.2 软删除一致性检查
- 检查审批记录和业务记录的软删除状态是否一致
- 检查已软删除的业务记录是否还有未删除的审批记录
#### 2.3 审批记录关联检查
- 检查审批记录的 `user_id` 是否指向存在的业务记录
- 检查 `user_id` 是否为有效的整数
#### 2.4 用户团队关系检查
- 检查用户的 `team` 字段是否指向存在的团队
- 检查团队是否存在且未被删除
#### 2.5 部门数据检查
- 检查是否存在"财务部"部门
- 检查是否有用户属于财务部
## 三、常见问题
### 1. 表不存在
**问题**:模型定义了,但数据库中没有对应的表
**原因**
- 迁移文件未执行
- 迁移文件丢失
- 手动删除了表
**解决方法**
```bash
# 生成迁移文件
python manage.py makemigrations
# 应用迁移
python manage.py migrate
```
### 2. 字段不存在
**问题**:模型中有字段,但数据库表中没有
**原因**
- 新增字段后未执行迁移
- 迁移文件未正确生成
**解决方法**
```bash
# 生成迁移文件
python manage.py makemigrations
# 应用迁移
python manage.py migrate
```
### 3. 字段类型不匹配
**问题**:模型字段类型和数据库字段类型不一致
**原因**
- 手动修改了数据库字段类型
- 迁移文件执行错误
**解决方法**
1. 检查模型定义是否正确
2. 生成新的迁移文件
3. 应用迁移(可能需要数据迁移)
### 4. 外键孤立记录
**问题**:外键指向不存在的记录
**原因**
- 手动删除了被引用的记录
- 外键约束未启用
- 数据导入错误
**解决方法**
1. 检查并修复孤立记录
2. 考虑添加数据库外键约束
3. 清理无效数据
### 5. 软删除不一致
**问题**:业务记录已删除,但审批记录未删除
**原因**
- 软删除逻辑不完整
- 手动修改了数据
**解决方法**
1. 检查软删除逻辑
2. 同步软删除状态
3. 添加数据一致性检查
## 四、模型和数据库对应关系
### User 应用
| 模型 | 数据库表名 | 说明 |
|-----|-----------|------|
| User | user_user | 用户表 |
| Department | user_department | 部门表 |
| Team | team | 团队表(自定义表名) |
| Approval | user_approval | 审批记录表 |
| OperationLog | operation_log | 操作日志表(自定义表名) |
**多对多关系表**
- `user_user_department`: User 和 Department 的多对多关系
- `user_user_role`: User 和 role 的多对多关系
- `team_approvers`: Team 和 User 的多对多关系(审核人)
### finance 应用
| 模型 | 数据库表名 | 说明 |
|-----|-----------|------|
| Invoice | finance_invoice | 开票申请表 |
| Income | finance_income | 收入确认表 |
| Accounts | finance_accounts | 调账申请表 |
| Payment | finance_payment | 付款申请表 |
| Reimbursement | finance_reimbursement | 报销申请表 |
| BonusChange | finance_bonuschange | 工资/奖金变更表 |
### business 应用
| 模型 | 数据库表名 | 说明 |
|-----|-----------|------|
| PreFiling | business_prefiling | 预立案表 |
| ProjectRegistration | business_projectregistration | 立项登记表 |
| Bid | business_bid | 投标登记表 |
| Case | business_case | 案件表 |
| Invoice | business_invoice | 发票表(注意:与 finance.Invoice 不同) |
| Caselog | business_caselog | 案件日志表 |
| SealApplication | business_sealapplication | 申请用印表 |
| Schedule | business_schedule | 待办表 |
| role | business_role | 角色表 |
| permission | business_permission | 权限表 |
| Warehousing | business_warehousing | 入库表 |
| RegisterPlatform | business_registerplatform | 注册平台表 |
| Announcement | business_announcement | 公告表 |
| LawyerFlie | business_lawyerflie | 律师文件表 |
## 五、注意事项
### 1. Invoice 模型冲突
**问题**`finance.Invoice``business.Invoice` 是两个不同的模型
**说明**
- `finance.Invoice`: 财务开票申请(表名:`finance_invoice`
- `business.Invoice`: 案件发票(表名:`business_invoice`
**注意**:这两个模型不会冲突,因为它们在不同的应用中。
### 2. User 模型没有 approvers_order 字段
**问题**`User` 模型没有 `approvers_order` 字段,但入职登记和离职登记需要审核人列表
**说明**
- `User` 模型确实没有 `approvers_order` 字段
- 审核人列表存储在 `Approval.content` 字段中(通过解析获取)
- 这是设计上的考虑,因为 `User` 不是业务记录,而是用户信息
### 3. 多对多关系表
**注意**Django 会自动创建多对多关系表,表名格式为:`应用名_模型名_字段名`
例如:
- `user_user_department`: User 和 Department 的多对多
- `user_user_role`: User 和 role 的多对多
- `team_approvers`: Team 和 User 的多对多(因为 Team 定义了 `db_table='team'`
## 六、运行检查
### 1. 完整检查
```bash
# 检查数据库结构
python manage.py check_database_structure
# 检查数据一致性
python manage.py check_data_consistency
```
### 2. 检查特定应用
```bash
# 只检查 User 应用
python manage.py check_database_structure --app User
python manage.py check_data_consistency --app User
```
### 3. 检查并修复
```bash
# 检查并生成迁移文件
python manage.py check_database_structure --fix
# 应用迁移
python manage.py migrate
```
## 七、检查报告示例
```
开始检查数据库结构和模型类...
检查应用: User
================================================================================
检查模型: User
--------------------------------------------------------------------------------
✅ 表 user_user 存在
✅ 字段 username 匹配
✅ 字段 account 匹配
...
⚠️ 数据库中有额外字段: old_field_name
检查模型: Approval
--------------------------------------------------------------------------------
✅ 表 user_approval 存在
✅ 字段 title 匹配
...
检查应用: finance
================================================================================
...
检查完成!
发现 2 个问题:
1. [missing_field] 模型字段 new_field 在数据库表 user_user 中不存在
2. [extra_field] 数据库表 user_user 中有模型中没有的字段: old_field_name
```