fix(app):

1.Handling of large file upload issues;
2. Handling of abnormal display of conversation titles when the opening remarks function is enabled
This commit is contained in:
Timebomb2018
2026-03-27 16:27:09 +08:00
parent ad5dc3c138
commit 46fa99a8b8
7 changed files with 121 additions and 35 deletions

View File

@@ -44,6 +44,8 @@ class OSSStorage(StorageBackend):
access_key_id: str,
access_key_secret: str,
bucket_name: str,
connect_timeout: int = 30,
multipart_threshold: int = 10 * 1024 * 1024, # 10MB
):
"""
Initialize the OSSStorage backend.
@@ -53,6 +55,8 @@ class OSSStorage(StorageBackend):
access_key_id: The Aliyun access key ID.
access_key_secret: The Aliyun access key secret.
bucket_name: The name of the OSS bucket.
connect_timeout: Connection timeout in seconds (default: 30).
multipart_threshold: File size threshold for multipart upload (default: 10MB).
Raises:
StorageConfigError: If any required configuration is missing.
@@ -69,10 +73,17 @@ class OSSStorage(StorageBackend):
self.endpoint = endpoint
self.bucket_name = bucket_name
self.multipart_threshold = multipart_threshold
try:
auth = oss2.Auth(access_key_id, access_key_secret)
self.bucket = oss2.Bucket(auth, endpoint, bucket_name)
# 设置超时和重试
self.bucket = oss2.Bucket(
auth,
endpoint,
bucket_name,
connect_timeout=connect_timeout
)
logger.info(
f"OSSStorage initialized with endpoint: {endpoint}, bucket: {bucket_name}"
)
@@ -108,21 +119,38 @@ class OSSStorage(StorageBackend):
if content_type:
headers["Content-Type"] = content_type
self.bucket.put_object(file_key, content, headers=headers if headers else None)
# 大文件使用分片上传
if len(content) > self.multipart_threshold:
logger.info(f"Using multipart upload for large file: {file_key} ({len(content)} bytes)")
upload_id = self.bucket.init_multipart_upload(file_key, headers=headers if headers else None).upload_id
parts = []
part_size = 5 * 1024 * 1024 # 5MB per part
part_num = 1
for offset in range(0, len(content), part_size):
chunk = content[offset:offset + part_size]
result = self.bucket.upload_part(file_key, upload_id, part_num, chunk)
parts.append(oss2.models.PartInfo(part_num, result.etag))
part_num += 1
self.bucket.complete_multipart_upload(file_key, upload_id, parts)
else:
self.bucket.put_object(file_key, content, headers=headers if headers else None)
logger.info(f"File uploaded to OSS successfully: {file_key}")
return file_key
except OssError as e:
logger.error(f"OSS error uploading file {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to upload file to OSS: {e.message}",
message=f"Failed to upload file to OSS: {str(e)}",
file_key=file_key,
cause=e,
)
except Exception as e:
logger.error(f"Failed to upload file to OSS {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to upload file to OSS: {e}",
message=f"Failed to upload file to OSS: {str(e)}",
file_key=file_key,
cause=e,
)
@@ -135,28 +163,73 @@ class OSSStorage(StorageBackend):
) -> int:
"""Upload from async stream to OSS. Returns total bytes written."""
buf = io.BytesIO()
headers = {"Content-Type": content_type} if content_type else None
upload_id = None
try:
# 收集流数据
total_size = 0
async for chunk in stream:
if not chunk:
continue
buf.write(chunk)
total_size += len(chunk)
content = buf.getvalue()
headers = {"Content-Type": content_type} if content_type else None
self.bucket.put_object(file_key, content, headers=headers)
logger.info(f"File stream uploaded to OSS successfully: {file_key}")
return len(content)
if not content:
raise StorageUploadError(
message="Empty stream content",
file_key=file_key,
)
# 大文件使用分片上传
if len(content) > self.multipart_threshold:
logger.info(f"Using multipart upload for stream: {file_key} ({len(content)} bytes)")
upload_id = self.bucket.init_multipart_upload(file_key, headers=headers).upload_id
parts = []
part_size = 5 * 1024 * 1024 # 5MB
part_num = 1
for offset in range(0, len(content), part_size):
chunk = content[offset:offset + part_size]
result = self.bucket.upload_part(file_key, upload_id, part_num, chunk)
parts.append(oss2.models.PartInfo(part_num, result.etag))
part_num += 1
self.bucket.complete_multipart_upload(file_key, upload_id, parts)
else:
self.bucket.put_object(file_key, content, headers=headers)
logger.info(f"File stream uploaded to OSS successfully: {file_key} ({total_size} bytes)")
return total_size
except OssError as e:
if upload_id:
try:
self.bucket.abort_multipart_upload(file_key, upload_id)
except:
pass
logger.error(f"OSS error stream uploading file {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to stream upload file to OSS: {e.message}",
message=f"Failed to stream upload file to OSS: {str(e)}",
file_key=file_key,
cause=e,
)
except Exception as e:
if upload_id:
try:
self.bucket.abort_multipart_upload(file_key, upload_id)
except:
pass
logger.error(f"Failed to stream upload file to OSS {file_key}: {e}")
raise StorageUploadError(
message=f"Failed to stream upload file to OSS: {e}",
message=f"Failed to stream upload file to OSS: {str(e)}",
file_key=file_key,
cause=e,
)
finally:
buf.close()
async def download(self, file_key: str) -> bytes:
"""
@@ -182,14 +255,14 @@ class OSSStorage(StorageBackend):
except OssError as e:
logger.error(f"OSS error downloading file {file_key}: {e}")
raise StorageDownloadError(
message=f"Failed to download file from OSS: {e.message}",
message=f"Failed to download file from OSS: {str(e)}",
file_key=file_key,
cause=e,
)
except Exception as e:
logger.error(f"Failed to download file from OSS {file_key}: {e}")
raise StorageDownloadError(
message=f"Failed to download file from OSS: {e}",
message=f"Failed to download file from OSS: {str(e)}",
file_key=file_key,
cause=e,
)
@@ -215,14 +288,14 @@ class OSSStorage(StorageBackend):
except OssError as e:
logger.error(f"OSS error deleting file {file_key}: {e}")
raise StorageDeleteError(
message=f"Failed to delete file from OSS: {e.message}",
message=f"Failed to delete file from OSS: {str(e)}",
file_key=file_key,
cause=e,
)
except Exception as e:
logger.error(f"Failed to delete file from OSS {file_key}: {e}")
raise StorageDeleteError(
message=f"Failed to delete file from OSS: {e}",
message=f"Failed to delete file from OSS: {str(e)}",
file_key=file_key,
cause=e,
)

