""" Metrics calculation engine for the universal data report generator. Provides generic metric computation, trend analysis, distribution, ranking, and insights. """ import pandas as pd def _pct_change(curr, prev): """Calculate percentage change. Returns None if previous base is 0 or None.""" if prev is None or prev == 0: return None return round((curr - prev) / prev * 100, 1) def _safe_div(a, b): return round(a / b, 1) if b else 0 # ============================================================================== # HELPER FUNCTIONS # ============================================================================== def _fmt_pct(val): if val is None: return '—' sign = '+' if val >= 0 else '' return f'{sign}{val:.1f}%' def _fmt_chg_dir(val): if val is None: return '' return '增加' if val >= 0 else '减少' def avg(lst): return sum(lst) / len(lst) if lst else 0 # ============================================================================== # GENERIC METRICS # ============================================================================== def calc_generic_metrics(df: pd.DataFrame, config) -> dict: metrics = {} for metric_def in config.metrics: col = metric_def.column if col not in df.columns: metrics[metric_def.name] = 0 continue series = df[col].dropna() agg = metric_def.aggregation if not pd.api.types.is_numeric_dtype(series): coerced = pd.to_numeric(series, errors='coerce').dropna() if len(coerced) > 0: series = coerced if agg == 'sum': val = int(series.sum()) if pd.api.types.is_numeric_dtype(series) else len(series) elif agg == 'count': val = int(series.count()) elif agg == 'avg': val = round(float(series.mean()), 1) if pd.api.types.is_numeric_dtype(series) else 0 elif agg == 'max': val = round(float(series.max()), 1) if pd.api.types.is_numeric_dtype(series) else 0 elif agg == 'min': val = round(float(series.min()), 1) if pd.api.types.is_numeric_dtype(series) else 0 elif agg == 'distinct_count': val = int(series.nunique()) else: val = len(series) metrics[metric_def.name] = val metrics[f'{metric_def.name}_label'] = metric_def.label metrics[f'{metric_def.name}_unit'] = metric_def.unit if hasattr(config, 'comparison') and config.comparison: pass return metrics def calc_generic_trend(df: pd.DataFrame, time_col: str, metric_col: str, aggregation: str = 'sum') -> dict: if time_col not in df.columns or metric_col not in df.columns: return {} if aggregation == 'sum': trend = df.groupby(time_col)[metric_col].sum().sort_index() elif aggregation == 'count': trend = df.groupby(time_col)[metric_col].count().sort_index() else: trend = df.groupby(time_col)[metric_col].mean().sort_index() dates = [] for d in trend.index: try: dates.append(pd.Timestamp(d).strftime('%m/%d')) except Exception: dates.append(str(d)) return { 'dates': dates, 'values': [int(v) if aggregation != 'avg' else round(float(v), 1) for v in trend.values], } def calc_generic_distribution(df: pd.DataFrame, cat_col: str, metric_col: str = None, aggregation: str = 'sum', top_n: int = 10) -> dict: if cat_col not in df.columns: return {} if metric_col and metric_col in df.columns: if aggregation == 'sum': dist = df.groupby(cat_col)[metric_col].sum().sort_values(ascending=False).head(top_n) elif aggregation == 'count': dist = df.groupby(cat_col)[metric_col].count().sort_values(ascending=False).head(top_n) else: dist = df.groupby(cat_col)[metric_col].mean().sort_values(ascending=False).head(top_n) else: dist = df[cat_col].value_counts().head(top_n) total = sum(dist.values) return { 'categories': [str(k) for k in dist.index], 'values': [int(v) for v in dist.values], 'percentages': [round(v / total * 100, 1) if total else 0 for v in dist.values], } def calc_generic_ranking(df: pd.DataFrame, rank_col: str, metric_col: str, aggregation: str = 'sum', top_n: int = 15) -> list[dict]: if rank_col not in df.columns or metric_col not in df.columns: return [] if aggregation == 'sum': ranked = df.groupby(rank_col)[metric_col].sum().sort_values(ascending=False).head(top_n) elif aggregation == 'count': ranked = df.groupby(rank_col)[metric_col].count().sort_values(ascending=False).head(top_n) else: ranked = df.groupby(rank_col)[metric_col].mean().sort_values(ascending=False).head(top_n) return [{'name': str(k), 'value': int(v), 'rank': i + 1} for i, (k, v) in enumerate(ranked.items())] def generate_generic_insights(data_profile: dict, metrics: dict) -> list[dict]: items = [] num_cols = data_profile.get('numeric_columns', []) cat_cols = data_profile.get('category_columns', []) time_cols = data_profile.get('time_columns', []) q = data_profile.get('data_quality', {}) score = q.get('score', 100) if metrics: metric_details = [] for k, v in metrics.items(): if isinstance(v, (int, float)): metric_details.append(f'{k}: {v:,.0f}') if metric_details: items.append({ 'title': '核心指标总览', 'content': f'本期关键指标:{";".join(metric_details[:6])}。' f'综合来看,业务运行态势可通过这些核心数据进行量化评估,' f'建议结合业务目标与实际值的差距进行针对性分析。', }) if num_cols: for nc in num_cols[:2]: ns = nc.get('numeric_stats', {}) or {} col_name = nc.get('inferred_label', nc['column_name']) stats_parts = [] if 'sum' in ns and ns['sum']: stats_parts.append(f'总量 {ns["sum"]:,.0f}') if 'mean' in ns and ns['mean']: stats_parts.append(f'均值 {ns["mean"]:,.1f}') if 'max' in ns and ns['max']: stats_parts.append(f'峰值 {ns["max"]:,.0f}') if 'min' in ns and ns['min']: stats_parts.append(f'最低 {ns["min"]:,.0f}') if stats_parts: items.append({ 'title': f'{col_name}数据特征', 'content': f'指标"{col_name}"的统计特征:{",".join(stats_parts)}。' f'标准差 {ns.get("std", "N/A")},数据波动幅度' f'{"较大" if isinstance(ns.get("std"), (int,float)) and ns["std"] > ns.get("mean", 1) * 0.5 else "适中"}。', }) if cat_cols: for cc in cat_cols[:2]: uc = cc.get('unique_count', 0) items.append({ 'title': f'{cc.get("inferred_label", cc["column_name"])}维度分析', 'content': f'数据覆盖 {uc} 个不同的{cc.get("inferred_label", cc["column_name"])}类别,' f'丰富的分类维度支持多角度交叉分析。' f'建议重点关注主要类别的集中度与分布均衡性,' f'识别高价值类别与低效类别之间的差异特征。', }) if time_cols: tc = time_cols[0] items.append({ 'title': '时间维度覆盖', 'content': f'数据包含时间列"{tc.get("inferred_label", tc["column_name"])}",' f'支持按时间维度进行趋势分析。通过对时间序列数据的分解,' f'可识别周期性波动、趋势变化及异常时间节点,为预测与规划提供依据。', }) items.append({ 'title': '数据质量评估', 'content': f'数据质量评分 {score}/100,' f'{"数据完整可靠," if score >= 90 else "数据质量良好,建议关注缺失值" if score >= 80 else "数据需重点关注质量控制"}' f'缺失率 {q.get("null_rate", 0)*100:.1f}%。' f'本报告中的分析与图表均基于现有数据进行自动化生成,确保数据准确性。', }) high_null = q.get('high_null_columns', []) if high_null: items.append({ 'title': '数据完整性说明', 'content': f'以下列缺失值比例较高:{", ".join(high_null[:5])}。' f'在分析涉及这些列时已进行空值排除处理,' f'建议后续数据录入环节关注这些字段的完整填写,以提升分析精度。', }) total_rows = data_profile.get('total_rows', 0) if total_rows: items.append({ 'title': '数据规模概述', 'content': f'本期报告基于 {total_rows} 条数据记录进行分析,' f'样本量{"充足,统计结果具有较好的代表性" if total_rows >= 100 else "适中,统计结果可作为参考" if total_rows >= 30 else "有限,分析结果仅供参考"}。', }) return items if __name__ == '__main__': import sys if len(sys.argv) > 1: from data_loader import load_generic_excel from data_profiler import profile_dataframe from agent_analyzer import analyze_and_recommend fp = sys.argv[1] df = load_generic_excel(fp) print(f"Loaded: {len(df)} rows x {len(df.columns)} cols") profile = profile_dataframe(df) recs = analyze_and_recommend(profile) print(f"Suggested {len(recs['suggested_metrics'])} metrics, " f"{len(recs['suggested_pages'])} pages")