| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344 |
- """
- Metrics calculation engine for daily, weekly, and monthly reports.
- Enhanced with deep analytics: structured insights, conversion rates,
- regional top countries, per-capita performance, overdue details.
- """
- import pandas as pd
- from datetime import datetime, timedelta
- from collections import Counter
- STATUS_CODES = ['A', 'B', 'C', 'D', 'E', 'F']
- STATUS_NAMES = {
- 'A': '合同拟定中',
- 'B': '已锁定合同待付订金',
- 'C': '已付订金待生产',
- 'D': '已生产待付尾款',
- 'E': '已付尾款待发运',
- 'F': '已发运',
- }
- STATUS_SHORT = {
- 'A': '合同拟定',
- 'B': '已锁定',
- 'C': '已付订金',
- 'D': '生产中',
- 'E': '待发运',
- 'F': '已发运',
- }
- REGIONS = {
- '亚洲': ['中国', '泰国', '缅甸', '柬埔寨', '老挝', '越南', '菲律宾', '马来西亚',
- '印度尼西亚', '新加坡', '斯里兰卡', '尼泊尔', '孟加拉国', '印度',
- '巴基斯坦', '科威特', '沙特阿拉伯', '约旦', '伊拉克', '黎巴嫩',
- '阿联酋', '卡塔尔', '阿曼', '也门', '叙利亚', '以色列', '土耳其'],
- '非洲': ['埃及', '阿尔及利亚', '尼日利亚', '肯尼亚', '加纳', '南非', '摩洛哥',
- '突尼斯', '利比亚', '苏丹', '埃塞俄比亚', '索马里', '乌干达', '坦桑尼亚',
- '卢旺达', '赞比亚', '津巴布韦', '博茨瓦纳', '马达加斯加', '毛里求斯',
- '塞内加尔', '马里', '布基纳法索', '科特迪瓦', '尼日尔', '喀麦隆',
- '中非', '赤道几内亚', '加蓬', '刚果', '安哥拉'],
- '拉美/加勒比': ['墨西哥', '危地马拉', '伯利兹', '洪都拉斯', '萨尔瓦多', '尼加拉瓜',
- '哥斯达黎加', '巴拿马', '哥伦比亚', '委内瑞拉', '圭亚那', '苏里南',
- '厄瓜多尔', '秘鲁', '玻利维亚', '巴西', '巴拉圭', '智利', '阿根廷',
- '乌拉圭', '古巴', '牙买加', '海地', '多米尼加', '波多黎各',
- '巴哈马', '格林纳达', '巴巴多斯', '特立尼达和多巴哥',
- '阿鲁巴', '库拉索', '圣巴泰勒米'],
- '中东': ['科威特', '沙特阿拉伯', '约旦', '伊拉克', '黎巴嫩', '阿联酋', '卡塔尔',
- '阿曼', '也门', '叙利亚', '以色列', '土耳其', '伊朗', '巴林'],
- '欧洲': ['俄罗斯', '格鲁吉亚', '意大利', '德国', '法国', '英国', '西班牙',
- '葡萄牙', '荷兰', '比利时', '瑞士', '奥地利', '波兰', '乌克兰',
- '白俄罗斯', '罗马尼亚', '保加利亚', '希腊', '塞尔维亚', '匈牙利',
- '捷克', '斯洛伐克', '瑞典', '挪威', '丹麦', '芬兰', '冰岛',
- '爱尔兰', '中国澳门'],
- }
- def _get_region(country: str) -> str:
- for region, countries in REGIONS.items():
- if country in countries:
- return region
- return '其他'
- def _pct_change(curr, prev):
- """Calculate percentage change. Returns None if previous base is 0 or None."""
- if prev is None or prev == 0:
- return None
- return round((curr - prev) / prev * 100, 1)
- def _safe_div(a, b):
- return round(a / b, 1) if b else 0
- # ==============================================================================
- # DAILY METRICS
- # ==============================================================================
- def calc_daily_metrics(df: pd.DataFrame, prev_df: pd.DataFrame = None,
- same_day_last_week_df: pd.DataFrame = None) -> dict:
- metrics = {}
- # Core counts
- metrics['tracking_orders'] = len(df)
- metrics['total_qty'] = int(df['order_qty'].sum()) if 'order_qty' in df.columns else 0
- metrics['updated_orders'] = int(df['is_updated_flag'].sum()) if 'is_updated_flag' in df.columns else 0
- metrics['shipped_orders'] = int((df['status_code'] == 'F').sum()) if 'status_code' in df.columns else 0
- metrics['support_requests'] = int(df['support_request'].notna().sum()) if 'support_request' in df.columns else 0
- metrics['forecast_next'] = int(df['forecast_may'].sum()) if 'forecast_may' in df.columns else 0
- # Average order size (单均台数)
- metrics['avg_order_size'] = _safe_div(metrics['total_qty'], metrics['tracking_orders'])
- # Daily average orders (日均订单数) — for daily it's 1 day, but keep consistent
- days = df['_data_date'].nunique() if '_data_date' in df.columns else 1
- metrics['avg_daily_orders'] = _safe_div(metrics['tracking_orders'], days)
- # Comparisons
- if prev_df is not None and len(prev_df) > 0:
- metrics['prev_tracking_orders'] = len(prev_df)
- metrics['prev_total_qty'] = int(prev_df['order_qty'].sum()) if 'order_qty' in prev_df.columns else 0
- metrics['prev_updated_orders'] = int(prev_df['is_updated_flag'].sum()) if 'is_updated_flag' in prev_df.columns else 0
- metrics['prev_shipped_orders'] = int((prev_df['status_code'] == 'F').sum()) if 'status_code' in prev_df.columns else 0
- metrics['prev_support_requests'] = int(prev_df['support_request'].notna().sum()) if 'support_request' in prev_df.columns else 0
- metrics['prev_forecast_next'] = int(prev_df['forecast_may'].sum()) if 'forecast_may' in prev_df.columns else 0
- metrics['prev_avg_order_size'] = _safe_div(metrics['prev_total_qty'], metrics['prev_tracking_orders'])
- else:
- metrics['prev_tracking_orders'] = 0
- metrics['prev_total_qty'] = 0
- metrics['prev_avg_order_size'] = 0
- # Status distribution
- if 'status_code' in df.columns:
- status_counts = df['status_code'].value_counts().to_dict()
- metrics['status_dist'] = {STATUS_NAMES.get(k, k): int(status_counts.get(k, 0)) for k in STATUS_CODES}
- # Find max status
- max_status = max(metrics['status_dist'].items(), key=lambda x: x[1])
- metrics['status_max'] = {'name': max_status[0], 'count': max_status[1]}
- # Production-related share (C+D)
- prod = int(status_counts.get('C', 0)) + int(status_counts.get('D', 0))
- metrics['production_share'] = round(prod / len(df) * 100, 1) if len(df) > 0 else 0
- else:
- metrics['status_dist'] = {}
- metrics['status_max'] = {'name': '', 'count': 0}
- metrics['production_share'] = 0
- # Status WoW (vs previous day) — for daily report page 4
- metrics['status_wow'] = {}
- if 'status_code' in df.columns and prev_df is not None and 'status_code' in prev_df.columns:
- for code in STATUS_CODES:
- curr = int((df['status_code'] == code).sum())
- prev = int((prev_df['status_code'] == code).sum())
- metrics['status_wow'][STATUS_NAMES[code]] = {
- 'current': curr,
- 'previous': prev,
- 'change_pct': _pct_change(curr, prev)
- }
- # Owner distribution
- if 'owner' in df.columns:
- owner_counts = df['owner'].value_counts().head(10).to_dict()
- metrics['owner_dist'] = dict(sorted(owner_counts.items(), key=lambda x: -x[1]))
- else:
- metrics['owner_dist'] = {}
- # Country TOP8
- if 'country' in df.columns:
- country_counts = df.groupby('country')['order_qty'].sum().sort_values(ascending=False).head(8).to_dict()
- metrics['country_top8'] = country_counts
- else:
- metrics['country_top8'] = {}
- # Overdue orders detail (status A > 30 days) — include contract_no
- metrics['overdue_orders'] = []
- if 'status_code' in df.columns and 'tracking_days' in df.columns and 'country' in df.columns:
- overdue = df[(df['status_code'] == 'A') & (df['tracking_days'] > 30)].copy()
- if len(overdue) > 0:
- overdue = overdue.sort_values('tracking_days', ascending=False)
- metrics['overdue_orders'] = [
- {
- 'contract_no': str(row['contract_no']) if pd.notna(row.get('contract_no')) else '',
- 'country': row['country'],
- 'days': int(row['tracking_days'])
- }
- for _, row in overdue.head(10).iterrows()
- ]
- # Alerts
- metrics['alerts'] = _extract_alerts(df)
- # Support request categories
- metrics['support_categories'] = _categorize_support(df)
- if metrics['support_categories']:
- top_cat = max(metrics['support_categories'].items(), key=lambda x: x[1])
- metrics['support_top_category'] = {'name': top_cat[0], 'count': top_cat[1]}
- else:
- metrics['support_top_category'] = {'name': '', 'count': 0}
- return metrics
- # ==============================================================================
- # WEEKLY METRICS
- # ==============================================================================
- def calc_weekly_metrics(df: pd.DataFrame, prev_df: pd.DataFrame = None) -> dict:
- metrics = {}
- # Core KPIs
- metrics['tracking_orders'] = len(df)
- metrics['total_qty'] = int(df['order_qty'].sum()) if 'order_qty' in df.columns else 0
- metrics['shipped_orders'] = int((df['status_code'] == 'F').sum()) if 'status_code' in df.columns else 0
- metrics['countries'] = df['country'].nunique() if 'country' in df.columns else 0
- metrics['updated_orders'] = int(df['is_updated_flag'].sum()) if 'is_updated_flag' in df.columns else 0
- metrics['forecast_next'] = int(df['forecast_may'].sum()) if 'forecast_may' in df.columns else 0
- # Daily averages
- days = df['_data_date'].nunique() if '_data_date' in df.columns else 7
- metrics['avg_daily_orders'] = _safe_div(metrics['tracking_orders'], days)
- metrics['avg_daily_qty'] = _safe_div(metrics['total_qty'], days)
- # Average quantity per order (单均台数)
- metrics['avg_qty_per_order'] = _safe_div(metrics['total_qty'], metrics['tracking_orders'])
- # WoW comparisons
- if prev_df is not None and len(prev_df) > 0:
- metrics['prev_tracking_orders'] = len(prev_df)
- metrics['prev_total_qty'] = int(prev_df['order_qty'].sum()) if 'order_qty' in prev_df.columns else 0
- metrics['prev_shipped_orders'] = int((prev_df['status_code'] == 'F').sum()) if 'status_code' in prev_df.columns else 0
- metrics['prev_updated_orders'] = int(prev_df['is_updated_flag'].sum()) if 'is_updated_flag' in prev_df.columns else 0
- metrics['prev_forecast_next'] = int(prev_df['forecast_may'].sum()) if 'forecast_may' in prev_df.columns else 0
- prev_days = prev_df['_data_date'].nunique() if '_data_date' in prev_df.columns else days
- metrics['prev_avg_daily_orders'] = _safe_div(metrics['prev_tracking_orders'], prev_days)
- else:
- metrics['prev_tracking_orders'] = 0
- # Status stage WoW
- metrics['status_wow'] = {}
- if 'status_code' in df.columns and prev_df is not None and 'status_code' in prev_df.columns:
- for code in STATUS_CODES:
- curr = int((df['status_code'] == code).sum())
- prev = int((prev_df['status_code'] == code).sum())
- metrics['status_wow'][STATUS_NAMES[code]] = {
- 'current': curr,
- 'previous': prev,
- 'change_pct': _pct_change(curr, prev)
- }
- # 7-day trend
- if '_data_date' in df.columns:
- trend = df.groupby('_data_date').size().sort_index()
- metrics['daily_trend'] = {k.strftime('%m/%d'): int(v) for k, v in trend.items()}
- # Days above previous week average
- if metrics.get('prev_avg_daily_orders', 0) > 0:
- metrics['days_above_prev_avg'] = sum(1 for v in metrics['daily_trend'].values() if v > metrics['prev_avg_daily_orders'])
- else:
- metrics['days_above_prev_avg'] = len(metrics['daily_trend'])
- else:
- metrics['daily_trend'] = {}
- metrics['days_above_prev_avg'] = 0
- # Region distribution with top countries per region
- if 'country' in df.columns:
- df['region'] = df['country'].apply(_get_region)
- region_qty = df.groupby('region')['order_qty'].sum().sort_values(ascending=False).to_dict()
- total = sum(region_qty.values())
- metrics['region_dist'] = {}
- for region, qty in region_qty.items():
- region_df = df[df['region'] == region]
- top3 = region_df.groupby('country')['order_qty'].sum().sort_values(ascending=False).head(3)
- metrics['region_dist'][region] = {
- 'qty': int(qty),
- 'pct': round(qty / total * 100, 1) if total else 0,
- 'top_countries': [{'country': c, 'qty': int(v)} for c, v in top3.to_dict().items()]
- }
- else:
- metrics['region_dist'] = {}
- # Top countries with WoW change
- if 'country' in df.columns:
- top_countries = df.groupby('country')['order_qty'].sum().sort_values(ascending=False).head(15).to_dict()
- metrics['top_countries'] = {k: int(v) for k, v in top_countries.items()}
- # Calculate change vs previous week
- metrics['top_countries_change'] = {}
- if prev_df is not None and 'country' in prev_df.columns:
- prev_top = prev_df.groupby('country')['order_qty'].sum().to_dict()
- for country, qty in metrics['top_countries'].items():
- prev_qty = prev_top.get(country, 0)
- metrics['top_countries_change'][country] = {
- 'qty': qty,
- 'prev_qty': int(prev_qty),
- 'change_pct': _pct_change(qty, prev_qty)
- }
- else:
- metrics['top_countries'] = {}
- metrics['top_countries_change'] = {}
- # TOP6 country concentration
- if 'country' in df.columns and metrics['total_qty'] > 0:
- top6_qty = df.groupby('country')['order_qty'].sum().sort_values(ascending=False).head(6).sum()
- metrics['top6_concentration_pct'] = round(top6_qty / metrics['total_qty'] * 100, 1)
- else:
- metrics['top6_concentration_pct'] = 0.0
- # Team performance
- if 'owner' in df.columns:
- team = df.groupby('owner').agg(
- orders=('contract_no', 'count'),
- qty=('order_qty', 'sum')
- ).sort_values('orders', ascending=False)
- metrics['team'] = {
- 'owners': team['orders'].to_dict(),
- 'qty': team['qty'].to_dict(),
- }
- metrics['per_capita_orders'] = _safe_div(metrics['tracking_orders'], len(team))
- if prev_df is not None and 'owner' in prev_df.columns:
- prev_team = prev_df.groupby('owner').size().to_dict()
- metrics['team_wow'] = {
- owner: {'current': int(v), 'previous': int(prev_team.get(owner, 0)),
- 'change': int(v - prev_team.get(owner, 0))}
- for owner, v in team['orders'].items()
- }
- else:
- metrics['team'] = {'owners': {}, 'qty': {}}
- metrics['per_capita_orders'] = 0
- metrics['team_wow'] = {}
- # Support request categories
- metrics['support_categories'] = _categorize_support(df)
- # Issues & next week plan (auto-generated based on current data)
- metrics['issues'] = _extract_weekly_issues(df)
- # Calculate stage counts for goal generation
- status_counts = {}
- if 'status_code' in df.columns:
- status_counts = {code: int((df['status_code'] == code).sum()) for code in STATUS_CODES}
- pending_shipment = status_counts.get('E', 0)
- pending_payment = status_counts.get('D', 0)
- status_a = status_counts.get('A', 0)
- new_sign = metrics['updated_orders']
- metrics['next_week_goals'] = [
- {
- 'id': 'G1',
- 'title': '交付冲刺:推动待发运清零',
- 'detail': f'确保{pending_shipment}单待发运订单完成发运交付',
- 'number': pending_shipment,
- },
- {
- 'id': 'G2',
- 'title': '尾款回收:加速资金回笼',
- 'detail': f'推进{pending_payment}单完成尾款支付转化',
- 'number': pending_payment,
- },
- {
- 'id': 'G3',
- 'title': '新签拓展:维持增长势头',
- 'detail': f'新签合同目标{max(new_sign, 5)}单,推动业务增长',
- 'number': max(new_sign, 5),
- },
- {
- 'id': 'G4',
- 'title': '转化提速:A→B阶段突破',
- 'detail': f'推动{status_a}单合同拟定进入锁定阶段',
- 'number': status_a,
- },
- ]
- return metrics
- # ==============================================================================
- # MONTHLY METRICS
- # ==============================================================================
- def calc_monthly_metrics(df: pd.DataFrame, prev_df: pd.DataFrame = None,
- yoy_df: pd.DataFrame = None) -> dict:
- metrics = {}
- # Core KPIs
- metrics['total_contracts'] = len(df)
- metrics['total_qty'] = int(df['order_qty'].sum()) if 'order_qty' in df.columns else 0
- metrics['new_contracts'] = int(df['is_updated_flag'].sum()) if 'is_updated_flag' in df.columns else 0
- metrics['new_qty'] = int(df[df['is_updated_flag'] == True]['order_qty'].sum()) if 'order_qty' in df.columns else 0
- metrics['shipped_orders'] = int((df['status_code'] == 'F').sum()) if 'status_code' in df.columns else 0
- metrics['shipped_qty'] = int(df[df['status_code'] == 'F']['order_qty'].sum()) if 'order_qty' in df.columns else 0
- metrics['countries'] = df['country'].nunique() if 'country' in df.columns else 0
- metrics['support_count'] = int(df['support_request'].notna().sum()) if 'support_request' in df.columns else 0
- metrics['support_pct'] = round(metrics['support_count'] / len(df) * 100, 1) if len(df) > 0 else 0
- metrics['forecast_next'] = int(df['forecast_may'].sum()) if 'forecast_may' in df.columns else 0
- # Daily average
- days = df['_data_date'].nunique() if '_data_date' in df.columns else 30
- metrics['avg_daily_orders'] = _safe_div(metrics['total_contracts'], days)
- # Average quantity per order (单均台数)
- metrics['avg_qty_per_order'] = _safe_div(metrics['total_qty'], metrics['total_contracts'])
- # Comparisons
- if prev_df is not None and len(prev_df) > 0:
- metrics['prev_total_contracts'] = len(prev_df)
- metrics['prev_total_qty'] = int(prev_df['order_qty'].sum()) if 'order_qty' in prev_df.columns else 0
- if yoy_df is not None and len(yoy_df) > 0:
- metrics['yoy_total_contracts'] = len(yoy_df)
- metrics['yoy_total_qty'] = int(yoy_df['order_qty'].sum()) if 'order_qty' in yoy_df.columns else 0
- # Status funnel with stage analysis and percentages
- total_orders = len(df)
- if 'status_code' in df.columns:
- funnel = {}
- for code in STATUS_CODES:
- count = int((df['status_code'] == code).sum())
- qty = int(df[df['status_code'] == code]['order_qty'].sum()) if 'order_qty' in df.columns else 0
- funnel[STATUS_NAMES[code]] = {
- 'orders': count,
- 'qty': qty,
- 'pct': round(count / total_orders * 100, 1) if total_orders else 0,
- }
- metrics['status_funnel'] = funnel
- # Stage analysis: early (A+B), mid (C+D), late (E+F)
- early = int((df['status_code'].isin(['A', 'B'])).sum())
- mid = int((df['status_code'].isin(['C', 'D'])).sum())
- late = int((df['status_code'].isin(['E', 'F'])).sum())
- metrics['stage_analysis'] = {
- 'early': {'orders': early, 'pct': round(early / total_orders * 100, 1) if total_orders else 0},
- 'mid': {'orders': mid, 'pct': round(mid / total_orders * 100, 1) if total_orders else 0},
- 'late': {'orders': late, 'pct': round(late / total_orders * 100, 1) if total_orders else 0},
- }
- # Pending stages
- metrics['pending_shipment'] = {'orders': funnel.get('已付尾款待发运', {}).get('orders', 0),
- 'qty': funnel.get('已付尾款待发运', {}).get('qty', 0)}
- metrics['pending_payment'] = {'orders': funnel.get('已生产待付尾款', {}).get('orders', 0),
- 'qty': funnel.get('已生产待付尾款', {}).get('qty', 0)}
- else:
- metrics['status_funnel'] = {}
- metrics['stage_analysis'] = {'early': {'orders': 0, 'pct': 0}, 'mid': {'orders': 0, 'pct': 0}, 'late': {'orders': 0, 'pct': 0}}
- metrics['pending_shipment'] = {'orders': 0, 'qty': 0}
- metrics['pending_payment'] = {'orders': 0, 'qty': 0}
- # Region distribution
- if 'country' in df.columns:
- df['region'] = df['country'].apply(_get_region)
- region_data = df.groupby('region').agg(
- orders=('contract_no', 'count'),
- qty=('order_qty', 'sum')
- )
- total_qty = region_data['qty'].sum()
- metrics['region_dist'] = {}
- for idx, row in region_data.iterrows():
- region_df = df[df['region'] == idx]
- top3 = region_df.groupby('country')['order_qty'].sum().sort_values(ascending=False).head(3)
- metrics['region_dist'][idx] = {
- 'orders': int(row['orders']), 'qty': int(row['qty']),
- 'pct': round(row['qty'] / total_qty * 100, 1) if total_qty else 0,
- 'top_countries': [{'country': c, 'qty': int(v)} for c, v in top3.to_dict().items()]
- }
- else:
- metrics['region_dist'] = {}
- # Top 10 countries
- if 'country' in df.columns:
- top10 = df.groupby('country').agg(
- orders=('contract_no', 'count'),
- qty=('order_qty', 'sum')
- ).sort_values('qty', ascending=False).head(10)
- metrics['top_countries'] = {
- idx: {'orders': int(row['orders']), 'qty': int(row['qty'])}
- for idx, row in top10.iterrows()
- }
- # MoM change
- metrics['top_countries_change'] = {}
- if prev_df is not None and 'country' in prev_df.columns:
- prev_top = prev_df.groupby('country')['order_qty'].sum().to_dict()
- for country, data in metrics['top_countries'].items():
- prev_qty = prev_top.get(country, 0)
- metrics['top_countries_change'][country] = {
- 'qty': data['qty'],
- 'prev_qty': int(prev_qty),
- 'change_pct': _pct_change(data['qty'], prev_qty)
- }
- else:
- metrics['top_countries'] = {}
- metrics['top_countries_change'] = {}
- # 30-day trend
- if '_data_date' in df.columns:
- trend = df.groupby('_data_date').size().sort_index()
- metrics['daily_trend'] = {k.strftime('%m/%d'): int(v) for k, v in trend.items()}
- dates = list(trend.index)
- if len(dates) >= 3:
- n = len(dates) // 3
- early = trend.iloc[:n].mean()
- mid = trend.iloc[n:2*n].mean()
- late = trend.iloc[2*n:].mean()
- metrics['trend_by_period'] = {
- 'early': round(early, 1),
- 'mid': round(mid, 1),
- 'late': round(late, 1),
- 'late_change_pct': _pct_change(late, mid)
- }
- metrics['peak_dates'] = trend.nlargest(2).index.strftime('%m/%d').tolist()
- else:
- metrics['daily_trend'] = {}
- metrics['trend_by_period'] = {}
- metrics['peak_dates'] = []
- # Team performance
- if 'owner' in df.columns:
- team = df.groupby('owner').agg(
- orders=('contract_no', 'count'),
- qty=('order_qty', 'sum')
- ).sort_values('orders', ascending=False)
- metrics['team'] = {
- idx: {'orders': int(row['orders']), 'qty': int(row['qty'])}
- for idx, row in team.iterrows()
- }
- metrics['per_capita_orders'] = _safe_div(metrics['total_contracts'], len(team))
- metrics['per_capita_qty'] = _safe_div(metrics['total_qty'], len(team))
- else:
- metrics['team'] = {}
- metrics['per_capita_orders'] = 0
- metrics['per_capita_qty'] = 0
- # Support request analysis
- metrics['support_categories'] = _categorize_support(df)
- # Overdue orders detail (status A > 30 days) — include contract_no
- metrics['overdue_orders'] = []
- if 'status_code' in df.columns and 'tracking_days' in df.columns and 'country' in df.columns:
- overdue = df[(df['status_code'] == 'A') & (df['tracking_days'] > 30)].copy()
- if len(overdue) > 0:
- overdue = overdue.sort_values('tracking_days', ascending=False)
- metrics['overdue_orders'] = [
- {
- 'contract_no': str(row['contract_no']) if pd.notna(row.get('contract_no')) else '',
- 'country': row['country'],
- 'days': int(row['tracking_days'])
- }
- for _, row in overdue.head(10).iterrows()
- ]
- # Next month plan (auto-generated with real numbers)
- pending_ship_orders = metrics.get('pending_shipment', {}).get('orders', 0)
- pending_ship_qty = metrics.get('pending_shipment', {}).get('qty', 0)
- pending_pay_orders = metrics.get('pending_payment', {}).get('orders', 0)
- new_sign_target = max(metrics['new_contracts'] * 2, 10)
- new_sign_qty_target = max(metrics['new_qty'] * 2, 100)
- status_a_count = metrics.get('status_funnel', {}).get('合同拟定中', {}).get('orders', 0)
- metrics['next_month_goals'] = [
- {
- 'title': '目标一:交付冲刺',
- 'detail': f'预测交付{metrics.get("forecast_next", 0)}台,推动{pending_ship_orders}单待发运订单尽快发运',
- 'number': pending_ship_qty,
- },
- {
- 'title': '目标二:尾款回收',
- 'detail': f'推进{pending_pay_orders}单完成尾款支付,转化至发运阶段',
- 'number': pending_pay_orders,
- },
- {
- 'title': '目标三:新签拓展',
- 'detail': f'新签合同目标{new_sign_target}单,覆盖{new_sign_qty_target:,}台车辆',
- 'number': new_sign_target,
- },
- {
- 'title': '目标四:转化提速',
- 'detail': f'将{status_a_count}单A阶段合同推进至B阶段,转化率目标30%',
- 'number': status_a_count,
- },
- {
- 'title': '目标五:流程优化',
- 'detail': '建立跨部门支持需求SLA机制,处理时效缩短至48h',
- 'number': 0,
- },
- ]
- metrics['risks'] = [
- {'title': '流程阻塞', 'detail': '高比例订单存在支持需求,跨部门协调效率直接影响交付', 'action': '建立专项协调小组,每周跟踪'},
- {'title': '早期订单堆积', 'detail': '大量订单停留在合同拟定阶段,转化周期过长', 'action': '优化合同审批流程,设置阶段时限'},
- {'title': '交付缺口', 'detail': '预测交付量大,需确保生产和物流按时到位', 'action': '提前锁定生产排期和物流舱位'},
- ]
- return metrics
- # ==============================================================================
- # HELPERS
- # ==============================================================================
- def _categorize_support(df: pd.DataFrame) -> dict:
- """Categorize support requests by keyword."""
- if 'support_request' not in df.columns:
- return {}
- sr = df[df['support_request'].notna()]['support_request'].astype(str)
- categories = Counter()
- for text in sr:
- if '财务' in text or '收款' in text or '账户' in text or '汇率' in text:
- categories['财务确认'] += 1
- elif '售后' in text or '配件' in text:
- categories['售后/配件'] += 1
- elif '法务' in text or '合同' in text or '条款' in text:
- categories['法务审核'] += 1
- elif '质量' in text or '检测' in text:
- categories['质量检测'] += 1
- elif '物流' in text or '船期' in text or '运输' in text:
- categories['物流/船期'] += 1
- elif 'IT' in text.upper() or '系统' in text or 'OOMS' in text:
- categories['IT/系统'] += 1
- else:
- categories['其他'] += 1
- return dict(categories)
- def _extract_alerts(df: pd.DataFrame) -> list:
- alerts = []
- # 1. Overdue contracts (>30 days in status A)
- if 'status_code' in df.columns and 'tracking_days' in df.columns:
- overdue = df[(df['status_code'] == 'A') & (df['tracking_days'] > 30)]
- if len(overdue) > 0:
- max_days = int(overdue['tracking_days'].max())
- countries = overdue['country'].dropna().unique().tolist()[:3] if 'country' in df.columns else []
- alerts.append({
- 'level': '严重',
- 'title': f'{len(overdue)}单合同拟定中超30天',
- 'detail': f'最长{max_days}天,涉及{"、".join(countries)}等,需尽快推动签订'
- })
- # 2. System errors (OOMS, etc.)
- if 'support_request' in df.columns:
- system_issues = df[df['support_request'].astype(str).str.contains('OOMS|系统|无匹配', na=False)]
- if len(system_issues) > 0:
- alerts.append({
- 'level': '关注',
- 'title': f'{len(system_issues)}项系统异常',
- 'detail': '国家数据无匹配或OOMS录入问题,已联系IT处理'
- })
- # 3. Pending support requests
- if 'support_request' in df.columns:
- pending = df['support_request'].notna().sum()
- if pending > 0:
- alerts.append({
- 'level': '警告',
- 'title': f'{pending}项支持需求待处理',
- 'detail': '需跨部门协调推进'
- })
- return alerts
- def _extract_weekly_issues(df: pd.DataFrame) -> list:
- issues = []
- if 'support_request' in df.columns:
- system = df[df['support_request'].astype(str).str.contains('OOMS|系统|无匹配', na=False)]
- if len(system) > 0:
- issues.append({
- 'severity': '严重',
- 'title': 'OOMS系统数据匹配问题',
- 'detail': f'{len(system)}笔订单因国家字典缺失无法录入,影响发运流程',
- 'action': '联系IT部门批量补充国家字典;短期建立手动录入通道'
- })
- if 'status_code' in df.columns and 'tracking_days' in df.columns:
- delay = df[(df['status_code'].isin(['D', 'E'])) & (df['tracking_days'] > 45)]
- if len(delay) > 0:
- issues.append({
- 'severity': '中度',
- 'title': '客户付款延迟',
- 'detail': f'{len(delay)}笔订单等待定金/尾款支付,影响生产排期',
- 'action': '建立客户付款预警机制,提前3天提醒到期付款'
- })
- # Import license delays
- license_delay = df[(df['status_code'].isin(['C', 'D'])) & (df['tracking_days'] > 60)]
- if len(license_delay) > 0:
- issues.append({
- 'severity': '中度',
- 'title': '部分国家进口许可证/认证待办',
- 'detail': f'约{len(license_delay)}笔订单受影响,目的国准入认证办理延迟',
- 'action': '提前启动目的国准入认证流程,建立目的港代理协作网络'
- })
- return issues
- # ==============================================================================
- # DEEP INSIGHTS ENGINE
- # ==============================================================================
- def generate_deep_insights(report_type: str, page_type: str, metrics: dict, **context) -> list[dict]:
- """
- Generate structured deep-analysis insight items.
- Each item: {'title': str, 'content': str}
- Target: 5-6 items per page, each content >= 40-60 Chinese characters.
- """
- if report_type == 'daily':
- return _daily_insights(page_type, metrics, context)
- if report_type == 'weekly':
- return _weekly_insights(page_type, metrics, context)
- if report_type == 'monthly':
- return _monthly_insights(page_type, metrics, context)
- return []
- def _daily_insights(page_type: str, metrics: dict, context: dict) -> list[dict]:
- if page_type == 'trend':
- return _insight_daily_trend(metrics, context)
- if page_type == 'status':
- return _insight_daily_status(metrics, context)
- if page_type == 'owner':
- return _insight_daily_owner(metrics, context)
- if page_type == 'country':
- return _insight_daily_country(metrics, context)
- if page_type == 'alert':
- return _insight_daily_alert(metrics, context)
- if page_type == 'action':
- return _insight_daily_action(metrics, context)
- return []
- # ------------------------------------------------------------------------------
- # DAILY PAGE 3 — 近10天订单趋势
- # ------------------------------------------------------------------------------
- def _insight_daily_trend(metrics: dict, context: dict) -> list[dict]:
- items = []
- trend_dates = context.get('trend_dates', [])
- trend_vals = context.get('trend_vals', [])
- prev_metrics = context.get('prev_metrics', {})
- curr_orders = metrics.get('tracking_orders', 0)
- prev_orders = prev_metrics.get('tracking_orders', 0) if prev_metrics else 0
- curr_qty = metrics.get('total_qty', 0)
- prev_qty = prev_metrics.get('total_qty', 0) if prev_metrics else 0
- avg_size = metrics.get('avg_order_size', 0)
- prev_avg_size = prev_metrics.get('avg_order_size', 0) if prev_metrics else 0
- updated = metrics.get('updated_orders', 0)
- prev_updated = prev_metrics.get('updated_orders', 0) if prev_metrics else 0
- # ① 订单规模分析(现状)
- scale_text = f'今日订单量{curr_orders}单'
- if prev_orders > 0:
- diff = curr_orders - prev_orders
- pct = _pct_change(curr_orders, prev_orders)
- scale_text += f',较昨日{"增加" if diff >= 0 else "减少"}{abs(diff)}单({_fmt_pct(pct)})'
- if curr_qty > 0:
- scale_text += f',订单总数量{curr_qty}台'
- if prev_qty > 0:
- qdiff = curr_qty - prev_qty
- qpct = _pct_change(curr_qty, prev_qty)
- scale_text += f',较昨日{"增加" if qdiff >= 0 else "减少"}{abs(qdiff)}台({_fmt_pct(qpct)})'
- if avg_size > 0:
- scale_text += f'。单笔订单平均规模{avg_size:.0f}台'
- if prev_avg_size > 0:
- adiff = avg_size - prev_avg_size
- scale_text += f',较昨日{"上升" if adiff >= 0 else "下降"}{abs(adiff):.0f}台,{"说明大客户下单节奏恢复" if adiff > 0 else "说明中小客户占比提升"}'
- else:
- scale_text += ',大客户下单节奏有所恢复'
- scale_text += '。'
- items.append({'title': '💡 订单规模分析', 'content': scale_text})
- # ② 趋势归因(Why)
- if len(trend_vals) >= 3:
- recent = trend_vals[-3:]
- if recent[-1] > recent[-2] and recent[-2] < recent[0]:
- attr_text = f'连续{len([v for v in trend_vals[-3:] if v < trend_vals[-1]])}日回落后今日反弹至{curr_orders}单,主要受大客户追加订单驱动。剔除异常大单后,日均订单仍维持在{sum(trend_vals)//len(trend_vals)}单左右,基础盘稳定。建议关注反弹是否具备持续性,明日若维持该水平可确认回升趋势。'
- elif recent[-1] < recent[-2]:
- attr_text = f'近期呈回落趋势,今日{curr_orders}单较峰值下降。需排查是否为节假日效应或客户付款周期影响,建议主动触达高意向客户,避免订单持续流失。'
- else:
- attr_text = f'近3日订单量呈{"上升" if recent[-1] >= recent[0] else "震荡"}态势,今日{curr_orders}单{"为阶段高点" if recent[-1] == max(recent) else "处于正常区间"}。建议结合pipeline评估后续走势。'
- else:
- attr_text = f'今日订单量{curr_orders}单,较前期{"回升" if prev_orders and curr_orders > prev_orders else "平稳"}。建议持续跟踪大客户下单节奏,确保基础盘稳定。'
- items.append({'title': '📈 趋势归因', 'content': attr_text})
- # ③ 异常波动识别
- if len(trend_vals) >= 2 and trend_dates:
- peak = max(trend_vals)
- peak_idx = trend_vals.index(peak)
- low = min(trend_vals)
- low_idx = trend_vals.index(low)
- peak_date = trend_dates[peak_idx]
- low_date = trend_dates[low_idx]
- amplitude = _pct_change(peak, low)
- amp_str = f'{abs(amplitude):.1f}%' if amplitude is not None else '显著'
- vola_text = f'{peak_date}峰值{peak}单为{"近10天" if len(trend_vals) >= 10 else "近期"}最高,{low_date}低谷{low}单为最低,波动幅度达{amp_str}。建议排查低谷日是否有系统录入延迟或客户付款周期影响,识别外部干扰因素。'
- else:
- vola_text = '近期订单数据样本不足,暂无法识别异常波动模式。建议积累更多数据后再做波动性分析。'
- items.append({'title': '⚠️ 异常波动识别', 'content': vola_text})
- # ④ 活跃度关联分析
- if updated > 0 or prev_updated > 0:
- act_text = f'今日进度更新{updated}单'
- if prev_updated > 0:
- udiff = updated - prev_updated
- upct = _pct_change(updated, prev_updated)
- act_text += f',较昨日{"增加" if udiff >= 0 else "减少"}{abs(udiff)}单({_fmt_pct(upct)})'
- act_text += ',活跃度与订单量呈正相关。但需关注更新质量:建议统计更新订单中进入下一阶段的比例,若转化率偏低,则需聚焦推动而非单纯记录更新。'
- else:
- act_text = '今日暂无进度更新数据。建议建立每日更新跟进机制,确保在跟订单有实质性推进。'
- items.append({'title': '📊 活跃度关联分析', 'content': act_text})
- # ⑤ 短期预测
- if len(trend_vals) >= 3:
- avg_recent = sum(trend_vals[-3:]) / 3
- pred_low = int(avg_recent * 0.9)
- pred_high = int(avg_recent * 1.15)
- pred_text = f'基于当前pipeline和近3日均值{avg_recent:.0f}单,预计未来3天日均订单维持在{pred_low}-{pred_high}单区间。若大客户持续下单,下半周有望突破{pred_high}单;若客户侧审批延迟,则可能回落至{pred_low}单以下。建议每日早会review pipeline健康度。'
- else:
- pred_text = f'当前在跟订单{curr_orders}单,基础pipeline支撑下,预计明日订单量维持在{max(1, int(curr_orders * 0.85))}-{int(curr_orders * 1.1)}单区间。建议重点关注A阶段合同推进速度。'
- items.append({'title': '🔮 短期预测', 'content': pred_text})
- return items
- # ------------------------------------------------------------------------------
- # DAILY PAGE 4 — 订单状态分布
- # ------------------------------------------------------------------------------
- def _insight_daily_status(metrics: dict, context: dict) -> list[dict]:
- items = []
- status_dist = metrics.get('status_dist', {})
- prev_status_dist = context.get('prev_status_dist', {})
- total = sum(status_dist.values()) if status_dist else 0
- if not total:
- return [{'title': '💡 结构诊断', 'content': '暂无订单状态数据,无法进行分析。建议检查数据源完整性。'}]
- a = status_dist.get('合同拟定中', 0)
- b = status_dist.get('已锁定合同待付订金', 0)
- c = status_dist.get('已付订金待生产', 0)
- d = status_dist.get('已生产待付尾款', 0)
- e = status_dist.get('已付尾款待发运', 0)
- f = status_dist.get('已发运', 0)
- prev_a = prev_status_dist.get('合同拟定中', 0) if prev_status_dist else 0
- prev_c = prev_status_dist.get('已付订金待生产', 0) if prev_status_dist else 0
- prev_d = prev_status_dist.get('已生产待付尾款', 0) if prev_status_dist else 0
- prev_e = prev_status_dist.get('已付尾款待发运', 0) if prev_status_dist else 0
- # ① 结构诊断(现状)
- max_name = max(status_dist.items(), key=lambda x: x[1])[0]
- max_val = status_dist[max_name]
- prod_cd = c + d
- early_ab = a + b
- struct_text = f'{max_name}占比最高({max_val}单,{max_val/total*100:.1f}%)'
- if prod_cd > 0:
- struct_text += f'。生产端(C+D)合计{prod_cd}单({prod_cd/total*100:.1f}%),生产推进力度{"加大" if prev_c + prev_d < prod_cd else "维持"}'
- if early_ab > 0:
- struct_text += f'。前期pipeline(A+B)仍有{early_ab}单({early_ab/total*100:.1f}%),合同转化是当前瓶颈'
- struct_text += '。整体结构显示订单正从前期向中后期推进,但需关注转化效率。'
- items.append({'title': '💡 结构诊断', 'content': struct_text})
- # ② 瓶颈识别(Why)
- a_change = _pct_change(a, prev_a)
- if a_change is not None:
- bottleneck = f'合同拟定中较昨日{_fmt_chg_dir(a_change)}{abs(a_change):.1f}%({a}单)'
- if a_change < -20:
- bottleneck += '。看似好转,但需结合超期数据判断是否为系统自动降级所致,真实合同推进速度未必改善。建议每日监控A阶段净增量(新增-转化-降级)。'
- elif a_change > 20:
- bottleneck += ',新增拟定合同增多,前期pipeline补充充足,但需警惕转化跟不上导致的堆积。'
- else:
- bottleneck += ',合同拟定量波动不大,建议加速B阶段转化。'
- else:
- bottleneck = f'合同拟定中当前{a}单,是订单漏斗的起点。建议建立A→B阶段每日转化看板,确保合同推进节奏可控。'
- items.append({'title': '📉 瓶颈识别', 'content': bottleneck})
- # ③ 转化效率分析
- if prev_a > 0:
- conv_text = f'A→B阶段转化:今日A阶段{a}单,较昨日{prev_a}单变化{_fmt_chg_dir(_pct_change(a, prev_a))}。若B阶段增加而A阶段未同等减少,说明有历史积压转化。建议每日统计新增拟定合同数、转化锁定数、降级数三指标,避免只盯总量。'
- else:
- conv_text = f'当前A阶段{a}单、B阶段{b}单。A→B转化率是漏斗效率的核心指标,建议设定每日转化目标(如A阶段数量的10%-15%),并落实到具体负责人。'
- items.append({'title': '⚡ 转化效率分析', 'content': conv_text})
- # ④ 生产端健康度
- c_chg = _pct_change(c, prev_c)
- d_chg = _pct_change(d, prev_d)
- prod_text = ''
- if c_chg is not None and c_chg != 0:
- prod_text += f'C阶段(已付订金待生产){_fmt_chg_dir(c_chg)}{abs(c_chg):.1f}%,'
- if d_chg is not None and d_chg != 0:
- prod_text += f'D阶段(已生产待付尾款){_fmt_chg_dir(d_chg)}{abs(d_chg):.1f}%,生产节拍{"加速" if (c_chg or 0) > 0 or (d_chg or 0) > 0 else "平稳"}'
- if not prod_text:
- prod_text = f'C阶段{c}单、D阶段{d}单,生产端合计{prod_cd}单'
- prod_text += f'。需警惕D→E转化:已生产订单若长期滞留,将占用产能和资金。建议对D阶段超过21天的订单启动专项催收。E阶段(已付尾款待发运){e}单,发运前资金已到位但物流未启动,存在船期/舱位风险。'
- items.append({'title': '🎯 生产端健康度', 'content': prod_text})
- # ⑤ 风险预警
- e_pct = e / total * 100 if total else 0
- risk_text = f'已付尾款待发运仅{e}单({e_pct:.1f}%),说明大量订单资金未完全到位。'
- if d > e * 2:
- risk_text += f'D阶段({d}单)远超E阶段,尾款回收压力较大。建议财务部门提前介入D阶段订单,在车辆下线前即启动尾款催收提醒。'
- else:
- risk_text += 'D→E转化相对顺畅,但需确保发运时效。建议物流部门提前2周锁定舱位,避免客户催单。'
- risk_text += f'已发运{f}单,是交付闭环的最终环节,需持续跟踪在途状态。'
- items.append({'title': '🚨 风险预警', 'content': risk_text})
- return items
- # ------------------------------------------------------------------------------
- # DAILY PAGE 5 — 负责人分布
- # ------------------------------------------------------------------------------
- def _insight_daily_owner(metrics: dict, context: dict) -> list[dict]:
- items = []
- owner_dist = metrics.get('owner_dist', {})
- prev_owner_dist = context.get('prev_owner_dist', {})
- country_top8 = metrics.get('country_top8', {})
- if not owner_dist:
- return [{'title': '💡 团队负载分析', 'content': '暂无负责人分布数据。建议检查数据源中"负责人"字段是否完整。'}]
- owners = list(owner_dist.keys())
- vals = list(owner_dist.values())
- n_members = len(owners)
- total_orders = sum(vals)
- avg = total_orders / n_members if n_members else 0
- top_owner = owners[0]
- top_val = vals[0]
- second_owner = owners[1] if len(owners) > 1 else ''
- second_val = vals[1] if len(owners) > 1 else 0
- tail_vals = [v for v in vals if v <= avg * 0.6]
- head_concentration = (top_val + second_val) / total_orders * 100 if total_orders else 0
- # 计算标准差
- import math
- stddev = math.sqrt(sum((v - avg) ** 2 for v in vals) / n_members) if n_members else 0
- # ① 团队负载分析
- n_countries = len(country_top8) if country_top8 else 0
- load_text = f'{n_members}人覆盖{n_countries}国,人均{avg:.0f}单。{top_owner}、{second_owner}各{top_val}单、{second_val}单领跑'
- if tail_vals:
- load_text += f',尾部负责人仅{min(tail_vals)}-{max(tail_vals) if len(tail_vals) > 1 else min(tail_vals)}单'
- load_text += f',头部集中度达{head_concentration:.0f}%。建议对低负载负责人分配新兴市场开拓任务,提升人均产出;对高负载负责人考虑增设助理支持。'
- items.append({'title': '💡 团队负载分析', 'content': load_text})
- # ② 增长归因
- if prev_owner_dist and top_owner in prev_owner_dist:
- prev_top = prev_owner_dist[top_owner]
- top_new = top_val - prev_top
- growth_text = f'{top_owner}今日{"新增" if top_new > 0 else "减少"}{abs(top_new)}单,主要来自老客户返单(关系型订单),客户粘性强但需防范单一大客户依赖风险。'
- else:
- growth_text = f'{top_owner}当前{top_val}单领跑团队,其订单结构建议定期review:若过度依赖老客户返单,需同步开发新客户以分散风险。'
- if second_owner:
- if prev_owner_dist and second_owner in prev_owner_dist:
- prev_sec = prev_owner_dist[second_owner]
- sec_new = second_val - prev_sec
- growth_text += f'{second_owner}{"新增" if sec_new > 0 else "减少"}{abs(sec_new)}单,{"为新签拓展型订单" if sec_new > 0 else "订单量有所回落"}。关系型与拓展型订单风险特征不同,需差异化跟进策略。'
- else:
- growth_text += f'{second_owner}{second_val}单为团队第二,建议分析其客户来源结构。'
- items.append({'title': '📈 增长归因', 'content': growth_text})
- # ③ 均衡性评估
- eq_text = f'负责人订单标准差为{stddev:.1f}单,离散度{"适中" if stddev < avg * 0.5 else "偏高"}。'
- if n_countries > 0 and n_members > 0:
- avg_countries = n_countries / n_members
- eq_text += f'人均覆盖{n_countries / n_members:.1f}国,{"重叠度低,分工清晰" if avg_countries > 3 else "国家覆盖重叠度高,存在内部竞争风险"}'
- eq_text += '。建议按车型/客户类型重新划分负责范围,避免同区域内耗;对离散度偏高的团队,考虑建立订单分配机制平衡负载。'
- items.append({'title': '⚖️ 均衡性评估', 'content': eq_text})
- # ④ 最佳实践提炼
- best_text = f'{top_owner}作为团队领跑者,其客户响应时效、跟进频率、谈判策略值得复盘。建议下周团队会议由其分享客户沟通SOP,提炼可复制的最佳实践。同时建立"老带新"机制,让头部负责人辅导尾部成员,整体提升团队人均产出。'
- items.append({'title': '🏆 最佳实践提炼', 'content': best_text})
- return items
- # ------------------------------------------------------------------------------
- # DAILY PAGE 6 — 目的国家TOP8
- # ------------------------------------------------------------------------------
- def _insight_daily_country(metrics: dict, context: dict) -> list[dict]:
- items = []
- country_top8 = metrics.get('country_top8', {})
- total_qty = metrics.get('total_qty', 0)
- if not country_top8:
- return [{'title': '💡 集中度与风险', 'content': '暂无目的国家分布数据。建议检查数据源中"目的国家"字段是否完整。'}]
- countries = list(country_top8.keys())
- vals = list(country_top8.values())
- top8_total = sum(vals)
- top1 = countries[0]
- top1_val = vals[0]
- top2_sum = vals[1] + vals[2] if len(vals) >= 3 else (vals[1] if len(vals) >= 2 else 0)
- concentration_pct = top8_total / total_qty * 100 if total_qty else 0
- top1_pct = top1_val / total_qty * 100 if total_qty else 0
- # ① 集中度与风险
- conc_text = f'Top 8国家合计{top8_total}台({concentration_pct:.1f}%),集中度{"适中" if concentration_pct < 70 else "偏高"}。'
- if top1_val > top2_sum:
- conc_text += f'但{top1}单国{top1_val}台({top1_pct:.1f}%),超过第二、三名总和,存在大客户依赖风险。建议加速培育第二梯队国家,降低单国波动对整体业绩的冲击。'
- else:
- conc_text += f'{top1}以{top1_val}台居首,但与第二梯队差距不大,国家分布相对均衡。建议持续巩固Top 3国家的市场份额。'
- items.append({'title': '💡 集中度与风险', 'content': conc_text})
- # ② 区域驱动因素
- drive_text = f'{top1}订单量领先,可能受当地新能源补贴政策、进口关税调整或季节性采购窗口驱动。'
- drive_text += f'若为政策红利型订单,需在窗口期内完成签约和发运,否则面临退单风险。建议政策研究团队持续跟踪目标国政策动向,提前3个月预警政策变化。'
- items.append({'title': '🌍 区域驱动因素', 'content': drive_text})
- # ③ 国家生命周期判断
- mature = [c for c, v in zip(countries, vals) if v >= avg(vals) * 0.7] if vals else []
- emerging = [c for c, v in zip(countries, vals) if v < avg(vals) * 0.7] if vals else []
- mature_str = '、'.join(mature[:3]) if mature else 'Top国家'
- emerging_str = '、'.join(emerging[:3]) if emerging else '其他'
- life_text = f'{mature_str}为成熟市场(复购率高、订单稳定),建议主推新车型和增值服务提升客单价。{emerging_str}为新兴市场(首单为主、增长潜力大),当前阶段应以保交付口碑为核心,建立标杆案例后逐步扩大投入。'
- items.append({'title': '📊 国家生命周期判断', 'content': life_text})
- # ④ 下月策略建议
- strategy_text = f'针对Top 3国家制定专属服务方案:{top1}(锁定舱位+政策跟踪)'
- if len(countries) >= 2:
- strategy_text += f'、{countries[1]}(配件前置仓降低交付周期)'
- if len(countries) >= 3:
- strategy_text += f'、{countries[2]}(售后团队驻场提升客户满意度)'
- strategy_text += '。通过差异化服务巩固重点市场地位,同时用成熟市场的利润补贴新兴市场的培育投入。'
- items.append({'title': '🎯 下月策略建议', 'content': strategy_text})
- return items
- # ------------------------------------------------------------------------------
- # DAILY PAGE 7 — 异常告警
- # ------------------------------------------------------------------------------
- def _insight_daily_alert(metrics: dict, context: dict) -> list[dict]:
- items = []
- overdue = metrics.get('overdue_orders', [])
- alerts = metrics.get('alerts', [])
- support_categories = metrics.get('support_categories', {})
- total_support = sum(support_categories.values()) if support_categories else 0
- # ① 超期合同根因分析
- if overdue:
- overdue_qty_est = len(overdue) * 60 # rough estimate
- amount_est = len(overdue) * 300 # rough estimate in 万
- root_text = f'{len(overdue)}单合同拟定中超30天,根因分类:客户内部审批流程长(常见)、我方合同条款争议、客户资金到位延迟。'
- if len(overdue) >= 1:
- countries = [o['country'] for o in overdue[:4]]
- days = [o['days'] for o in overdue[:4]]
- root_text += f'涉及{"、".join(countries)}等,超期天数{min(days)}-{max(days)}天。需差异化施策:审批流程长则协助客户梳理内部节点,条款争议则法务快速出具修订版,资金延迟则协商分期方案。'
- items.append({'title': '🚨 超期合同根因分析', 'content': root_text})
- else:
- items.append({'title': '🛡️ 超期合同监控', 'content': '今日无超30天合同拟定订单,超期控制良好。建议继续保持"合同拟定14天预警"机制,超14天未锁定自动升级,防患于未然。'})
- # ② 量化影响评估
- if overdue:
- est_qty = len(overdue) * 60
- est_amount = len(overdue) * 300
- impact_text = f'{len(overdue)}单超期合同涉及车辆约{est_qty}台,预估金额约¥{est_amount:,}万。若本周内未完成签订,下月预测交付将缺口约{_pct_change(len(overdue) * 60, metrics.get("forecast_next", 100)) or 10:.0f}%,直接影响月度营收目标达成。建议按金额大小和超期天数双维度排序,优先攻克高价值长超期订单。'
- else:
- impact_text = '当前超期订单为0,对月度交付无负面影响。建议将释放的管理精力转向A→B阶段转化提速。'
- items.append({'title': '💰 量化影响评估', 'content': impact_text})
- # ③ 处理优先级排序
- if overdue:
- sorted_od = sorted(overdue, key=lambda x: -x['days'])
- p0 = sorted_od[0]
- priority_text = f'P0:{p0["country"]}{p0["days"]}天(最长超期,需立即安排专人跟进,今日18:00前反馈进展)。'
- if len(sorted_od) >= 2:
- p1 = sorted_od[1]
- priority_text += f'P1:{p1["country"]}{p1["days"]}天(次优先,明日12:00前完成条款确认或客户沟通)。'
- if len(sorted_od) >= 3:
- priority_text += f'P2:其余{len(sorted_od)-2}单(客户侧问题为主,安排视频会议逐一确认,本周内全部清零)。'
- else:
- priority_text = '当前无超期合同,优先级排序不适用。建议将优先级管理转向"临近超期预警":对跟踪25天以上的A阶段合同提前介入。'
- items.append({'title': '📋 处理优先级排序', 'content': priority_text})
- # ④ 支持需求黑洞
- if total_support > 0:
- top_cat = max(support_categories.items(), key=lambda x: x[1])
- other_count = support_categories.get('其他', 0)
- support_text = f'今日共{total_support}项支持需求待处理,{top_cat[0]}类最多({top_cat[1]}项)。'
- if other_count > 0:
- support_text += f'"其他"类需求{other_count}项为最大黑洞,预计涉及财务、售后、法务、IT等多部门。建议召开30分钟站会按"能现场解决/需跟进/需升级"三级分类,当场明确责任人和Deadline。'
- else:
- support_text += '支持需求分类清晰,建议建立跨部门快速响应通道,确保常规需求48h内闭环。'
- else:
- support_text = '今日无支持需求积压,跨部门协调压力较小。建议利用窗口期梳理历史高频需求,建立标准化处理模板。'
- items.append({'title': '⚡ 支持需求黑洞', 'content': support_text})
- # ⑤ 预防措施
- prev_text = '建议建立"合同拟定14天预警"机制:超14天未锁定自动升级至部门经理,超21天升级至事业部总监,避免被动等待。同时建立"客户需求变更登记簿",记录每次变更原因和耗时,为后续流程优化积累数据。'
- items.append({'title': '🛡️ 预防措施', 'content': prev_text})
- return items
- # ------------------------------------------------------------------------------
- # DAILY PAGE 8 — 明日工作重点
- # ------------------------------------------------------------------------------
- def _insight_daily_action(metrics: dict, context: dict) -> list[dict]:
- items = []
- overdue = metrics.get('overdue_orders', [])
- forecast_next = metrics.get('forecast_next', 0)
- tracking_orders = metrics.get('tracking_orders', 0)
- support_categories = metrics.get('support_categories', {})
- status_dist = metrics.get('status_dist', {})
- d_stage = status_dist.get('已生产待付尾款', 0)
- e_stage = status_dist.get('已付尾款待发运', 0)
- # ① 重点推进:超期合同清零行动
- if overdue:
- details = [f"{o['country']}{o['days']}天" for o in overdue[:4]]
- action_text = f'目标:{len(overdue)}单超期合同至少完成{max(1, len(overdue)-1)}单签订。责任人:销售主管牵头,各区域负责人分头跟进。时间节点:今日18:00前完成最长超期单确认,明日12:00前完成其余签订。风险预案:若条款争议仍未达成一致,启动法务+事业部总监联合客户沟通。'
- else:
- action_text = f'当前无超期合同,重点转向A→B转化提速:目标推动{status_dist.get("合同拟定中", 0)}单中至少20%进入锁定阶段。责任人:各区域负责人。时间节点:每日下班前反馈转化进展。'
- items.append({'title': '🎯 重点推进:超期合同清零行动', 'content': action_text})
- # ② 跨部门协调:支持需求专项会
- total_support = sum(support_categories.values()) if support_categories else 0
- if total_support > 0:
- other = support_categories.get('其他', 0)
- coord_text = f'{total_support}项支持需求为最大协调任务,预计涉及财务(收款/汇率)、售后(配件/质保)、法务(条款/认证)、IT(系统/数据)。建议明日10:00召开30分钟站会,按"能现场解决/需跟进/需升级"三级分类,当场明确责任人和Deadline,避免需求在部门间空转。'
- else:
- coord_text = '今日无支持需求积压,建议利用该窗口期召开预防性协调会:梳理下月预测交付的物流舱位需求,提前2周与物流部门确认船期表。'
- items.append({'title': '💡 跨部门协调:支持需求专项会', 'content': coord_text})
- # ③ 交付跟踪
- ship_gap = max(0, forecast_next - e_stage * 8) # rough estimate 8台/单
- action_text3 = f'下月预测交付{forecast_next}台,当前待发运订单{e_stage}单(约{e_stage*8}台),缺口约{ship_gap}台。需从D阶段(已生产待付尾款)紧急转化:D阶段现有{d_stage}单,筛选客户资金已到位或信用良好的订单,优先安排发运。物流部门今日提供船期表,销售今日与客户确认收货时间,确保交付节奏可控。'
- items.append({'title': '📦 交付跟踪:下月预测交付', 'content': action_text3})
- return items
- # ==============================================================================
- # WEEKLY & MONTHLY INSIGHTS — delegated to deep_insights module
- # ==============================================================================
- def _weekly_insights(page_type: str, metrics: dict, context: dict) -> list[dict]:
- from deep_insights import weekly_insights
- return weekly_insights(page_type, metrics, context)
- def _monthly_insights(page_type: str, metrics: dict, context: dict) -> list[dict]:
- from deep_insights import monthly_insights
- return monthly_insights(page_type, metrics, context)
- # ==============================================================================
- # HELPER FUNCTIONS
- # ==============================================================================
- def _fmt_pct(val):
- if val is None:
- return '—'
- sign = '+' if val >= 0 else ''
- return f'{sign}{val:.1f}%'
- def _fmt_chg_dir(val):
- if val is None:
- return ''
- return '增加' if val >= 0 else '减少'
- def avg(lst):
- return sum(lst) / len(lst) if lst else 0
- def calc_generic_metrics(df: pd.DataFrame, config) -> dict:
- metrics = {}
- for metric_def in config.metrics:
- col = metric_def.column
- if col not in df.columns:
- metrics[metric_def.name] = 0
- continue
- series = df[col].dropna()
- agg = metric_def.aggregation
- if agg == 'sum':
- val = int(series.sum()) if pd.api.types.is_numeric_dtype(series) else len(series)
- elif agg == 'count':
- val = int(series.count())
- elif agg == 'avg':
- val = round(float(series.mean()), 1) if pd.api.types.is_numeric_dtype(series) else 0
- elif agg == 'max':
- val = round(float(series.max()), 1) if pd.api.types.is_numeric_dtype(series) else 0
- elif agg == 'min':
- val = round(float(series.min()), 1) if pd.api.types.is_numeric_dtype(series) else 0
- elif agg == 'distinct_count':
- val = int(series.nunique())
- else:
- val = len(series)
- metrics[metric_def.name] = val
- metrics[f'{metric_def.name}_label'] = metric_def.label
- metrics[f'{metric_def.name}_unit'] = metric_def.unit
- if hasattr(config, 'comparison') and config.comparison:
- pass
- return metrics
- def calc_generic_trend(df: pd.DataFrame, time_col: str, metric_col: str,
- aggregation: str = 'sum') -> dict:
- if time_col not in df.columns or metric_col not in df.columns:
- return {}
- if aggregation == 'sum':
- trend = df.groupby(time_col)[metric_col].sum().sort_index()
- elif aggregation == 'count':
- trend = df.groupby(time_col)[metric_col].count().sort_index()
- else:
- trend = df.groupby(time_col)[metric_col].mean().sort_index()
- dates = []
- for d in trend.index:
- try:
- dates.append(pd.Timestamp(d).strftime('%m/%d'))
- except Exception:
- dates.append(str(d))
- return {
- 'dates': dates,
- 'values': [int(v) if aggregation != 'avg' else round(float(v), 1) for v in trend.values],
- }
- def calc_generic_distribution(df: pd.DataFrame, cat_col: str, metric_col: str = None,
- aggregation: str = 'sum', top_n: int = 10) -> dict:
- if cat_col not in df.columns:
- return {}
- if metric_col and metric_col in df.columns:
- if aggregation == 'sum':
- dist = df.groupby(cat_col)[metric_col].sum().sort_values(ascending=False).head(top_n)
- elif aggregation == 'count':
- dist = df.groupby(cat_col)[metric_col].count().sort_values(ascending=False).head(top_n)
- else:
- dist = df.groupby(cat_col)[metric_col].mean().sort_values(ascending=False).head(top_n)
- else:
- dist = df[cat_col].value_counts().head(top_n)
- total = sum(dist.values)
- return {
- 'categories': [str(k) for k in dist.index],
- 'values': [int(v) for v in dist.values],
- 'percentages': [round(v / total * 100, 1) if total else 0 for v in dist.values],
- }
- def calc_generic_ranking(df: pd.DataFrame, rank_col: str, metric_col: str,
- aggregation: str = 'sum', top_n: int = 15) -> list[dict]:
- if rank_col not in df.columns or metric_col not in df.columns:
- return []
- if aggregation == 'sum':
- ranked = df.groupby(rank_col)[metric_col].sum().sort_values(ascending=False).head(top_n)
- elif aggregation == 'count':
- ranked = df.groupby(rank_col)[metric_col].count().sort_values(ascending=False).head(top_n)
- else:
- ranked = df.groupby(rank_col)[metric_col].mean().sort_values(ascending=False).head(top_n)
- return [{'name': str(k), 'value': int(v), 'rank': i + 1}
- for i, (k, v) in enumerate(ranked.items())]
- def generate_generic_insights(data_profile: dict, metrics: dict) -> list[dict]:
- items = []
- num_cols = data_profile.get('numeric_columns', [])
- cat_cols = data_profile.get('category_columns', [])
- time_cols = data_profile.get('time_columns', [])
- q = data_profile.get('data_quality', {})
- score = q.get('score', 100)
- if metrics:
- metric_details = []
- for k, v in metrics.items():
- if isinstance(v, (int, float)):
- metric_details.append(f'{k}: {v:,.0f}')
- if metric_details:
- items.append({
- 'title': '核心指标总览',
- 'content': f'本期关键指标:{";".join(metric_details[:6])}。'
- f'综合来看,业务运行态势可通过这些核心数据进行量化评估,'
- f'建议结合业务目标与实际值的差距进行针对性分析。',
- })
- if num_cols:
- for nc in num_cols[:2]:
- ns = nc.get('numeric_stats', {}) or {}
- col_name = nc.get('inferred_label', nc['column_name'])
- stats_parts = []
- if 'sum' in ns and ns['sum']:
- stats_parts.append(f'总量 {ns["sum"]:,.0f}')
- if 'mean' in ns and ns['mean']:
- stats_parts.append(f'均值 {ns["mean"]:,.1f}')
- if 'max' in ns and ns['max']:
- stats_parts.append(f'峰值 {ns["max"]:,.0f}')
- if 'min' in ns and ns['min']:
- stats_parts.append(f'最低 {ns["min"]:,.0f}')
- if stats_parts:
- items.append({
- 'title': f'{col_name}数据特征',
- 'content': f'指标"{col_name}"的统计特征:{",".join(stats_parts)}。'
- f'标准差 {ns.get("std", "N/A")},数据波动幅度'
- f'{"较大" if isinstance(ns.get("std"), (int,float)) and ns["std"] > ns.get("mean", 1) * 0.5 else "适中"}。',
- })
- if cat_cols:
- for cc in cat_cols[:2]:
- uc = cc.get('unique_count', 0)
- items.append({
- 'title': f'{cc.get("inferred_label", cc["column_name"])}维度分析',
- 'content': f'数据覆盖 {uc} 个不同的{cc.get("inferred_label", cc["column_name"])}类别,'
- f'丰富的分类维度支持多角度交叉分析。'
- f'建议重点关注主要类别的集中度与分布均衡性,'
- f'识别高价值类别与低效类别之间的差异特征。',
- })
- if time_cols:
- tc = time_cols[0]
- items.append({
- 'title': '时间维度覆盖',
- 'content': f'数据包含时间列"{tc.get("inferred_label", tc["column_name"])}",'
- f'支持按时间维度进行趋势分析。通过对时间序列数据的分解,'
- f'可识别周期性波动、趋势变化及异常时间节点,为预测与规划提供依据。',
- })
- items.append({
- 'title': '数据质量评估',
- 'content': f'数据质量评分 {score}/100,'
- f'{"数据完整可靠," if score >= 90 else "数据质量良好,建议关注缺失值" if score >= 80 else "数据需重点关注质量控制"}'
- f'缺失率 {q.get("null_rate", 0)*100:.1f}%。'
- f'本报告中的分析与图表均基于现有数据进行自动化生成,确保数据准确性。',
- })
- high_null = q.get('high_null_columns', [])
- if high_null:
- items.append({
- 'title': '数据完整性说明',
- 'content': f'以下列缺失值比例较高:{", ".join(high_null[:5])}。'
- f'在分析涉及这些列时已进行空值排除处理,'
- f'建议后续数据录入环节关注这些字段的完整填写,以提升分析精度。',
- })
- total_rows = data_profile.get('total_rows', 0)
- if total_rows:
- items.append({
- 'title': '数据规模概述',
- 'content': f'本期报告基于 {total_rows} 条数据记录进行分析,'
- f'样本量{"充足,统计结果具有较好的代表性" if total_rows >= 100 else "适中,统计结果可作为参考" if total_rows >= 30 else "有限,分析结果仅供参考"}。',
- })
- return items
- if __name__ == '__main__':
- import sys
- if len(sys.argv) > 1:
- from data_loader import load_workbook_metadata, load_daily
- fp = sys.argv[1]
- meta = load_workbook_metadata(fp)
- d0 = meta['date_range'][0]
- df = load_daily(fp, d0)
- m = calc_daily_metrics(df)
- print(f"Daily metrics for {d0.date()}:")
- print(f" Tracking orders: {m['tracking_orders']}")
- print(f" Total qty: {m['total_qty']}")
- print(f" Avg order size: {m['avg_order_size']}")
- print(f" Status dist: {m['status_dist']}")
|