""" Excel data loader for daily/weekly/monthly report generation. """ import pandas as pd from datetime import datetime, timedelta import re import warnings # Field mapping: Excel column name -> internal field name FIELD_MAP = { '序号': 'seq', '目的国家': 'country', '合同号': 'contract_no', '用户名称/公司': 'customer', '意向车型及数量': 'product_info', '订单总数量': 'order_qty', '负责人': 'owner', '当前状态': 'status', '拟定合同时间': 'contract_date', '跟单天数': 'tracking_days', '定金支付时间': 'deposit_date', '订金认领时间': 'deposit_claim_date', '订单生成时间': 'order_gen_date', '价格评审时间': 'price_review_date', '合同评审时间': 'contract_review_date', '合同提交盖章申请时间': 'seal_apply_date', '合同盖章时间': 'seal_date', '车辆下线入库状态': 'inventory_status', '尾款支付时间': 'final_pay_date', '尾款认领时间': 'final_claim_date', '智慧关务信息维护': 'customs_date', '许可证办理时间': 'license_date', '车辆发运时间': 'ship_date', '预计开票时间': 'invoice_date', '今日进度更新': 'progress_update', '是否更新': 'is_updated', '支持需求': 'support_request', '4月交付': 'deliver_apr', '5月预测': 'forecast_may', } STATUS_ORDER = ['A', 'B', 'C', 'D', 'E', 'F'] STATUS_LABELS = { 'A': '合同拟定中', 'B': '已锁定合同待付订金', 'C': '已付订金待生产', 'D': '已生产待付尾款', 'E': '已付尾款待发运', 'F': '已发运', } def _normalize_status(val): """Extract status code A-F from status string.""" if pd.isna(val): return None s = str(val).strip() # Match pattern like "A(合同拟定中)" or "A" m = re.match(r'^([A-F])', s) if m: return m.group(1) return None def _parse_date(val): """Parse various date formats.""" if pd.isna(val): return None if isinstance(val, datetime): return val for fmt in ('%Y-%m-%d', '%Y/%m/%d', '%Y年%m月%d日'): try: return datetime.strptime(str(val).strip(), fmt) except ValueError: continue return None def _sheet_name_for_date(date: datetime) -> str: """Convert datetime to expected sheet name.""" return date.strftime('%Y年%m月%d日') def load_workbook_metadata(filepath: str) -> dict: """Return workbook metadata: sheet names, date range.""" xl = pd.ExcelFile(filepath) sheets = xl.sheet_names dates = [] for s in sheets: try: d = datetime.strptime(s, '%Y年%m月%d日') dates.append(d) except ValueError: continue dates.sort() return { 'sheets': sheets, 'date_range': (dates[0], dates[-1]) if dates else (None, None), 'total_days': len(dates), } def load_daily(filepath: str, date: datetime) -> pd.DataFrame: """Load single-day order data.""" sheet = _sheet_name_for_date(date) df = pd.read_excel(filepath, sheet_name=sheet) return _clean_dataframe(df) def load_date_range(filepath: str, start: datetime, end: datetime) -> pd.DataFrame: """Load and concatenate data across a date range [start, end].""" xl = pd.ExcelFile(filepath) frames = [] current = start while current <= end: sheet = _sheet_name_for_date(current) if sheet in xl.sheet_names: df = pd.read_excel(filepath, sheet_name=sheet) df['_data_date'] = current frames.append(df) current += timedelta(days=1) if not frames: raise ValueError(f"No data found between {start.date()} and {end.date()}") combined = pd.concat(frames, ignore_index=True) return _clean_dataframe(combined) def load_weekly(filepath: str, year: int, week_num: int, week_start_day=0) -> tuple: """ Load data for a specific week. Returns (current_week_df, prev_week_df). week_start_day: 0=Monday, 6=Sunday """ # Find the first day of the given week # Simplified: assume data starts from a known reference meta = load_workbook_metadata(filepath) first_date, last_date = meta['date_range'] if first_date is None: raise ValueError("No valid date sheets found") # Find the Monday of the target week (using ISO week definition) # Jan 4 is always in week 1 jan4 = datetime(year, 1, 4) # Adjust to Monday jan4_monday = jan4 - timedelta(days=jan4.weekday()) target_monday = jan4_monday + timedelta(weeks=week_num - 1) target_sunday = target_monday + timedelta(days=6) # Clamp to available data range start = max(target_monday, first_date) end = min(target_sunday, last_date) current = load_date_range(filepath, start, end) # Previous week prev_start = start - timedelta(days=7) prev_end = end - timedelta(days=7) if prev_start >= first_date: previous = load_date_range(filepath, prev_start, prev_end) else: previous = pd.DataFrame(columns=current.columns) return current, previous def load_monthly(filepath: str, year: int, month: int) -> tuple: """ Load data for a specific month. Returns (current_month_df, prev_month_df, yoy_month_df). """ start = datetime(year, month, 1) # Last day of month if month == 12: end = datetime(year + 1, 1, 1) - timedelta(days=1) else: end = datetime(year, month + 1, 1) - timedelta(days=1) current = load_date_range(filepath, start, end) # Previous month if month == 1: prev_start = datetime(year - 1, 12, 1) prev_end = datetime(year, 1, 1) - timedelta(days=1) else: prev_start = datetime(year, month - 1, 1) prev_end = datetime(year, month, 1) - timedelta(days=1) try: previous = load_date_range(filepath, prev_start, prev_end) except ValueError: previous = pd.DataFrame(columns=current.columns) # YoY (same month last year) yoy_start = datetime(year - 1, month, 1) if month == 12: yoy_end = datetime(year, 1, 1) - timedelta(days=1) else: yoy_end = datetime(year - 1, month + 1, 1) - timedelta(days=1) try: yoy = load_date_range(filepath, yoy_start, yoy_end) except ValueError: yoy = pd.DataFrame(columns=current.columns) return current, previous, yoy def _clean_dataframe(df: pd.DataFrame) -> pd.DataFrame: """Rename columns, parse dates, clean statuses.""" # Rename known columns rename_map = {k: v for k, v in FIELD_MAP.items() if k in df.columns} df = df.rename(columns=rename_map) # Normalize status if 'status' in df.columns: df['status_code'] = df['status'].apply(_normalize_status) # Parse numeric fields if 'order_qty' in df.columns: df['order_qty'] = pd.to_numeric(df['order_qty'], errors='coerce') # Parse date fields date_fields = ['contract_date', 'deposit_date', 'order_gen_date', 'price_review_date', 'contract_review_date', 'seal_apply_date', 'seal_date', 'final_pay_date', 'customs_date', 'license_date', 'ship_date', 'invoice_date'] for field in date_fields: if field in df.columns: df[field] = df[field].apply(_parse_date) # Tracking days if 'tracking_days' in df.columns: df['tracking_days'] = pd.to_numeric(df['tracking_days'], errors='coerce') # Boolean updated if 'is_updated' in df.columns: df['is_updated_flag'] = df['is_updated'].astype(str).str.strip() == '是' return df if __name__ == '__main__': import sys if len(sys.argv) > 1: fp = sys.argv[1] meta = load_workbook_metadata(fp) print(f"Sheets: {meta['sheets'][:5]}...") print(f"Date range: {meta['date_range'][0]} ~ {meta['date_range'][1]}") print(f"Total days: {meta['total_days']}")