""" Universal data profiling engine: auto-detect schema, statistical features, and semantic inference from arbitrary Excel data. Enhanced with content-based value analysis, distribution shape analysis, derived metric detection, and multi-dimensional quality scoring. """ import pandas as pd import numpy as np from datetime import datetime, date from collections import Counter import re import math from report_config import ColumnProfile, ColumnRole # ===================================================================== # DATE PATTERNS — expanded for broader format coverage # ===================================================================== DATE_PATTERNS = [ re.compile(r'^\d{4}年\d{1,2}月\d{1,2}日$'), re.compile(r'^\d{4}-\d{2}-\d{2}$'), re.compile(r'^\d{4}/\d{1,2}/\d{1,2}$'), re.compile(r'^\d{4}\.\d{1,2}\.\d{1,2}$'), re.compile(r'^\d{4}年\d{1,2}月$'), re.compile(r'^\d{4}-\d{2}$'), re.compile(r'^\d{2}-\d{2}$'), re.compile(r'^\d{2}/\d{2}$'), re.compile(r'^\d{8}$'), # YYYYMMDD ] TIME_KEYWORDS = [ '日期', '时间', 'date', 'time', '年', '月', '日', '周', '期', '季度', 'period', 'month', 'year', 'quarter', 'week', 'day', 'timestamp', '月份', '年份', '周期', '时段', 'time', 'datetime', 'date', 'created', 'updated', 'modified', '发生', '录入', '创建', ] NUMERIC_KEYWORDS = [ '金额', '数量', '台数', '件数', '元', '价格', '收入', '支出', '利润', '成本', '费用', '销量', '销售额', '总数', '合计', 'amount', 'price', 'qty', 'quantity', 'revenue', 'cost', 'sales', 'volume', 'value', 'total', 'sum', 'count', '单数', '笔数', '人数', '天数', '比率', '占比', '比例', '率', '预算', 'budget', '花费', 'spend', 'fee', '金额', '单价', 'unit', '得分', 'score', 'rating', '评分', ] CATEGORY_KEYWORDS = [ '国家', '区域', '地区', '城市', '省份', '状态', '类型', '类别', '分类', '部门', '组', '等级', '级别', '品牌', '渠道', 'country', 'region', 'city', 'status', 'type', 'category', 'department', 'group', 'level', 'brand', 'channel', '负责人', 'owner', 'manager', '产品', 'product', '阶段', '供应商', 'supplier', '客户', 'customer', '行业', 'industry', '性别', 'gender', '职位', 'title', '角色', 'role', '标签', 'tag', '科目', 'account', '方向', 'direction', '方式', 'method', '意向', 'intent', 'intention', 'priority', '优先级', ] ID_KEYWORDS = [ 'id', '编号', '序号', '代码', 'code', 'no', '编码', '合同号', '订单号', '工单号', '流水号', '单号', '标识', 'key', 'uuid', 'guid', 'sn', '序列号', '身份证', 'phone', '手机', '邮箱', 'email', '电话', 'tel', 'mobile', ] TEXT_KEYWORDS = [ '备注', '描述', '说明', '详情', '内容', '意见', '建议', '进度更新', 'note', 'description', 'detail', 'remark', 'comment', 'memo', '地址', 'address', '介绍', '摘要', 'summary', '附注', '反馈', 'feedback', '理由', 'reason', '原因', 'cause', ] RATE_KEYWORDS = [ '率', 'ratio', 'rate', '占比', '比例', 'percentage', 'pct', 'percent', 'conversion', '转化率', '完成率', '增长率', ] # Patterns for value-based content detection PHONE_PATTERN = re.compile(r'^[\+]?[\d\-\(\)\s]{6,20}$') EMAIL_PATTERN = re.compile(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$') URL_PATTERN = re.compile(r'^https?://', re.IGNORECASE) YEAR_PATTERN = re.compile(r'^\d{4}$') def _parse_date(val): if pd.isna(val): return None if isinstance(val, (datetime, date)): return val if isinstance(val, (int, float)) and not math.isnan(val): try: return pd.Timestamp(val).to_pydatetime() except (ValueError, OverflowError): pass s = str(val).strip() for pattern in DATE_PATTERNS: if pattern.match(s): for fmt in ('%Y年%m月%d日', '%Y-%m-%d', '%Y/%m/%d', '%Y.%m.%d', '%Y年%m月', '%Y-%m', '%m-%d', '%m/%d', '%Y%m%d'): try: return datetime.strptime(s, fmt) except ValueError: continue return None # ===================================================================== # VALUE-BASED CONTENT ANALYSIS # ===================================================================== def _analyze_value_patterns(series: pd.Series, sample_count: int = 100) -> dict: """Analyze actual data values to detect patterns and content types.""" non_null = series.dropna().astype(str).head(sample_count) if len(non_null) == 0: return {} patterns = {} # Check if values look like percentages # Only flag as percentage if: ends with % OR is a decimal fraction (0.0-1.0) pct_like = sum(1 for v in non_null if v.endswith('%') or (v.replace('.', '', 1).lstrip('-').isdigit() and 0 < float(v) <= 1 and not v.isdigit())) patterns['pct_ratio'] = pct_like / len(non_null) # Check for yes/no or true/false patterns yn_vals = {'是', '否', 'yes', 'no', 'true', 'false', 'y', 'n', 't', 'f', '有', '无', '0', '1'} yn_like = sum(1 for v in non_null if v.lower() in yn_vals) patterns['binary_ratio'] = yn_like / len(non_null) # Check for ordinal/categorical short text short_text = sum(1 for v in non_null if len(v) <= 20) patterns['short_text_ratio'] = short_text / len(non_null) # Check for phone-like patterns phone_like = sum(1 for v in non_null if PHONE_PATTERN.match(v)) patterns['phone_ratio'] = phone_like / len(non_null) # Check for email-like patterns email_like = sum(1 for v in non_null if EMAIL_PATTERN.match(v)) patterns['email_ratio'] = email_like / len(non_null) # Check for URL-like patterns url_like = sum(1 for v in non_null if URL_PATTERN.match(v)) patterns['url_ratio'] = url_like / len(non_null) # Check for pure digit strings (possible IDs) digit_only = sum(1 for v in non_null if v.isdigit() and len(v) >= 6) patterns['digit_id_ratio'] = digit_only / len(non_null) # Check for year-like values year_like = sum(1 for v in non_null if YEAR_PATTERN.match(v)) patterns['year_ratio'] = year_like / len(non_null) # Detect ordinal levels ordinal_sets = [ {'高', '中', '低', 'A', 'B', 'C', '甲', '乙', '丙'}, {'一级', '二级', '三级', '四级', 'level 1', 'level 2', 'level 3'}, {'critical', 'major', 'minor', 'high', 'medium', 'low'}, ] for oset in ordinal_sets: ord_like = sum(1 for v in non_null if v in oset) if ord_like / len(non_null) > 0.3: patterns['ordinal'] = True break else: patterns['ordinal'] = False # Average text length patterns['avg_text_len'] = round(non_null.str.len().mean(), 1) # Unique ratio unique_ratio = series.nunique() / max(len(non_null), 1) patterns['unique_ratio'] = round(unique_ratio, 4) return patterns def _infer_role_from_values(value_patterns: dict, col_name: str, dtype_str: str, unique_count: int, total_rows: int) -> str: """Infer column role based on value content analysis results.""" up = value_patterns # High ratio of email patterns if up.get('email_ratio', 0) > 0.5: return 'id' # High ratio of phone patterns if up.get('phone_ratio', 0) > 0.5: return 'id' # High ratio of URL patterns if up.get('url_ratio', 0) > 0.5: return 'text' # Mostly binary values (yes/no) if up.get('binary_ratio', 0) > 0.6: return 'category' # Mostly percentage values if up.get('pct_ratio', 0) > 0.5: return 'numeric' # High ratio of digit-only long strings (likely IDs) if up.get('digit_id_ratio', 0) > 0.5 and unique_count > total_rows * 0.5: return 'id' # Ordinal level detected if up.get('ordinal', False): return 'category' return None # No clear signal from values # ===================================================================== # SEMANTIC KEYWORD-BASED ROLE INFERENCE # ===================================================================== def _infer_column_role(col_name: str, dtype_str: str, sample_values: list, null_rate: float, unique_count: int, total_rows: int, value_patterns: dict = None) -> ColumnRole: col_lower = col_name.lower().strip() # 1) Value-based inference first (stronger signal) if value_patterns: value_role = _infer_role_from_values(value_patterns, col_name, dtype_str, unique_count, total_rows) if value_role: return ColumnRole(value_role) # 2) Keyword-based inference (expanded) if any(kw in col_lower for kw in ID_KEYWORDS): return ColumnRole.ID if any(kw in col_lower for kw in TIME_KEYWORDS): return ColumnRole.TIME if any(kw in col_lower for kw in NUMERIC_KEYWORDS): return ColumnRole.NUMERIC if any(kw in col_lower for kw in CATEGORY_KEYWORDS): return ColumnRole.CATEGORY if any(kw in col_lower for kw in TEXT_KEYWORDS): return ColumnRole.TEXT # 3) dtype-based fallback if 'int' in dtype_str or 'float' in dtype_str: if unique_count <= 15 and total_rows > 20: return ColumnRole.CATEGORY return ColumnRole.NUMERIC if 'bool' in dtype_str: return ColumnRole.CATEGORY if 'datetime' in dtype_str: return ColumnRole.TIME # 4) Cardinality-based inference if total_rows > 0: cardinality_ratio = unique_count / total_rows if cardinality_ratio > 0.8 and unique_count > 20: return ColumnRole.TEXT if cardinality_ratio < 0.3 and unique_count <= 30: return ColumnRole.CATEGORY return ColumnRole.TEXT def _infer_metric_label(col_name: str, role: ColumnRole, value_patterns: dict = None) -> str: col_lower = col_name.lower().strip() # If values are percentage-like, mark as '比率' if value_patterns and value_patterns.get('pct_ratio', 0) > 0.5: for kw in ['率', '转化', '占比', '比例']: if kw in col_lower: return col_name return col_name + '(占比)' label_map = { '金额': '金额', '销售额': '销售额', '收入': '收入', '利润': '利润', '数量': '数量', '台数': '台数', '件数': '件数', '订单数': '订单数', '成本': '成本', '费用': '费用', '销量': '销量', '占比': '占比', '天数': '天数', '人数': '人数', '比率': '比率', '转化率': '转化率', '增长率': '增长率', '完成率': '完成率', '单价': '单价', '价格': '价格', '得分': '得分', '评分': '评分', } for kw, label in label_map.items(): if kw in col_lower: return label # Check for rate-related keywords if any(kw in col_lower for kw in RATE_KEYWORDS): return '比率' if role == ColumnRole.NUMERIC: return col_name elif role == ColumnRole.TIME: return '日期' elif role == ColumnRole.CATEGORY: return col_name return col_name def _infer_unit(col_name: str, value_patterns: dict = None) -> str: col_lower = col_name.lower().strip() # If values are percentage-like if value_patterns and value_patterns.get('pct_ratio', 0) > 0.5: return '%' unit_map = { '金额': '元', '销售额': '元', '收入': '元', '利润': '元', '成本': '元', '费用': '元', '台数': '台', '件数': '件', '数量': '', '人数': '人', '天数': '天', '占比': '%', '比率': '%', '比例': '%', '率': '%', '转化率': '%', '增长率': '%', '完成率': '%', '单价': '元', '价格': '元', '得分': '分', '评分': '分', } for kw, unit in unit_map.items(): if kw in col_lower: return unit return '' # ===================================================================== # DISTRIBUTION SHAPE ANALYSIS # ===================================================================== def _calc_distribution_shape(series: pd.Series) -> dict: """Compute skewness, kurtosis and distribution type for numeric series.""" try: s = series.dropna() if len(s) < 4: return {} skew = round(float(s.skew()), 3) kurt = round(float(s.kurtosis()), 3) # Determine distribution type abs_skew = abs(skew) if abs_skew < 0.5: skew_type = '近似对称' elif abs_skew < 1.0: skew_type = '轻度偏态' else: skew_type = '显著偏态' if skew > 0.5: skew_dir = '右偏(长尾在右侧,大部分值偏小)' elif skew < -0.5: skew_dir = '左偏(长尾在左侧,大部分值偏大)' else: skew_dir = '基本对称' # Concentration analysis cv = float(s.std()) / float(s.mean()) if float(s.mean()) != 0 else 0 if cv < 0.3: concentration = '高度集中' elif cv < 0.7: concentration = '中度集中' elif cv < 1.2: concentration = '适度分散' else: concentration = '高度分散' return { 'skewness': skew, 'kurtosis': kurt, 'skew_type': skew_type, 'skew_direction': skew_dir, 'cv': round(cv, 3), 'concentration': concentration, } except Exception: return {} # ===================================================================== # DERIVED METRIC DETECTION # ===================================================================== def _detect_derived_relations(df: pd.DataFrame, numeric_cols: list) -> list[dict]: """ Detect potential derived relationships among numeric columns. E.g., A - B = C, A + B = C, A / B = C (approx.) """ relations = [] num_names = [c['column_name'] for c in numeric_cols] if len(num_names) < 3: return relations sample = df[num_names].dropna().head(500) for i, a_name in enumerate(num_names): for j, b_name in enumerate(num_names): if j <= i: continue a = sample[a_name] b = sample[b_name] # Check subtraction: a - b ≈ c or b - a ≈ c for diff_name in num_names: if diff_name in (a_name, b_name): continue d = sample[diff_name] diff_ab = (a - b - d).abs().mean() diff_ba = (b - a - d).abs().mean() threshold = max(d.mean(), 1) * 0.1 if diff_ab < threshold: relations.append({ 'type': 'subtraction', 'expression': f'{a_name} - {b_name} ≈ {diff_name}', 'accuracy': round(float(1 - diff_ab / max(float(d.mean()), 1)), 3), 'formula': f'{diff_name} = {a_name} - {b_name}', }) break elif diff_ba < threshold: relations.append({ 'type': 'subtraction', 'expression': f'{b_name} - {a_name} ≈ {diff_name}', 'accuracy': round(float(1 - diff_ba / max(float(d.mean()), 1)), 3), 'formula': f'{diff_name} = {b_name} - {a_name}', }) break # Check addition: a + b ≈ c for sum_name in num_names: if sum_name in (a_name, b_name): continue s = sample[sum_name] sum_ab = (a + b - s).abs().mean() threshold = max(s.mean(), 1) * 0.1 if sum_ab < threshold: relations.append({ 'type': 'addition', 'expression': f'{a_name} + {b_name} ≈ {sum_name}', 'accuracy': round(float(1 - sum_ab / max(float(s.mean()), 1)), 3), 'formula': f'{sum_name} = {a_name} + {b_name}', }) break # Also check for ratio relations if len(num_names) >= 2: for i, a_name in enumerate(num_names): for j, b_name in enumerate(num_names): if j <= i: continue a = sample[a_name] b = sample[b_name] ratio = (a / b.replace(0, np.nan)).dropna() if len(ratio) > 0: ratio_std = float(ratio.std()) ratio_mean = float(ratio.mean()) if ratio_mean > 0 and ratio_std / ratio_mean < 0.1: # Consistent ratio found relations.append({ 'type': 'ratio', 'expression': f'{a_name} / {b_name} ≈ {ratio_mean:.3f}', 'accuracy': round(float(1 - ratio_std / ratio_mean), 3), 'formula': f'{a_name} = {b_name} × {ratio_mean:.2f}', }) return relations # ===================================================================== # MAIN PROFILING FUNCTION # ===================================================================== def profile_dataframe(df: pd.DataFrame) -> dict: total_rows = len(df) columns = [] for col in df.columns: series = df[col] dtype_str = str(series.dtype) null_count = int(series.isna().sum()) null_rate = round(null_count / total_rows, 4) if total_rows else 0.0 non_null = series.dropna() unique_count = int(non_null.nunique()) sample_values = non_null.head(5).tolist() sample_values = [str(v) for v in sample_values] # Enhanced: Value pattern analysis value_patterns = _analyze_value_patterns(series) # Enhanced: Distribution shape analysis for numeric columns distribution_shape = None numeric_stats = None if pd.api.types.is_numeric_dtype(series) and not pd.api.types.is_bool_dtype(series): try: numeric_stats = { 'mean': round(float(series.mean()), 2) if not pd.isna(series.mean()) else 0, 'median': round(float(series.median()), 2) if not pd.isna(series.median()) else 0, 'min': round(float(series.min()), 2) if not pd.isna(series.min()) else 0, 'max': round(float(series.max()), 2) if not pd.isna(series.max()) else 0, 'std': round(float(series.std()), 2) if not pd.isna(series.std()) else 0, 'sum': round(float(series.sum()), 2) if not pd.isna(series.sum()) else 0, 'p25': round(float(series.quantile(0.25)), 2) if not pd.isna(series.quantile(0.25)) else 0, 'p75': round(float(series.quantile(0.75)), 2) if not pd.isna(series.quantile(0.75)) else 0, } distribution_shape = _calc_distribution_shape(series) except Exception: numeric_stats = None # Enhanced role inference with value patterns role = _infer_column_role(col, dtype_str, sample_values, null_rate, unique_count, total_rows, value_patterns) label = _infer_metric_label(col, role, value_patterns) unit = _infer_unit(col, value_patterns) # Enhanced: detect if column is a high-cardinality ID is_high_cardinality_id = (role == ColumnRole.TEXT and unique_count / max(total_rows, 1) > 0.8 and unique_count > 20) if is_high_cardinality_id: role = ColumnRole.ID columns.append(ColumnProfile( column_name=col, dtype=dtype_str, role=role, null_count=null_count, null_rate=null_rate, unique_count=unique_count, sample_values=sample_values, numeric_stats=numeric_stats, inferred_label=label, )) # Append extra metadata not in ColumnProfile columns[-1]._unit = unit columns[-1]._distribution_shape = distribution_shape columns[-1]._value_patterns = value_patterns time_cols = [c for c in columns if c.role == ColumnRole.TIME] numeric_cols = [c for c in columns if c.role == ColumnRole.NUMERIC] category_cols = [c for c in columns if c.role == ColumnRole.CATEGORY] text_cols = [c for c in columns if c.role == ColumnRole.TEXT] id_cols = [c for c in columns if c.role == ColumnRole.ID] # Date range inference date_range = (None, None) time_granularity = 'unknown' if time_cols: series = df[time_cols[0].column_name].dropna() parsed = series.apply(_parse_date).dropna() if len(parsed) > 0: date_range = (parsed.min(), parsed.max()) if len(parsed) >= 2: diff = (parsed.max() - parsed.min()).days if diff <= 1: time_granularity = 'daily' elif diff <= 7: time_granularity = 'weekly' elif diff <= 31: time_granularity = 'monthly' elif diff <= 92: time_granularity = 'quarterly' else: time_granularity = 'yearly' # Enhanced: Detect derived relations among numeric columns derived_relations = _detect_derived_relations(df, [c.__dict__ for c in numeric_cols]) # Enhanced: Multi-dimensional quality scoring quality_score, quality_details = _calc_quality_score( df, columns, numeric_cols, date_range ) # Outlier detection (Enhanced: with CV-based filtering) outlier_columns = [] for c in numeric_cols: ns = c.numeric_stats if ns and ns.get('std', 0) > 0 and ns.get('mean', 0) > 0: cv = ns['std'] / ns['mean'] if cv > 3: outlier_columns.append(c.column_name) return { 'total_rows': total_rows, 'total_columns': len(columns), 'columns': [c.__dict__ for c in columns], 'time_columns': [c.__dict__ for c in time_cols], 'numeric_columns': [c.__dict__ for c in numeric_cols], 'category_columns': [c.__dict__ for c in category_cols], 'text_columns': [c.__dict__ for c in text_cols], 'id_columns': [c.__dict__ for c in id_cols], 'date_range': ( date_range[0].strftime('%Y-%m-%d') if date_range[0] else None, date_range[1].strftime('%Y-%m-%d') if date_range[1] else None, ), 'time_granularity': time_granularity, 'data_quality': { 'score': quality_score, 'details': quality_details, 'high_null_columns': [c.column_name for c in columns if c.null_rate > 0.3], 'outlier_columns': outlier_columns, }, 'derived_relations': derived_relations, 'column_stats': [{ 'column_name': col.column_name, 'role': col.role.value, 'dtype': col.dtype, 'null_rate': col.null_rate, 'unique_count': col.unique_count, 'distribution_shape': getattr(col, '_distribution_shape', None), 'inferred_label': col.inferred_label, 'unit': getattr(col, '_unit', ''), 'numeric_stats': col.numeric_stats, } for col in columns], } # ===================================================================== # ENHANCED QUALITY SCORING # ===================================================================== def _calc_quality_score(df: pd.DataFrame, columns: list, numeric_cols: list, date_range: tuple) -> tuple: """Multi-dimensional quality scoring (0-100).""" score = 100 details = {} # 1) Completeness (30%) — null rates avg_null_rate = np.mean([c.null_rate for c in columns]) if columns else 0 completeness_penalty = min(30, avg_null_rate * 100 * 2) completeness = max(0, 30 - completeness_penalty) details['completeness'] = round(completeness, 1) # 2) Uniqueness (20%) — presence of ID columns or unique identifiers id_ratio = len([c for c in columns if c.role == ColumnRole.ID]) / max(len(columns), 1) uniqueness = min(20, 10 + id_ratio * 10) details['uniqueness'] = round(uniqueness, 1) # 3) Numeric health (25%) — outliers, zeros, negative values numeric_health = 25 for c in numeric_cols: series = df[c.column_name].dropna() if len(series) == 0: continue # Check for negative values in non-negative expected columns if c.inferred_label in ('台数', '数量', '金额', '人数'): neg_ratio = (series < 0).sum() / len(series) if neg_ratio > 0.05: numeric_health -= 5 # Check for excessive zeros zero_ratio = (series == 0).sum() / len(series) if zero_ratio > 0.5: numeric_health -= 3 details['numeric_health'] = max(0, numeric_health) # 4) Temporal consistency (15%) — if time columns exist, check date ordering temporal = 15 if date_range[0] and date_range[1]: if date_range[0] <= date_range[1]: temporal = 15 else: temporal = 5 details['temporal_consistency'] = temporal # 5) Completeness of categorical data (10%) cat_health = 10 for c in columns: if c.role == ColumnRole.CATEGORY and c.null_rate > 0.2: cat_health -= 2 details['categorical_health'] = max(0, cat_health) score = completeness + uniqueness + numeric_health + temporal + cat_health score = max(0, min(100, round(score))) return score, details # ===================================================================== # HELPER FUNCTIONS (enhanced) # ===================================================================== def profile_category_distribution(df: pd.DataFrame, col_name: str, top_n: int = 15) -> dict: if col_name not in df.columns: return {} counts = df[col_name].value_counts().head(top_n).to_dict() total = df[col_name].notna().sum() # Calculate concentration (Herfindahl index) pcts = [v / total for v in counts.values()] if total else [] hhi = sum(p * p for p in pcts) if pcts else 0 return { 'total_categories': df[col_name].nunique(), 'top_items': {str(k): {'count': int(v), 'pct': round(v / total * 100, 1) if total else 0} for k, v in counts.items()}, 'concentration_hhi': round(hhi, 4), 'concentration_label': '高度集中' if hhi > 0.5 else '中度集中' if hhi > 0.2 else '分散', } def profile_numeric_series(df: pd.DataFrame, col_name: str) -> dict: if col_name not in df.columns: return {} series = df[col_name].dropna() if len(series) == 0: return {} shape = _calc_distribution_shape(series) result = { 'count': len(series), 'sum': round(float(series.sum()), 2), 'mean': round(float(series.mean()), 2), 'median': round(float(series.median()), 2), 'min': round(float(series.min()), 2), 'max': round(float(series.max()), 2), 'std': round(float(series.std()), 2), } if shape: result.update(shape) return result def detect_data_issues(df: pd.DataFrame) -> list[dict]: issues = [] for col in df.columns: null_rate = df[col].isna().mean() if null_rate > 0.5: issues.append({ 'column': col, 'type': 'high_missing', 'severity': 'major', 'message': f'列"{col}"缺失率{null_rate:.1%},建议排除或补全', }) elif null_rate > 0.1: issues.append({ 'column': col, 'type': 'moderate_missing', 'severity': 'minor', 'message': f'列"{col}"缺失率{null_rate:.1%}', }) if pd.api.types.is_numeric_dtype(df[col]): series = df[col].dropna() if len(series) > 0: q1, q3 = series.quantile(0.25), series.quantile(0.75) iqr = q3 - q1 lower, upper = q1 - 3 * iqr, q3 + 3 * iqr outlier_count = ((series < lower) | (series > upper)).sum() if outlier_count > len(series) * 0.1: issues.append({ 'column': col, 'type': 'outliers', 'severity': 'major', 'message': f'列"{col}"存在{outlier_count}个异常值({outlier_count/len(series):.1%})', }) # Check for negative values neg_count = (series < 0).sum() if neg_count > 0: issues.append({ 'column': col, 'type': 'negative_values', 'severity': 'minor', 'message': f'列"{col}"存在{neg_count}个负值', }) # Check for constant columns if df[col].nunique() <= 1 and null_rate < 1.0: issues.append({ 'column': col, 'type': 'constant_column', 'severity': 'minor', 'message': f'列"{col}"为常量列(仅1个唯一值),对分析无贡献', }) return issues def generate_summary_text(profile: dict) -> str: lines = [] lines.append(f"共 {profile['total_rows']:,} 行 × {profile['total_columns']} 列") num_cols = profile.get('numeric_columns', []) cat_cols = profile.get('category_columns', []) time_cols = profile.get('time_columns', []) lines.append(f"数值列: {len(num_cols)} 个 | 分类列: {len(cat_cols)} 个 | 时间列: {len(time_cols)} 个") dr = profile.get('date_range', (None, None)) if dr[0]: lines.append(f"时间范围: {dr[0]} ~ {dr[1]}") lines.append(f"时间粒度: {profile.get('time_granularity', 'unknown')}") q = profile.get('data_quality', {}) lines.append(f"数据质量评分: {q.get('score', 0)}/100") if q.get('details'): det = q['details'] lines.append(f" 完整性: {det.get('completeness', 0)}/30 | " f"数值健康: {det.get('numeric_health', 0)}/25 | " f"时间一致性: {det.get('temporal_consistency', 0)}/15") if q.get('high_null_columns'): lines.append(f"高缺失列: {', '.join(q['high_null_columns'])}") # Enhanced: derived relations derived = profile.get('derived_relations', []) if derived: lines.append(f"检测到 {len(derived)} 个数值关系:") for rel in derived[:5]: lines.append(f" {rel['formula']} (置信度: {rel['accuracy']:.0%})") # Distribution shape summary for numeric columns shape_cols = [] for nc in num_cols[:3]: shape = nc.get('distribution_shape') if shape: shape_cols.append(f"{nc.get('inferred_label', nc['column_name'])}[{shape.get('concentration', 'N/A')}]") if shape_cols: lines.append(f"分布特征: {' | '.join(shape_cols)}") return '\n'.join(lines) if __name__ == '__main__': import sys if len(sys.argv) > 1: fp = sys.argv[1] try: df = pd.read_excel(fp) except Exception: df = pd.read_excel(fp, sheet_name=0) profile = profile_dataframe(df) print(generate_summary_text(profile)) issues = detect_data_issues(df) if issues: print(f"\n数据问题 ({len(issues)}):") for iss in issues: print(f" [{iss['severity']}] {iss['message']}")