View File

@@ -57,6 +57,12 @@ class Conversation(Base):
workspace = relationship("Workspace")
messages = relationship("Message", back_populates="conversation", cascade="all, delete-orphan")
@property
def is_first_user_message(self):
"""判断当前是否是用户的第一条消息(无视开场白)"""
user_message_count = sum(1 for msg in self.messages if msg.role == "user")
return user_message_count == 1
class ConversationDetail(Base):
__tablename__ = "conversation_details"

View File

@@ -276,7 +276,7 @@ class AgentConfigCreate(BaseModel):
# 记忆配置
memory: MemoryConfig = Field(
default_factory=lambda: MemoryConfig(enabled=True),
default_factory=lambda: MemoryConfig(enabled=False),
description="对话历史记忆配置"
)

View File

@@ -140,13 +140,13 @@ class AppChatService:
# 如果是新会话且有开场白,作为第一条 assistant 消息写入数据库
is_new_conversation = len(history) == 0
if is_new_conversation:
opening = self.agent_service._get_opening_statement(features_config, True, variables)
opening, suggested_questions = self.agent_service._get_opening_statement(features_config, True, variables)
if opening:
self.conversation_service.add_message(
conversation_id=conversation_id,
role="assistant",
content=opening,
meta_data={}
meta_data={"suggested_questions": suggested_questions}
)
# 重新加载历史(包含刚写入的开场白)
history = await self.conversation_service.get_conversation_history(
@@ -367,13 +367,13 @@ class AppChatService:
# 如果是新会话且有开场白,作为第一条 assistant 消息写入数据库
is_new_conversation = len(history) == 0
if is_new_conversation:
opening = self.agent_service._get_opening_statement(features_config, True, variables)
opening, suggested_questions = self.agent_service._get_opening_statement(features_config, True, variables)
if opening:
self.conversation_service.add_message(
conversation_id=conversation_id,
role="assistant",
content=opening,
meta_data={}
meta_data={"suggested_questions": suggested_questions}
)
# 重新加载历史(包含刚写入的开场白)
history = await self.conversation_service.get_conversation_history(

View File

@@ -1084,7 +1084,6 @@ class AppService:
if not exists:
cleaned["memory_config_id"] = None
cleaned.pop("memory_content", None)
cleaned["enabled"] = False
return cleaned
exists = self.db.query(
@@ -1096,7 +1095,6 @@ class AppService:
if not exists:
cleaned["memory_config_id"] = None
cleaned.pop("memory_content", None)
cleaned["enabled"] = False
return cleaned

View File

@@ -214,14 +214,14 @@ class ConversationService:
conversation.message_count += 1
if conversation.message_count == 1 and role == "user":
self.db.commit()
self.db.refresh(message)
if conversation.is_first_user_message and role == "user":
conversation.title = (
content[:50] + ("..." if len(content) > 50 else "")
)
self.db.commit()
self.db.refresh(message)
logger.info(
"Message added successfully",
extra={

View File

@@ -449,15 +449,16 @@ class AgentRunService:
features_config: Dict[str, Any],
is_new_conversation: bool,
variables: Optional[Dict[str, Any]] = None
) -> Optional[str]:
) -> tuple[Any, Any]:
"""首轮对话时返回开场白文本(支持变量替换),否则返回 None"""
if not is_new_conversation:
return None
return None, None
opening = features_config.get("opening_statement", {})
if not (isinstance(opening, dict) and opening.get("enabled") and opening.get("statement")):
return None
return None, None
statement = opening["statement"]
suggested_questions = opening["suggested_questions"]
# 如果有变量,进行替换(仅支持 {{var_name}} 格式)
if variables:
@@ -465,7 +466,7 @@ class AgentRunService:
placeholder = f"{{{{{var_name}}}}}"
statement = statement.replace(placeholder, str(var_value))
return statement
return statement, suggested_questions
@staticmethod
def _filter_citations(
@@ -599,13 +600,16 @@ class AgentRunService:
# 5. 处理会话ID创建或验证新会话时写入开场白
is_new_conversation = not conversation_id
opening = self._get_opening_statement(features_config, is_new_conversation, variables)
opening, suggested_questions = None, None
if not sub_agent:
opening, suggested_questions = self._get_opening_statement(features_config, is_new_conversation, variables)
conversation_id = await self._ensure_conversation(
conversation_id=conversation_id,
app_id=agent_config.app_id,
workspace_id=workspace_id,
user_id=user_id,
opening_statement=opening
opening_statement=opening,
suggested_questions=suggested_questions
)
model_info = ModelInfo(
@@ -845,14 +849,17 @@ class AgentRunService:
# 5. 处理会话ID创建或验证新会话时写入开场白
is_new_conversation = not conversation_id
opening = self._get_opening_statement(features_config, is_new_conversation, variables)
opening, suggested_questions = None, None
if not sub_agent:
opening, suggested_questions = self._get_opening_statement(features_config, is_new_conversation, variables)
conversation_id = await self._ensure_conversation(
conversation_id=conversation_id,
app_id=agent_config.app_id,
workspace_id=workspace_id,
user_id=user_id,
sub_agent=sub_agent,
opening_statement=opening
opening_statement=opening,
suggested_questions=suggested_questions
)
model_info = ModelInfo(
@@ -1061,7 +1068,8 @@ class AgentRunService:
workspace_id: uuid.UUID,
user_id: Optional[str],
sub_agent: bool = False,
opening_statement: Optional[str] = None
opening_statement: Optional[str] = None,
suggested_questions: Optional[List[str]] = None
) -> str:
"""确保会话存在(创建或验证)
@@ -1072,6 +1080,7 @@ class AgentRunService:
user_id: 用户ID
sub_agent: 是否为子代理
opening_statement: 开场白(新会话时作为第一条消息写入)
suggested_questions: 预设问题列表
Returns:
str: 会话ID
@@ -1115,7 +1124,7 @@ class AgentRunService:
conversation_id=uuid.UUID(new_conv_id),
role="assistant",
content=opening_statement,
meta_data={}
meta_data={"suggested_questions": suggested_questions}
)
logger.debug(f"已保存开场白到会话 {new_conv_id}")