data_loader.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380
  1. """
  2. Universal Excel/CSV data loader for the data report generator.
  3. Supports auto-detection of file format, encoding, header rows, and data cleaning.
  4. """
  5. import pandas as pd
  6. import re
  7. import os
  8. import csv
  9. # =====================================================================
  10. # GENERIC LOADING — Universal loaders for any Excel/CSV data
  11. # =====================================================================
  12. # Summary row keywords (Chinese and English) to auto-detect and skip
  13. SUMMARY_KEYWORDS = [
  14. '合计', '总计', '小计', '汇总', '累计', '总和',
  15. 'total', 'sum', 'subtotal', 'grand total', '合计:', '总计:',
  16. '平均', 'avg', 'average',
  17. ]
  18. # Encoding priority list for CSV detection
  19. CSV_ENCODINGS = ['utf-8', 'utf-8-sig', 'gbk', 'gb2312', 'gb18030', 'latin-1', 'cp1252']
  20. def auto_detect_file_format(filepath: str) -> str:
  21. """Auto-detect file format: xlsx, xls, csv, or unknown."""
  22. ext = os.path.splitext(filepath)[1].lower()
  23. if ext in ('.xlsx', '.xls'):
  24. return ext[1:]
  25. if ext == '.csv':
  26. return 'csv'
  27. if ext in ('.xlsm', '.xlsb'):
  28. return ext[1:]
  29. return 'unknown'
  30. def load_generic_csv(filepath: str, encoding=None, **kwargs) -> pd.DataFrame:
  31. """
  32. Load a CSV file with auto-encoding detection.
  33. Tries common encodings until one succeeds.
  34. """
  35. if encoding:
  36. try:
  37. return pd.read_csv(filepath, encoding=encoding, **kwargs)
  38. except (UnicodeDecodeError, UnicodeError):
  39. raise ValueError(f"Failed to decode {filepath} with encoding {encoding}")
  40. last_error = None
  41. for enc in CSV_ENCODINGS:
  42. try:
  43. df = pd.read_csv(filepath, encoding=enc, **kwargs)
  44. if len(df.columns) > 0:
  45. return df
  46. except (UnicodeDecodeError, UnicodeError, pd.errors.ParserError) as e:
  47. last_error = e
  48. continue
  49. raise ValueError(
  50. f"Unable to decode CSV file {filepath}. Tried encodings: "
  51. f"{CSV_ENCODINGS}. Last error: {last_error}"
  52. )
  53. def _detect_and_skip_footer_rows(df_raw: pd.DataFrame) -> pd.DataFrame:
  54. """Detect and remove summary/aggregation rows at the end of the data."""
  55. if df_raw.empty:
  56. return df_raw
  57. rows_to_drop = []
  58. text_cols = [c for c in df_raw.columns if df_raw[c].dtype == 'object']
  59. for idx in range(len(df_raw) - 1, -1, -1):
  60. row = df_raw.iloc[idx]
  61. is_summary = False
  62. for col in text_cols:
  63. val = str(row.get(col, '')).strip().lower()
  64. if any(kw in val for kw in SUMMARY_KEYWORDS):
  65. is_summary = True
  66. break
  67. if is_summary:
  68. rows_to_drop.append(idx)
  69. else:
  70. break
  71. if len(rows_to_drop) > 20:
  72. break
  73. if rows_to_drop:
  74. df_raw = df_raw.drop(index=rows_to_drop)
  75. df_raw = df_raw.reset_index(drop=True)
  76. return df_raw
  77. def _detect_empty_or_notes_rows(df_raw: pd.DataFrame) -> pd.DataFrame:
  78. """Remove leading empty rows and trailing fully-empty rows."""
  79. if df_raw.empty:
  80. return df_raw
  81. non_empty_rows = df_raw.notna().any(axis=1)
  82. first_valid = non_empty_rows.idxmax() if non_empty_rows.any() else 0
  83. last_valid = non_empty_rows[non_empty_rows].index[-1] if non_empty_rows.any() else len(df_raw)
  84. df_raw = df_raw.iloc[first_valid:last_valid + 1].reset_index(drop=True)
  85. return df_raw
  86. def normalize_column_names(col_name: str) -> str:
  87. """
  88. Normalize a single column name: strip whitespace, unify brackets, remove special chars.
  89. """
  90. if not isinstance(col_name, str):
  91. return col_name
  92. name = col_name.strip()
  93. name = name.replace('(', '(').replace(')', ')')
  94. name = name.replace('【', '[').replace('】', ']')
  95. name = name.replace('\n', ' ').replace('\r', ' ')
  96. name = re.sub(r'\s+', ' ', name)
  97. return name
  98. def _is_unnamed_column(col) -> bool:
  99. return str(col).strip().lower().startswith('unnamed')
  100. def _dedupe_columns(columns) -> list:
  101. seen = {}
  102. result = []
  103. for idx, col in enumerate(columns):
  104. name = normalize_column_names(str(col).strip()) if col is not None else ''
  105. if not name or name.lower() in ('nan', 'none') or _is_unnamed_column(name):
  106. name = f'column_{idx + 1}'
  107. base = name
  108. count = seen.get(base, 0)
  109. if count:
  110. name = f'{base}_{count + 1}'
  111. seen[base] = count + 1
  112. result.append(name)
  113. return result
  114. def _row_header_score(values) -> float:
  115. cells = [str(v).strip() for v in values if pd.notna(v) and str(v).strip()]
  116. if not cells:
  117. return 0
  118. non_numeric = 0
  119. unique = set()
  120. for cell in cells:
  121. unique.add(cell)
  122. try:
  123. float(cell.replace(',', '').replace('%', ''))
  124. is_numeric = True
  125. except Exception:
  126. is_numeric = False
  127. if not is_numeric:
  128. non_numeric += 1
  129. return len(cells) + non_numeric * 1.5 + len(unique) * 0.5
  130. def _detect_header_row(raw_df: pd.DataFrame, max_scan_rows: int = 8) -> int:
  131. if raw_df.empty:
  132. return 0
  133. scan_rows = min(max_scan_rows, len(raw_df))
  134. best_idx = 0
  135. best_score = -1
  136. for idx in range(scan_rows):
  137. row = raw_df.iloc[idx]
  138. score = _row_header_score(row)
  139. next_non_empty = 0
  140. if idx + 1 < len(raw_df):
  141. next_non_empty = raw_df.iloc[idx + 1].notna().sum()
  142. # Prefer rows with many non-numeric labels and data underneath.
  143. score += min(next_non_empty, len(raw_df.columns)) * 0.2
  144. if score > best_score:
  145. best_score = score
  146. best_idx = idx
  147. return best_idx
  148. def _dataframe_from_detected_header(raw_df: pd.DataFrame, header_row='auto') -> pd.DataFrame:
  149. raw_df = raw_df.dropna(how='all').reset_index(drop=True)
  150. raw_df = raw_df.dropna(axis=1, how='all')
  151. if raw_df.empty:
  152. return raw_df
  153. if header_row == 'auto':
  154. header_idx = _detect_header_row(raw_df)
  155. elif header_row is None:
  156. header_idx = 0
  157. else:
  158. header_idx = int(header_row)
  159. header_idx = max(0, min(header_idx, len(raw_df) - 1))
  160. columns = _dedupe_columns(raw_df.iloc[header_idx].tolist())
  161. df = raw_df.iloc[header_idx + 1:].copy().reset_index(drop=True)
  162. df.columns = columns
  163. return df
  164. def _clean_generic_dataframe(df: pd.DataFrame, skip_summary_rows=True) -> pd.DataFrame:
  165. """
  166. Universal DataFrame cleaning:
  167. - Remove fully empty rows/columns
  168. - Drop 'Unnamed' columns
  169. - Normalize column names (strip whitespace, unify brackets)
  170. - Auto-detect and remove summary/total rows
  171. - Try to parse date columns
  172. - Try to parse numeric columns
  173. """
  174. if df.empty:
  175. return df
  176. df = df.dropna(how='all').reset_index(drop=True)
  177. df = df.dropna(axis=1, how='all')
  178. unnamed_mask = df.columns.map(_is_unnamed_column)
  179. if unnamed_mask.any() and (~unnamed_mask).any():
  180. df = df.loc[:, ~unnamed_mask]
  181. df = df.rename(columns=normalize_column_names)
  182. if skip_summary_rows:
  183. df = _detect_and_skip_footer_rows(df)
  184. df = _detect_empty_or_notes_rows(df)
  185. for col in df.columns:
  186. if df[col].dtype == 'object':
  187. # Try numeric first to avoid small integers being mis-parsed as dates
  188. try:
  189. numeric = pd.to_numeric(df[col], errors='coerce')
  190. if numeric.notna().sum() > len(df) * 0.7:
  191. df[col] = numeric
  192. continue
  193. except Exception:
  194. pass
  195. # Then try datetime
  196. try:
  197. try:
  198. parsed = pd.to_datetime(df[col], errors='coerce', format='mixed')
  199. except TypeError:
  200. parsed = pd.to_datetime(df[col], errors='coerce')
  201. if parsed.notna().sum() > len(df) * 0.7:
  202. df[col] = parsed
  203. continue
  204. except Exception:
  205. pass
  206. return df
  207. def load_generic_excel(filepath: str, sheet_name=0, skip_summary_rows=True,
  208. encoding=None, dtype_backend=None, header_row='auto') -> pd.DataFrame:
  209. """
  210. Load any Excel/CSV file into a cleaned DataFrame.
  211. Args:
  212. filepath: Path to the data file (.xlsx, .xls, or .csv)
  213. sheet_name: Sheet name or index (for Excel). Ignored for CSV.
  214. skip_summary_rows: Auto-detect and remove summary/total footer rows
  215. encoding: File encoding (auto-detected for CSV if None)
  216. dtype_backend: Optional pandas dtype backend ('numpy_nullable', 'pyarrow')
  217. header_row: Excel header row index or 'auto'. CSV keeps its native header.
  218. """
  219. fmt = auto_detect_file_format(filepath)
  220. kwargs = {}
  221. if dtype_backend:
  222. kwargs['dtype_backend'] = dtype_backend
  223. if fmt == 'csv':
  224. df = load_generic_csv(filepath, encoding=encoding, **kwargs)
  225. else:
  226. try:
  227. raw_df = pd.read_excel(filepath, sheet_name=sheet_name, header=None, **kwargs)
  228. df = _dataframe_from_detected_header(raw_df, header_row=header_row)
  229. except Exception as e:
  230. if fmt == 'xls':
  231. raise ValueError(
  232. f"Failed to read .xls file. Try converting to .xlsx format. "
  233. f"Error: {e}"
  234. )
  235. raise
  236. return _clean_generic_dataframe(df, skip_summary_rows=skip_summary_rows)
  237. def load_generic_all_sheets(filepath: str, skip_summary_rows=True, header_row='auto') -> pd.DataFrame:
  238. """
  239. Load all sheets from an Excel file, merge into a single DataFrame.
  240. Adds '_source_sheet' column to track the source sheet.
  241. """
  242. fmt = auto_detect_file_format(filepath)
  243. if fmt == 'csv':
  244. return load_generic_excel(filepath, skip_summary_rows=skip_summary_rows)
  245. xl = pd.ExcelFile(filepath)
  246. if len(xl.sheet_names) == 1:
  247. raw_df = pd.read_excel(filepath, sheet_name=xl.sheet_names[0], header=None)
  248. df = _dataframe_from_detected_header(raw_df, header_row=header_row)
  249. return _clean_generic_dataframe(df, skip_summary_rows=skip_summary_rows)
  250. frames = []
  251. for sheet in xl.sheet_names:
  252. try:
  253. raw_df = pd.read_excel(filepath, sheet_name=sheet, header=None)
  254. df = _dataframe_from_detected_header(raw_df, header_row=header_row)
  255. df['_source_sheet'] = sheet
  256. frames.append(df)
  257. except Exception:
  258. continue
  259. if not frames:
  260. raise ValueError(f"No valid sheets found in {filepath}")
  261. combined = pd.concat(frames, ignore_index=True)
  262. return _clean_generic_dataframe(combined, skip_summary_rows=skip_summary_rows)
  263. def auto_detect_date_column(df: pd.DataFrame) -> str:
  264. """Auto-detect the primary date/time column in a DataFrame."""
  265. date_keywords = ['日期', '时间', 'date', 'time', '年', '月', '日']
  266. for col in df.columns:
  267. col_str = str(col).lower().strip()
  268. if any(kw in col_str for kw in date_keywords):
  269. parsed = pd.to_datetime(df[col], errors='coerce')
  270. if parsed.notna().sum() > len(df) * 0.5:
  271. return col
  272. return ''
  273. def auto_parse_single_sheet(filepath: str, sheet_name=0) -> pd.DataFrame:
  274. """Load and clean a single sheet (shortcut for load_generic_excel)."""
  275. return load_generic_excel(filepath, sheet_name=sheet_name)
  276. def load_generic_file_info(filepath: str) -> dict:
  277. """
  278. Return file metadata without full data loading.
  279. Useful for quick inspection before deciding how to load.
  280. """
  281. info = {'filepath': filepath, 'format': auto_detect_file_format(filepath)}
  282. if info['format'] == 'csv':
  283. try:
  284. with open(filepath, 'r', encoding='utf-8-sig') as f:
  285. sample = f.read(8192)
  286. dialect = csv.Sniffer().sniff(sample[:4096])
  287. info['delimiter'] = dialect.delimiter
  288. info['has_header'] = csv.Sniffer().has_header(sample)
  289. info['approx_rows'] = sample.count('\n')
  290. except Exception:
  291. info['delimiter'] = ','
  292. else:
  293. try:
  294. xl = pd.ExcelFile(filepath)
  295. info['sheet_names'] = xl.sheet_names
  296. info['sheet_count'] = len(xl.sheet_names)
  297. except Exception as e:
  298. info['error'] = str(e)
  299. info['file_size_mb'] = round(os.path.getsize(filepath) / (1024 * 1024), 2)
  300. return info
  301. if __name__ == '__main__':
  302. import sys
  303. if len(sys.argv) > 1:
  304. fp = sys.argv[1]
  305. fmt = auto_detect_file_format(fp)
  306. print(f"File: {fp}")
  307. print(f"Format: {fmt}")
  308. file_info = load_generic_file_info(fp)
  309. print(f"Size: {file_info.get('file_size_mb', '?')} MB")
  310. if 'sheet_names' in file_info:
  311. print(f"Sheets ({file_info['sheet_count']}): {file_info['sheet_names'][:5]}...")
  312. df = load_generic_excel(fp)
  313. date_col = auto_detect_date_column(df)
  314. print(f"Generic load: {len(df)} rows x {len(df.columns)} cols, "
  315. f"date column: {date_col}")
  316. print(f"Columns: {list(df.columns)}")