Merge branch 'release/v0.3.2' into feature/rag2
* release/v0.3.2: (245 commits) fix(conversation_schema): refine citations field type to Dict[str, Any] fix(tool_controller): re-raise HTTPException to preserve original status codes fix(workflow): add reasoning content, suggested questions, citations and audio status support feat(workflow): augment logging queries and ameliorate error handling fix(api_key): bypass publication check for SERVICE type API keys fix(multimodal_service): add '文档内容:' prefix to document text and simplify image placeholder text fix(api): convert config_id to string in write_router fix(api): convert end_user_id to string in write_router fix(multimodal_service): refactor image processing to use intermediate list before extending result fix(web): node status ui fix(api): correct import paths in memory_read and celery task command fix(api): correct import paths in memory_read and celery task command refactor(tool): flatten request body parameters for model exposure fix(api): correct import paths in memory_read and celery task command refactor(workflow): streamline node execution handling and log service logic feat(web): http request add process feat(web): workflow app logs fix(app_chat_service,draft_run_service): move system_prompt augmentation before LangChainAgent instantiation fix(app_chat_service,draft_run_service): move system_prompt augmentation before LangChainAgent instantiation refactor(http_request): simplify request handling and remove unused fields ... # Conflicts: # api/app/controllers/file_controller.py # api/app/tasks.py
This commit is contained in:
167
api/app/tasks.py
167
api/app/tasks.py
@@ -34,7 +34,7 @@ from app.core.rag.prompts.generator import question_proposal
|
||||
from app.core.rag.vdb.elasticsearch.elasticsearch_vector import (
|
||||
ElasticSearchVectorFactory,
|
||||
)
|
||||
from app.db import get_db, get_db_context
|
||||
from app.db import get_db_context
|
||||
from app.models import Document, File, Knowledge
|
||||
from app.models.end_user_model import EndUser
|
||||
from app.schemas import document_schema, file_schema
|
||||
@@ -280,8 +280,39 @@ def parse_document(file_key: str, document_id: uuid.UUID, file_name: str = ""):
|
||||
# Prepare vision_model for parsing
|
||||
vision_model = _build_vision_model(file_name, db_knowledge)
|
||||
|
||||
# 先将文件读入内存,避免解析过程中依赖 NFS 文件持续可访问
|
||||
# python-docx 等库在 binary=None 时会用路径直接打开文件,
|
||||
# 在 NFS/共享存储上可能因缓存失效导致 "Package not found"
|
||||
max_wait_seconds = 30
|
||||
wait_interval = 2
|
||||
waited = 0
|
||||
file_binary = None
|
||||
while waited <= max_wait_seconds:
|
||||
# os.listdir 强制 NFS 客户端刷新目录缓存
|
||||
parent_dir = os.path.dirname(file_path)
|
||||
try:
|
||||
os.listdir(parent_dir)
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
with open(file_path, "rb") as f:
|
||||
file_binary = f.read()
|
||||
if not file_binary:
|
||||
# NFS 上文件存在但内容为空(可能还在同步中)
|
||||
raise IOError(f"File is empty (0 bytes), NFS may still be syncing: {file_path}")
|
||||
break
|
||||
except (FileNotFoundError, IOError) as e:
|
||||
if waited >= max_wait_seconds:
|
||||
raise type(e)(
|
||||
f"File not accessible at '{file_path}' after waiting {max_wait_seconds}s: {e}"
|
||||
)
|
||||
logger.warning(f"File not ready on this node, retrying in {wait_interval}s: {file_path} ({e})")
|
||||
time.sleep(wait_interval)
|
||||
waited += wait_interval
|
||||
|
||||
from app.core.rag.app.naive import chunk
|
||||
res = chunk(filename=file_name,
|
||||
logger.info(f"[ParseDoc] file_binary size={len(file_binary)} bytes, type={type(file_binary).__name__}, bool={bool(file_binary)}")
|
||||
res = chunk(filename=file_path,
|
||||
binary=file_binary,
|
||||
from_page=0,
|
||||
to_page=DEFAULT_PARSE_TO_PAGE,
|
||||
@@ -485,7 +516,7 @@ def build_graphrag_for_kb(kb_id: uuid.UUID):
|
||||
db_knowledge = db.query(Knowledge).filter(Knowledge.id == kb_id).first()
|
||||
if db_knowledge is None:
|
||||
logger.error(f"[GraphRAG-KB] knowledge={kb_id} not found")
|
||||
return f"build knowledge graph failed: knowledge not found"
|
||||
return "build knowledge graph failed: knowledge not found"
|
||||
|
||||
if not (db_knowledge.parser_config and
|
||||
db_knowledge.parser_config.get("graphrag", {}).get("use_graphrag", False)):
|
||||
@@ -568,7 +599,7 @@ def build_graphrag_for_document(document_id: str, knowledge_id: str):
|
||||
db_knowledge = db.query(Knowledge).filter(Knowledge.id == uuid.UUID(knowledge_id)).first()
|
||||
if db_document is None or db_knowledge is None:
|
||||
logger.error(f"[GraphRAG] document={document_id} or knowledge={knowledge_id} not found")
|
||||
return f"build_graphrag_for_document failed: record not found"
|
||||
return "build_graphrag_for_document failed: record not found"
|
||||
|
||||
graphrag_conf = db_knowledge.parser_config.get("graphrag", {})
|
||||
with_resolution = graphrag_conf.get("resolution", False)
|
||||
@@ -647,7 +678,7 @@ def sync_knowledge_for_kb(kb_id: uuid.UUID):
|
||||
db_knowledge = db.query(Knowledge).filter(Knowledge.id == kb_id).first()
|
||||
if db_knowledge is None:
|
||||
logger.error(f"[SyncKB] knowledge={kb_id} not found")
|
||||
return f"sync knowledge failed: knowledge not found"
|
||||
return "sync knowledge failed: knowledge not found"
|
||||
|
||||
# 1. get vector_service
|
||||
vector_service = ElasticSearchVectorFactory().init_vector(knowledge=db_knowledge)
|
||||
@@ -2023,7 +2054,7 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
|
||||
end_users = db.query(EndUser).all()
|
||||
if not end_users:
|
||||
logger.info("没有终端用户,跳过遗忘周期")
|
||||
return {"status": "SUCCESS", "message": "没有终端用户",
|
||||
return {"status": "SUCCESS", "message": "没有终端用户",
|
||||
"report": {"merged_count": 0, "failed_count": 0, "processed_users": 0},
|
||||
"duration_seconds": time.time() - start_time}
|
||||
|
||||
@@ -2037,7 +2068,7 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
|
||||
# 获取用户配置(自动回退到工作空间默认配置)
|
||||
connected_config = get_end_user_connected_config(str(end_user.id), db)
|
||||
user_config_id = resolve_config_id(connected_config.get("memory_config_id"), db)
|
||||
|
||||
|
||||
if not user_config_id:
|
||||
failed_users.append({"end_user_id": str(end_user.id), "error": "无法获取配置"})
|
||||
continue
|
||||
@@ -2046,13 +2077,13 @@ def run_forgetting_cycle_task(self, config_id: Optional[uuid.UUID] = None) -> Di
|
||||
report = await forget_service.trigger_forgetting_cycle(
|
||||
db=db, end_user_id=str(end_user.id), config_id=user_config_id
|
||||
)
|
||||
|
||||
|
||||
total_merged += report.get('merged_count', 0)
|
||||
total_failed += report.get('failed_count', 0)
|
||||
processed_users += 1
|
||||
|
||||
|
||||
logger.info(f"用户 {end_user.id}: 融合 {report.get('merged_count', 0)} 对节点")
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"处理用户 {end_user.id} 失败: {e}", exc_info=True)
|
||||
failed_users.append({"end_user_id": str(end_user.id), "error": str(e)})
|
||||
@@ -2799,18 +2830,18 @@ def run_incremental_clustering(
|
||||
包含任务执行结果的字典
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
|
||||
async def _run() -> Dict[str, Any]:
|
||||
from app.core.logging_config import get_logger
|
||||
from app.repositories.neo4j.neo4j_connector import Neo4jConnector
|
||||
from app.core.memory.storage_services.clustering_engine.label_propagation import LabelPropagationEngine
|
||||
|
||||
|
||||
logger = get_logger(__name__)
|
||||
logger.info(
|
||||
f"[IncrementalClustering] 开始增量聚类任务 - end_user_id={end_user_id}, "
|
||||
f"实体数={len(new_entity_ids)}, llm_model_id={llm_model_id}"
|
||||
)
|
||||
|
||||
|
||||
connector = Neo4jConnector()
|
||||
try:
|
||||
engine = LabelPropagationEngine(
|
||||
@@ -2818,12 +2849,12 @@ def run_incremental_clustering(
|
||||
llm_model_id=llm_model_id,
|
||||
embedding_model_id=embedding_model_id,
|
||||
)
|
||||
|
||||
|
||||
# 执行增量聚类
|
||||
await engine.run(end_user_id=end_user_id, new_entity_ids=new_entity_ids)
|
||||
|
||||
|
||||
logger.info(f"[IncrementalClustering] 增量聚类完成 - end_user_id={end_user_id}")
|
||||
|
||||
|
||||
return {
|
||||
"status": "SUCCESS",
|
||||
"end_user_id": end_user_id,
|
||||
@@ -2834,18 +2865,18 @@ def run_incremental_clustering(
|
||||
raise
|
||||
finally:
|
||||
await connector.close()
|
||||
|
||||
|
||||
try:
|
||||
loop = set_asyncio_event_loop()
|
||||
result = loop.run_until_complete(_run())
|
||||
result["elapsed_time"] = time.time() - start_time
|
||||
result["task_id"] = self.request.id
|
||||
|
||||
|
||||
logger.info(
|
||||
f"[IncrementalClustering] 任务完成 - task_id={self.request.id}, "
|
||||
f"elapsed_time={result['elapsed_time']:.2f}s"
|
||||
)
|
||||
|
||||
|
||||
return result
|
||||
except Exception as e:
|
||||
elapsed_time = time.time() - start_time
|
||||
@@ -3132,29 +3163,11 @@ def extract_user_metadata_task(
|
||||
logger.info(f"[CELERY METADATA] No metadata extracted for end_user_id={end_user_id}")
|
||||
return {"status": "SUCCESS", "result": "no_metadata_extracted"}
|
||||
|
||||
user_metadata, aliases_to_add, aliases_to_remove = extract_result
|
||||
logger.info(f"[CELERY METADATA] LLM 别名新增: {aliases_to_add}, 移除: {aliases_to_remove}")
|
||||
|
||||
# 4. 清洗元数据、覆盖写入元数据和别名
|
||||
def clean_metadata(raw: dict) -> dict:
|
||||
"""递归移除空字符串、空列表、空字典。"""
|
||||
result = {}
|
||||
for k, v in raw.items():
|
||||
if v == "" or v == []:
|
||||
continue
|
||||
if isinstance(v, dict):
|
||||
cleaned = clean_metadata(v)
|
||||
if cleaned:
|
||||
result[k] = cleaned
|
||||
else:
|
||||
result[k] = v
|
||||
return result
|
||||
|
||||
raw_dict = user_metadata.model_dump(exclude_none=True) if user_metadata else {}
|
||||
logger.info(f"[CELERY METADATA] LLM 输出完整元数据: {json.dumps(raw_dict, ensure_ascii=False)}")
|
||||
|
||||
cleaned = clean_metadata(raw_dict) if raw_dict else {}
|
||||
logger.info(f"[CELERY METADATA] 清洗后元数据: {json.dumps(cleaned, ensure_ascii=False)}")
|
||||
metadata_changes, aliases_to_add, aliases_to_remove = extract_result
|
||||
logger.info(
|
||||
f"[CELERY METADATA] LLM 元数据变更: {[c.model_dump() for c in metadata_changes]}, "
|
||||
f"别名新增: {aliases_to_add}, 移除: {aliases_to_remove}"
|
||||
)
|
||||
|
||||
from datetime import datetime as dt, timezone as tz
|
||||
now = dt.now(tz.utc).isoformat()
|
||||
@@ -3182,15 +3195,49 @@ def extract_user_metadata_task(
|
||||
end_user = EndUserRepository(db).get_by_id(end_user_uuid)
|
||||
|
||||
if info:
|
||||
# 元数据覆盖写入
|
||||
if cleaned:
|
||||
existing_meta = info.meta_data if info.meta_data else {}
|
||||
# 4. 元数据增量更新(按 LLM 输出的变更操作逐条执行,所有字段均为列表类型)
|
||||
if metadata_changes:
|
||||
# 深拷贝,确保 SQLAlchemy 能检测到变更
|
||||
import copy
|
||||
existing_meta = copy.deepcopy(info.meta_data) if info.meta_data else {}
|
||||
updated_at = dict(existing_meta.get("_updated_at", {}))
|
||||
_update_timestamps(existing_meta, cleaned, updated_at, now)
|
||||
final = dict(cleaned)
|
||||
final["_updated_at"] = updated_at
|
||||
info.meta_data = final
|
||||
logger.info("[CELERY METADATA] 覆盖写入元数据")
|
||||
|
||||
for change in metadata_changes:
|
||||
field_path = change.field_path
|
||||
action = change.action
|
||||
value = change.value
|
||||
|
||||
if not value or not value.strip():
|
||||
continue
|
||||
|
||||
# 定位到目标字段的父级节点
|
||||
parts = field_path.split(".")
|
||||
target = existing_meta
|
||||
for part in parts[:-1]:
|
||||
target = target.setdefault(part, {})
|
||||
leaf = parts[-1]
|
||||
|
||||
current_list = target.get(leaf, [])
|
||||
|
||||
if action == "set":
|
||||
if value not in current_list:
|
||||
# 新值插入列表头部,保证按时间从新到旧排序
|
||||
current_list.insert(0, value)
|
||||
target[leaf] = current_list
|
||||
logger.info(f"[CELERY METADATA] set {field_path} = {value}")
|
||||
|
||||
elif action == "remove":
|
||||
if value in current_list:
|
||||
current_list.remove(value)
|
||||
target[leaf] = current_list
|
||||
logger.info(f"[CELERY METADATA] remove {value} from {field_path}")
|
||||
|
||||
updated_at[field_path] = now
|
||||
|
||||
existing_meta["_updated_at"] = updated_at
|
||||
# 赋值深拷贝后的新对象,SQLAlchemy 会检测到字段变更并写入
|
||||
info.meta_data = existing_meta
|
||||
logger.info(f"[CELERY METADATA] 增量更新元数据完成: {json.dumps(existing_meta, ensure_ascii=False)}")
|
||||
|
||||
# 别名增量增删:(已有 - remove) + add
|
||||
old_aliases = info.aliases if info.aliases else []
|
||||
@@ -3226,12 +3273,28 @@ def extract_user_metadata_task(
|
||||
from app.models.end_user_info_model import EndUserInfo
|
||||
initial_aliases = filtered_add # 新记录只有 add,没有 remove
|
||||
first_alias = initial_aliases[0] if initial_aliases else ""
|
||||
if first_alias or cleaned:
|
||||
|
||||
# 从变更操作构建初始元数据(所有字段均为列表类型)
|
||||
initial_meta = {}
|
||||
for change in metadata_changes:
|
||||
if change.action == "set" and change.value is not None and change.value.strip():
|
||||
parts = change.field_path.split(".")
|
||||
target = initial_meta
|
||||
for part in parts[:-1]:
|
||||
target = target.setdefault(part, {})
|
||||
leaf = parts[-1]
|
||||
current_list = target.get(leaf, [])
|
||||
if change.value not in current_list:
|
||||
# 新值插入列表头部,保证按时间从新到旧排序
|
||||
current_list.insert(0, change.value)
|
||||
target[leaf] = current_list
|
||||
|
||||
if first_alias or initial_meta:
|
||||
new_info = EndUserInfo(
|
||||
end_user_id=end_user_uuid,
|
||||
other_name=first_alias or "",
|
||||
aliases=initial_aliases,
|
||||
meta_data=cleaned if cleaned else None,
|
||||
meta_data=initial_meta if initial_meta else None,
|
||||
)
|
||||
db.add(new_info)
|
||||
if end_user and first_alias and (
|
||||
|
||||
Reference in New Issue
Block a user