| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380 |
- """
- Universal Excel/CSV data loader for the data report generator.
- Supports auto-detection of file format, encoding, header rows, and data cleaning.
- """
- import pandas as pd
- import re
- import os
- import csv
- # =====================================================================
- # GENERIC LOADING — Universal loaders for any Excel/CSV data
- # =====================================================================
- # Summary row keywords (Chinese and English) to auto-detect and skip
- SUMMARY_KEYWORDS = [
- '合计', '总计', '小计', '汇总', '累计', '总和',
- 'total', 'sum', 'subtotal', 'grand total', '合计:', '总计:',
- '平均', 'avg', 'average',
- ]
- # Encoding priority list for CSV detection
- CSV_ENCODINGS = ['utf-8', 'utf-8-sig', 'gbk', 'gb2312', 'gb18030', 'latin-1', 'cp1252']
- def auto_detect_file_format(filepath: str) -> str:
- """Auto-detect file format: xlsx, xls, csv, or unknown."""
- ext = os.path.splitext(filepath)[1].lower()
- if ext in ('.xlsx', '.xls'):
- return ext[1:]
- if ext == '.csv':
- return 'csv'
- if ext in ('.xlsm', '.xlsb'):
- return ext[1:]
- return 'unknown'
- def load_generic_csv(filepath: str, encoding=None, **kwargs) -> pd.DataFrame:
- """
- Load a CSV file with auto-encoding detection.
- Tries common encodings until one succeeds.
- """
- if encoding:
- try:
- return pd.read_csv(filepath, encoding=encoding, **kwargs)
- except (UnicodeDecodeError, UnicodeError):
- raise ValueError(f"Failed to decode {filepath} with encoding {encoding}")
- last_error = None
- for enc in CSV_ENCODINGS:
- try:
- df = pd.read_csv(filepath, encoding=enc, **kwargs)
- if len(df.columns) > 0:
- return df
- except (UnicodeDecodeError, UnicodeError, pd.errors.ParserError) as e:
- last_error = e
- continue
- raise ValueError(
- f"Unable to decode CSV file {filepath}. Tried encodings: "
- f"{CSV_ENCODINGS}. Last error: {last_error}"
- )
- def _detect_and_skip_footer_rows(df_raw: pd.DataFrame) -> pd.DataFrame:
- """Detect and remove summary/aggregation rows at the end of the data."""
- if df_raw.empty:
- return df_raw
- rows_to_drop = []
- text_cols = [c for c in df_raw.columns if df_raw[c].dtype == 'object']
- for idx in range(len(df_raw) - 1, -1, -1):
- row = df_raw.iloc[idx]
- is_summary = False
- for col in text_cols:
- val = str(row.get(col, '')).strip().lower()
- if any(kw in val for kw in SUMMARY_KEYWORDS):
- is_summary = True
- break
- if is_summary:
- rows_to_drop.append(idx)
- else:
- break
- if len(rows_to_drop) > 20:
- break
- if rows_to_drop:
- df_raw = df_raw.drop(index=rows_to_drop)
- df_raw = df_raw.reset_index(drop=True)
- return df_raw
- def _detect_empty_or_notes_rows(df_raw: pd.DataFrame) -> pd.DataFrame:
- """Remove leading empty rows and trailing fully-empty rows."""
- if df_raw.empty:
- return df_raw
- non_empty_rows = df_raw.notna().any(axis=1)
- first_valid = non_empty_rows.idxmax() if non_empty_rows.any() else 0
- last_valid = non_empty_rows[non_empty_rows].index[-1] if non_empty_rows.any() else len(df_raw)
- df_raw = df_raw.iloc[first_valid:last_valid + 1].reset_index(drop=True)
- return df_raw
- def normalize_column_names(col_name: str) -> str:
- """
- Normalize a single column name: strip whitespace, unify brackets, remove special chars.
- """
- if not isinstance(col_name, str):
- return col_name
- name = col_name.strip()
- name = name.replace('(', '(').replace(')', ')')
- name = name.replace('【', '[').replace('】', ']')
- name = name.replace('\n', ' ').replace('\r', ' ')
- name = re.sub(r'\s+', ' ', name)
- return name
- def _is_unnamed_column(col) -> bool:
- return str(col).strip().lower().startswith('unnamed')
- def _dedupe_columns(columns) -> list:
- seen = {}
- result = []
- for idx, col in enumerate(columns):
- name = normalize_column_names(str(col).strip()) if col is not None else ''
- if not name or name.lower() in ('nan', 'none') or _is_unnamed_column(name):
- name = f'column_{idx + 1}'
- base = name
- count = seen.get(base, 0)
- if count:
- name = f'{base}_{count + 1}'
- seen[base] = count + 1
- result.append(name)
- return result
- def _row_header_score(values) -> float:
- cells = [str(v).strip() for v in values if pd.notna(v) and str(v).strip()]
- if not cells:
- return 0
- non_numeric = 0
- unique = set()
- for cell in cells:
- unique.add(cell)
- try:
- float(cell.replace(',', '').replace('%', ''))
- is_numeric = True
- except Exception:
- is_numeric = False
- if not is_numeric:
- non_numeric += 1
- return len(cells) + non_numeric * 1.5 + len(unique) * 0.5
- def _detect_header_row(raw_df: pd.DataFrame, max_scan_rows: int = 8) -> int:
- if raw_df.empty:
- return 0
- scan_rows = min(max_scan_rows, len(raw_df))
- best_idx = 0
- best_score = -1
- for idx in range(scan_rows):
- row = raw_df.iloc[idx]
- score = _row_header_score(row)
- next_non_empty = 0
- if idx + 1 < len(raw_df):
- next_non_empty = raw_df.iloc[idx + 1].notna().sum()
- # Prefer rows with many non-numeric labels and data underneath.
- score += min(next_non_empty, len(raw_df.columns)) * 0.2
- if score > best_score:
- best_score = score
- best_idx = idx
- return best_idx
- def _dataframe_from_detected_header(raw_df: pd.DataFrame, header_row='auto') -> pd.DataFrame:
- raw_df = raw_df.dropna(how='all').reset_index(drop=True)
- raw_df = raw_df.dropna(axis=1, how='all')
- if raw_df.empty:
- return raw_df
- if header_row == 'auto':
- header_idx = _detect_header_row(raw_df)
- elif header_row is None:
- header_idx = 0
- else:
- header_idx = int(header_row)
- header_idx = max(0, min(header_idx, len(raw_df) - 1))
- columns = _dedupe_columns(raw_df.iloc[header_idx].tolist())
- df = raw_df.iloc[header_idx + 1:].copy().reset_index(drop=True)
- df.columns = columns
- return df
- def _clean_generic_dataframe(df: pd.DataFrame, skip_summary_rows=True) -> pd.DataFrame:
- """
- Universal DataFrame cleaning:
- - Remove fully empty rows/columns
- - Drop 'Unnamed' columns
- - Normalize column names (strip whitespace, unify brackets)
- - Auto-detect and remove summary/total rows
- - Try to parse date columns
- - Try to parse numeric columns
- """
- if df.empty:
- return df
- df = df.dropna(how='all').reset_index(drop=True)
- df = df.dropna(axis=1, how='all')
- unnamed_mask = df.columns.map(_is_unnamed_column)
- if unnamed_mask.any() and (~unnamed_mask).any():
- df = df.loc[:, ~unnamed_mask]
- df = df.rename(columns=normalize_column_names)
- if skip_summary_rows:
- df = _detect_and_skip_footer_rows(df)
- df = _detect_empty_or_notes_rows(df)
- for col in df.columns:
- if df[col].dtype == 'object':
- # Try numeric first to avoid small integers being mis-parsed as dates
- try:
- numeric = pd.to_numeric(df[col], errors='coerce')
- if numeric.notna().sum() > len(df) * 0.7:
- df[col] = numeric
- continue
- except Exception:
- pass
- # Then try datetime
- try:
- try:
- parsed = pd.to_datetime(df[col], errors='coerce', format='mixed')
- except TypeError:
- parsed = pd.to_datetime(df[col], errors='coerce')
- if parsed.notna().sum() > len(df) * 0.7:
- df[col] = parsed
- continue
- except Exception:
- pass
- return df
- def load_generic_excel(filepath: str, sheet_name=0, skip_summary_rows=True,
- encoding=None, dtype_backend=None, header_row='auto') -> pd.DataFrame:
- """
- Load any Excel/CSV file into a cleaned DataFrame.
- Args:
- filepath: Path to the data file (.xlsx, .xls, or .csv)
- sheet_name: Sheet name or index (for Excel). Ignored for CSV.
- skip_summary_rows: Auto-detect and remove summary/total footer rows
- encoding: File encoding (auto-detected for CSV if None)
- dtype_backend: Optional pandas dtype backend ('numpy_nullable', 'pyarrow')
- header_row: Excel header row index or 'auto'. CSV keeps its native header.
- """
- fmt = auto_detect_file_format(filepath)
- kwargs = {}
- if dtype_backend:
- kwargs['dtype_backend'] = dtype_backend
- if fmt == 'csv':
- df = load_generic_csv(filepath, encoding=encoding, **kwargs)
- else:
- try:
- raw_df = pd.read_excel(filepath, sheet_name=sheet_name, header=None, **kwargs)
- df = _dataframe_from_detected_header(raw_df, header_row=header_row)
- except Exception as e:
- if fmt == 'xls':
- raise ValueError(
- f"Failed to read .xls file. Try converting to .xlsx format. "
- f"Error: {e}"
- )
- raise
- return _clean_generic_dataframe(df, skip_summary_rows=skip_summary_rows)
- def load_generic_all_sheets(filepath: str, skip_summary_rows=True, header_row='auto') -> pd.DataFrame:
- """
- Load all sheets from an Excel file, merge into a single DataFrame.
- Adds '_source_sheet' column to track the source sheet.
- """
- fmt = auto_detect_file_format(filepath)
- if fmt == 'csv':
- return load_generic_excel(filepath, skip_summary_rows=skip_summary_rows)
- xl = pd.ExcelFile(filepath)
- if len(xl.sheet_names) == 1:
- raw_df = pd.read_excel(filepath, sheet_name=xl.sheet_names[0], header=None)
- df = _dataframe_from_detected_header(raw_df, header_row=header_row)
- return _clean_generic_dataframe(df, skip_summary_rows=skip_summary_rows)
- frames = []
- for sheet in xl.sheet_names:
- try:
- raw_df = pd.read_excel(filepath, sheet_name=sheet, header=None)
- df = _dataframe_from_detected_header(raw_df, header_row=header_row)
- df['_source_sheet'] = sheet
- frames.append(df)
- except Exception:
- continue
- if not frames:
- raise ValueError(f"No valid sheets found in {filepath}")
- combined = pd.concat(frames, ignore_index=True)
- return _clean_generic_dataframe(combined, skip_summary_rows=skip_summary_rows)
- def auto_detect_date_column(df: pd.DataFrame) -> str:
- """Auto-detect the primary date/time column in a DataFrame."""
- date_keywords = ['日期', '时间', 'date', 'time', '年', '月', '日']
- for col in df.columns:
- col_str = str(col).lower().strip()
- if any(kw in col_str for kw in date_keywords):
- parsed = pd.to_datetime(df[col], errors='coerce')
- if parsed.notna().sum() > len(df) * 0.5:
- return col
- return ''
- def auto_parse_single_sheet(filepath: str, sheet_name=0) -> pd.DataFrame:
- """Load and clean a single sheet (shortcut for load_generic_excel)."""
- return load_generic_excel(filepath, sheet_name=sheet_name)
- def load_generic_file_info(filepath: str) -> dict:
- """
- Return file metadata without full data loading.
- Useful for quick inspection before deciding how to load.
- """
- info = {'filepath': filepath, 'format': auto_detect_file_format(filepath)}
- if info['format'] == 'csv':
- try:
- with open(filepath, 'r', encoding='utf-8-sig') as f:
- sample = f.read(8192)
- dialect = csv.Sniffer().sniff(sample[:4096])
- info['delimiter'] = dialect.delimiter
- info['has_header'] = csv.Sniffer().has_header(sample)
- info['approx_rows'] = sample.count('\n')
- except Exception:
- info['delimiter'] = ','
- else:
- try:
- xl = pd.ExcelFile(filepath)
- info['sheet_names'] = xl.sheet_names
- info['sheet_count'] = len(xl.sheet_names)
- except Exception as e:
- info['error'] = str(e)
- info['file_size_mb'] = round(os.path.getsize(filepath) / (1024 * 1024), 2)
- return info
- if __name__ == '__main__':
- import sys
- if len(sys.argv) > 1:
- fp = sys.argv[1]
- fmt = auto_detect_file_format(fp)
- print(f"File: {fp}")
- print(f"Format: {fmt}")
- file_info = load_generic_file_info(fp)
- print(f"Size: {file_info.get('file_size_mb', '?')} MB")
- if 'sheet_names' in file_info:
- print(f"Sheets ({file_info['sheet_count']}): {file_info['sheet_names'][:5]}...")
- df = load_generic_excel(fp)
- date_col = auto_detect_date_column(df)
- print(f"Generic load: {len(df)} rows x {len(df.columns)} cols, "
- f"date column: {date_col}")
- print(f"Columns: {list(df.columns)}")
|