| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599 |
- """
- Excel data loader for daily/weekly/monthly report generation.
- Contains both legacy order-specific loaders and enhanced generic loaders.
- """
- import pandas as pd
- from datetime import datetime, timedelta
- import re
- import warnings
- import os
- import io
- import csv
- # =====================================================================
- # LEGACY SECTION — Order-specific loaders (kept for backward compat)
- # =====================================================================
- FIELD_MAP = {
- '序号': 'seq',
- '目的国家': 'country',
- '合同号': 'contract_no',
- '用户名称/公司': 'customer',
- '意向车型及数量': 'product_info',
- '订单总数量': 'order_qty',
- '负责人': 'owner',
- '当前状态': 'status',
- '拟定合同时间': 'contract_date',
- '跟单天数': 'tracking_days',
- '定金支付时间': 'deposit_date',
- '订金认领时间': 'deposit_claim_date',
- '订单生成时间': 'order_gen_date',
- '价格评审时间': 'price_review_date',
- '合同评审时间': 'contract_review_date',
- '合同提交盖章申请时间': 'seal_apply_date',
- '合同盖章时间': 'seal_date',
- '车辆下线入库状态': 'inventory_status',
- '尾款支付时间': 'final_pay_date',
- '尾款认领时间': 'final_claim_date',
- '智慧关务信息维护': 'customs_date',
- '许可证办理时间': 'license_date',
- '车辆发运时间': 'ship_date',
- '预计开票时间': 'invoice_date',
- '今日进度更新': 'progress_update',
- '是否更新': 'is_updated',
- '支持需求': 'support_request',
- '4月交付': 'deliver_apr',
- '5月预测': 'forecast_may',
- }
- STATUS_ORDER = ['A', 'B', 'C', 'D', 'E', 'F']
- STATUS_LABELS = {
- 'A': '合同拟定中',
- 'B': '已锁定合同待付订金',
- 'C': '已付订金待生产',
- 'D': '已生产待付尾款',
- 'E': '已付尾款待发运',
- 'F': '已发运',
- }
- def _normalize_status(val):
- """Extract status code A-F from status string."""
- if pd.isna(val):
- return None
- s = str(val).strip()
- m = re.match(r'^([A-F])', s)
- if m:
- return m.group(1)
- return None
- def _parse_date(val):
- """Parse various date formats."""
- if pd.isna(val):
- return None
- if isinstance(val, datetime):
- return val
- for fmt in ('%Y-%m-%d', '%Y/%m/%d', '%Y年%m月%d日'):
- try:
- return datetime.strptime(str(val).strip(), fmt)
- except ValueError:
- continue
- return None
- def _sheet_name_for_date(date: datetime) -> str:
- """Convert datetime to expected sheet name."""
- return date.strftime('%Y年%m月%d日')
- def load_workbook_metadata(filepath: str) -> dict:
- """Return workbook metadata: sheet names, date range."""
- xl = pd.ExcelFile(filepath)
- sheets = xl.sheet_names
- dates = []
- for s in sheets:
- try:
- d = datetime.strptime(s, '%Y年%m月%d日')
- dates.append(d)
- except ValueError:
- continue
- dates.sort()
- return {
- 'sheets': sheets,
- 'date_range': (dates[0], dates[-1]) if dates else (None, None),
- 'total_days': len(dates),
- }
- def load_daily(filepath: str, date: datetime) -> pd.DataFrame:
- """Load single-day order data."""
- sheet = _sheet_name_for_date(date)
- df = pd.read_excel(filepath, sheet_name=sheet)
- return _clean_dataframe(df)
- def load_date_range(filepath: str, start: datetime, end: datetime) -> pd.DataFrame:
- """Load and concatenate data across a date range [start, end]."""
- xl = pd.ExcelFile(filepath)
- frames = []
- current = start
- while current <= end:
- sheet = _sheet_name_for_date(current)
- if sheet in xl.sheet_names:
- df = pd.read_excel(filepath, sheet_name=sheet)
- df['_data_date'] = current
- frames.append(df)
- current += timedelta(days=1)
- if not frames:
- raise ValueError(f"No data found between {start.date()} and {end.date()}")
- combined = pd.concat(frames, ignore_index=True)
- return _clean_dataframe(combined)
- def load_weekly(filepath: str, year: int, week_num: int, week_start_day=0) -> tuple:
- """
- Load data for a specific week.
- Returns (current_week_df, prev_week_df).
- week_start_day: 0=Monday, 6=Sunday
- """
- meta = load_workbook_metadata(filepath)
- first_date, last_date = meta['date_range']
- if first_date is None:
- raise ValueError("No valid date sheets found")
- jan4 = datetime(year, 1, 4)
- jan4_monday = jan4 - timedelta(days=jan4.weekday())
- target_monday = jan4_monday + timedelta(weeks=week_num - 1)
- target_sunday = target_monday + timedelta(days=6)
- start = max(target_monday, first_date)
- end = min(target_sunday, last_date)
- current = load_date_range(filepath, start, end)
- prev_start = start - timedelta(days=7)
- prev_end = end - timedelta(days=7)
- if prev_start >= first_date:
- previous = load_date_range(filepath, prev_start, prev_end)
- else:
- previous = pd.DataFrame(columns=current.columns)
- return current, previous
- def load_monthly(filepath: str, year: int, month: int) -> tuple:
- """
- Load data for a specific month.
- Returns (current_month_df, prev_month_df, yoy_month_df).
- """
- start = datetime(year, month, 1)
- if month == 12:
- end = datetime(year + 1, 1, 1) - timedelta(days=1)
- else:
- end = datetime(year, month + 1, 1) - timedelta(days=1)
- current = load_date_range(filepath, start, end)
- if month == 1:
- prev_start = datetime(year - 1, 12, 1)
- prev_end = datetime(year, 1, 1) - timedelta(days=1)
- else:
- prev_start = datetime(year, month - 1, 1)
- prev_end = datetime(year, month, 1) - timedelta(days=1)
- try:
- previous = load_date_range(filepath, prev_start, prev_end)
- except ValueError:
- previous = pd.DataFrame(columns=current.columns)
- yoy_start = datetime(year - 1, month, 1)
- if month == 12:
- yoy_end = datetime(year, 1, 1) - timedelta(days=1)
- else:
- yoy_end = datetime(year - 1, month + 1, 1) - timedelta(days=1)
- try:
- yoy = load_date_range(filepath, yoy_start, yoy_end)
- except ValueError:
- yoy = pd.DataFrame(columns=current.columns)
- return current, previous, yoy
- def _clean_dataframe(df: pd.DataFrame) -> pd.DataFrame:
- """Rename columns, parse dates, clean statuses (legacy)."""
- rename_map = {k: v for k, v in FIELD_MAP.items() if k in df.columns}
- df = df.rename(columns=rename_map)
- if 'status' in df.columns:
- df['status_code'] = df['status'].apply(_normalize_status)
- if 'order_qty' in df.columns:
- df['order_qty'] = pd.to_numeric(df['order_qty'], errors='coerce')
- date_fields = ['contract_date', 'deposit_date', 'order_gen_date',
- 'price_review_date', 'contract_review_date', 'seal_apply_date',
- 'seal_date', 'final_pay_date', 'customs_date', 'license_date',
- 'ship_date', 'invoice_date']
- for field in date_fields:
- if field in df.columns:
- df[field] = df[field].apply(_parse_date)
- if 'tracking_days' in df.columns:
- df['tracking_days'] = pd.to_numeric(df['tracking_days'], errors='coerce')
- if 'is_updated' in df.columns:
- df['is_updated_flag'] = df['is_updated'].astype(str).str.strip() == '是'
- return df
- # =====================================================================
- # GENERIC LOADING SECTION — Universal loaders for any Excel data
- # =====================================================================
- # Summary row keywords (Chinese and English) to auto-detect and skip
- SUMMARY_KEYWORDS = [
- '合计', '总计', '小计', '汇总', '累计', '总和',
- 'total', 'sum', 'subtotal', 'grand total', '合计:', '总计:',
- '平均', 'avg', 'average',
- ]
- # Encoding priority list for CSV detection
- CSV_ENCODINGS = ['utf-8', 'utf-8-sig', 'gbk', 'gb2312', 'gb18030', 'latin-1', 'cp1252']
- def auto_detect_file_format(filepath: str) -> str:
- """Auto-detect file format: xlsx, xls, csv, or unknown."""
- ext = os.path.splitext(filepath)[1].lower()
- if ext in ('.xlsx', '.xls'):
- return ext[1:]
- if ext == '.csv':
- return 'csv'
- if ext in ('.xlsm', '.xlsb'):
- return ext[1:]
- return 'unknown'
- def load_generic_csv(filepath: str, encoding=None, **kwargs) -> pd.DataFrame:
- """
- Load a CSV file with auto-encoding detection.
- Tries common encodings until one succeeds.
- """
- if encoding:
- try:
- return pd.read_csv(filepath, encoding=encoding, **kwargs)
- except (UnicodeDecodeError, UnicodeError):
- raise ValueError(f"Failed to decode {filepath} with encoding {encoding}")
- last_error = None
- for enc in CSV_ENCODINGS:
- try:
- df = pd.read_csv(filepath, encoding=enc, **kwargs)
- if len(df.columns) > 0:
- return df
- except (UnicodeDecodeError, UnicodeError, pd.errors.ParserError) as e:
- last_error = e
- continue
- raise ValueError(
- f"Unable to decode CSV file {filepath}. Tried encodings: "
- f"{CSV_ENCODINGS}. Last error: {last_error}"
- )
- def _detect_and_skip_footer_rows(df_raw: pd.DataFrame) -> pd.DataFrame:
- """Detect and remove summary/aggregation rows at the end of the data."""
- if df_raw.empty:
- return df_raw
- rows_to_drop = []
- text_cols = [c for c in df_raw.columns if df_raw[c].dtype == 'object']
- for idx in range(len(df_raw) - 1, -1, -1):
- row = df_raw.iloc[idx]
- is_summary = False
- for col in text_cols:
- val = str(row.get(col, '')).strip().lower()
- if any(kw in val for kw in SUMMARY_KEYWORDS):
- is_summary = True
- break
- if is_summary:
- rows_to_drop.append(idx)
- else:
- break
- if len(rows_to_drop) > 20:
- break
- if rows_to_drop:
- df_raw = df_raw.drop(index=rows_to_drop)
- df_raw = df_raw.reset_index(drop=True)
- return df_raw
- def _detect_empty_or_notes_rows(df_raw: pd.DataFrame) -> pd.DataFrame:
- """Remove leading empty rows and trailing fully-empty rows."""
- if df_raw.empty:
- return df_raw
- non_empty_rows = df_raw.notna().any(axis=1)
- first_valid = non_empty_rows.idxmax() if non_empty_rows.any() else 0
- last_valid = non_empty_rows[non_empty_rows].index[-1] if non_empty_rows.any() else len(df_raw)
- df_raw = df_raw.iloc[first_valid:last_valid + 1].reset_index(drop=True)
- return df_raw
- def normalize_column_names(col_name: str) -> str:
- """
- Normalize a single column name: strip whitespace, unify brackets, remove special chars.
- """
- if not isinstance(col_name, str):
- return col_name
- name = col_name.strip()
- name = name.replace('(', '(').replace(')', ')')
- name = name.replace('【', '[').replace('】', ']')
- name = name.replace('\n', ' ').replace('\r', ' ')
- name = re.sub(r'\s+', ' ', name)
- return name
- def _is_unnamed_column(col) -> bool:
- return str(col).strip().lower().startswith('unnamed')
- def _dedupe_columns(columns) -> list:
- seen = {}
- result = []
- for idx, col in enumerate(columns):
- name = normalize_column_names(str(col).strip()) if col is not None else ''
- if not name or name.lower() in ('nan', 'none') or _is_unnamed_column(name):
- name = f'column_{idx + 1}'
- base = name
- count = seen.get(base, 0)
- if count:
- name = f'{base}_{count + 1}'
- seen[base] = count + 1
- result.append(name)
- return result
- def _row_header_score(values) -> float:
- cells = [str(v).strip() for v in values if pd.notna(v) and str(v).strip()]
- if not cells:
- return 0
- non_numeric = 0
- unique = set()
- for cell in cells:
- unique.add(cell)
- try:
- float(cell.replace(',', '').replace('%', ''))
- is_numeric = True
- except Exception:
- is_numeric = False
- if not is_numeric:
- non_numeric += 1
- return len(cells) + non_numeric * 1.5 + len(unique) * 0.5
- def _detect_header_row(raw_df: pd.DataFrame, max_scan_rows: int = 8) -> int:
- if raw_df.empty:
- return 0
- scan_rows = min(max_scan_rows, len(raw_df))
- best_idx = 0
- best_score = -1
- for idx in range(scan_rows):
- row = raw_df.iloc[idx]
- score = _row_header_score(row)
- next_non_empty = 0
- if idx + 1 < len(raw_df):
- next_non_empty = raw_df.iloc[idx + 1].notna().sum()
- # Prefer rows with many non-numeric labels and data underneath.
- score += min(next_non_empty, len(raw_df.columns)) * 0.2
- if score > best_score:
- best_score = score
- best_idx = idx
- return best_idx
- def _dataframe_from_detected_header(raw_df: pd.DataFrame, header_row='auto') -> pd.DataFrame:
- raw_df = raw_df.dropna(how='all').reset_index(drop=True)
- raw_df = raw_df.dropna(axis=1, how='all')
- if raw_df.empty:
- return raw_df
- if header_row == 'auto':
- header_idx = _detect_header_row(raw_df)
- elif header_row is None:
- header_idx = 0
- else:
- header_idx = int(header_row)
- header_idx = max(0, min(header_idx, len(raw_df) - 1))
- columns = _dedupe_columns(raw_df.iloc[header_idx].tolist())
- df = raw_df.iloc[header_idx + 1:].copy().reset_index(drop=True)
- df.columns = columns
- return df
- def _clean_generic_dataframe(df: pd.DataFrame, skip_summary_rows=True) -> pd.DataFrame:
- """
- Universal DataFrame cleaning:
- - Remove fully empty rows/columns
- - Drop 'Unnamed' columns
- - Normalize column names (strip whitespace, unify brackets)
- - Auto-detect and remove summary/total rows
- - Try to parse date columns
- - Try to parse numeric columns
- """
- if df.empty:
- return df
- df = df.dropna(how='all').reset_index(drop=True)
- df = df.dropna(axis=1, how='all')
- unnamed_mask = df.columns.map(_is_unnamed_column)
- if unnamed_mask.any() and (~unnamed_mask).any():
- df = df.loc[:, ~unnamed_mask]
- df = df.rename(columns=normalize_column_names)
- if skip_summary_rows:
- df = _detect_and_skip_footer_rows(df)
- df = _detect_empty_or_notes_rows(df)
- for col in df.columns:
- if df[col].dtype == 'object':
- try:
- try:
- parsed = pd.to_datetime(df[col], errors='coerce', format='mixed')
- except TypeError:
- parsed = pd.to_datetime(df[col], errors='coerce')
- if parsed.notna().sum() > len(df) * 0.7:
- df[col] = parsed
- continue
- except Exception:
- pass
- try:
- numeric = pd.to_numeric(df[col], errors='coerce')
- if numeric.notna().sum() > len(df) * 0.7:
- df[col] = numeric
- except Exception:
- pass
- return df
- def load_generic_excel(filepath: str, sheet_name=0, skip_summary_rows=True,
- encoding=None, dtype_backend=None, header_row='auto') -> pd.DataFrame:
- """
- Load any Excel/CSV file into a cleaned DataFrame.
- Args:
- filepath: Path to the data file (.xlsx, .xls, or .csv)
- sheet_name: Sheet name or index (for Excel). Ignored for CSV.
- skip_summary_rows: Auto-detect and remove summary/total footer rows
- encoding: File encoding (auto-detected for CSV if None)
- dtype_backend: Optional pandas dtype backend ('numpy_nullable', 'pyarrow')
- header_row: Excel header row index or 'auto'. CSV keeps its native header.
- """
- fmt = auto_detect_file_format(filepath)
- kwargs = {}
- if dtype_backend:
- kwargs['dtype_backend'] = dtype_backend
- if fmt == 'csv':
- df = load_generic_csv(filepath, encoding=encoding, **kwargs)
- else:
- try:
- raw_df = pd.read_excel(filepath, sheet_name=sheet_name, header=None, **kwargs)
- df = _dataframe_from_detected_header(raw_df, header_row=header_row)
- except Exception as e:
- if fmt == 'xls':
- raise ValueError(
- f"Failed to read .xls file. Try converting to .xlsx format. "
- f"Error: {e}"
- )
- raise
- return _clean_generic_dataframe(df, skip_summary_rows=skip_summary_rows)
- def load_generic_all_sheets(filepath: str, skip_summary_rows=True, header_row='auto') -> pd.DataFrame:
- """
- Load all sheets from an Excel file, merge into a single DataFrame.
- Adds '_source_sheet' column to track the source sheet.
- """
- fmt = auto_detect_file_format(filepath)
- if fmt == 'csv':
- return load_generic_excel(filepath, skip_summary_rows=skip_summary_rows)
- xl = pd.ExcelFile(filepath)
- if len(xl.sheet_names) == 1:
- raw_df = pd.read_excel(filepath, sheet_name=xl.sheet_names[0], header=None)
- df = _dataframe_from_detected_header(raw_df, header_row=header_row)
- return _clean_generic_dataframe(df, skip_summary_rows=skip_summary_rows)
- frames = []
- for sheet in xl.sheet_names:
- try:
- raw_df = pd.read_excel(filepath, sheet_name=sheet, header=None)
- df = _dataframe_from_detected_header(raw_df, header_row=header_row)
- df['_source_sheet'] = sheet
- frames.append(df)
- except Exception:
- continue
- if not frames:
- raise ValueError(f"No valid sheets found in {filepath}")
- combined = pd.concat(frames, ignore_index=True)
- return _clean_generic_dataframe(combined, skip_summary_rows=skip_summary_rows)
- def auto_detect_date_column(df: pd.DataFrame) -> str:
- """Auto-detect the primary date/time column in a DataFrame."""
- date_keywords = ['日期', '时间', 'date', 'time', '年', '月', '日']
- for col in df.columns:
- col_str = str(col).lower().strip()
- if any(kw in col_str for kw in date_keywords):
- parsed = pd.to_datetime(df[col], errors='coerce')
- if parsed.notna().sum() > len(df) * 0.5:
- return col
- return ''
- def auto_parse_single_sheet(filepath: str, sheet_name=0) -> pd.DataFrame:
- """Load and clean a single sheet (shortcut for load_generic_excel)."""
- return load_generic_excel(filepath, sheet_name=sheet_name)
- def load_generic_file_info(filepath: str) -> dict:
- """
- Return file metadata without full data loading.
- Useful for quick inspection before deciding how to load.
- """
- info = {'filepath': filepath, 'format': auto_detect_file_format(filepath)}
- if info['format'] == 'csv':
- try:
- with open(filepath, 'r', encoding='utf-8-sig') as f:
- sample = f.read(8192)
- dialect = csv.Sniffer().sniff(sample[:4096])
- info['delimiter'] = dialect.delimiter
- info['has_header'] = csv.Sniffer().has_header(sample)
- info['approx_rows'] = sample.count('\n')
- except Exception:
- info['delimiter'] = ','
- else:
- try:
- xl = pd.ExcelFile(filepath)
- info['sheet_names'] = xl.sheet_names
- info['sheet_count'] = len(xl.sheet_names)
- except Exception as e:
- info['error'] = str(e)
- info['file_size_mb'] = round(os.path.getsize(filepath) / (1024 * 1024), 2)
- return info
- if __name__ == '__main__':
- import sys
- if len(sys.argv) > 1:
- fp = sys.argv[1]
- fmt = auto_detect_file_format(fp)
- print(f"File: {fp}")
- print(f"Format: {fmt}")
- file_info = load_generic_file_info(fp)
- print(f"Size: {file_info.get('file_size_mb', '?')} MB")
- if 'sheet_names' in file_info:
- print(f"Sheets ({file_info['sheet_count']}): {file_info['sheet_names'][:5]}...")
- df = load_generic_excel(fp)
- date_col = auto_detect_date_column(df)
- print(f"Generic load: {len(df)} rows x {len(df.columns)} cols, "
- f"date column: {date_col}")
- print(f"Columns: {list(df.columns)}")
|