889 lines
32 KiB
Python
889 lines
32 KiB
Python
"""
|
||
翻译工具通用函数模块
|
||
包含所有工具函数和处理逻辑
|
||
"""
|
||
|
||
import os
|
||
import pandas as pd
|
||
import re
|
||
import openpyxl
|
||
from openpyxl import Workbook
|
||
import jieba
|
||
import datetime
|
||
from collections import defaultdict
|
||
import random
|
||
from autogen import ConversableAgent
|
||
import json
|
||
from pathlib import Path
|
||
from typing import Union, List
|
||
|
||
from backend.config import (
|
||
ENGLISH_STOPWORDS,
|
||
DEFAULT_MIN_WORD_LEN, DEFAULT_MAX_STR_LEN, DEFAULT_TOP_K,
|
||
DEFAULT_SAMPLE_K, DEFAULT_SAMPLE_R, get_llm_config_ag2
|
||
)
|
||
|
||
# ============================================================================
|
||
# 文件操作函数
|
||
# ============================================================================
|
||
|
||
def load_txt(txt_path):
|
||
"""读取文本文件内容"""
|
||
with open(txt_path, 'r', encoding='utf-8') as file:
|
||
content = file.read()
|
||
return content
|
||
|
||
def write_to_txt(file_path, text, mode, line=False):
|
||
"""
|
||
将文本写入到指定的txt文件
|
||
|
||
Args:
|
||
file_path: txt文件路径
|
||
text: 要写入的文本内容
|
||
mode: 写入模式 'a'|'w'
|
||
line: 是否按换行符分割并逐行写入
|
||
"""
|
||
if mode not in ['a', 'w']:
|
||
return "error mode"
|
||
|
||
# 检查文件是否存在以及是否为空
|
||
if os.path.exists(file_path) and os.path.getsize(file_path) > 0 and mode == 'a':
|
||
file_mode = 'a'
|
||
else:
|
||
file_mode = 'w'
|
||
|
||
with open(file_path, file_mode, encoding='utf-8') as file:
|
||
if line:
|
||
lines = text.split('\\n')
|
||
for li in lines:
|
||
file.write(li + '\n')
|
||
else:
|
||
file.write(text)
|
||
|
||
# ============================================================================
|
||
# Excel处理函数
|
||
# ============================================================================
|
||
|
||
def read_character_styles(excel_path, sheet_name=0, skip_columns=2,
|
||
name_row=1, style_row=16):
|
||
"""
|
||
读取角色风格指南Excel文档的增强版本
|
||
|
||
Args:
|
||
excel_path (str): Excel文件路径
|
||
sheet_name (str/int): 工作表名称或索引,默认为第一个工作表
|
||
skip_columns (int): 跳过的列数,默认为2
|
||
name_row (int): 角色名所在行号(从1开始),默认为1
|
||
style_row (int): 风格参考所在行号(从1开始),默认为16
|
||
|
||
Returns:
|
||
dict: 键为角色名,值为对应角色风格的字典
|
||
"""
|
||
try:
|
||
# 读取Excel文件
|
||
df = pd.read_excel(excel_path, sheet_name=sheet_name, header=None)
|
||
|
||
# 转换为pandas索引(从0开始)
|
||
name_row_idx = name_row - 1
|
||
style_row_idx = style_row - 1
|
||
|
||
# 检查表格是否有足够的行和列
|
||
if df.shape[0] < max(name_row, style_row):
|
||
raise IndexError(f"表格行数不足,需要至少{max(name_row, style_row)}行")
|
||
|
||
if df.shape[1] <= skip_columns:
|
||
raise IndexError(f"表格列数不足,需要超过{skip_columns}列")
|
||
|
||
# 从指定列开始获取角色名
|
||
character_names = df.iloc[name_row_idx, skip_columns:].dropna()
|
||
|
||
if len(character_names) == 0:
|
||
print("警告:未找到任何角色名")
|
||
return {}
|
||
|
||
# 构建字典
|
||
style_dict = {}
|
||
for i, name in enumerate(character_names):
|
||
if pd.notna(name) and str(name).strip():
|
||
# 计算对应的列索引
|
||
style_col_idx = skip_columns + i
|
||
|
||
# 获取对应位置的风格
|
||
if style_col_idx < df.shape[1]:
|
||
style = df.iloc[style_row_idx, style_col_idx]
|
||
style_dict[str(name).strip()] = str(style).strip() if pd.notna(style) else ""
|
||
else:
|
||
style_dict[str(name).strip()] = ""
|
||
|
||
print(f"成功读取{len(style_dict)}个角色的风格信息")
|
||
return style_dict
|
||
|
||
except FileNotFoundError:
|
||
print(f"错误:找不到文件 {excel_path}")
|
||
return {}
|
||
except IndexError as e:
|
||
print(f"错误:{e}")
|
||
return {}
|
||
except Exception as e:
|
||
print(f"读取Excel文件时发生错误:{e}")
|
||
return {}
|
||
|
||
def read_ori(source, index, head=False):
|
||
"""
|
||
读取原文Excel文件
|
||
|
||
Args:
|
||
source: Excel文件的路径
|
||
index: 二元组(s, j),s是工作表索引,j是列标识符列表
|
||
head: 是否跳过首行
|
||
|
||
Returns:
|
||
ori_str_all: 全量拼接的原文
|
||
ori_str_all_list: 逐条原文列表
|
||
ori_len: 原文字数
|
||
"""
|
||
ori_len = 0
|
||
|
||
with open(source, 'rb') as f:
|
||
xl = pd.ExcelFile(f)
|
||
|
||
try:
|
||
# 从index中解析出工作表索引和列标识符列表
|
||
sheet_indices, col_letters = index
|
||
|
||
# 如果sheet_indices是单个整数,将其转换为列表
|
||
if isinstance(sheet_indices, int):
|
||
sheet_indices = [sheet_indices]
|
||
|
||
# 读取所有指定的工作表并拼接
|
||
df_list = []
|
||
for sheet_index in sheet_indices:
|
||
df = xl.parse(sheet_index, header=None if not head else 0)
|
||
df_list.append(df)
|
||
|
||
# 将所有DataFrame拼接在一起
|
||
df = pd.concat(df_list, ignore_index=True)
|
||
|
||
# 将列字母转换为数字索引
|
||
def col_letter_to_index(col_letter):
|
||
return ord(col_letter.upper()) - ord('A')
|
||
|
||
col_indices = [col_letter_to_index(col) for col in col_letters]
|
||
|
||
# 初始化结果字符串和结果list
|
||
ori_str_all = ""
|
||
ori_str_all_list = []
|
||
|
||
# 遍历DataFrame的行
|
||
for i, row in df.iterrows():
|
||
# 获取指定列的值并格式化
|
||
formatted_str = f"<o={i}>"
|
||
for col_index in col_indices:
|
||
value = row.iloc[col_index]
|
||
|
||
if pd.isna(value):
|
||
value = "旁白"
|
||
if value == "<ref=PlayerName>" or value == "团长":
|
||
value = "玩家"
|
||
|
||
if len(col_indices) == 1: # 只有一列即系统字
|
||
ori_len += len(str(value))
|
||
if len(col_indices) > 1 and col_index == col_indices[1]: # 多于一列,在第一列的内容后面添加冒号
|
||
formatted_str += ":"
|
||
ori_len += len(str(value)) # 只计算对话(第二列)字数,不计入人名
|
||
formatted_str += f"{value} "
|
||
|
||
formatted_str = formatted_str.strip() + "</o>\n"
|
||
|
||
# 添加到结果字符串和结果list中
|
||
ori_str_all += formatted_str
|
||
ori_str_all_list.append(formatted_str)
|
||
|
||
finally:
|
||
# 关闭 ExcelFile 以释放资源
|
||
pass
|
||
|
||
return ori_str_all, ori_str_all_list, ori_len
|
||
|
||
def merge_excel_sheets(input_folder: Union[str, Path], output_file: Union[str, Path],
|
||
file_pattern: str = "*.xlsx") -> None:
|
||
"""
|
||
将指定文件夹中的所有Excel文件的sheet合并到一个新的Excel文件中
|
||
|
||
Args:
|
||
input_folder: 输入文件夹路径
|
||
output_file: 输出Excel文件路径
|
||
file_pattern: Excel文件匹配模式,默认为"*.xlsx"
|
||
"""
|
||
# 转换路径为Path对象
|
||
input_folder = Path(input_folder)
|
||
output_file = Path(output_file)
|
||
|
||
# 检查输入文件夹是否存在
|
||
if not input_folder.exists():
|
||
raise FileNotFoundError(f"输入文件夹 '{input_folder}' 不存在")
|
||
|
||
# 获取所有Excel文件并排序
|
||
excel_files = sorted(input_folder.glob(file_pattern))
|
||
|
||
# 检查是否找到Excel文件
|
||
if not excel_files:
|
||
raise ValueError(f"在文件夹 '{input_folder}' 中没有找到Excel文件")
|
||
|
||
# 创建ExcelWriter对象
|
||
with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
|
||
# 遍历所有Excel文件
|
||
for index, excel_file in enumerate(excel_files, start=1):
|
||
try:
|
||
# 读取Excel文件,header=None表示不使用第一行作为表头
|
||
df = pd.read_excel(excel_file, header=None)
|
||
|
||
# 使用数字作为sheet名
|
||
sheet_name = str(index)
|
||
|
||
# 将数据框写入新的Excel文件,创建新的sheet
|
||
df.to_excel(writer, sheet_name=sheet_name, index=False, header=False)
|
||
|
||
print(f"成功添加sheet {sheet_name}: {excel_file.name}")
|
||
|
||
except Exception as e:
|
||
print(f"处理文件 '{excel_file}' 时出错: {str(e)}")
|
||
continue
|
||
|
||
print(f"\n所有sheet已成功合并到文件: {output_file}")
|
||
|
||
def write_data_to_excel(s_data, o_data, sheet_index, excel_file_path, include_name):
|
||
"""将翻译数据写入Excel文件"""
|
||
# 创建一个新的Excel工作簿或加载现有的工作簿
|
||
try:
|
||
wb = openpyxl.load_workbook(excel_file_path)
|
||
except FileNotFoundError:
|
||
wb = openpyxl.Workbook()
|
||
|
||
# 如果指定的工作表索引超出范围,则创建新的工作表
|
||
if sheet_index >= len(wb.sheetnames):
|
||
sheet = wb.create_sheet(f"Sheet{sheet_index + 1}")
|
||
else:
|
||
sheet = wb.worksheets[sheet_index]
|
||
|
||
# 创建一个字典,以便快速查找<o={i}></o>标签对的数据
|
||
o_dict = {i: sentence for i, name, sentence in o_data}
|
||
|
||
# 遍历<s={i}></s>标签对的数据列表,将数据写入到Excel文件中
|
||
for i, name, translated_sentence, term in s_data:
|
||
print(f"Writing row {i}: name={name}, translated_sentence={translated_sentence}, term={term}")
|
||
if include_name:
|
||
sheet.cell(row=i+1, column=1, value=name)
|
||
sheet.cell(row=i+1, column=2, value=translated_sentence)
|
||
sheet.cell(row=i+1, column=3, value=term)
|
||
else:
|
||
sheet.cell(row=i+1, column=1, value=translated_sentence)
|
||
sheet.cell(row=i+1, column=2, value=term)
|
||
|
||
# 如果存在对应的<o={i}></o>标签对的数据,则写入到相应列
|
||
if i in o_dict:
|
||
if include_name:
|
||
sheet.cell(row=i+1, column=4, value=o_dict[i])
|
||
else:
|
||
sheet.cell(row=i+1, column=3, value=o_dict[i])
|
||
print(f"Writing to column {4 if include_name else 3}: {o_dict[i]}")
|
||
|
||
# 保存Excel文件
|
||
wb.save(excel_file_path)
|
||
|
||
def post_process_excel(excel_file_path):
|
||
"""
|
||
对翻译完成的Excel文件进行后处理
|
||
1. 删除所有包含<t>的单元格所在行的所有内容
|
||
2. 清空第B列的所有内容
|
||
|
||
Args:
|
||
excel_file_path: Excel文件路径
|
||
"""
|
||
try:
|
||
# 加载Excel工作簿
|
||
wb = openpyxl.load_workbook(excel_file_path)
|
||
|
||
# 遍历所有工作表
|
||
for sheet in wb.worksheets:
|
||
rows_to_delete = []
|
||
|
||
# 第一步:找出所有包含<t>的单元格所在的行
|
||
for row_num in range(1, sheet.max_row + 1):
|
||
for col_num in range(1, sheet.max_column + 1):
|
||
cell = sheet.cell(row=row_num, column=col_num)
|
||
if cell.value and isinstance(cell.value, str) and '<t>' in cell.value:
|
||
rows_to_delete.append(row_num)
|
||
break # 找到一个包含<t>的单元格就足够了,跳出列循环
|
||
|
||
# 从后往前删除行(避免索引变化问题)
|
||
for row_num in sorted(rows_to_delete, reverse=True):
|
||
sheet.delete_rows(row_num)
|
||
print(f"删除包含<t>的行: {row_num}")
|
||
|
||
# 第二步:清空第B列的所有内容
|
||
for row_num in range(1, sheet.max_row + 1):
|
||
cell = sheet.cell(row=row_num, column=2) # B列是第2列
|
||
if cell.value:
|
||
cell.value = None
|
||
|
||
print(f"已清空工作表 '{sheet.title}' 的B列内容")
|
||
|
||
# 保存修改后的Excel文件
|
||
wb.save(excel_file_path)
|
||
print(f"Excel文件后处理完成: {excel_file_path}")
|
||
|
||
except Exception as e:
|
||
print(f"Excel文件后处理时发生错误: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
|
||
# ============================================================================
|
||
# 文本处理函数
|
||
# ============================================================================
|
||
|
||
def col_letter_to_index(col_letter):
|
||
"""将Excel列的字母标识转换为0基索引"""
|
||
col_letter = col_letter.upper()
|
||
index = 0
|
||
for char in col_letter:
|
||
index = index * 26 + (ord(char) - ord('A') + 1)
|
||
return index - 1
|
||
|
||
def clean_text(text, lang="CN"):
|
||
"""清理文本,去除无关字符"""
|
||
# 确保传入的text是字符串
|
||
if not isinstance(text, str):
|
||
text = str(text)
|
||
|
||
if lang == "CN":
|
||
# 去除非中文字符
|
||
return re.sub(r'[^\u4e00-\u9fa5]', '', text)
|
||
elif lang == "EN":
|
||
# 英文处理:保留字母、数字、连字符、撇号和句号
|
||
processed = re.sub(r"[^a-zA-Z0-9'.-]", ' ', text)
|
||
# 删除末尾的句号(可能后跟空格)
|
||
processed = re.sub(r'\.+(\s*)$', r'\1', processed)
|
||
# 合并多个空格并去首尾空格
|
||
processed = re.sub(r'\s+', ' ', processed).strip()
|
||
return processed
|
||
else:
|
||
# 其他语言不做处理,直接返回
|
||
return text
|
||
|
||
def cut_EN(ori_str_clean):
|
||
"""英文分词处理"""
|
||
words = ori_str_clean.split()
|
||
ori_words = set(
|
||
word.lower()
|
||
for word in words
|
||
if (
|
||
word.lower() not in ENGLISH_STOPWORDS # 过滤停用词
|
||
and len(word.lower()) >= 2 # 长度≥2
|
||
and not word.lower().isdigit() # 排除纯数字
|
||
)
|
||
)
|
||
return ori_words
|
||
|
||
# ============================================================================
|
||
# 术语处理函数
|
||
# ============================================================================
|
||
|
||
def extract_terms_from_text(text, excel_path, sheet_details):
|
||
"""
|
||
从文本中提取术语,并根据Excel表格中的记录查找对应的翻译
|
||
采用直接字符串匹配的方法来处理长术语的匹配问题
|
||
"""
|
||
if excel_path is None:
|
||
return ''
|
||
|
||
sheet_index_or_name, source_col, target_col = sheet_details
|
||
|
||
# 将列的字母标识转换为0基索引
|
||
source_col_index = col_letter_to_index(source_col)
|
||
target_col_index = col_letter_to_index(target_col)
|
||
|
||
# 读取术语文档指定sheet的指定两列
|
||
df = pd.read_excel(excel_path, sheet_name=sheet_index_or_name, usecols=[source_col_index, target_col_index])
|
||
|
||
if source_col_index > target_col_index:
|
||
source_col_index, target_col_index = 0, 1
|
||
else:
|
||
source_col_index, target_col_index = 1, 0
|
||
|
||
# 创建一个空字典来存储找到的术语翻译
|
||
found_translations = {}
|
||
|
||
# 遍历术语表中的每个术语,直接在文本中查找
|
||
for index, row in df.iterrows():
|
||
term = str(row.iloc[target_col_index])
|
||
if term in text:
|
||
# 如果找到了术语,记录其翻译
|
||
found_translations[term] = row.iloc[source_col_index]
|
||
|
||
return found_translations
|
||
|
||
def get_glossary(lang, user_glossary_path=None):
|
||
"""获取指定语言的术语表路径 - 已修改为使用用户配置"""
|
||
if user_glossary_path:
|
||
print(f"[DEBUG] 使用用户术语表路径: {user_glossary_path}")
|
||
return user_glossary_path
|
||
else:
|
||
print(f"[WARNING] 未提供用户术语表路径")
|
||
return None
|
||
|
||
# ============================================================================
|
||
# 文本匹配和参考提取函数
|
||
# ============================================================================
|
||
|
||
def partial_match_translation(ref, sheet_info_list, ori_str, min_word_len=DEFAULT_MIN_WORD_LEN,
|
||
max_str_len=DEFAULT_MAX_STR_LEN, lang="CN"):
|
||
"""
|
||
从参考文件中提取匹配的翻译
|
||
|
||
Args:
|
||
ref: 已翻译内容的文件路径
|
||
sheet_info_list: tuple (工作表索引,读入的列索引)
|
||
ori_str: 待翻译文本
|
||
min_word_len: 分词最小长度
|
||
max_str_len: 超过该数值的长度的原文直接去掉
|
||
lang: 语言类型
|
||
|
||
Returns:
|
||
匹配到的所有语料列表,[cn, en, list(intersection)]
|
||
"""
|
||
cn_list = []
|
||
en_list = []
|
||
|
||
# 读取Excel文件中的多个工作表
|
||
for sheet_index, col_cn, col_en in sheet_info_list:
|
||
df = pd.read_excel(ref, sheet_name=sheet_index)
|
||
|
||
# 提取指定原文-译文列 cn原文en译文
|
||
cn_col = df.iloc[:, col_cn]
|
||
en_col = df.iloc[:, col_en]
|
||
|
||
# 将两列分别转换为list
|
||
cn_list.extend(cn_col.tolist())
|
||
en_list.extend(en_col.tolist())
|
||
|
||
# 对待翻译文本进行预处理和分词
|
||
ori_str_clean = clean_text(ori_str, lang=lang)
|
||
if lang == 'CN':
|
||
ori_words = set(word for word in jieba.cut(ori_str_clean) if len(word) >= min_word_len)
|
||
elif lang == 'EN':
|
||
ori_words = cut_EN(ori_str_clean)
|
||
else:
|
||
print("lang err")
|
||
ori_words = set()
|
||
|
||
matches = []
|
||
|
||
# 进行部分匹配
|
||
for cn, en in zip(cn_list, en_list):
|
||
cn_clean = clean_text(cn, lang=lang)
|
||
|
||
if lang == 'CN':
|
||
cn_words = set(word for word in jieba.cut(cn_clean) if len(word) >= min_word_len)
|
||
elif lang == 'EN':
|
||
words_ref = cn_clean.split()
|
||
cn_words = set(word.lower() for word in words_ref if word.lower() not in ENGLISH_STOPWORDS)
|
||
else:
|
||
cn_words = set()
|
||
|
||
intersection = ori_words & cn_words
|
||
|
||
if intersection and len(str(cn)) < max_str_len:
|
||
if isinstance(en, str) and en.strip():
|
||
matches.append([str(cn), en, list(intersection)])
|
||
|
||
return matches
|
||
|
||
def select_top_k_matches(matches, ori_words, k=DEFAULT_TOP_K):
|
||
"""
|
||
从模糊匹配得到的所有结果挑选最短的k个条目
|
||
|
||
Args:
|
||
matches: 上一步检索出的所有待选语料
|
||
ori_words: 待翻译原文的分词列表
|
||
k: 选择前k个
|
||
"""
|
||
# 创建一个字典来存储每个分词的最短 cn 列表
|
||
word_to_matches = {word: [] for word in ori_words}
|
||
|
||
# 填充字典
|
||
for match in matches:
|
||
cn, en, words = match
|
||
for word in words:
|
||
if word in word_to_matches:
|
||
word_to_matches[word].append(match)
|
||
|
||
# 对每个分词的匹配结果按 cn 长度排序并选出前 k 个
|
||
top_k_matches = []
|
||
for word, word_matches in word_to_matches.items():
|
||
word_matches.sort(key=lambda x: len(x[0])) # 按 cn 的长度排序
|
||
top_k_matches.extend(word_matches[:k]) # 选出每个原文分词匹配到的语料的最短的 k 个
|
||
|
||
return top_k_matches
|
||
|
||
def get_ref(ref, sheet_info_list, ori_str, ori_lang):
|
||
"""获取参考翻译"""
|
||
matches = partial_match_translation(ref, sheet_info_list, ori_str, min_word_len=2, lang=ori_lang)
|
||
ori_words = set()
|
||
|
||
if ori_lang == "CN":
|
||
ori_words = set(word for word in jieba.cut(clean_text(ori_str)) if len(word) >= 2)
|
||
elif ori_lang == "EN":
|
||
ori_words = cut_EN(clean_text(ori_str, "EN"))
|
||
else:
|
||
print("ori lang err")
|
||
|
||
top_k_matches = select_top_k_matches(matches, ori_words, 2)
|
||
|
||
# 对top_k_matches去重处理
|
||
unique_top_k_matches = []
|
||
seen = set()
|
||
|
||
for match in top_k_matches:
|
||
cn, en, words = match
|
||
match_tuple = (cn, en, tuple(sorted(words)))
|
||
if match_tuple not in seen:
|
||
seen.add(match_tuple)
|
||
unique_top_k_matches.append(match[0:2])
|
||
|
||
print(f"模糊匹配的前k条结果:\n{unique_top_k_matches}\n")
|
||
return unique_top_k_matches
|
||
|
||
# ============================================================================
|
||
# 对话处理函数
|
||
# ============================================================================
|
||
|
||
def parse_dialogues(input_ori):
|
||
"""将不同角色的对话分开,每篇对话拆分成多个人物"""
|
||
# 使用 defaultdict 来自动创建列表
|
||
dialogues_by_name = defaultdict(list)
|
||
|
||
# 定义正则表达式来匹配对话格式
|
||
dialogue_pattern = re.compile(r'<o=\d+>([^:]+) :[\s\S]*?</o>')
|
||
|
||
for dialogue in input_ori:
|
||
match = dialogue_pattern.match(dialogue)
|
||
if match:
|
||
name = match.group(1)
|
||
dialogues_by_name[name].append(dialogue)
|
||
else:
|
||
print(f"Warning in parse_dialogues: Dialogue format is incorrect: {dialogue}")
|
||
|
||
# 将 defaultdict 转换为普通的 list
|
||
grouped_dialogues = list(dialogues_by_name.values())
|
||
return grouped_dialogues
|
||
|
||
def extract_name(group):
|
||
"""从分好的列表提取角色名"""
|
||
if not group:
|
||
return None
|
||
|
||
# 所有对话的名字都相同,取第一个对话进行解析
|
||
first_dialogue = group[0]
|
||
|
||
# 定义正则表达式来匹配对话格式并提取名字
|
||
name_pattern = re.compile(r'<o=\d+>([^:]+) :[\s\S]*?</o>')
|
||
match = name_pattern.match(first_dialogue)
|
||
if match:
|
||
return match.group(1)
|
||
else:
|
||
print(f"Warning in extract name: Dialogue format is incorrect: {first_dialogue}")
|
||
return None
|
||
|
||
def filter_and_sample_dialogues(dialogues, k=DEFAULT_SAMPLE_K, r=DEFAULT_SAMPLE_R):
|
||
"""从分好的列表抽取指定数量"""
|
||
# 按中文对话的长度排序,取前 k 条
|
||
sorted_dialogues = sorted(dialogues, key=lambda x: len(x[1]), reverse=True)[:k]
|
||
|
||
# 从排序后的对话中随机抽取 r 条
|
||
sampled_dialogues = random.sample(sorted_dialogues, min(r, len(sorted_dialogues)))
|
||
|
||
return sampled_dialogues
|
||
|
||
def extract_dialogues(files, character_name):
|
||
"""
|
||
从多个Excel文件中提取指定角色的对话文本,用于剧情文本翻译参考语料
|
||
|
||
Args:
|
||
files: 包含(file_path, columns_to_extract)元组的列表
|
||
character_name: 需要提取对话的角色名称
|
||
|
||
Returns:
|
||
包含指定列内容的tuple的列表
|
||
"""
|
||
result = []
|
||
|
||
for file_path, columns_to_extract in files:
|
||
# 读取Excel文件
|
||
print(file_path)
|
||
xls = pd.ExcelFile(file_path)
|
||
|
||
# 遍历每一个工作表
|
||
for sheet_name in xls.sheet_names:
|
||
print(sheet_name)
|
||
# 读取工作表
|
||
df = pd.read_excel(xls, sheet_name=sheet_name)
|
||
|
||
# 使用columns_to_extract列表的第一项作为"角色"列
|
||
character_column_index = columns_to_extract[0]
|
||
|
||
# 筛选出"角色"列中为指定角色的行
|
||
character_rows = df[df.iloc[:, character_column_index] == character_name]
|
||
|
||
# 提取指定列的数据
|
||
for _, row in character_rows.iterrows():
|
||
extracted_data = tuple(row.iloc[columns_to_extract].values)
|
||
if not isinstance(extracted_data[1], str) or not isinstance(extracted_data[2], str):
|
||
pass
|
||
else:
|
||
result.append(extracted_data)
|
||
|
||
return result
|
||
|
||
def get_feature(ref, lang, user_feature_prompt_path=None, llm=None):
|
||
"""从参考语料提取人物关键语气特征"""
|
||
if len(ref) < 1:
|
||
return "无语气或用词倾向"
|
||
|
||
# 使用用户特定的角色特征提示词路径
|
||
if user_feature_prompt_path:
|
||
print(f"[DEBUG] 使用用户角色特征提示词: {user_feature_prompt_path}")
|
||
feature_prompt_path = user_feature_prompt_path
|
||
else:
|
||
print(f"[ERROR] 未提供用户角色特征提示词路径,无法进行角色特征分析")
|
||
return "无法获取角色特征:缺少用户配置"
|
||
|
||
agent = ConversableAgent(
|
||
"chatbot",
|
||
llm_config=get_llm_config_ag2(),
|
||
system_message=load_txt(feature_prompt_path),
|
||
code_execution_config=False,
|
||
function_map=None,
|
||
human_input_mode="NEVER",
|
||
)
|
||
|
||
reply = agent.generate_reply(messages=[{"content": "\n以下是需要分析的文本:\n" + ref, "role": "user"}])
|
||
print(reply)
|
||
return reply
|
||
|
||
# ============================================================================
|
||
# 结果处理函数
|
||
# ============================================================================
|
||
|
||
def extract_and_sort_texts(input_string):
|
||
"""剧情文本译文后处理:将所有角色对话整合并按序号排序"""
|
||
# 定义正则表达式来匹配每一条文本
|
||
pattern = re.compile(r'(<s=\d+>.*?</s>)', re.DOTALL)
|
||
|
||
# 使用 findall 方法找到所有匹配项
|
||
matches = pattern.findall(input_string)
|
||
|
||
# 检查是否找到匹配项
|
||
if not matches:
|
||
return []
|
||
|
||
# 将匹配项按序号排序
|
||
def extract_index(text):
|
||
match = re.search(r'<s=(\d+)>', text)
|
||
return int(match.group(1)) if match else float('inf')
|
||
|
||
sorted_matches = sorted(matches, key=extract_index)
|
||
|
||
# 定义正则表达式来去除人物名及其后面的冒号
|
||
name_pattern = re.compile(r'(<s=\d+>)[^:]+:')
|
||
|
||
# 去除人物名及其后面的冒号
|
||
cleaned_texts = []
|
||
for text in sorted_matches:
|
||
cleaned_text = name_pattern.sub(r'\1', text)
|
||
cleaned_texts.append(cleaned_text)
|
||
|
||
return cleaned_texts
|
||
|
||
def process_file(file_path):
|
||
"""处理翻译结果丢失<t>标签导致匹配不上正则式无法正常写入"""
|
||
with open(file_path, 'r', encoding='utf-8') as file:
|
||
content = file.read()
|
||
|
||
# 正则表达式匹配<s={i}>...</s>标签对及其内容
|
||
pattern = re.compile(r'<s=(\d+)>(.*?)</s>', re.DOTALL)
|
||
matches = pattern.finditer(content)
|
||
|
||
updated_content = content
|
||
for match in matches:
|
||
i = match.group(1)
|
||
text = match.group(2)
|
||
# 打印匹配到的内容
|
||
print(f"匹配到的内容:<s={i}>{text}</s>")
|
||
# 使用正则表达式检查是否包含<t></t>标签对
|
||
if re.search(r'<t>.*?</t>', text, re.DOTALL):
|
||
print(f"第{i}条语句包含t")
|
||
pass
|
||
else:
|
||
# 在</s>标签前插入<t></t>标签
|
||
print(f"第{i}条语句缺失t")
|
||
new_text = text + '<t></t>'
|
||
updated_content = updated_content.replace(match.group(0), f'<s={i}>{new_text}</s>')
|
||
|
||
print("\n***" + updated_content + "***\n")
|
||
|
||
with open(file_path, 'w', encoding='utf-8') as file:
|
||
file.write(updated_content)
|
||
|
||
def extract_data_from_text(file_path, pattern):
|
||
"""基于正则表达式,按规则从txt文件提取每一条对话"""
|
||
with open(file_path, 'r', encoding='utf-8') as file:
|
||
text = file.read()
|
||
return pattern.findall(text)
|
||
|
||
def write_res(input_ori, all_res, ori_path, mode='plot', sheetidx=None):
|
||
"""写入翻译结果"""
|
||
timestamp = datetime.datetime.now().strftime("%m%d%H%M%S")
|
||
|
||
if mode == 'plot':
|
||
smr = ''
|
||
for r in all_res:
|
||
smr += r.summary
|
||
sort_res = extract_and_sort_texts(smr)
|
||
print(f"all smr of res:\n{smr}\n")
|
||
|
||
s_all = ''
|
||
record_all = ''
|
||
for r in sort_res:
|
||
s_all += r
|
||
s_all += '\n'
|
||
for r in all_res:
|
||
record_all += str(r)
|
||
|
||
write_to_txt(f"./res/input_s_{timestamp}.txt", s_all, 'w')
|
||
ori_all = ""
|
||
for i in input_ori:
|
||
ori_all += i
|
||
write_to_txt(f"./res/input_o_{timestamp}.txt", ori_all, 'w')
|
||
|
||
elif mode == 'sys':
|
||
s_all = ''
|
||
record_all = ''
|
||
for r in all_res:
|
||
if isinstance(r, str):
|
||
s_all += r
|
||
else:
|
||
s_all += r.summary
|
||
record_all += str(r)
|
||
|
||
write_to_txt(f"./res/input_s_{timestamp}.txt", s_all, 'w')
|
||
ori_all = ""
|
||
for i in input_ori:
|
||
ori_all += i
|
||
write_to_txt(f"./res/input_o_{timestamp}.txt", ori_all, 'w')
|
||
|
||
# 统一写入表格
|
||
process_file(f"./res/input_s_{timestamp}.txt")
|
||
|
||
# 输入txt文件路径
|
||
s_txt_file_path = f"./res/input_s_{timestamp}.txt"
|
||
o_txt_file_path = f"./res/input_o_{timestamp}.txt"
|
||
|
||
# 用户指定的sheet索引(例如:0表示第一个sheet)
|
||
sheet_index = 0
|
||
|
||
# 是否包含name字段 false为系统字 true为剧情台词
|
||
include_name = False
|
||
|
||
if include_name:
|
||
# 提取<s={i}></s>标签对的数据
|
||
s_pattern = re.compile(r'<s=(\d+)>([^:]+):(.*?)<t>(.*?)</t></s>', re.DOTALL)
|
||
s_data = extract_data_from_text(s_txt_file_path, s_pattern)
|
||
s_data = [(int(i), name, translated_sentence, term if term else None) for i, name, translated_sentence, term in s_data]
|
||
|
||
# 提取<o={i}></o>标签对的数据
|
||
o_pattern = re.compile(r'<o=(\d+)>([^:]+):(.*?)</o>', re.DOTALL)
|
||
o_data = extract_data_from_text(o_txt_file_path, o_pattern)
|
||
o_data = [(int(i), name, sentence) for i, name, sentence in o_data]
|
||
else:
|
||
# 提取<s={i}></s>标签对的数据(不包含name)
|
||
s_pattern = re.compile(r'<s=(\d+)>(.*?)<t>(.*?)</t></s>', re.DOTALL)
|
||
s_data = extract_data_from_text(s_txt_file_path, s_pattern)
|
||
s_data = [(int(i), None, translated_sentence, term if term else None) for i, translated_sentence, term in s_data]
|
||
|
||
# 提取<o={i}></o>标签对的数据(不包含name)
|
||
o_pattern = re.compile(r'<o=(\d+)>(.*?)</o>', re.DOTALL)
|
||
o_data = extract_data_from_text(o_txt_file_path, o_pattern)
|
||
o_data = [(int(i), None, sentence) for i, sentence in o_data]
|
||
|
||
# 生成文件名
|
||
resdir = f'./res{os.path.basename(ori_path)}'
|
||
os.makedirs(resdir, exist_ok=True)
|
||
if sheetidx:
|
||
print(type(sheetidx))
|
||
excel_file_path = f'{resdir}/{os.path.basename(ori_path)}-sheet{int(sheetidx):03}-{timestamp}.xlsx'
|
||
else:
|
||
excel_file_path = f'{resdir}/{os.path.basename(ori_path)}-{timestamp}.xlsx'
|
||
|
||
# 将数据写入Excel文件
|
||
write_data_to_excel(s_data, o_data, sheet_index, excel_file_path, include_name)
|
||
|
||
# 对Excel文件进行后处理
|
||
post_process_excel(excel_file_path)
|
||
|
||
write_to_txt(f'./res/{timestamp}-record.txt', record_all, 'w')
|
||
|
||
def append_to_log(start_time, ori_len, filename, json_path='translation_log.json'):
|
||
"""
|
||
将翻译任务的开始时间和原文长度追加写入JSON文件
|
||
|
||
Args:
|
||
start_time: 开始时间
|
||
ori_len: 原文长度
|
||
filename: 文件名
|
||
json_path: JSON文件路径
|
||
"""
|
||
# 准备新数据
|
||
new_data = {
|
||
"start_time": start_time,
|
||
"original_length": ori_len,
|
||
"file_name": filename
|
||
}
|
||
|
||
# 检查文件是否存在
|
||
if os.path.exists(json_path):
|
||
# 读取现有数据
|
||
try:
|
||
with open(json_path, 'r', encoding='utf-8') as f:
|
||
data = json.load(f)
|
||
if not isinstance(data, list):
|
||
data = [data]
|
||
except json.JSONDecodeError:
|
||
# 如果文件为空或格式错误,创建新列表
|
||
data = []
|
||
else:
|
||
data = []
|
||
|
||
# 追加新数据
|
||
data.append(new_data)
|
||
|
||
# 写入文件
|
||
with open(json_path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=4)
|
||
|
||
def call_turn_cnt(ori_text, timestamp):
|
||
"""翻译轮次计数装饰器"""
|
||
count = 0
|
||
trans_rec = str(ori_text) + '\n'
|
||
|
||
def decorator(func):
|
||
def wrapper(recipient, messages, sender, config, *args, **kwargs):
|
||
nonlocal count
|
||
nonlocal trans_rec
|
||
count += 1
|
||
print(f"当前为第 {count} 轮")
|
||
trans_rec = trans_rec + recipient.chat_messages_for_summary(sender)[-1]['content'] + '\n'
|
||
write_to_txt(f'./res/{timestamp}-record.txt', trans_rec, 'a', line=True)
|
||
return func(recipient, messages, sender, config, *args, **kwargs)
|
||
return wrapper
|
||
return decorator |