""" Universal Excel/CSV data loader for the data report generator. Supports auto-detection of file format, encoding, header rows, and data cleaning. """ import pandas as pd import re import os import csv # ===================================================================== # GENERIC LOADING — Universal loaders for any Excel/CSV data # ===================================================================== # Summary row keywords (Chinese and English) to auto-detect and skip SUMMARY_KEYWORDS = [ '合计', '总计', '小计', '汇总', '累计', '总和', 'total', 'sum', 'subtotal', 'grand total', '合计:', '总计:', '平均', 'avg', 'average', ] # Encoding priority list for CSV detection CSV_ENCODINGS = ['utf-8', 'utf-8-sig', 'gbk', 'gb2312', 'gb18030', 'latin-1', 'cp1252'] def auto_detect_file_format(filepath: str) -> str: """Auto-detect file format: xlsx, xls, csv, or unknown.""" ext = os.path.splitext(filepath)[1].lower() if ext in ('.xlsx', '.xls'): return ext[1:] if ext == '.csv': return 'csv' if ext in ('.xlsm', '.xlsb'): return ext[1:] return 'unknown' def load_generic_csv(filepath: str, encoding=None, **kwargs) -> pd.DataFrame: """ Load a CSV file with auto-encoding detection. Tries common encodings until one succeeds. """ if encoding: try: return pd.read_csv(filepath, encoding=encoding, **kwargs) except (UnicodeDecodeError, UnicodeError): raise ValueError(f"Failed to decode {filepath} with encoding {encoding}") last_error = None for enc in CSV_ENCODINGS: try: df = pd.read_csv(filepath, encoding=enc, **kwargs) if len(df.columns) > 0: return df except (UnicodeDecodeError, UnicodeError, pd.errors.ParserError) as e: last_error = e continue raise ValueError( f"Unable to decode CSV file {filepath}. Tried encodings: " f"{CSV_ENCODINGS}. Last error: {last_error}" ) def _detect_and_skip_footer_rows(df_raw: pd.DataFrame) -> pd.DataFrame: """Detect and remove summary/aggregation rows at the end of the data.""" if df_raw.empty: return df_raw rows_to_drop = [] text_cols = [c for c in df_raw.columns if df_raw[c].dtype == 'object'] for idx in range(len(df_raw) - 1, -1, -1): row = df_raw.iloc[idx] is_summary = False for col in text_cols: val = str(row.get(col, '')).strip().lower() if any(kw in val for kw in SUMMARY_KEYWORDS): is_summary = True break if is_summary: rows_to_drop.append(idx) else: break if len(rows_to_drop) > 20: break if rows_to_drop: df_raw = df_raw.drop(index=rows_to_drop) df_raw = df_raw.reset_index(drop=True) return df_raw def _detect_empty_or_notes_rows(df_raw: pd.DataFrame) -> pd.DataFrame: """Remove leading empty rows and trailing fully-empty rows.""" if df_raw.empty: return df_raw non_empty_rows = df_raw.notna().any(axis=1) first_valid = non_empty_rows.idxmax() if non_empty_rows.any() else 0 last_valid = non_empty_rows[non_empty_rows].index[-1] if non_empty_rows.any() else len(df_raw) df_raw = df_raw.iloc[first_valid:last_valid + 1].reset_index(drop=True) return df_raw def normalize_column_names(col_name: str) -> str: """ Normalize a single column name: strip whitespace, unify brackets, remove special chars. """ if not isinstance(col_name, str): return col_name name = col_name.strip() name = name.replace('(', '(').replace(')', ')') name = name.replace('【', '[').replace('】', ']') name = name.replace('\n', ' ').replace('\r', ' ') name = re.sub(r'\s+', ' ', name) return name def _is_unnamed_column(col) -> bool: return str(col).strip().lower().startswith('unnamed') def _dedupe_columns(columns) -> list: seen = {} result = [] for idx, col in enumerate(columns): name = normalize_column_names(str(col).strip()) if col is not None else '' if not name or name.lower() in ('nan', 'none') or _is_unnamed_column(name): name = f'column_{idx + 1}' base = name count = seen.get(base, 0) if count: name = f'{base}_{count + 1}' seen[base] = count + 1 result.append(name) return result def _row_header_score(values) -> float: cells = [str(v).strip() for v in values if pd.notna(v) and str(v).strip()] if not cells: return 0 non_numeric = 0 unique = set() for cell in cells: unique.add(cell) try: float(cell.replace(',', '').replace('%', '')) is_numeric = True except Exception: is_numeric = False if not is_numeric: non_numeric += 1 return len(cells) + non_numeric * 1.5 + len(unique) * 0.5 def _detect_header_row(raw_df: pd.DataFrame, max_scan_rows: int = 8) -> int: if raw_df.empty: return 0 scan_rows = min(max_scan_rows, len(raw_df)) best_idx = 0 best_score = -1 for idx in range(scan_rows): row = raw_df.iloc[idx] score = _row_header_score(row) next_non_empty = 0 if idx + 1 < len(raw_df): next_non_empty = raw_df.iloc[idx + 1].notna().sum() # Prefer rows with many non-numeric labels and data underneath. score += min(next_non_empty, len(raw_df.columns)) * 0.2 if score > best_score: best_score = score best_idx = idx return best_idx def _dataframe_from_detected_header(raw_df: pd.DataFrame, header_row='auto') -> pd.DataFrame: raw_df = raw_df.dropna(how='all').reset_index(drop=True) raw_df = raw_df.dropna(axis=1, how='all') if raw_df.empty: return raw_df if header_row == 'auto': header_idx = _detect_header_row(raw_df) elif header_row is None: header_idx = 0 else: header_idx = int(header_row) header_idx = max(0, min(header_idx, len(raw_df) - 1)) columns = _dedupe_columns(raw_df.iloc[header_idx].tolist()) df = raw_df.iloc[header_idx + 1:].copy().reset_index(drop=True) df.columns = columns return df def _clean_generic_dataframe(df: pd.DataFrame, skip_summary_rows=True) -> pd.DataFrame: """ Universal DataFrame cleaning: - Remove fully empty rows/columns - Drop 'Unnamed' columns - Normalize column names (strip whitespace, unify brackets) - Auto-detect and remove summary/total rows - Try to parse date columns - Try to parse numeric columns """ if df.empty: return df df = df.dropna(how='all').reset_index(drop=True) df = df.dropna(axis=1, how='all') unnamed_mask = df.columns.map(_is_unnamed_column) if unnamed_mask.any() and (~unnamed_mask).any(): df = df.loc[:, ~unnamed_mask] df = df.rename(columns=normalize_column_names) if skip_summary_rows: df = _detect_and_skip_footer_rows(df) df = _detect_empty_or_notes_rows(df) for col in df.columns: if df[col].dtype == 'object': # Try numeric first to avoid small integers being mis-parsed as dates try: numeric = pd.to_numeric(df[col], errors='coerce') if numeric.notna().sum() > len(df) * 0.7: df[col] = numeric continue except Exception: pass # Then try datetime try: try: parsed = pd.to_datetime(df[col], errors='coerce', format='mixed') except TypeError: parsed = pd.to_datetime(df[col], errors='coerce') if parsed.notna().sum() > len(df) * 0.7: df[col] = parsed continue except Exception: pass return df def load_generic_excel(filepath: str, sheet_name=0, skip_summary_rows=True, encoding=None, dtype_backend=None, header_row='auto') -> pd.DataFrame: """ Load any Excel/CSV file into a cleaned DataFrame. Args: filepath: Path to the data file (.xlsx, .xls, or .csv) sheet_name: Sheet name or index (for Excel). Ignored for CSV. skip_summary_rows: Auto-detect and remove summary/total footer rows encoding: File encoding (auto-detected for CSV if None) dtype_backend: Optional pandas dtype backend ('numpy_nullable', 'pyarrow') header_row: Excel header row index or 'auto'. CSV keeps its native header. """ fmt = auto_detect_file_format(filepath) kwargs = {} if dtype_backend: kwargs['dtype_backend'] = dtype_backend if fmt == 'csv': df = load_generic_csv(filepath, encoding=encoding, **kwargs) else: try: raw_df = pd.read_excel(filepath, sheet_name=sheet_name, header=None, **kwargs) df = _dataframe_from_detected_header(raw_df, header_row=header_row) except Exception as e: if fmt == 'xls': raise ValueError( f"Failed to read .xls file. Try converting to .xlsx format. " f"Error: {e}" ) raise return _clean_generic_dataframe(df, skip_summary_rows=skip_summary_rows) def load_generic_all_sheets(filepath: str, skip_summary_rows=True, header_row='auto') -> pd.DataFrame: """ Load all sheets from an Excel file, merge into a single DataFrame. Adds '_source_sheet' column to track the source sheet. """ fmt = auto_detect_file_format(filepath) if fmt == 'csv': return load_generic_excel(filepath, skip_summary_rows=skip_summary_rows) xl = pd.ExcelFile(filepath) if len(xl.sheet_names) == 1: raw_df = pd.read_excel(filepath, sheet_name=xl.sheet_names[0], header=None) df = _dataframe_from_detected_header(raw_df, header_row=header_row) return _clean_generic_dataframe(df, skip_summary_rows=skip_summary_rows) frames = [] for sheet in xl.sheet_names: try: raw_df = pd.read_excel(filepath, sheet_name=sheet, header=None) df = _dataframe_from_detected_header(raw_df, header_row=header_row) df['_source_sheet'] = sheet frames.append(df) except Exception: continue if not frames: raise ValueError(f"No valid sheets found in {filepath}") combined = pd.concat(frames, ignore_index=True) return _clean_generic_dataframe(combined, skip_summary_rows=skip_summary_rows) def auto_detect_date_column(df: pd.DataFrame) -> str: """Auto-detect the primary date/time column in a DataFrame.""" date_keywords = ['日期', '时间', 'date', 'time', '年', '月', '日'] for col in df.columns: col_str = str(col).lower().strip() if any(kw in col_str for kw in date_keywords): parsed = pd.to_datetime(df[col], errors='coerce') if parsed.notna().sum() > len(df) * 0.5: return col return '' def auto_parse_single_sheet(filepath: str, sheet_name=0) -> pd.DataFrame: """Load and clean a single sheet (shortcut for load_generic_excel).""" return load_generic_excel(filepath, sheet_name=sheet_name) def load_generic_file_info(filepath: str) -> dict: """ Return file metadata without full data loading. Useful for quick inspection before deciding how to load. """ info = {'filepath': filepath, 'format': auto_detect_file_format(filepath)} if info['format'] == 'csv': try: with open(filepath, 'r', encoding='utf-8-sig') as f: sample = f.read(8192) dialect = csv.Sniffer().sniff(sample[:4096]) info['delimiter'] = dialect.delimiter info['has_header'] = csv.Sniffer().has_header(sample) info['approx_rows'] = sample.count('\n') except Exception: info['delimiter'] = ',' else: try: xl = pd.ExcelFile(filepath) info['sheet_names'] = xl.sheet_names info['sheet_count'] = len(xl.sheet_names) except Exception as e: info['error'] = str(e) info['file_size_mb'] = round(os.path.getsize(filepath) / (1024 * 1024), 2) return info if __name__ == '__main__': import sys if len(sys.argv) > 1: fp = sys.argv[1] fmt = auto_detect_file_format(fp) print(f"File: {fp}") print(f"Format: {fmt}") file_info = load_generic_file_info(fp) print(f"Size: {file_info.get('file_size_mb', '?')} MB") if 'sheet_names' in file_info: print(f"Sheets ({file_info['sheet_count']}): {file_info['sheet_names'][:5]}...") df = load_generic_excel(fp) date_col = auto_detect_date_column(df) print(f"Generic load: {len(df)} rows x {len(df.columns)} cols, " f"date column: {date_col}") print(f"Columns: {list(df.columns)}")