Merge pull request #715 from SuanmoSuanyangTechnology/fix/Timebomb_029
fix(app)
This commit is contained in:
@@ -44,6 +44,8 @@ class OSSStorage(StorageBackend):
|
||||
access_key_id: str,
|
||||
access_key_secret: str,
|
||||
bucket_name: str,
|
||||
connect_timeout: int = 30,
|
||||
multipart_threshold: int = 10 * 1024 * 1024, # 10MB
|
||||
):
|
||||
"""
|
||||
Initialize the OSSStorage backend.
|
||||
@@ -53,6 +55,8 @@ class OSSStorage(StorageBackend):
|
||||
access_key_id: The Aliyun access key ID.
|
||||
access_key_secret: The Aliyun access key secret.
|
||||
bucket_name: The name of the OSS bucket.
|
||||
connect_timeout: Connection timeout in seconds (default: 30).
|
||||
multipart_threshold: File size threshold for multipart upload (default: 10MB).
|
||||
|
||||
Raises:
|
||||
StorageConfigError: If any required configuration is missing.
|
||||
@@ -69,10 +73,17 @@ class OSSStorage(StorageBackend):
|
||||
|
||||
self.endpoint = endpoint
|
||||
self.bucket_name = bucket_name
|
||||
self.multipart_threshold = multipart_threshold
|
||||
|
||||
try:
|
||||
auth = oss2.Auth(access_key_id, access_key_secret)
|
||||
self.bucket = oss2.Bucket(auth, endpoint, bucket_name)
|
||||
# 设置超时和重试
|
||||
self.bucket = oss2.Bucket(
|
||||
auth,
|
||||
endpoint,
|
||||
bucket_name,
|
||||
connect_timeout=connect_timeout
|
||||
)
|
||||
logger.info(
|
||||
f"OSSStorage initialized with endpoint: {endpoint}, bucket: {bucket_name}"
|
||||
)
|
||||
@@ -108,21 +119,38 @@ class OSSStorage(StorageBackend):
|
||||
if content_type:
|
||||
headers["Content-Type"] = content_type
|
||||
|
||||
self.bucket.put_object(file_key, content, headers=headers if headers else None)
|
||||
# 大文件使用分片上传
|
||||
if len(content) > self.multipart_threshold:
|
||||
logger.info(f"Using multipart upload for large file: {file_key} ({len(content)} bytes)")
|
||||
upload_id = self.bucket.init_multipart_upload(file_key, headers=headers if headers else None).upload_id
|
||||
parts = []
|
||||
part_size = 5 * 1024 * 1024 # 5MB per part
|
||||
part_num = 1
|
||||
|
||||
for offset in range(0, len(content), part_size):
|
||||
chunk = content[offset:offset + part_size]
|
||||
result = self.bucket.upload_part(file_key, upload_id, part_num, chunk)
|
||||
parts.append(oss2.models.PartInfo(part_num, result.etag))
|
||||
part_num += 1
|
||||
|
||||
self.bucket.complete_multipart_upload(file_key, upload_id, parts)
|
||||
else:
|
||||
self.bucket.put_object(file_key, content, headers=headers if headers else None)
|
||||
|
||||
logger.info(f"File uploaded to OSS successfully: {file_key}")
|
||||
return file_key
|
||||
|
||||
except OssError as e:
|
||||
logger.error(f"OSS error uploading file {file_key}: {e}")
|
||||
raise StorageUploadError(
|
||||
message=f"Failed to upload file to OSS: {e.message}",
|
||||
message=f"Failed to upload file to OSS: {str(e)}",
|
||||
file_key=file_key,
|
||||
cause=e,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to upload file to OSS {file_key}: {e}")
|
||||
raise StorageUploadError(
|
||||
message=f"Failed to upload file to OSS: {e}",
|
||||
message=f"Failed to upload file to OSS: {str(e)}",
|
||||
file_key=file_key,
|
||||
cause=e,
|
||||
)
|
||||
@@ -135,28 +163,73 @@ class OSSStorage(StorageBackend):
|
||||
) -> int:
|
||||
"""Upload from async stream to OSS. Returns total bytes written."""
|
||||
buf = io.BytesIO()
|
||||
headers = {"Content-Type": content_type} if content_type else None
|
||||
upload_id = None
|
||||
|
||||
try:
|
||||
# 收集流数据
|
||||
total_size = 0
|
||||
async for chunk in stream:
|
||||
if not chunk:
|
||||
continue
|
||||
buf.write(chunk)
|
||||
total_size += len(chunk)
|
||||
|
||||
content = buf.getvalue()
|
||||
headers = {"Content-Type": content_type} if content_type else None
|
||||
self.bucket.put_object(file_key, content, headers=headers)
|
||||
logger.info(f"File stream uploaded to OSS successfully: {file_key}")
|
||||
return len(content)
|
||||
|
||||
if not content:
|
||||
raise StorageUploadError(
|
||||
message="Empty stream content",
|
||||
file_key=file_key,
|
||||
)
|
||||
|
||||
# 大文件使用分片上传
|
||||
if len(content) > self.multipart_threshold:
|
||||
logger.info(f"Using multipart upload for stream: {file_key} ({len(content)} bytes)")
|
||||
upload_id = self.bucket.init_multipart_upload(file_key, headers=headers).upload_id
|
||||
parts = []
|
||||
part_size = 5 * 1024 * 1024 # 5MB
|
||||
part_num = 1
|
||||
|
||||
for offset in range(0, len(content), part_size):
|
||||
chunk = content[offset:offset + part_size]
|
||||
result = self.bucket.upload_part(file_key, upload_id, part_num, chunk)
|
||||
parts.append(oss2.models.PartInfo(part_num, result.etag))
|
||||
part_num += 1
|
||||
|
||||
self.bucket.complete_multipart_upload(file_key, upload_id, parts)
|
||||
else:
|
||||
self.bucket.put_object(file_key, content, headers=headers)
|
||||
|
||||
logger.info(f"File stream uploaded to OSS successfully: {file_key} ({total_size} bytes)")
|
||||
return total_size
|
||||
|
||||
except OssError as e:
|
||||
if upload_id:
|
||||
try:
|
||||
self.bucket.abort_multipart_upload(file_key, upload_id)
|
||||
except:
|
||||
pass
|
||||
logger.error(f"OSS error stream uploading file {file_key}: {e}")
|
||||
raise StorageUploadError(
|
||||
message=f"Failed to stream upload file to OSS: {e.message}",
|
||||
message=f"Failed to stream upload file to OSS: {str(e)}",
|
||||
file_key=file_key,
|
||||
cause=e,
|
||||
)
|
||||
except Exception as e:
|
||||
if upload_id:
|
||||
try:
|
||||
self.bucket.abort_multipart_upload(file_key, upload_id)
|
||||
except:
|
||||
pass
|
||||
logger.error(f"Failed to stream upload file to OSS {file_key}: {e}")
|
||||
raise StorageUploadError(
|
||||
message=f"Failed to stream upload file to OSS: {e}",
|
||||
message=f"Failed to stream upload file to OSS: {str(e)}",
|
||||
file_key=file_key,
|
||||
cause=e,
|
||||
)
|
||||
finally:
|
||||
buf.close()
|
||||
|
||||
async def download(self, file_key: str) -> bytes:
|
||||
"""
|
||||
@@ -182,14 +255,14 @@ class OSSStorage(StorageBackend):
|
||||
except OssError as e:
|
||||
logger.error(f"OSS error downloading file {file_key}: {e}")
|
||||
raise StorageDownloadError(
|
||||
message=f"Failed to download file from OSS: {e.message}",
|
||||
message=f"Failed to download file from OSS: {str(e)}",
|
||||
file_key=file_key,
|
||||
cause=e,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to download file from OSS {file_key}: {e}")
|
||||
raise StorageDownloadError(
|
||||
message=f"Failed to download file from OSS: {e}",
|
||||
message=f"Failed to download file from OSS: {str(e)}",
|
||||
file_key=file_key,
|
||||
cause=e,
|
||||
)
|
||||
@@ -215,14 +288,14 @@ class OSSStorage(StorageBackend):
|
||||
except OssError as e:
|
||||
logger.error(f"OSS error deleting file {file_key}: {e}")
|
||||
raise StorageDeleteError(
|
||||
message=f"Failed to delete file from OSS: {e.message}",
|
||||
message=f"Failed to delete file from OSS: {str(e)}",
|
||||
file_key=file_key,
|
||||
cause=e,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete file from OSS {file_key}: {e}")
|
||||
raise StorageDeleteError(
|
||||
message=f"Failed to delete file from OSS: {e}",
|
||||
message=f"Failed to delete file from OSS: {str(e)}",
|
||||
file_key=file_key,
|
||||
cause=e,
|
||||
)
|
||||
|
||||
@@ -57,6 +57,12 @@ class Conversation(Base):
|
||||
workspace = relationship("Workspace")
|
||||
messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")
|
||||
|
||||
@property
|
||||
def is_first_user_message(self):
|
||||
"""判断当前是否是用户的第一条消息(无视开场白)"""
|
||||
user_message_count = sum(1 for msg in self.messages if msg.role == "user")
|
||||
return user_message_count == 1
|
||||
|
||||
|
||||
class ConversationDetail(Base):
|
||||
__tablename__ = "conversation_details"
|
||||
|
||||
@@ -276,7 +276,7 @@ class AgentConfigCreate(BaseModel):
|
||||
|
||||
# 记忆配置
|
||||
memory: MemoryConfig = Field(
|
||||
default_factory=lambda: MemoryConfig(enabled=True),
|
||||
default_factory=lambda: MemoryConfig(enabled=False),
|
||||
description="对话历史记忆配置"
|
||||
)
|
||||
|
||||
|
||||
@@ -140,13 +140,13 @@ class AppChatService:
|
||||
# 如果是新会话且有开场白,作为第一条 assistant 消息写入数据库
|
||||
is_new_conversation = len(history) == 0
|
||||
if is_new_conversation:
|
||||
opening = self.agent_service._get_opening_statement(features_config, True, variables)
|
||||
opening, suggested_questions = self.agent_service._get_opening_statement(features_config, True, variables)
|
||||
if opening:
|
||||
self.conversation_service.add_message(
|
||||
conversation_id=conversation_id,
|
||||
role="assistant",
|
||||
content=opening,
|
||||
meta_data={}
|
||||
meta_data={"suggested_questions": suggested_questions}
|
||||
)
|
||||
# 重新加载历史(包含刚写入的开场白)
|
||||
history = await self.conversation_service.get_conversation_history(
|
||||
@@ -367,13 +367,13 @@ class AppChatService:
|
||||
# 如果是新会话且有开场白,作为第一条 assistant 消息写入数据库
|
||||
is_new_conversation = len(history) == 0
|
||||
if is_new_conversation:
|
||||
opening = self.agent_service._get_opening_statement(features_config, True, variables)
|
||||
opening, suggested_questions = self.agent_service._get_opening_statement(features_config, True, variables)
|
||||
if opening:
|
||||
self.conversation_service.add_message(
|
||||
conversation_id=conversation_id,
|
||||
role="assistant",
|
||||
content=opening,
|
||||
meta_data={}
|
||||
meta_data={"suggested_questions": suggested_questions}
|
||||
)
|
||||
# 重新加载历史(包含刚写入的开场白)
|
||||
history = await self.conversation_service.get_conversation_history(
|
||||
|
||||
@@ -1084,7 +1084,6 @@ class AppService:
|
||||
if not exists:
|
||||
cleaned["memory_config_id"] = None
|
||||
cleaned.pop("memory_content", None)
|
||||
cleaned["enabled"] = False
|
||||
return cleaned
|
||||
|
||||
exists = self.db.query(
|
||||
@@ -1096,7 +1095,6 @@ class AppService:
|
||||
if not exists:
|
||||
cleaned["memory_config_id"] = None
|
||||
cleaned.pop("memory_content", None)
|
||||
cleaned["enabled"] = False
|
||||
|
||||
return cleaned
|
||||
|
||||
|
||||
@@ -214,14 +214,14 @@ class ConversationService:
|
||||
|
||||
conversation.message_count += 1
|
||||
|
||||
if conversation.message_count == 1 and role == "user":
|
||||
self.db.commit()
|
||||
self.db.refresh(message)
|
||||
|
||||
if conversation.is_first_user_message and role == "user":
|
||||
conversation.title = (
|
||||
content[:50] + ("..." if len(content) > 50 else "")
|
||||
)
|
||||
|
||||
self.db.commit()
|
||||
self.db.refresh(message)
|
||||
|
||||
logger.info(
|
||||
"Message added successfully",
|
||||
extra={
|
||||
|
||||
@@ -449,15 +449,16 @@ class AgentRunService:
|
||||
features_config: Dict[str, Any],
|
||||
is_new_conversation: bool,
|
||||
variables: Optional[Dict[str, Any]] = None
|
||||
) -> Optional[str]:
|
||||
) -> tuple[Any, Any]:
|
||||
"""首轮对话时返回开场白文本(支持变量替换),否则返回 None"""
|
||||
if not is_new_conversation:
|
||||
return None
|
||||
return None, None
|
||||
opening = features_config.get("opening_statement", {})
|
||||
if not (isinstance(opening, dict) and opening.get("enabled") and opening.get("statement")):
|
||||
return None
|
||||
return None, None
|
||||
|
||||
statement = opening["statement"]
|
||||
suggested_questions = opening["suggested_questions"]
|
||||
|
||||
# 如果有变量,进行替换(仅支持 {{var_name}} 格式)
|
||||
if variables:
|
||||
@@ -465,7 +466,7 @@ class AgentRunService:
|
||||
placeholder = f"{{{{{var_name}}}}}"
|
||||
statement = statement.replace(placeholder, str(var_value))
|
||||
|
||||
return statement
|
||||
return statement, suggested_questions
|
||||
|
||||
@staticmethod
|
||||
def _filter_citations(
|
||||
@@ -599,13 +600,16 @@ class AgentRunService:
|
||||
|
||||
# 5. 处理会话ID(创建或验证),新会话时写入开场白
|
||||
is_new_conversation = not conversation_id
|
||||
opening = self._get_opening_statement(features_config, is_new_conversation, variables)
|
||||
opening, suggested_questions = None, None
|
||||
if not sub_agent:
|
||||
opening, suggested_questions = self._get_opening_statement(features_config, is_new_conversation, variables)
|
||||
conversation_id = await self._ensure_conversation(
|
||||
conversation_id=conversation_id,
|
||||
app_id=agent_config.app_id,
|
||||
workspace_id=workspace_id,
|
||||
user_id=user_id,
|
||||
opening_statement=opening
|
||||
opening_statement=opening,
|
||||
suggested_questions=suggested_questions
|
||||
)
|
||||
|
||||
model_info = ModelInfo(
|
||||
@@ -845,14 +849,17 @@ class AgentRunService:
|
||||
|
||||
# 5. 处理会话ID(创建或验证),新会话时写入开场白
|
||||
is_new_conversation = not conversation_id
|
||||
opening = self._get_opening_statement(features_config, is_new_conversation, variables)
|
||||
opening, suggested_questions = None, None
|
||||
if not sub_agent:
|
||||
opening, suggested_questions = self._get_opening_statement(features_config, is_new_conversation, variables)
|
||||
conversation_id = await self._ensure_conversation(
|
||||
conversation_id=conversation_id,
|
||||
app_id=agent_config.app_id,
|
||||
workspace_id=workspace_id,
|
||||
user_id=user_id,
|
||||
sub_agent=sub_agent,
|
||||
opening_statement=opening
|
||||
opening_statement=opening,
|
||||
suggested_questions=suggested_questions
|
||||
)
|
||||
|
||||
model_info = ModelInfo(
|
||||
@@ -1061,7 +1068,8 @@ class AgentRunService:
|
||||
workspace_id: uuid.UUID,
|
||||
user_id: Optional[str],
|
||||
sub_agent: bool = False,
|
||||
opening_statement: Optional[str] = None
|
||||
opening_statement: Optional[str] = None,
|
||||
suggested_questions: Optional[List[str]] = None
|
||||
) -> str:
|
||||
"""确保会话存在(创建或验证)
|
||||
|
||||
@@ -1072,6 +1080,7 @@ class AgentRunService:
|
||||
user_id: 用户ID
|
||||
sub_agent: 是否为子代理
|
||||
opening_statement: 开场白(新会话时作为第一条消息写入)
|
||||
suggested_questions: 预设问题列表
|
||||
|
||||
Returns:
|
||||
str: 会话ID
|
||||
@@ -1115,7 +1124,7 @@ class AgentRunService:
|
||||
conversation_id=uuid.UUID(new_conv_id),
|
||||
role="assistant",
|
||||
content=opening_statement,
|
||||
meta_data={}
|
||||
meta_data={"suggested_questions": suggested_questions}
|
||||
)
|
||||
logger.debug(f"已保存开场白到会话 {new_conv_id}")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user