| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838 |
- """
- Universal data profiling engine: auto-detect schema, statistical features,
- and semantic inference from arbitrary Excel data.
- Enhanced with content-based value analysis, distribution shape analysis,
- derived metric detection, and multi-dimensional quality scoring.
- """
- import pandas as pd
- import numpy as np
- from datetime import datetime, date
- from collections import Counter
- import re
- import math
- from report_config import ColumnProfile, ColumnRole
- # =====================================================================
- # DATE PATTERNS — expanded for broader format coverage
- # =====================================================================
- DATE_PATTERNS = [
- re.compile(r'^\d{4}年\d{1,2}月\d{1,2}日$'),
- re.compile(r'^\d{4}-\d{2}-\d{2}$'),
- re.compile(r'^\d{4}/\d{1,2}/\d{1,2}$'),
- re.compile(r'^\d{4}\.\d{1,2}\.\d{1,2}$'),
- re.compile(r'^\d{4}年\d{1,2}月$'),
- re.compile(r'^\d{4}-\d{2}$'),
- re.compile(r'^\d{2}-\d{2}$'),
- re.compile(r'^\d{2}/\d{2}$'),
- re.compile(r'^\d{8}$'), # YYYYMMDD
- ]
- TIME_KEYWORDS = [
- '日期', '时间', 'date', 'time', '年', '月', '日', '周', '期', '季度',
- 'period', 'month', 'year', 'quarter', 'week', 'day', 'timestamp',
- '月份', '年份', '周期', '时段', 'time', 'datetime',
- 'date', 'created', 'updated', 'modified', '发生', '录入', '创建',
- ]
- NUMERIC_KEYWORDS = [
- '金额', '数量', '台数', '件数', '元', '价格', '收入', '支出',
- '利润', '成本', '费用', '销量', '销售额', '总数', '合计',
- 'amount', 'price', 'qty', 'quantity', 'revenue', 'cost',
- 'sales', 'volume', 'value', 'total', 'sum', 'count',
- '单数', '笔数', '人数', '天数', '比率', '占比', '比例', '率',
- '预算', 'budget', '花费', 'spend', 'fee', '金额', '单价',
- 'unit', '得分', 'score', 'rating', '评分',
- ]
- CATEGORY_KEYWORDS = [
- '国家', '区域', '地区', '城市', '省份', '状态', '类型', '类别',
- '分类', '部门', '组', '等级', '级别', '品牌', '渠道',
- 'country', 'region', 'city', 'status', 'type', 'category',
- 'department', 'group', 'level', 'brand', 'channel',
- '负责人', 'owner', 'manager', '产品', 'product', '阶段',
- '供应商', 'supplier', '客户', 'customer', '行业', 'industry',
- '性别', 'gender', '职位', 'title', '角色', 'role', '标签', 'tag',
- '科目', 'account', '方向', 'direction', '方式', 'method',
- '意向', 'intent', 'intention', 'priority', '优先级',
- ]
- ID_KEYWORDS = [
- 'id', '编号', '序号', '代码', 'code', 'no', '编码', '合同号',
- '订单号', '工单号', '流水号', '单号', '标识', 'key',
- 'uuid', 'guid', 'sn', '序列号', '身份证', 'phone', '手机',
- '邮箱', 'email', '电话', 'tel', 'mobile',
- ]
- TEXT_KEYWORDS = [
- '备注', '描述', '说明', '详情', '内容', '意见', '建议', '进度更新',
- 'note', 'description', 'detail', 'remark', 'comment', 'memo',
- '地址', 'address', '介绍', '摘要', 'summary', '附注',
- '反馈', 'feedback', '理由', 'reason', '原因', 'cause',
- ]
- RATE_KEYWORDS = [
- '率', 'ratio', 'rate', '占比', '比例', 'percentage', 'pct',
- 'percent', 'conversion', '转化率', '完成率', '增长率',
- ]
- # Patterns for value-based content detection
- PHONE_PATTERN = re.compile(r'^[\+]?[\d\-\(\)\s]{6,20}$')
- EMAIL_PATTERN = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$')
- URL_PATTERN = re.compile(r'^https?://', re.IGNORECASE)
- YEAR_PATTERN = re.compile(r'^\d{4}$')
- def _parse_date(val):
- if pd.isna(val):
- return None
- if isinstance(val, (datetime, date)):
- return val
- if isinstance(val, (int, float)) and not math.isnan(val):
- try:
- return pd.Timestamp(val).to_pydatetime()
- except (ValueError, OverflowError):
- pass
- s = str(val).strip()
- for pattern in DATE_PATTERNS:
- if pattern.match(s):
- for fmt in ('%Y年%m月%d日', '%Y-%m-%d', '%Y/%m/%d',
- '%Y.%m.%d', '%Y年%m月', '%Y-%m',
- '%m-%d', '%m/%d', '%Y%m%d'):
- try:
- return datetime.strptime(s, fmt)
- except ValueError:
- continue
- return None
- # =====================================================================
- # VALUE-BASED CONTENT ANALYSIS
- # =====================================================================
- def _analyze_value_patterns(series: pd.Series, sample_count: int = 100) -> dict:
- """Analyze actual data values to detect patterns and content types."""
- non_null = series.dropna().astype(str).head(sample_count)
- if len(non_null) == 0:
- return {}
- patterns = {}
- # Check if values look like percentages
- # Only flag as percentage if: ends with % OR is a decimal fraction (0.0-1.0)
- pct_like = sum(1 for v in non_null if v.endswith('%') or
- (v.replace('.', '', 1).lstrip('-').isdigit() and
- 0 < float(v) <= 1 and not v.isdigit()))
- patterns['pct_ratio'] = pct_like / len(non_null)
- # Check for yes/no or true/false patterns
- yn_vals = {'是', '否', 'yes', 'no', 'true', 'false', 'y', 'n', 't', 'f',
- '有', '无', '0', '1'}
- yn_like = sum(1 for v in non_null if v.lower() in yn_vals)
- patterns['binary_ratio'] = yn_like / len(non_null)
- # Check for ordinal/categorical short text
- short_text = sum(1 for v in non_null if len(v) <= 20)
- patterns['short_text_ratio'] = short_text / len(non_null)
- # Check for phone-like patterns
- phone_like = sum(1 for v in non_null if PHONE_PATTERN.match(v))
- patterns['phone_ratio'] = phone_like / len(non_null)
- # Check for email-like patterns
- email_like = sum(1 for v in non_null if EMAIL_PATTERN.match(v))
- patterns['email_ratio'] = email_like / len(non_null)
- # Check for URL-like patterns
- url_like = sum(1 for v in non_null if URL_PATTERN.match(v))
- patterns['url_ratio'] = url_like / len(non_null)
- # Check for pure digit strings (possible IDs)
- digit_only = sum(1 for v in non_null if v.isdigit() and len(v) >= 6)
- patterns['digit_id_ratio'] = digit_only / len(non_null)
- # Check for year-like values
- year_like = sum(1 for v in non_null if YEAR_PATTERN.match(v))
- patterns['year_ratio'] = year_like / len(non_null)
- # Detect ordinal levels
- ordinal_sets = [
- {'高', '中', '低', 'A', 'B', 'C', '甲', '乙', '丙'},
- {'一级', '二级', '三级', '四级', 'level 1', 'level 2', 'level 3'},
- {'critical', 'major', 'minor', 'high', 'medium', 'low'},
- ]
- for oset in ordinal_sets:
- ord_like = sum(1 for v in non_null if v in oset)
- if ord_like / len(non_null) > 0.3:
- patterns['ordinal'] = True
- break
- else:
- patterns['ordinal'] = False
- # Average text length
- patterns['avg_text_len'] = round(non_null.str.len().mean(), 1)
- # Unique ratio
- unique_ratio = series.nunique() / max(len(non_null), 1)
- patterns['unique_ratio'] = round(unique_ratio, 4)
- return patterns
- def _infer_role_from_values(value_patterns: dict, col_name: str,
- dtype_str: str, unique_count: int, total_rows: int) -> str:
- """Infer column role based on value content analysis results."""
- up = value_patterns
- # High ratio of email patterns
- if up.get('email_ratio', 0) > 0.5:
- return 'id'
- # High ratio of phone patterns
- if up.get('phone_ratio', 0) > 0.5:
- return 'id'
- # High ratio of URL patterns
- if up.get('url_ratio', 0) > 0.5:
- return 'text'
- # Mostly binary values (yes/no)
- if up.get('binary_ratio', 0) > 0.6:
- return 'category'
- # Mostly percentage values
- if up.get('pct_ratio', 0) > 0.5:
- return 'numeric'
- # High ratio of digit-only long strings (likely IDs)
- if up.get('digit_id_ratio', 0) > 0.5 and unique_count > total_rows * 0.5:
- return 'id'
- # Ordinal level detected
- if up.get('ordinal', False):
- return 'category'
- return None # No clear signal from values
- # =====================================================================
- # SEMANTIC KEYWORD-BASED ROLE INFERENCE
- # =====================================================================
- def _infer_column_role(col_name: str, dtype_str: str, sample_values: list,
- null_rate: float, unique_count: int, total_rows: int,
- value_patterns: dict = None) -> ColumnRole:
- col_lower = col_name.lower().strip()
- # 1) Value-based inference first (stronger signal)
- if value_patterns:
- value_role = _infer_role_from_values(value_patterns, col_name,
- dtype_str, unique_count, total_rows)
- if value_role:
- return ColumnRole(value_role)
- # 2) Keyword-based inference (expanded)
- if any(kw in col_lower for kw in ID_KEYWORDS):
- return ColumnRole.ID
- if any(kw in col_lower for kw in TIME_KEYWORDS):
- return ColumnRole.TIME
- if any(kw in col_lower for kw in NUMERIC_KEYWORDS):
- return ColumnRole.NUMERIC
- if any(kw in col_lower for kw in CATEGORY_KEYWORDS):
- return ColumnRole.CATEGORY
- if any(kw in col_lower for kw in TEXT_KEYWORDS):
- return ColumnRole.TEXT
- # 3) dtype-based fallback
- if 'int' in dtype_str or 'float' in dtype_str:
- if unique_count <= 15 and total_rows > 20:
- return ColumnRole.CATEGORY
- return ColumnRole.NUMERIC
- if 'bool' in dtype_str:
- return ColumnRole.CATEGORY
- if 'datetime' in dtype_str:
- return ColumnRole.TIME
- # 4) Cardinality-based inference
- if total_rows > 0:
- cardinality_ratio = unique_count / total_rows
- if cardinality_ratio > 0.8 and unique_count > 20:
- return ColumnRole.TEXT
- if cardinality_ratio < 0.3 and unique_count <= 30:
- return ColumnRole.CATEGORY
- return ColumnRole.TEXT
- def _infer_metric_label(col_name: str, role: ColumnRole, value_patterns: dict = None) -> str:
- col_lower = col_name.lower().strip()
- # If values are percentage-like, mark as '比率'
- if value_patterns and value_patterns.get('pct_ratio', 0) > 0.5:
- for kw in ['率', '转化', '占比', '比例']:
- if kw in col_lower:
- return col_name
- return col_name + '(占比)'
- label_map = {
- '金额': '金额', '销售额': '销售额', '收入': '收入', '利润': '利润',
- '数量': '数量', '台数': '台数', '件数': '件数', '订单数': '订单数',
- '成本': '成本', '费用': '费用', '销量': '销量', '占比': '占比',
- '天数': '天数', '人数': '人数', '比率': '比率', '转化率': '转化率',
- '增长率': '增长率', '完成率': '完成率', '单价': '单价',
- '价格': '价格', '得分': '得分', '评分': '评分',
- }
- for kw, label in label_map.items():
- if kw in col_lower:
- return label
- # Check for rate-related keywords
- if any(kw in col_lower for kw in RATE_KEYWORDS):
- return '比率'
- if role == ColumnRole.NUMERIC:
- return col_name
- elif role == ColumnRole.TIME:
- return '日期'
- elif role == ColumnRole.CATEGORY:
- return col_name
- return col_name
- def _infer_unit(col_name: str, value_patterns: dict = None) -> str:
- col_lower = col_name.lower().strip()
- # If values are percentage-like
- if value_patterns and value_patterns.get('pct_ratio', 0) > 0.5:
- return '%'
- unit_map = {
- '金额': '元', '销售额': '元', '收入': '元', '利润': '元',
- '成本': '元', '费用': '元', '台数': '台', '件数': '件',
- '数量': '', '人数': '人', '天数': '天', '占比': '%',
- '比率': '%', '比例': '%', '率': '%', '转化率': '%',
- '增长率': '%', '完成率': '%', '单价': '元', '价格': '元',
- '得分': '分', '评分': '分',
- }
- for kw, unit in unit_map.items():
- if kw in col_lower:
- return unit
- return ''
- # =====================================================================
- # DISTRIBUTION SHAPE ANALYSIS
- # =====================================================================
- def _calc_distribution_shape(series: pd.Series) -> dict:
- """Compute skewness, kurtosis and distribution type for numeric series."""
- try:
- s = series.dropna()
- if len(s) < 4:
- return {}
- skew = round(float(s.skew()), 3)
- kurt = round(float(s.kurtosis()), 3)
- # Determine distribution type
- abs_skew = abs(skew)
- if abs_skew < 0.5:
- skew_type = '近似对称'
- elif abs_skew < 1.0:
- skew_type = '轻度偏态'
- else:
- skew_type = '显著偏态'
- if skew > 0.5:
- skew_dir = '右偏(长尾在右侧,大部分值偏小)'
- elif skew < -0.5:
- skew_dir = '左偏(长尾在左侧,大部分值偏大)'
- else:
- skew_dir = '基本对称'
- # Concentration analysis
- cv = float(s.std()) / float(s.mean()) if float(s.mean()) != 0 else 0
- if cv < 0.3:
- concentration = '高度集中'
- elif cv < 0.7:
- concentration = '中度集中'
- elif cv < 1.2:
- concentration = '适度分散'
- else:
- concentration = '高度分散'
- return {
- 'skewness': skew,
- 'kurtosis': kurt,
- 'skew_type': skew_type,
- 'skew_direction': skew_dir,
- 'cv': round(cv, 3),
- 'concentration': concentration,
- }
- except Exception:
- return {}
- # =====================================================================
- # DERIVED METRIC DETECTION
- # =====================================================================
- def _detect_derived_relations(df: pd.DataFrame, numeric_cols: list) -> list[dict]:
- """
- Detect potential derived relationships among numeric columns.
- E.g., A - B = C, A + B = C, A / B = C (approx.)
- """
- relations = []
- num_names = [c['column_name'] for c in numeric_cols]
- if len(num_names) < 3:
- return relations
- sample = df[num_names].dropna().head(500)
- for i, a_name in enumerate(num_names):
- for j, b_name in enumerate(num_names):
- if j <= i:
- continue
- a = sample[a_name]
- b = sample[b_name]
- # Check subtraction: a - b ≈ c or b - a ≈ c
- for diff_name in num_names:
- if diff_name in (a_name, b_name):
- continue
- d = sample[diff_name]
- diff_ab = (a - b - d).abs().mean()
- diff_ba = (b - a - d).abs().mean()
- threshold = max(d.mean(), 1) * 0.1
- if diff_ab < threshold:
- relations.append({
- 'type': 'subtraction',
- 'expression': f'{a_name} - {b_name} ≈ {diff_name}',
- 'accuracy': round(float(1 - diff_ab / max(float(d.mean()), 1)), 3),
- 'formula': f'{diff_name} = {a_name} - {b_name}',
- })
- break
- elif diff_ba < threshold:
- relations.append({
- 'type': 'subtraction',
- 'expression': f'{b_name} - {a_name} ≈ {diff_name}',
- 'accuracy': round(float(1 - diff_ba / max(float(d.mean()), 1)), 3),
- 'formula': f'{diff_name} = {b_name} - {a_name}',
- })
- break
- # Check addition: a + b ≈ c
- for sum_name in num_names:
- if sum_name in (a_name, b_name):
- continue
- s = sample[sum_name]
- sum_ab = (a + b - s).abs().mean()
- threshold = max(s.mean(), 1) * 0.1
- if sum_ab < threshold:
- relations.append({
- 'type': 'addition',
- 'expression': f'{a_name} + {b_name} ≈ {sum_name}',
- 'accuracy': round(float(1 - sum_ab / max(float(s.mean()), 1)), 3),
- 'formula': f'{sum_name} = {a_name} + {b_name}',
- })
- break
- # Also check for ratio relations
- if len(num_names) >= 2:
- for i, a_name in enumerate(num_names):
- for j, b_name in enumerate(num_names):
- if j <= i:
- continue
- a = sample[a_name]
- b = sample[b_name]
- ratio = (a / b.replace(0, np.nan)).dropna()
- if len(ratio) > 0:
- ratio_std = float(ratio.std())
- ratio_mean = float(ratio.mean())
- if ratio_mean > 0 and ratio_std / ratio_mean < 0.1:
- # Consistent ratio found
- relations.append({
- 'type': 'ratio',
- 'expression': f'{a_name} / {b_name} ≈ {ratio_mean:.3f}',
- 'accuracy': round(float(1 - ratio_std / ratio_mean), 3),
- 'formula': f'{a_name} = {b_name} × {ratio_mean:.2f}',
- })
- return relations
- # =====================================================================
- # MAIN PROFILING FUNCTION
- # =====================================================================
- def profile_dataframe(df: pd.DataFrame) -> dict:
- total_rows = len(df)
- columns = []
- for col in df.columns:
- series = df[col]
- dtype_str = str(series.dtype)
- null_count = int(series.isna().sum())
- null_rate = round(null_count / total_rows, 4) if total_rows else 0.0
- non_null = series.dropna()
- unique_count = int(non_null.nunique())
- sample_values = non_null.head(5).tolist()
- sample_values = [str(v) for v in sample_values]
- # Enhanced: Value pattern analysis
- value_patterns = _analyze_value_patterns(series)
- # Enhanced: Distribution shape analysis for numeric columns
- distribution_shape = None
- numeric_stats = None
- if pd.api.types.is_numeric_dtype(series) and not pd.api.types.is_bool_dtype(series):
- try:
- numeric_stats = {
- 'mean': round(float(series.mean()), 2) if not pd.isna(series.mean()) else 0,
- 'median': round(float(series.median()), 2) if not pd.isna(series.median()) else 0,
- 'min': round(float(series.min()), 2) if not pd.isna(series.min()) else 0,
- 'max': round(float(series.max()), 2) if not pd.isna(series.max()) else 0,
- 'std': round(float(series.std()), 2) if not pd.isna(series.std()) else 0,
- 'sum': round(float(series.sum()), 2) if not pd.isna(series.sum()) else 0,
- 'p25': round(float(series.quantile(0.25)), 2) if not pd.isna(series.quantile(0.25)) else 0,
- 'p75': round(float(series.quantile(0.75)), 2) if not pd.isna(series.quantile(0.75)) else 0,
- }
- distribution_shape = _calc_distribution_shape(series)
- except Exception:
- numeric_stats = None
- # Enhanced role inference with value patterns
- role = _infer_column_role(col, dtype_str, sample_values, null_rate,
- unique_count, total_rows, value_patterns)
- label = _infer_metric_label(col, role, value_patterns)
- unit = _infer_unit(col, value_patterns)
- # Enhanced: detect if column is a high-cardinality ID
- is_high_cardinality_id = (role == ColumnRole.TEXT and
- unique_count / max(total_rows, 1) > 0.8 and
- unique_count > 20)
- if is_high_cardinality_id:
- role = ColumnRole.ID
- columns.append(ColumnProfile(
- column_name=col,
- dtype=dtype_str,
- role=role,
- null_count=null_count,
- null_rate=null_rate,
- unique_count=unique_count,
- sample_values=sample_values,
- numeric_stats=numeric_stats,
- inferred_label=label,
- ))
- # Append extra metadata not in ColumnProfile
- columns[-1]._unit = unit
- columns[-1]._distribution_shape = distribution_shape
- columns[-1]._value_patterns = value_patterns
- time_cols = [c for c in columns if c.role == ColumnRole.TIME]
- numeric_cols = [c for c in columns if c.role == ColumnRole.NUMERIC]
- category_cols = [c for c in columns if c.role == ColumnRole.CATEGORY]
- text_cols = [c for c in columns if c.role == ColumnRole.TEXT]
- id_cols = [c for c in columns if c.role == ColumnRole.ID]
- # Date range inference
- date_range = (None, None)
- time_granularity = 'unknown'
- if time_cols:
- series = df[time_cols[0].column_name].dropna()
- parsed = series.apply(_parse_date).dropna()
- if len(parsed) > 0:
- date_range = (parsed.min(), parsed.max())
- if len(parsed) >= 2:
- diff = (parsed.max() - parsed.min()).days
- if diff <= 1:
- time_granularity = 'daily'
- elif diff <= 7:
- time_granularity = 'weekly'
- elif diff <= 31:
- time_granularity = 'monthly'
- elif diff <= 92:
- time_granularity = 'quarterly'
- else:
- time_granularity = 'yearly'
- # Enhanced: Detect derived relations among numeric columns
- derived_relations = _detect_derived_relations(df, [c.__dict__ for c in numeric_cols])
- # Enhanced: Multi-dimensional quality scoring
- quality_score, quality_details = _calc_quality_score(
- df, columns, numeric_cols, date_range
- )
- # Outlier detection (Enhanced: with CV-based filtering)
- outlier_columns = []
- for c in numeric_cols:
- ns = c.numeric_stats
- if ns and ns.get('std', 0) > 0 and ns.get('mean', 0) > 0:
- cv = ns['std'] / ns['mean']
- if cv > 3:
- outlier_columns.append(c.column_name)
- return {
- 'total_rows': total_rows,
- 'total_columns': len(columns),
- 'columns': [c.__dict__ for c in columns],
- 'time_columns': [c.__dict__ for c in time_cols],
- 'numeric_columns': [c.__dict__ for c in numeric_cols],
- 'category_columns': [c.__dict__ for c in category_cols],
- 'text_columns': [c.__dict__ for c in text_cols],
- 'id_columns': [c.__dict__ for c in id_cols],
- 'date_range': (
- date_range[0].strftime('%Y-%m-%d') if date_range[0] else None,
- date_range[1].strftime('%Y-%m-%d') if date_range[1] else None,
- ),
- 'time_granularity': time_granularity,
- 'data_quality': {
- 'score': quality_score,
- 'details': quality_details,
- 'high_null_columns': [c.column_name for c in columns if c.null_rate > 0.3],
- 'outlier_columns': outlier_columns,
- },
- 'derived_relations': derived_relations,
- 'column_stats': [{
- 'column_name': col.column_name,
- 'role': col.role.value,
- 'dtype': col.dtype,
- 'null_rate': col.null_rate,
- 'unique_count': col.unique_count,
- 'distribution_shape': getattr(col, '_distribution_shape', None),
- 'inferred_label': col.inferred_label,
- 'unit': getattr(col, '_unit', ''),
- 'numeric_stats': col.numeric_stats,
- } for col in columns],
- }
- # =====================================================================
- # ENHANCED QUALITY SCORING
- # =====================================================================
- def _calc_quality_score(df: pd.DataFrame, columns: list,
- numeric_cols: list, date_range: tuple) -> tuple:
- """Multi-dimensional quality scoring (0-100)."""
- score = 100
- details = {}
- # 1) Completeness (30%) — null rates
- avg_null_rate = np.mean([c.null_rate for c in columns]) if columns else 0
- completeness_penalty = min(30, avg_null_rate * 100 * 2)
- completeness = max(0, 30 - completeness_penalty)
- details['completeness'] = round(completeness, 1)
- # 2) Uniqueness (20%) — presence of ID columns or unique identifiers
- id_ratio = len([c for c in columns if c.role == ColumnRole.ID]) / max(len(columns), 1)
- uniqueness = min(20, 10 + id_ratio * 10)
- details['uniqueness'] = round(uniqueness, 1)
- # 3) Numeric health (25%) — outliers, zeros, negative values
- numeric_health = 25
- for c in numeric_cols:
- series = df[c.column_name].dropna()
- if len(series) == 0:
- continue
- # Check for negative values in non-negative expected columns
- if c.inferred_label in ('台数', '数量', '金额', '人数'):
- neg_ratio = (series < 0).sum() / len(series)
- if neg_ratio > 0.05:
- numeric_health -= 5
- # Check for excessive zeros
- zero_ratio = (series == 0).sum() / len(series)
- if zero_ratio > 0.5:
- numeric_health -= 3
- details['numeric_health'] = max(0, numeric_health)
- # 4) Temporal consistency (15%) — if time columns exist, check date ordering
- temporal = 15
- if date_range[0] and date_range[1]:
- if date_range[0] <= date_range[1]:
- temporal = 15
- else:
- temporal = 5
- details['temporal_consistency'] = temporal
- # 5) Completeness of categorical data (10%)
- cat_health = 10
- for c in columns:
- if c.role == ColumnRole.CATEGORY and c.null_rate > 0.2:
- cat_health -= 2
- details['categorical_health'] = max(0, cat_health)
- score = completeness + uniqueness + numeric_health + temporal + cat_health
- score = max(0, min(100, round(score)))
- return score, details
- # =====================================================================
- # HELPER FUNCTIONS (enhanced)
- # =====================================================================
- def profile_category_distribution(df: pd.DataFrame, col_name: str, top_n: int = 15) -> dict:
- if col_name not in df.columns:
- return {}
- counts = df[col_name].value_counts().head(top_n).to_dict()
- total = df[col_name].notna().sum()
- # Calculate concentration (Herfindahl index)
- pcts = [v / total for v in counts.values()] if total else []
- hhi = sum(p * p for p in pcts) if pcts else 0
- return {
- 'total_categories': df[col_name].nunique(),
- 'top_items': {str(k): {'count': int(v), 'pct': round(v / total * 100, 1) if total else 0}
- for k, v in counts.items()},
- 'concentration_hhi': round(hhi, 4),
- 'concentration_label': '高度集中' if hhi > 0.5 else '中度集中' if hhi > 0.2 else '分散',
- }
- def profile_numeric_series(df: pd.DataFrame, col_name: str) -> dict:
- if col_name not in df.columns:
- return {}
- series = df[col_name].dropna()
- if len(series) == 0:
- return {}
- shape = _calc_distribution_shape(series)
- result = {
- 'count': len(series),
- 'sum': round(float(series.sum()), 2),
- 'mean': round(float(series.mean()), 2),
- 'median': round(float(series.median()), 2),
- 'min': round(float(series.min()), 2),
- 'max': round(float(series.max()), 2),
- 'std': round(float(series.std()), 2),
- }
- if shape:
- result.update(shape)
- return result
- def detect_data_issues(df: pd.DataFrame) -> list[dict]:
- issues = []
- for col in df.columns:
- null_rate = df[col].isna().mean()
- if null_rate > 0.5:
- issues.append({
- 'column': col,
- 'type': 'high_missing',
- 'severity': 'major',
- 'message': f'列"{col}"缺失率{null_rate:.1%},建议排除或补全',
- })
- elif null_rate > 0.1:
- issues.append({
- 'column': col,
- 'type': 'moderate_missing',
- 'severity': 'minor',
- 'message': f'列"{col}"缺失率{null_rate:.1%}',
- })
- if pd.api.types.is_numeric_dtype(df[col]):
- series = df[col].dropna()
- if len(series) > 0:
- q1, q3 = series.quantile(0.25), series.quantile(0.75)
- iqr = q3 - q1
- lower, upper = q1 - 3 * iqr, q3 + 3 * iqr
- outlier_count = ((series < lower) | (series > upper)).sum()
- if outlier_count > len(series) * 0.1:
- issues.append({
- 'column': col,
- 'type': 'outliers',
- 'severity': 'major',
- 'message': f'列"{col}"存在{outlier_count}个异常值({outlier_count/len(series):.1%})',
- })
- # Check for negative values
- neg_count = (series < 0).sum()
- if neg_count > 0:
- issues.append({
- 'column': col,
- 'type': 'negative_values',
- 'severity': 'minor',
- 'message': f'列"{col}"存在{neg_count}个负值',
- })
- # Check for constant columns
- if df[col].nunique() <= 1 and null_rate < 1.0:
- issues.append({
- 'column': col,
- 'type': 'constant_column',
- 'severity': 'minor',
- 'message': f'列"{col}"为常量列(仅1个唯一值),对分析无贡献',
- })
- return issues
- def generate_summary_text(profile: dict) -> str:
- lines = []
- lines.append(f"共 {profile['total_rows']:,} 行 × {profile['total_columns']} 列")
- num_cols = profile.get('numeric_columns', [])
- cat_cols = profile.get('category_columns', [])
- time_cols = profile.get('time_columns', [])
- lines.append(f"数值列: {len(num_cols)} 个 | 分类列: {len(cat_cols)} 个 | 时间列: {len(time_cols)} 个")
- dr = profile.get('date_range', (None, None))
- if dr[0]:
- lines.append(f"时间范围: {dr[0]} ~ {dr[1]}")
- lines.append(f"时间粒度: {profile.get('time_granularity', 'unknown')}")
- q = profile.get('data_quality', {})
- lines.append(f"数据质量评分: {q.get('score', 0)}/100")
- if q.get('details'):
- det = q['details']
- lines.append(f" 完整性: {det.get('completeness', 0)}/30 | "
- f"数值健康: {det.get('numeric_health', 0)}/25 | "
- f"时间一致性: {det.get('temporal_consistency', 0)}/15")
- if q.get('high_null_columns'):
- lines.append(f"高缺失列: {', '.join(q['high_null_columns'])}")
- # Enhanced: derived relations
- derived = profile.get('derived_relations', [])
- if derived:
- lines.append(f"检测到 {len(derived)} 个数值关系:")
- for rel in derived[:5]:
- lines.append(f" {rel['formula']} (置信度: {rel['accuracy']:.0%})")
- # Distribution shape summary for numeric columns
- shape_cols = []
- for nc in num_cols[:3]:
- shape = nc.get('distribution_shape')
- if shape:
- shape_cols.append(f"{nc.get('inferred_label', nc['column_name'])}[{shape.get('concentration', 'N/A')}]")
- if shape_cols:
- lines.append(f"分布特征: {' | '.join(shape_cols)}")
- return '\n'.join(lines)
- if __name__ == '__main__':
- import sys
- if len(sys.argv) > 1:
- fp = sys.argv[1]
- try:
- df = pd.read_excel(fp)
- except Exception:
- df = pd.read_excel(fp, sheet_name=0)
- profile = profile_dataframe(df)
- print(generate_summary_text(profile))
- issues = detect_data_issues(df)
- if issues:
- print(f"\n数据问题 ({len(issues)}):")
- for iss in issues:
- print(f" [{iss['severity']}] {iss['message']}")
|