""" Excel data loader for daily/weekly/monthly report generation. Contains both legacy order-specific loaders and enhanced generic loaders. """ import pandas as pd from datetime import datetime, timedelta import re import warnings import os import io import csv # ===================================================================== # LEGACY SECTION — Order-specific loaders (kept for backward compat) # ===================================================================== FIELD_MAP = { '序号': 'seq', '目的国家': 'country', '合同号': 'contract_no', '用户名称/公司': 'customer', '意向车型及数量': 'product_info', '订单总数量': 'order_qty', '负责人': 'owner', '当前状态': 'status', '拟定合同时间': 'contract_date', '跟单天数': 'tracking_days', '定金支付时间': 'deposit_date', '订金认领时间': 'deposit_claim_date', '订单生成时间': 'order_gen_date', '价格评审时间': 'price_review_date', '合同评审时间': 'contract_review_date', '合同提交盖章申请时间': 'seal_apply_date', '合同盖章时间': 'seal_date', '车辆下线入库状态': 'inventory_status', '尾款支付时间': 'final_pay_date', '尾款认领时间': 'final_claim_date', '智慧关务信息维护': 'customs_date', '许可证办理时间': 'license_date', '车辆发运时间': 'ship_date', '预计开票时间': 'invoice_date', '今日进度更新': 'progress_update', '是否更新': 'is_updated', '支持需求': 'support_request', '4月交付': 'deliver_apr', '5月预测': 'forecast_may', } STATUS_ORDER = ['A', 'B', 'C', 'D', 'E', 'F'] STATUS_LABELS = { 'A': '合同拟定中', 'B': '已锁定合同待付订金', 'C': '已付订金待生产', 'D': '已生产待付尾款', 'E': '已付尾款待发运', 'F': '已发运', } def _normalize_status(val): """Extract status code A-F from status string.""" if pd.isna(val): return None s = str(val).strip() m = re.match(r'^([A-F])', s) if m: return m.group(1) return None def _parse_date(val): """Parse various date formats.""" if pd.isna(val): return None if isinstance(val, datetime): return val for fmt in ('%Y-%m-%d', '%Y/%m/%d', '%Y年%m月%d日'): try: return datetime.strptime(str(val).strip(), fmt) except ValueError: continue return None def _sheet_name_for_date(date: datetime) -> str: """Convert datetime to expected sheet name.""" return date.strftime('%Y年%m月%d日') def load_workbook_metadata(filepath: str) -> dict: """Return workbook metadata: sheet names, date range.""" xl = pd.ExcelFile(filepath) sheets = xl.sheet_names dates = [] for s in sheets: try: d = datetime.strptime(s, '%Y年%m月%d日') dates.append(d) except ValueError: continue dates.sort() return { 'sheets': sheets, 'date_range': (dates[0], dates[-1]) if dates else (None, None), 'total_days': len(dates), } def load_daily(filepath: str, date: datetime) -> pd.DataFrame: """Load single-day order data.""" sheet = _sheet_name_for_date(date) df = pd.read_excel(filepath, sheet_name=sheet) return _clean_dataframe(df) def load_date_range(filepath: str, start: datetime, end: datetime) -> pd.DataFrame: """Load and concatenate data across a date range [start, end].""" xl = pd.ExcelFile(filepath) frames = [] current = start while current <= end: sheet = _sheet_name_for_date(current) if sheet in xl.sheet_names: df = pd.read_excel(filepath, sheet_name=sheet) df['_data_date'] = current frames.append(df) current += timedelta(days=1) if not frames: raise ValueError(f"No data found between {start.date()} and {end.date()}") combined = pd.concat(frames, ignore_index=True) return _clean_dataframe(combined) def load_weekly(filepath: str, year: int, week_num: int, week_start_day=0) -> tuple: """ Load data for a specific week. Returns (current_week_df, prev_week_df). week_start_day: 0=Monday, 6=Sunday """ meta = load_workbook_metadata(filepath) first_date, last_date = meta['date_range'] if first_date is None: raise ValueError("No valid date sheets found") jan4 = datetime(year, 1, 4) jan4_monday = jan4 - timedelta(days=jan4.weekday()) target_monday = jan4_monday + timedelta(weeks=week_num - 1) target_sunday = target_monday + timedelta(days=6) start = max(target_monday, first_date) end = min(target_sunday, last_date) current = load_date_range(filepath, start, end) prev_start = start - timedelta(days=7) prev_end = end - timedelta(days=7) if prev_start >= first_date: previous = load_date_range(filepath, prev_start, prev_end) else: previous = pd.DataFrame(columns=current.columns) return current, previous def load_monthly(filepath: str, year: int, month: int) -> tuple: """ Load data for a specific month. Returns (current_month_df, prev_month_df, yoy_month_df). """ start = datetime(year, month, 1) if month == 12: end = datetime(year + 1, 1, 1) - timedelta(days=1) else: end = datetime(year, month + 1, 1) - timedelta(days=1) current = load_date_range(filepath, start, end) if month == 1: prev_start = datetime(year - 1, 12, 1) prev_end = datetime(year, 1, 1) - timedelta(days=1) else: prev_start = datetime(year, month - 1, 1) prev_end = datetime(year, month, 1) - timedelta(days=1) try: previous = load_date_range(filepath, prev_start, prev_end) except ValueError: previous = pd.DataFrame(columns=current.columns) yoy_start = datetime(year - 1, month, 1) if month == 12: yoy_end = datetime(year, 1, 1) - timedelta(days=1) else: yoy_end = datetime(year - 1, month + 1, 1) - timedelta(days=1) try: yoy = load_date_range(filepath, yoy_start, yoy_end) except ValueError: yoy = pd.DataFrame(columns=current.columns) return current, previous, yoy def _clean_dataframe(df: pd.DataFrame) -> pd.DataFrame: """Rename columns, parse dates, clean statuses (legacy).""" rename_map = {k: v for k, v in FIELD_MAP.items() if k in df.columns} df = df.rename(columns=rename_map) if 'status' in df.columns: df['status_code'] = df['status'].apply(_normalize_status) if 'order_qty' in df.columns: df['order_qty'] = pd.to_numeric(df['order_qty'], errors='coerce') date_fields = ['contract_date', 'deposit_date', 'order_gen_date', 'price_review_date', 'contract_review_date', 'seal_apply_date', 'seal_date', 'final_pay_date', 'customs_date', 'license_date', 'ship_date', 'invoice_date'] for field in date_fields: if field in df.columns: df[field] = df[field].apply(_parse_date) if 'tracking_days' in df.columns: df['tracking_days'] = pd.to_numeric(df['tracking_days'], errors='coerce') if 'is_updated' in df.columns: df['is_updated_flag'] = df['is_updated'].astype(str).str.strip() == '是' return df # ===================================================================== # GENERIC LOADING SECTION — Universal loaders for any Excel data # ===================================================================== # Summary row keywords (Chinese and English) to auto-detect and skip SUMMARY_KEYWORDS = [ '合计', '总计', '小计', '汇总', '累计', '总和', 'total', 'sum', 'subtotal', 'grand total', '合计:', '总计:', '平均', 'avg', 'average', ] # Encoding priority list for CSV detection CSV_ENCODINGS = ['utf-8', 'utf-8-sig', 'gbk', 'gb2312', 'gb18030', 'latin-1', 'cp1252'] def auto_detect_file_format(filepath: str) -> str: """Auto-detect file format: xlsx, xls, csv, or unknown.""" ext = os.path.splitext(filepath)[1].lower() if ext in ('.xlsx', '.xls'): return ext[1:] if ext == '.csv': return 'csv' if ext in ('.xlsm', '.xlsb'): return ext[1:] return 'unknown' def load_generic_csv(filepath: str, encoding=None, **kwargs) -> pd.DataFrame: """ Load a CSV file with auto-encoding detection. Tries common encodings until one succeeds. """ if encoding: try: return pd.read_csv(filepath, encoding=encoding, **kwargs) except (UnicodeDecodeError, UnicodeError): raise ValueError(f"Failed to decode {filepath} with encoding {encoding}") last_error = None for enc in CSV_ENCODINGS: try: df = pd.read_csv(filepath, encoding=enc, **kwargs) if len(df.columns) > 0: return df except (UnicodeDecodeError, UnicodeError, pd.errors.ParserError) as e: last_error = e continue raise ValueError( f"Unable to decode CSV file {filepath}. Tried encodings: " f"{CSV_ENCODINGS}. Last error: {last_error}" ) def _detect_and_skip_footer_rows(df_raw: pd.DataFrame) -> pd.DataFrame: """Detect and remove summary/aggregation rows at the end of the data.""" if df_raw.empty: return df_raw rows_to_drop = [] text_cols = [c for c in df_raw.columns if df_raw[c].dtype == 'object'] for idx in range(len(df_raw) - 1, -1, -1): row = df_raw.iloc[idx] is_summary = False for col in text_cols: val = str(row.get(col, '')).strip().lower() if any(kw in val for kw in SUMMARY_KEYWORDS): is_summary = True break if is_summary: rows_to_drop.append(idx) else: break if len(rows_to_drop) > 20: break if rows_to_drop: df_raw = df_raw.drop(index=rows_to_drop) df_raw = df_raw.reset_index(drop=True) return df_raw def _detect_empty_or_notes_rows(df_raw: pd.DataFrame) -> pd.DataFrame: """Remove leading empty rows and trailing fully-empty rows.""" if df_raw.empty: return df_raw non_empty_rows = df_raw.notna().any(axis=1) first_valid = non_empty_rows.idxmax() if non_empty_rows.any() else 0 last_valid = non_empty_rows[non_empty_rows].index[-1] if non_empty_rows.any() else len(df_raw) df_raw = df_raw.iloc[first_valid:last_valid + 1].reset_index(drop=True) return df_raw def normalize_column_names(col_name: str) -> str: """ Normalize a single column name: strip whitespace, unify brackets, remove special chars. """ if not isinstance(col_name, str): return col_name name = col_name.strip() name = name.replace('(', '(').replace(')', ')') name = name.replace('【', '[').replace('】', ']') name = name.replace('\n', ' ').replace('\r', ' ') name = re.sub(r'\s+', ' ', name) return name def _is_unnamed_column(col) -> bool: return str(col).strip().lower().startswith('unnamed') def _dedupe_columns(columns) -> list: seen = {} result = [] for idx, col in enumerate(columns): name = normalize_column_names(str(col).strip()) if col is not None else '' if not name or name.lower() in ('nan', 'none') or _is_unnamed_column(name): name = f'column_{idx + 1}' base = name count = seen.get(base, 0) if count: name = f'{base}_{count + 1}' seen[base] = count + 1 result.append(name) return result def _row_header_score(values) -> float: cells = [str(v).strip() for v in values if pd.notna(v) and str(v).strip()] if not cells: return 0 non_numeric = 0 unique = set() for cell in cells: unique.add(cell) try: float(cell.replace(',', '').replace('%', '')) is_numeric = True except Exception: is_numeric = False if not is_numeric: non_numeric += 1 return len(cells) + non_numeric * 1.5 + len(unique) * 0.5 def _detect_header_row(raw_df: pd.DataFrame, max_scan_rows: int = 8) -> int: if raw_df.empty: return 0 scan_rows = min(max_scan_rows, len(raw_df)) best_idx = 0 best_score = -1 for idx in range(scan_rows): row = raw_df.iloc[idx] score = _row_header_score(row) next_non_empty = 0 if idx + 1 < len(raw_df): next_non_empty = raw_df.iloc[idx + 1].notna().sum() # Prefer rows with many non-numeric labels and data underneath. score += min(next_non_empty, len(raw_df.columns)) * 0.2 if score > best_score: best_score = score best_idx = idx return best_idx def _dataframe_from_detected_header(raw_df: pd.DataFrame, header_row='auto') -> pd.DataFrame: raw_df = raw_df.dropna(how='all').reset_index(drop=True) raw_df = raw_df.dropna(axis=1, how='all') if raw_df.empty: return raw_df if header_row == 'auto': header_idx = _detect_header_row(raw_df) elif header_row is None: header_idx = 0 else: header_idx = int(header_row) header_idx = max(0, min(header_idx, len(raw_df) - 1)) columns = _dedupe_columns(raw_df.iloc[header_idx].tolist()) df = raw_df.iloc[header_idx + 1:].copy().reset_index(drop=True) df.columns = columns return df def _clean_generic_dataframe(df: pd.DataFrame, skip_summary_rows=True) -> pd.DataFrame: """ Universal DataFrame cleaning: - Remove fully empty rows/columns - Drop 'Unnamed' columns - Normalize column names (strip whitespace, unify brackets) - Auto-detect and remove summary/total rows - Try to parse date columns - Try to parse numeric columns """ if df.empty: return df df = df.dropna(how='all').reset_index(drop=True) df = df.dropna(axis=1, how='all') unnamed_mask = df.columns.map(_is_unnamed_column) if unnamed_mask.any() and (~unnamed_mask).any(): df = df.loc[:, ~unnamed_mask] df = df.rename(columns=normalize_column_names) if skip_summary_rows: df = _detect_and_skip_footer_rows(df) df = _detect_empty_or_notes_rows(df) for col in df.columns: if df[col].dtype == 'object': # Try numeric first to avoid small integers being mis-parsed as dates try: numeric = pd.to_numeric(df[col], errors='coerce') if numeric.notna().sum() > len(df) * 0.7: df[col] = numeric continue except Exception: pass # Then try datetime try: try: parsed = pd.to_datetime(df[col], errors='coerce', format='mixed') except TypeError: parsed = pd.to_datetime(df[col], errors='coerce') if parsed.notna().sum() > len(df) * 0.7: df[col] = parsed continue except Exception: pass return df def load_generic_excel(filepath: str, sheet_name=0, skip_summary_rows=True, encoding=None, dtype_backend=None, header_row='auto') -> pd.DataFrame: """ Load any Excel/CSV file into a cleaned DataFrame. Args: filepath: Path to the data file (.xlsx, .xls, or .csv) sheet_name: Sheet name or index (for Excel). Ignored for CSV. skip_summary_rows: Auto-detect and remove summary/total footer rows encoding: File encoding (auto-detected for CSV if None) dtype_backend: Optional pandas dtype backend ('numpy_nullable', 'pyarrow') header_row: Excel header row index or 'auto'. CSV keeps its native header. """ fmt = auto_detect_file_format(filepath) kwargs = {} if dtype_backend: kwargs['dtype_backend'] = dtype_backend if fmt == 'csv': df = load_generic_csv(filepath, encoding=encoding, **kwargs) else: try: raw_df = pd.read_excel(filepath, sheet_name=sheet_name, header=None, **kwargs) df = _dataframe_from_detected_header(raw_df, header_row=header_row) except Exception as e: if fmt == 'xls': raise ValueError( f"Failed to read .xls file. Try converting to .xlsx format. " f"Error: {e}" ) raise return _clean_generic_dataframe(df, skip_summary_rows=skip_summary_rows) def load_generic_all_sheets(filepath: str, skip_summary_rows=True, header_row='auto') -> pd.DataFrame: """ Load all sheets from an Excel file, merge into a single DataFrame. Adds '_source_sheet' column to track the source sheet. """ fmt = auto_detect_file_format(filepath) if fmt == 'csv': return load_generic_excel(filepath, skip_summary_rows=skip_summary_rows) xl = pd.ExcelFile(filepath) if len(xl.sheet_names) == 1: raw_df = pd.read_excel(filepath, sheet_name=xl.sheet_names[0], header=None) df = _dataframe_from_detected_header(raw_df, header_row=header_row) return _clean_generic_dataframe(df, skip_summary_rows=skip_summary_rows) frames = [] for sheet in xl.sheet_names: try: raw_df = pd.read_excel(filepath, sheet_name=sheet, header=None) df = _dataframe_from_detected_header(raw_df, header_row=header_row) df['_source_sheet'] = sheet frames.append(df) except Exception: continue if not frames: raise ValueError(f"No valid sheets found in {filepath}") combined = pd.concat(frames, ignore_index=True) return _clean_generic_dataframe(combined, skip_summary_rows=skip_summary_rows) def auto_detect_date_column(df: pd.DataFrame) -> str: """Auto-detect the primary date/time column in a DataFrame.""" date_keywords = ['日期', '时间', 'date', 'time', '年', '月', '日'] for col in df.columns: col_str = str(col).lower().strip() if any(kw in col_str for kw in date_keywords): parsed = pd.to_datetime(df[col], errors='coerce') if parsed.notna().sum() > len(df) * 0.5: return col return '' def auto_parse_single_sheet(filepath: str, sheet_name=0) -> pd.DataFrame: """Load and clean a single sheet (shortcut for load_generic_excel).""" return load_generic_excel(filepath, sheet_name=sheet_name) def load_generic_file_info(filepath: str) -> dict: """ Return file metadata without full data loading. Useful for quick inspection before deciding how to load. """ info = {'filepath': filepath, 'format': auto_detect_file_format(filepath)} if info['format'] == 'csv': try: with open(filepath, 'r', encoding='utf-8-sig') as f: sample = f.read(8192) dialect = csv.Sniffer().sniff(sample[:4096]) info['delimiter'] = dialect.delimiter info['has_header'] = csv.Sniffer().has_header(sample) info['approx_rows'] = sample.count('\n') except Exception: info['delimiter'] = ',' else: try: xl = pd.ExcelFile(filepath) info['sheet_names'] = xl.sheet_names info['sheet_count'] = len(xl.sheet_names) except Exception as e: info['error'] = str(e) info['file_size_mb'] = round(os.path.getsize(filepath) / (1024 * 1024), 2) return info if __name__ == '__main__': import sys if len(sys.argv) > 1: fp = sys.argv[1] fmt = auto_detect_file_format(fp) print(f"File: {fp}") print(f"Format: {fmt}") file_info = load_generic_file_info(fp) print(f"Size: {file_info.get('file_size_mb', '?')} MB") if 'sheet_names' in file_info: print(f"Sheets ({file_info['sheet_count']}): {file_info['sheet_names'][:5]}...") df = load_generic_excel(fp) date_col = auto_detect_date_column(df) print(f"Generic load: {len(df)} rows x {len(df.columns)} cols, " f"date column: {date_col}") print(f"Columns: {list(df.columns)}")