#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 主生成脚本:根据订单信息生成销售合同和Proforma Invoice """ import os import sys import shutil import re from datetime import datetime # 添加脚本目录到路径 script_dir = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, script_dir) from parse_order_info import parse_order_info from match_vehicle import match_vehicle, build_vehicle_description from number_to_words import format_say_us_only, calculate_payment_split # 模板正文默认字号:12pt from docx.shared import Pt DEFAULT_FONT_SIZE = Pt(12) def copy_run_format(src_run, dst_run): """复制源run的格式到目标run,字号默认12pt""" dst_run.bold = src_run.bold dst_run.italic = src_run.italic dst_run.font.size = src_run.font.size or DEFAULT_FONT_SIZE if src_run.font.name: dst_run.font.name = src_run.font.name def set_run_format(run, bold=None, size=None): """设置run格式,字号默认12pt""" run.bold = bold run.font.size = size or DEFAULT_FONT_SIZE def get_asset_path(filename: str) -> str: """获取assets目录下的文件路径""" return os.path.join(script_dir, '..', 'assets', filename) def generate_contract_no() -> str: """生成合同编号: HLXYWPA{年月日}""" return f"HLXYWPA{datetime.now().strftime('%Y%m%d')}" def format_date() -> str: """格式化当前日期""" return datetime.now().strftime('%Y-%m-%d') def format_date_for_cell(): """Excel单元格日期格式(仅日期,无时间)""" from datetime import datetime now = datetime.now() return datetime(now.year, now.month, now.day) def get_port_names(port_input: str) -> tuple: """获取港口的中英文名称,返回 (cn_name, en_name)""" port_map = { '广州南沙': ('中国广州南沙港口', 'Guangzhou nansha Port'), '广州': ('中国广州港口', 'Guangzhou Port'), '深圳': ('中国深圳港口', 'Shenzhen Port'), '上海': ('中国上海港口', 'Shanghai Port'), '天津': ('中国天津港口', 'Tianjin Port'), '青岛': ('中国青岛港口', 'Qingdao Port'), '宁波': ('中国宁波港口', 'Ningbo Port'), '厦门': ('中国厦门港口', 'Xiamen Port'), } port_input = port_input.strip() # 先尝试中文匹配 for cn, (cn_name, en_name) in port_map.items(): if cn in port_input: return cn_name, en_name # 再尝试英文匹配 for cn, (cn_name, en_name) in port_map.items(): if en_name.lower() in port_input.lower(): return cn_name, en_name # 默认返回输入值 return port_input, port_input def number_to_chinese(num: int) -> str: """将整数金额转换为中文大写(壹贰叁肆伍陆柒捌玖拾佰仟万)""" if num == 0: return '零' if num < 0: return '负' + number_to_chinese(-num) units = ['', '拾', '佰', '仟'] big_units = ['', '万', '亿'] nums = '零壹贰叁肆伍陆柒捌玖' def convert_group(n): if n == 0: return '' s = '' zero = False for i in range(3, -1, -1): d = (n // (10 ** i)) % 10 if d == 0: if s and not zero: zero = True else: if zero: s += '零' zero = False s += nums[d] + units[i] return s groups = [] idx = 0 while num > 0: g = num % 10000 num //= 10000 if g > 0: gs = convert_group(g) if gs: groups.append(gs + big_units[idx]) else: groups.append(big_units[idx]) elif groups: groups.append('零') idx += 1 result = ''.join(reversed(groups)) result = result.replace('零零', '零') result = result.replace('零万', '万') result = result.replace('零亿', '亿') result = result.rstrip('零') return result def fill_paragraph_value(para, marker: str, value: str) -> bool: """ 在段落中 marker 标记后填入 value,保留原有格式。 不直接设置 para.text(这会破坏所有 run 格式)。 """ text = para.text if marker not in text: return False parts = text.split(marker, 1) if len(parts) < 2: return False after = parts[1].strip() if after and after != value.strip(): # 已有内容且不是我们要填的值,视为已填充 return False # 找到最后一个 run if not para.runs: run = para.add_run(value) run.font.size = DEFAULT_FONT_SIZE return True # 从后往前找:优先修改空白占位符 run,否则在末尾追加 for i in range(len(para.runs) - 1, -1, -1): r = para.runs[i] if not r.text.strip(): # 空白 run,填入值,并复制前一个非空 run 的格式 r.text = value src = None for j in range(i - 1, -1, -1): if para.runs[j].text: src = para.runs[j] break if src: copy_run_format(src, r) if src.font.color and src.font.color.rgb: r.font.color.rgb = src.font.color.rgb else: r.font.size = DEFAULT_FONT_SIZE return True elif r.text.strip(): # 最后一个非空 run,在其后追加新 run,复制格式 new_run = para.add_run(value) copy_run_format(r, new_run) if r.font.color and r.font.color.rgb: new_run.font.color.rgb = r.font.color.rgb return True return False def delete_table_row(table, row): """从表格中真正删除一行(使用XML操作)""" tbl = table._tbl tr = row._tr tbl.remove(tr) def set_cell_text(cell, text): """设置单元格文本:删除除第一个段落外的所有段落,清空第一个段落并填入新文本""" tc = cell._tc # 删除除第一个外的所有段落(从后往前删) while len(cell.paragraphs) > 1: p = cell.paragraphs[-1]._p tc.remove(p) # 清空并填充第一个段落 para = cell.paragraphs[0] style = para.style para.clear() run = para.add_run(text) para.style = style def set_cell_multiline(cell, text): """设置单元格多段落文本:按换行符拆分为多个段落,保留模板格式。 模板中 Description 单元格的格式为: Para 0: Model: ... Para 1: (配置描述) Para 2: Color: ... Para 3: 型号:... Para 4: (配置描述) Para 5: 颜色:... """ from docx.oxml.ns import qn from copy import deepcopy tc = cell._tc # 保存第一个段落的完整 XML 作为模板 first_p = cell.paragraphs[0]._p template_pPr = first_p.find(qn('w:pPr')) # 删除除第一个外的所有段落(从后往前删) while len(cell.paragraphs) > 1: p = cell.paragraphs[-1]._p tc.remove(p) # 清空第一个段落 first_para = cell.paragraphs[0] first_para.clear() # 按换行符拆分 lines = text.split('\n') # 第一行填入第一个段落 if lines: first_para.add_run(lines[0]) # 后续行追加为新段落 for line in lines[1:]: new_p = tc.makeelement(qn('w:p'), {}) # 复制段落属性(对齐、缩进等) if template_pPr is not None: new_p.append(deepcopy(template_pPr)) # 添加 run new_r = new_p.makeelement(qn('w:r'), {}) new_t = new_r.makeelement(qn('w:t'), {}) new_t.text = line new_t.set(qn('xml:space'), 'preserve') new_r.append(new_t) new_p.append(new_r) tc.append(new_p) def build_full_description(vehicle: dict, match_result: dict = None) -> str: """ 构建销售合同中 Commodity 表格的完整货物描述。 格式参考模板: Model: {model_code}, {engine_code} {中文车型名} {英文车型名} {中文配置} {英文配置} Color: {color} 型号:{model_code}, {engine_code} {英文车型名} """ model_code = vehicle.get('model_code', '') engine_code = vehicle.get('engine_code', '') color = vehicle.get('color', '1') name_cn = vehicle.get('name_cn', '') name_en = vehicle.get('name_en', '') lines = [] if match_result: # 有价格表匹配,使用价格表信息 desc_cn = match_result.get('description_cn', '') series_en = match_result.get('series_en', '') config_desc = match_result.get('config_desc', '') # 分离中英文配置描述 config_cn = '' config_en = '' if config_desc: config_parts = [p.strip() for p in config_desc.split('\n') if p.strip()] for part in config_parts: if re.search(r'[\u4e00-\u9fff]', part): config_cn = part else: config_en = part # Model 行(英文) model_line = f"Model: {model_code}, {engine_code}" if series_en: model_line += f" {series_en}" lines.append(model_line) # 英文配置描述 if config_en: lines.append(config_en) else: # 无价格表匹配,使用订单信息 model_line = f"Model: {model_code}, {engine_code}" if name_en: model_line += f" {name_en}" elif name_cn: model_line += f" {name_cn}" lines.append(model_line) # Color (English) if color and color != '1': lines.append(f"Color: {color}") else: lines.append("Color: ") # 型号行(中文标签)- 带中文描述 model_cn_line = f"型号:{model_code}, {engine_code}" if match_result and match_result.get('description_cn'): model_cn_line += f" {match_result['description_cn']}" elif name_cn: model_cn_line += f" {name_cn}" elif name_en: model_cn_line += f" {name_en}" lines.append(model_cn_line) # 中文配置描述(仅提取包含中文的配置行) if match_result and match_result.get('config_desc'): config_parts = [p.strip() for p in match_result['config_desc'].split('\n') if p.strip()] for part in config_parts: if re.search(r'[\u4e00-\u9fff]', part): lines.append(part) # 颜色行(中文) if color and color != '1': lines.append(f"颜色:{color}") else: lines.append("颜色:") return '\n'.join(lines) def process_vehicles(vehicles: list, trade_term: str) -> list: """ 处理车型列表,匹配价格表或提示用户提供价格 Returns: (processed_vehicles, missing_prices) """ processed = [] missing = [] for v in vehicles: model_code = v.get('model_code', '') # 如果用户已提供单价,直接使用 if v.get('unit_price_usd') is not None: processed.append(v) continue # 从价格表匹配 match_result = match_vehicle(model_code, trade_term) if match_result and match_result.get('unit_price_usd'): v['unit_price_usd'] = match_result['unit_price_usd'] v['config_desc'] = build_vehicle_description(v, match_result) v['_match_result'] = match_result processed.append(v) else: # 未匹配到价格 missing.append(v) return processed, missing def calculate_summary(vehicles: list) -> dict: """计算汇总信息""" total_qty = sum(v['quantity'] for v in vehicles) total_amount = sum(v['unit_price_usd'] * v['quantity'] for v in vehicles) deposit_30, balance_70 = calculate_payment_split(total_amount) return { 'total_qty': total_qty, 'total_amount': round(total_amount, 2), 'total_amount_int': round(total_amount), 'deposit_30': deposit_30, 'balance_70': balance_70, 'amount_in_words': format_say_us_only(total_amount), 'amount_in_chinese': number_to_chinese(round(total_amount)), } def generate_proforma_invoice(order: dict, vehicles: list, summary: dict, contract_no: str, output_dir: str) -> str: """生成Proforma Invoice""" import openpyxl from openpyxl.utils import get_column_letter # 复制模板 template_path = get_asset_path('proforma-invoice-template.xlsx') output_filename = f"{contract_no}-Proforma Invoice.xlsx" output_path = os.path.join(output_dir, output_filename) shutil.copy(template_path, output_path) # 打开并修改 wb = openpyxl.load_workbook(output_path) ws = wb['INVOICE'] # 买方信息 ws['B7'] = f"TO: {order.get('buyer_en', '')}" ws['G7'] = f"Contract NO.: {contract_no}" ws['H8'] = format_date_for_cell() ws['B8'] = f"ADD: {order.get('address_en', '')}" ws['B9'] = f"Tel: {order.get('tel', '')}" # 港口信息 port_cn, port_en = get_port_names(order.get('departure_port', '广州南沙')) trade_term = order.get('trade_term', 'FCA') # 商品明细 (B17-H22) start_row = 17 for i, v in enumerate(vehicles): row = start_row + i if row > 22: break # 模板最多支持到22行 desc = v.get('config_desc') or build_vehicle_description(v, v.get('_match_result')) if not desc: desc = f"Wuling {v['model_code']}, {v['engine_code']}" ws.cell(row=row, column=2, value=desc) # B列: DESCRIPTION ws.cell(row=row, column=3, value=v['quantity']) # C列: QTY ws.cell(row=row, column=4, value='UNIT') # D列 ws.cell(row=row, column=5, value='USD') # E列 ws.cell(row=row, column=6, value=v['unit_price_usd']) # F列: U.PRICE ws.cell(row=row, column=7, value='USD') # G列 # H列保留公式 =C*F # 清除未使用的行(如果车辆少于模板默认行数) # 注意: cell.value = None 在 openpyxl 中不能真正清除单元格,必须用 _value = None for row in range(start_row + len(vehicles), 23): for col in range(2, 9): ws.cell(row=row, column=col)._value = None # 汇总信息 ws['E23'] = f" {trade_term} {port_en} " ws['B24'] = summary['amount_in_words'] # Delivery terms ws['B27'] = f"1. Delivery terms: {trade_term} {port_en} " # Payment terms payment_text = ( f"3. Payment terms:\n" f"1. 30% of the Total Amount (USD {summary['deposit_30']:,}) shall be paid to Seller within 5 working days from order confirmation.\n" f"2. 70% of the Total Amount (USD {summary['balance_70']:,}) shall be paid to Seller within 10 working days before delivery. " ) ws['B29'] = payment_text wb.save(output_path) wb.close() return output_path def accept_all_revisions(doc): """ 清除文档中的所有修订标记(Track Changes)和批注。 解决WPS/Word模板中未接受修订导致内容重复显示的问题。 策略:仅处理正文段落(不遍历表格),避免误删表格中的图片。 """ body = doc.element.body ns = 'http://schemas.openxmlformats.org/wordprocessingml/2006/main' from docx.oxml.ns import qn # 仅处理 body 直接子元素中的段落(跳过表格内的内容,保护图片) for p in body.findall(qn('w:p')): # 删除段落内的 元素 for del_elem in list(p.iter(f'{{{ns}}}del')): parent = del_elem.getparent() if parent is not None: parent.remove(del_elem) # 删除段落内的 元素 for ins_elem in list(p.iter(f'{{{ns}}}ins')): parent = ins_elem.getparent() if parent is not None: parent.remove(ins_elem) # 删除批注标记 for tag in ['commentRangeStart', 'commentRangeEnd', 'commentReference']: for elem in list(p.iter(f'{{{ns}}}{tag}')): parent = elem.getparent() if parent is not None: parent.remove(elem) def generate_sales_contract(order: dict, vehicles: list, summary: dict, contract_no: str, output_dir: str) -> str: """生成销售合同 (Word)""" from docx import Document # 复制模板 template_path = get_asset_path('vehicle-sales-contract-template.docx') output_filename = f"{contract_no}-车辆销售合同.docx" output_path = os.path.join(output_dir, output_filename) shutil.copy(template_path, output_path) # 打开并修改 doc = Document(output_path) # 先接受所有修订、删除批注,避免WPS显示重复内容 accept_all_revisions(doc) port_cn, port_en = get_port_names(order.get('departure_port', '广州南沙')) trade_term = order.get('trade_term', 'FCA') # 1. 替换段落文本(保留原有格式) for para in doc.paragraphs: text = para.text if not text: continue # 合同编号: No.: → No.: HLXYWPA... if 'No.:' in text and not re.search(r'No\.:\s*\S', text): fill_paragraph_value(para, 'No.:', f' {contract_no}') # 签署日期 if 'Signature Date:' in text and not re.search(r'Signature Date:\s*\d', text): fill_paragraph_value(para, 'Signature Date:', f' {format_date()}') # 买方英文 if text.strip() == 'Buyer:': fill_paragraph_value(para, 'Buyer:', f' {order.get("buyer_en", "")}') # 买方中文 if text.strip() == '买方:': fill_paragraph_value(para, '买方:', f'{order.get("buyer_cn", "")}') # 英文地址(匹配 ADD: 或 ADD : 等变体,且内容较短说明是占位符) if re.search(r'ADD\s*[::]', text) and len(text.strip()) < 10: # 查找实际标记(支持空格和不间断空格) m = re.search(r'(ADD\s*[::])', text) if m: fill_paragraph_value(para, m.group(1), f' {order.get("address_en", "")}') # 中文地址 if re.search(r'地址\s*[::]', text) and len(text.strip()) < 10: m = re.search(r'(地址\s*[::])', text) if m: fill_paragraph_value(para, m.group(1), f'{order.get("address_cn", "")}') # 电话 if text.strip() == 'Tel电话:': fill_paragraph_value(para, 'Tel电话:', f' {order.get("tel", "")}') # 出口目的地(英文) if 'Export Destination:' in text: if para.runs: for run in para.runs: if run.text == ' ' and run.bold is None: run.text = f' {order.get("destination_country", "")} ' break # 出口目的地(中文) if '出口目的地:' in text: dest_cn = order.get('destination_country', '') if dest_cn and para.runs: # 在":"后插入国家名,保留后面的(下称"区域") for run in para.runs: if '(下称' in run.text: run.text = run.text.replace('(下称', f'{dest_cn}(下称') break # 贸易条款(中英文同段落) if 'Incoterms:' in text: if para.runs: src = para.runs[0] # 保留第一个run的格式(bold=True) para.clear() # 英文部分 run_en = para.add_run(f"Incoterms: {trade_term} {port_en}") copy_run_format(src, run_en) # 中文部分 run_cn = para.add_run(f"贸易条款:{trade_term} {port_cn}") set_run_format(run_cn, bold=None) # 付款条款英文 if '30% of the Total Amount' in text and 'shall be paid' in text: if para.runs: src = para.runs[-1] para.clear() run = para.add_run( f"30% of the Total Amount shall be paid to the Seller by T/T before the production. " f"70% of the Total Amount shall be paid to the Seller by the Buyer by means of T/T before the delivery of vehicles." ) copy_run_format(src, run) # 付款条款中文 if '生产前,买方向卖方通过电汇支付' in text: if para.runs: src = para.runs[-1] para.clear() run = para.add_run( f"生产前,买方向卖方通过电汇支付本合同项下30% 货款。" f"发运前,买方向卖方通过电汇支付本合同项下70% 货款。" ) copy_run_format(src, run) # 2. 插入买方信息(签字区域,卖方信息之后) buyer_en = order.get('buyer_en', '') buyer_cn = order.get('buyer_cn', '') if buyer_en or buyer_cn: # 找到第二个 "Signature 签字:" (买方签字区) 前的空白段落 sig_count = 0 buyer_sig_idx = -1 for i, para in enumerate(doc.paragraphs): if 'Signature' in para.text and '签字' in para.text: sig_count += 1 if sig_count == 2: buyer_sig_idx = i break if buyer_sig_idx > 0: # 从卖方Title之后向前找空段落,按顺序填入Buyer(EN)和买方(CN) empty_slots = [] for i in range(buyer_sig_idx - 1, max(buyer_sig_idx - 6, 0), -1): if not doc.paragraphs[i].text.strip(): empty_slots.append(i) else: break # 遇到非空段落就停止 empty_slots.reverse() # 正序(从上到下) if len(empty_slots) >= 2 and buyer_en: run = doc.paragraphs[empty_slots[0]].add_run(f'Buyer: {buyer_en}') set_run_format(run, bold=True) if len(empty_slots) >= 2 and buyer_cn: run = doc.paragraphs[empty_slots[1]].add_run(f'买方:{buyer_cn}') set_run_format(run, bold=True) elif len(empty_slots) >= 1 and buyer_cn: run = doc.paragraphs[empty_slots[0]].add_run(f'买方:{buyer_cn}') set_run_format(run, bold=True) # 3. 替换表格内容 for table in doc.tables: first_cell_text = table.rows[0].cells[0].text.strip() if table.rows else '' if 'DESCRIPTION' in first_cell_text or '货物描述' in first_cell_text: _fill_vehicle_table(table, vehicles, summary, trade_term, port_cn, port_en) # 银行账户表和商标表保持原样 doc.save(output_path) return output_path def _fill_vehicle_table(table, vehicles, summary, trade_term, port_cn, port_en): """填充销售合同中的车辆明细表(保留格式 + 删除多余行)""" data_start_row = 1 # 表头后第1行开始数据 # 先找到汇总行位置 summary_row_idx = None for idx, row in enumerate(table.rows): if idx == 0: continue row_text = ' '.join(cell.text for cell in row.cells) if 'Total' in row_text or '总数量' in row_text or 'SAY USD' in row_text: summary_row_idx = idx break # 确定可用的数据行(汇总行之前的行) available_data_rows = [] for idx in range(data_start_row, len(table.rows)): if idx == summary_row_idx: break available_data_rows.append(idx) # 1. 填充车辆数据 for i, v in enumerate(vehicles): if i >= len(available_data_rows): break row = table.rows[available_data_rows[i]] desc = build_full_description(v, v.get('_match_result')) if len(row.cells) > 0: set_cell_multiline(row.cells[0], desc) if len(row.cells) > 1: set_cell_text(row.cells[1], str(v['quantity'])) if len(row.cells) > 2: set_cell_text(row.cells[2], str(v['unit_price_usd'])) if len(row.cells) > 3: set_cell_text(row.cells[3], str(round(v['unit_price_usd'] * v['quantity'], 2))) # 2. 先填充汇总行(在删除操作之前,避免索引失效) if summary_row_idx is not None and summary_row_idx < len(table.rows): summary_row = table.rows[summary_row_idx] amount_en = summary['amount_in_words'].replace('SAY US ', '').replace(' only', '') amount_cn = summary['amount_in_chinese'] total_text = ( f"Total: {summary['total_qty']} units 总数量: {summary['total_qty']} 辆 \n" f"Total Amount: USD {summary['total_amount']:.2f} 总金额: 美元 {summary['total_amount']:.2f}\n" f"(Say USD: {amount_en} ONLY)\n" f"(合计美元:{amount_cn}美元整)\n" f"{trade_term} {port_en} {trade_term} {port_cn}" ) for cell in summary_row.cells: set_cell_multiline(cell, total_text) # 3. 删除多余空白行(从后往前删,跳过汇总行) filled_count = min(len(vehicles), len(available_data_rows)) rows_to_delete = [] for idx in range(data_start_row + filled_count, len(table.rows)): if idx == summary_row_idx: break rows_to_delete.append(table.rows[idx]) for row in reversed(rows_to_delete): delete_table_row(table, row) def generate_contracts(order_text: str, output_dir: str = '.', user_prices: dict = None) -> dict: """ 主函数:根据订单文本生成合同 Args: order_text: 订单信息文本 output_dir: 输出目录 user_prices: 用户手动提供的价格 {model_code: price} Returns: { 'pi_path': str, 'contract_path': str, 'contract_no': str, 'summary': dict, 'missing_prices': list, } """ # 1. 解析订单 order = parse_order_info(order_text) # 2. 生成合同编号 contract_no = generate_contract_no() # 3. 处理车型价格 vehicles = order.get('vehicles', []) trade_term = order.get('trade_term', 'FCA') # 应用用户提供的价格 if user_prices: for v in vehicles: if v['model_code'] in user_prices: v['unit_price_usd'] = user_prices[v['model_code']] processed, missing = process_vehicles(vehicles, trade_term) # 如果有缺失价格,返回提示信息 if missing: return { 'success': False, 'missing_prices': missing, 'message': f"以下车型在价格表中未找到,请提供单价USD: {[v['model_code'] for v in missing]}", } # 4. 计算汇总 summary = calculate_summary(processed) # 5. 生成文件 os.makedirs(output_dir, exist_ok=True) pi_path = generate_proforma_invoice(order, processed, summary, contract_no, output_dir) contract_path = generate_sales_contract(order, processed, summary, contract_no, output_dir) return { 'success': True, 'contract_no': contract_no, 'pi_path': pi_path, 'contract_path': contract_path, 'summary': summary, 'missing_prices': [], } if __name__ == '__main__': import argparse parser = argparse.ArgumentParser(description='Generate export contracts from order info') parser.add_argument('order_file', help='Path to order info text file') parser.add_argument('-o', '--output', default='.', help='Output directory') parser.add_argument('-p', '--prices', help='JSON string of manual prices {"LZW1028SPY": 6176}') args = parser.parse_args() with open(args.order_file, 'r', encoding='utf-8') as f: order_text = f.read() user_prices = None if args.prices: import json user_prices = json.loads(args.prices) result = generate_contracts(order_text, args.output, user_prices) import json as json_mod sys.stdout.reconfigure(encoding='utf-8') print(json_mod.dumps(result, ensure_ascii=False, indent=2))