| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253 |
- """
- Metrics calculation engine for the universal data report generator.
- Provides generic metric computation, trend analysis, distribution, ranking, and insights.
- """
- import pandas as pd
- def _pct_change(curr, prev):
- """Calculate percentage change. Returns None if previous base is 0 or None."""
- if prev is None or prev == 0:
- return None
- return round((curr - prev) / prev * 100, 1)
- def _safe_div(a, b):
- return round(a / b, 1) if b else 0
- # ==============================================================================
- # HELPER FUNCTIONS
- # ==============================================================================
- def _fmt_pct(val):
- if val is None:
- return '—'
- sign = '+' if val >= 0 else ''
- return f'{sign}{val:.1f}%'
- def _fmt_chg_dir(val):
- if val is None:
- return ''
- return '增加' if val >= 0 else '减少'
- def avg(lst):
- return sum(lst) / len(lst) if lst else 0
- # ==============================================================================
- # GENERIC METRICS
- # ==============================================================================
- def calc_generic_metrics(df: pd.DataFrame, config) -> dict:
- metrics = {}
- for metric_def in config.metrics:
- col = metric_def.column
- if col not in df.columns:
- metrics[metric_def.name] = 0
- continue
- series = df[col].dropna()
- agg = metric_def.aggregation
- if not pd.api.types.is_numeric_dtype(series):
- coerced = pd.to_numeric(series, errors='coerce').dropna()
- if len(coerced) > 0:
- series = coerced
- if agg == 'sum':
- val = int(series.sum()) if pd.api.types.is_numeric_dtype(series) else len(series)
- elif agg == 'count':
- val = int(series.count())
- elif agg == 'avg':
- val = round(float(series.mean()), 1) if pd.api.types.is_numeric_dtype(series) else 0
- elif agg == 'max':
- val = round(float(series.max()), 1) if pd.api.types.is_numeric_dtype(series) else 0
- elif agg == 'min':
- val = round(float(series.min()), 1) if pd.api.types.is_numeric_dtype(series) else 0
- elif agg == 'distinct_count':
- val = int(series.nunique())
- else:
- val = len(series)
- metrics[metric_def.name] = val
- metrics[f'{metric_def.name}_label'] = metric_def.label
- metrics[f'{metric_def.name}_unit'] = metric_def.unit
- if hasattr(config, 'comparison') and config.comparison:
- pass
- return metrics
- def calc_generic_trend(df: pd.DataFrame, time_col: str, metric_col: str,
- aggregation: str = 'sum') -> dict:
- if time_col not in df.columns or metric_col not in df.columns:
- return {}
- if aggregation == 'sum':
- trend = df.groupby(time_col)[metric_col].sum().sort_index()
- elif aggregation == 'count':
- trend = df.groupby(time_col)[metric_col].count().sort_index()
- else:
- trend = df.groupby(time_col)[metric_col].mean().sort_index()
- dates = []
- for d in trend.index:
- try:
- dates.append(pd.Timestamp(d).strftime('%m/%d'))
- except Exception:
- dates.append(str(d))
- return {
- 'dates': dates,
- 'values': [int(v) if aggregation != 'avg' else round(float(v), 1) for v in trend.values],
- }
- def calc_generic_distribution(df: pd.DataFrame, cat_col: str, metric_col: str = None,
- aggregation: str = 'sum', top_n: int = 10) -> dict:
- if cat_col not in df.columns:
- return {}
- if metric_col and metric_col in df.columns:
- if aggregation == 'sum':
- dist = df.groupby(cat_col)[metric_col].sum().sort_values(ascending=False).head(top_n)
- elif aggregation == 'count':
- dist = df.groupby(cat_col)[metric_col].count().sort_values(ascending=False).head(top_n)
- else:
- dist = df.groupby(cat_col)[metric_col].mean().sort_values(ascending=False).head(top_n)
- else:
- dist = df[cat_col].value_counts().head(top_n)
- total = sum(dist.values)
- return {
- 'categories': [str(k) for k in dist.index],
- 'values': [int(v) for v in dist.values],
- 'percentages': [round(v / total * 100, 1) if total else 0 for v in dist.values],
- }
- def calc_generic_ranking(df: pd.DataFrame, rank_col: str, metric_col: str,
- aggregation: str = 'sum', top_n: int = 15) -> list[dict]:
- if rank_col not in df.columns or metric_col not in df.columns:
- return []
- if aggregation == 'sum':
- ranked = df.groupby(rank_col)[metric_col].sum().sort_values(ascending=False).head(top_n)
- elif aggregation == 'count':
- ranked = df.groupby(rank_col)[metric_col].count().sort_values(ascending=False).head(top_n)
- else:
- ranked = df.groupby(rank_col)[metric_col].mean().sort_values(ascending=False).head(top_n)
- return [{'name': str(k), 'value': int(v), 'rank': i + 1}
- for i, (k, v) in enumerate(ranked.items())]
- def generate_generic_insights(data_profile: dict, metrics: dict) -> list[dict]:
- items = []
- num_cols = data_profile.get('numeric_columns', [])
- cat_cols = data_profile.get('category_columns', [])
- time_cols = data_profile.get('time_columns', [])
- q = data_profile.get('data_quality', {})
- score = q.get('score', 100)
- if metrics:
- metric_details = []
- for k, v in metrics.items():
- if isinstance(v, (int, float)):
- metric_details.append(f'{k}: {v:,.0f}')
- if metric_details:
- items.append({
- 'title': '核心指标总览',
- 'content': f'本期关键指标:{";".join(metric_details[:6])}。'
- f'综合来看,业务运行态势可通过这些核心数据进行量化评估,'
- f'建议结合业务目标与实际值的差距进行针对性分析。',
- })
- if num_cols:
- for nc in num_cols[:2]:
- ns = nc.get('numeric_stats', {}) or {}
- col_name = nc.get('inferred_label', nc['column_name'])
- stats_parts = []
- if 'sum' in ns and ns['sum']:
- stats_parts.append(f'总量 {ns["sum"]:,.0f}')
- if 'mean' in ns and ns['mean']:
- stats_parts.append(f'均值 {ns["mean"]:,.1f}')
- if 'max' in ns and ns['max']:
- stats_parts.append(f'峰值 {ns["max"]:,.0f}')
- if 'min' in ns and ns['min']:
- stats_parts.append(f'最低 {ns["min"]:,.0f}')
- if stats_parts:
- items.append({
- 'title': f'{col_name}数据特征',
- 'content': f'指标"{col_name}"的统计特征:{",".join(stats_parts)}。'
- f'标准差 {ns.get("std", "N/A")},数据波动幅度'
- f'{"较大" if isinstance(ns.get("std"), (int,float)) and ns["std"] > ns.get("mean", 1) * 0.5 else "适中"}。',
- })
- if cat_cols:
- for cc in cat_cols[:2]:
- uc = cc.get('unique_count', 0)
- items.append({
- 'title': f'{cc.get("inferred_label", cc["column_name"])}维度分析',
- 'content': f'数据覆盖 {uc} 个不同的{cc.get("inferred_label", cc["column_name"])}类别,'
- f'丰富的分类维度支持多角度交叉分析。'
- f'建议重点关注主要类别的集中度与分布均衡性,'
- f'识别高价值类别与低效类别之间的差异特征。',
- })
- if time_cols:
- tc = time_cols[0]
- items.append({
- 'title': '时间维度覆盖',
- 'content': f'数据包含时间列"{tc.get("inferred_label", tc["column_name"])}",'
- f'支持按时间维度进行趋势分析。通过对时间序列数据的分解,'
- f'可识别周期性波动、趋势变化及异常时间节点,为预测与规划提供依据。',
- })
- items.append({
- 'title': '数据质量评估',
- 'content': f'数据质量评分 {score}/100,'
- f'{"数据完整可靠," if score >= 90 else "数据质量良好,建议关注缺失值" if score >= 80 else "数据需重点关注质量控制"}'
- f'缺失率 {q.get("null_rate", 0)*100:.1f}%。'
- f'本报告中的分析与图表均基于现有数据进行自动化生成,确保数据准确性。',
- })
- high_null = q.get('high_null_columns', [])
- if high_null:
- items.append({
- 'title': '数据完整性说明',
- 'content': f'以下列缺失值比例较高:{", ".join(high_null[:5])}。'
- f'在分析涉及这些列时已进行空值排除处理,'
- f'建议后续数据录入环节关注这些字段的完整填写,以提升分析精度。',
- })
- total_rows = data_profile.get('total_rows', 0)
- if total_rows:
- items.append({
- 'title': '数据规模概述',
- 'content': f'本期报告基于 {total_rows} 条数据记录进行分析,'
- f'样本量{"充足,统计结果具有较好的代表性" if total_rows >= 100 else "适中,统计结果可作为参考" if total_rows >= 30 else "有限,分析结果仅供参考"}。',
- })
- return items
- if __name__ == '__main__':
- import sys
- if len(sys.argv) > 1:
- from data_loader import load_generic_excel
- from data_profiler import profile_dataframe
- from agent_analyzer import analyze_and_recommend
- fp = sys.argv[1]
- df = load_generic_excel(fp)
- print(f"Loaded: {len(df)} rows x {len(df.columns)} cols")
- profile = profile_dataframe(df)
- recs = analyze_and_recommend(profile)
- print(f"Suggested {len(recs['suggested_metrics'])} metrics, "
- f"{len(recs['suggested_pages'])} pages")
|