metrics_calculator.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253
  1. """
  2. Metrics calculation engine for the universal data report generator.
  3. Provides generic metric computation, trend analysis, distribution, ranking, and insights.
  4. """
  5. import pandas as pd
  6. def _pct_change(curr, prev):
  7. """Calculate percentage change. Returns None if previous base is 0 or None."""
  8. if prev is None or prev == 0:
  9. return None
  10. return round((curr - prev) / prev * 100, 1)
  11. def _safe_div(a, b):
  12. return round(a / b, 1) if b else 0
  13. # ==============================================================================
  14. # HELPER FUNCTIONS
  15. # ==============================================================================
  16. def _fmt_pct(val):
  17. if val is None:
  18. return '—'
  19. sign = '+' if val >= 0 else ''
  20. return f'{sign}{val:.1f}%'
  21. def _fmt_chg_dir(val):
  22. if val is None:
  23. return ''
  24. return '增加' if val >= 0 else '减少'
  25. def avg(lst):
  26. return sum(lst) / len(lst) if lst else 0
  27. # ==============================================================================
  28. # GENERIC METRICS
  29. # ==============================================================================
  30. def calc_generic_metrics(df: pd.DataFrame, config) -> dict:
  31. metrics = {}
  32. for metric_def in config.metrics:
  33. col = metric_def.column
  34. if col not in df.columns:
  35. metrics[metric_def.name] = 0
  36. continue
  37. series = df[col].dropna()
  38. agg = metric_def.aggregation
  39. if not pd.api.types.is_numeric_dtype(series):
  40. coerced = pd.to_numeric(series, errors='coerce').dropna()
  41. if len(coerced) > 0:
  42. series = coerced
  43. if agg == 'sum':
  44. val = int(series.sum()) if pd.api.types.is_numeric_dtype(series) else len(series)
  45. elif agg == 'count':
  46. val = int(series.count())
  47. elif agg == 'avg':
  48. val = round(float(series.mean()), 1) if pd.api.types.is_numeric_dtype(series) else 0
  49. elif agg == 'max':
  50. val = round(float(series.max()), 1) if pd.api.types.is_numeric_dtype(series) else 0
  51. elif agg == 'min':
  52. val = round(float(series.min()), 1) if pd.api.types.is_numeric_dtype(series) else 0
  53. elif agg == 'distinct_count':
  54. val = int(series.nunique())
  55. else:
  56. val = len(series)
  57. metrics[metric_def.name] = val
  58. metrics[f'{metric_def.name}_label'] = metric_def.label
  59. metrics[f'{metric_def.name}_unit'] = metric_def.unit
  60. if hasattr(config, 'comparison') and config.comparison:
  61. pass
  62. return metrics
  63. def calc_generic_trend(df: pd.DataFrame, time_col: str, metric_col: str,
  64. aggregation: str = 'sum') -> dict:
  65. if time_col not in df.columns or metric_col not in df.columns:
  66. return {}
  67. if aggregation == 'sum':
  68. trend = df.groupby(time_col)[metric_col].sum().sort_index()
  69. elif aggregation == 'count':
  70. trend = df.groupby(time_col)[metric_col].count().sort_index()
  71. else:
  72. trend = df.groupby(time_col)[metric_col].mean().sort_index()
  73. dates = []
  74. for d in trend.index:
  75. try:
  76. dates.append(pd.Timestamp(d).strftime('%m/%d'))
  77. except Exception:
  78. dates.append(str(d))
  79. return {
  80. 'dates': dates,
  81. 'values': [int(v) if aggregation != 'avg' else round(float(v), 1) for v in trend.values],
  82. }
  83. def calc_generic_distribution(df: pd.DataFrame, cat_col: str, metric_col: str = None,
  84. aggregation: str = 'sum', top_n: int = 10) -> dict:
  85. if cat_col not in df.columns:
  86. return {}
  87. if metric_col and metric_col in df.columns:
  88. if aggregation == 'sum':
  89. dist = df.groupby(cat_col)[metric_col].sum().sort_values(ascending=False).head(top_n)
  90. elif aggregation == 'count':
  91. dist = df.groupby(cat_col)[metric_col].count().sort_values(ascending=False).head(top_n)
  92. else:
  93. dist = df.groupby(cat_col)[metric_col].mean().sort_values(ascending=False).head(top_n)
  94. else:
  95. dist = df[cat_col].value_counts().head(top_n)
  96. total = sum(dist.values)
  97. return {
  98. 'categories': [str(k) for k in dist.index],
  99. 'values': [int(v) for v in dist.values],
  100. 'percentages': [round(v / total * 100, 1) if total else 0 for v in dist.values],
  101. }
  102. def calc_generic_ranking(df: pd.DataFrame, rank_col: str, metric_col: str,
  103. aggregation: str = 'sum', top_n: int = 15) -> list[dict]:
  104. if rank_col not in df.columns or metric_col not in df.columns:
  105. return []
  106. if aggregation == 'sum':
  107. ranked = df.groupby(rank_col)[metric_col].sum().sort_values(ascending=False).head(top_n)
  108. elif aggregation == 'count':
  109. ranked = df.groupby(rank_col)[metric_col].count().sort_values(ascending=False).head(top_n)
  110. else:
  111. ranked = df.groupby(rank_col)[metric_col].mean().sort_values(ascending=False).head(top_n)
  112. return [{'name': str(k), 'value': int(v), 'rank': i + 1}
  113. for i, (k, v) in enumerate(ranked.items())]
  114. def generate_generic_insights(data_profile: dict, metrics: dict) -> list[dict]:
  115. items = []
  116. num_cols = data_profile.get('numeric_columns', [])
  117. cat_cols = data_profile.get('category_columns', [])
  118. time_cols = data_profile.get('time_columns', [])
  119. q = data_profile.get('data_quality', {})
  120. score = q.get('score', 100)
  121. if metrics:
  122. metric_details = []
  123. for k, v in metrics.items():
  124. if isinstance(v, (int, float)):
  125. metric_details.append(f'{k}: {v:,.0f}')
  126. if metric_details:
  127. items.append({
  128. 'title': '核心指标总览',
  129. 'content': f'本期关键指标:{";".join(metric_details[:6])}。'
  130. f'综合来看,业务运行态势可通过这些核心数据进行量化评估,'
  131. f'建议结合业务目标与实际值的差距进行针对性分析。',
  132. })
  133. if num_cols:
  134. for nc in num_cols[:2]:
  135. ns = nc.get('numeric_stats', {}) or {}
  136. col_name = nc.get('inferred_label', nc['column_name'])
  137. stats_parts = []
  138. if 'sum' in ns and ns['sum']:
  139. stats_parts.append(f'总量 {ns["sum"]:,.0f}')
  140. if 'mean' in ns and ns['mean']:
  141. stats_parts.append(f'均值 {ns["mean"]:,.1f}')
  142. if 'max' in ns and ns['max']:
  143. stats_parts.append(f'峰值 {ns["max"]:,.0f}')
  144. if 'min' in ns and ns['min']:
  145. stats_parts.append(f'最低 {ns["min"]:,.0f}')
  146. if stats_parts:
  147. items.append({
  148. 'title': f'{col_name}数据特征',
  149. 'content': f'指标"{col_name}"的统计特征:{",".join(stats_parts)}。'
  150. f'标准差 {ns.get("std", "N/A")},数据波动幅度'
  151. f'{"较大" if isinstance(ns.get("std"), (int,float)) and ns["std"] > ns.get("mean", 1) * 0.5 else "适中"}。',
  152. })
  153. if cat_cols:
  154. for cc in cat_cols[:2]:
  155. uc = cc.get('unique_count', 0)
  156. items.append({
  157. 'title': f'{cc.get("inferred_label", cc["column_name"])}维度分析',
  158. 'content': f'数据覆盖 {uc} 个不同的{cc.get("inferred_label", cc["column_name"])}类别,'
  159. f'丰富的分类维度支持多角度交叉分析。'
  160. f'建议重点关注主要类别的集中度与分布均衡性,'
  161. f'识别高价值类别与低效类别之间的差异特征。',
  162. })
  163. if time_cols:
  164. tc = time_cols[0]
  165. items.append({
  166. 'title': '时间维度覆盖',
  167. 'content': f'数据包含时间列"{tc.get("inferred_label", tc["column_name"])}",'
  168. f'支持按时间维度进行趋势分析。通过对时间序列数据的分解,'
  169. f'可识别周期性波动、趋势变化及异常时间节点,为预测与规划提供依据。',
  170. })
  171. items.append({
  172. 'title': '数据质量评估',
  173. 'content': f'数据质量评分 {score}/100,'
  174. f'{"数据完整可靠," if score >= 90 else "数据质量良好,建议关注缺失值" if score >= 80 else "数据需重点关注质量控制"}'
  175. f'缺失率 {q.get("null_rate", 0)*100:.1f}%。'
  176. f'本报告中的分析与图表均基于现有数据进行自动化生成,确保数据准确性。',
  177. })
  178. high_null = q.get('high_null_columns', [])
  179. if high_null:
  180. items.append({
  181. 'title': '数据完整性说明',
  182. 'content': f'以下列缺失值比例较高:{", ".join(high_null[:5])}。'
  183. f'在分析涉及这些列时已进行空值排除处理,'
  184. f'建议后续数据录入环节关注这些字段的完整填写,以提升分析精度。',
  185. })
  186. total_rows = data_profile.get('total_rows', 0)
  187. if total_rows:
  188. items.append({
  189. 'title': '数据规模概述',
  190. 'content': f'本期报告基于 {total_rows} 条数据记录进行分析,'
  191. f'样本量{"充足,统计结果具有较好的代表性" if total_rows >= 100 else "适中,统计结果可作为参考" if total_rows >= 30 else "有限,分析结果仅供参考"}。',
  192. })
  193. return items
  194. if __name__ == '__main__':
  195. import sys
  196. if len(sys.argv) > 1:
  197. from data_loader import load_generic_excel
  198. from data_profiler import profile_dataframe
  199. from agent_analyzer import analyze_and_recommend
  200. fp = sys.argv[1]
  201. df = load_generic_excel(fp)
  202. print(f"Loaded: {len(df)} rows x {len(df.columns)} cols")
  203. profile = profile_dataframe(df)
  204. recs = analyze_and_recommend(profile)
  205. print(f"Suggested {len(recs['suggested_metrics'])} metrics, "
  206. f"{len(recs['suggested_pages'])} pages")