diff --git a/api/app/controllers/user_memory_controllers.py b/api/app/controllers/user_memory_controllers.py index 5c23f24c..15c50601 100644 --- a/api/app/controllers/user_memory_controllers.py +++ b/api/app/controllers/user_memory_controllers.py @@ -302,7 +302,7 @@ async def get_end_user_profile( ) api_logger.info(f"成功获取用户信息: end_user_id={end_user_id}") - return success(data=profile_data.model_dump(), msg="查询成功") + return success(data=UserMemoryService.convert_profile_to_dict_with_timestamp(profile_data), msg="查询成功") except Exception as e: api_logger.error(f"用户信息查询失败: end_user_id={end_user_id}, error={str(e)}") @@ -346,15 +346,22 @@ async def update_end_user_profile( # 更新字段(只更新提供的字段,排除 end_user_id) # 允许 None 值来重置字段(如 hire_date) update_data = profile_update.model_dump(exclude_unset=True, exclude={'end_user_id'}) + + # 特殊处理 hire_date:如果提供了时间戳,转换为 DateTime + if 'hire_date' in update_data: + hire_date_timestamp = update_data['hire_date'] + if hire_date_timestamp is not None: + update_data['hire_date'] = UserMemoryService.timestamp_to_datetime(hire_date_timestamp) + # 如果是 None,保持 None(允许清空) + for field, value in update_data.items(): setattr(end_user, field, value) # 更新 updated_at 时间戳 end_user.updated_at = datetime.datetime.now() - # 更新 updatetime_profile 为当前时间戳(毫秒) - current_timestamp = int(datetime.datetime.now().timestamp() * 1000) - end_user.updatetime_profile = current_timestamp + # 更新 updatetime_profile 为当前时间 + end_user.updatetime_profile = datetime.datetime.now() # 提交更改 db.commit() @@ -372,8 +379,8 @@ async def update_end_user_profile( updatetime_profile=end_user.updatetime_profile ) - api_logger.info(f"成功更新用户信息: end_user_id={end_user_id}, updated_fields={list(update_data.keys())}, updatetime_profile={current_timestamp}") - return success(data=profile_data.model_dump(), msg="更新成功") + api_logger.info(f"成功更新用户信息: end_user_id={end_user_id}, updated_fields={list(update_data.keys())}") + return success(data=UserMemoryService.convert_profile_to_dict_with_timestamp(profile_data), msg="更新成功") except Exception as e: db.rollback() diff --git a/api/app/core/memory/analytics/memory_insight.py b/api/app/core/memory/analytics/memory_insight.py index 466ac2e6..39746e58 100644 --- a/api/app/core/memory/analytics/memory_insight.py +++ b/api/app/core/memory/analytics/memory_insight.py @@ -1,7 +1,14 @@ """ This module provides the MemoryInsight class for analyzing user memory data. -This script can be executed directly to generate a memory insight report for a test user. +MemoryInsight 是一个工具类,提供基础的数据获取和分析功能: +- get_domain_distribution(): 获取记忆领域分布 +- get_active_periods(): 获取活跃时段 +- get_social_connections(): 获取社交关联 + +业务逻辑(如生成洞察报告)应该在服务层(user_memory_service.py)中实现。 + +This script can be executed directly to test the memory insight generation for a test user. """ import asyncio @@ -221,25 +228,32 @@ class MemoryInsight: async def get_social_connections(self) -> dict | None: """ Finds the user with whom the most memories are shared. + 使用 Chunk-Statement 的 CONTAINS 关系,因为系统中不创建 Dialogue-Statement 的 MENTIONS 关系。 """ + # 通过 Chunk 和 Statement 的 CONTAINS 关系来查找共同记忆 query = f""" - MATCH (d1:Dialogue {{group_id: '{self.user_id}'}})<-[:MENTIONS]-(s:Statement)-[:MENTIONS]->(d2:Dialogue) - WHERE d1 <> d2 - RETURN d2.group_id AS other_user_id, COUNT(s) AS common_statements + MATCH (c1:Chunk {{group_id: '{self.user_id}'}}) + OPTIONAL MATCH (c1)-[:CONTAINS]->(s:Statement) + OPTIONAL MATCH (s)<-[:CONTAINS]-(c2:Chunk) + WHERE c1.group_id <> c2.group_id AND s IS NOT NULL AND c2 IS NOT NULL + WITH c2.group_id AS other_user_id, COUNT(DISTINCT s) AS common_statements + WHERE common_statements > 0 + RETURN other_user_id, common_statements ORDER BY common_statements DESC LIMIT 1 """ records = await self.neo4j_connector.execute_query(query) - if not records: + if not records or not records[0].get("other_user_id"): return None most_connected_user = records[0]["other_user_id"] common_memories_count = records[0]["common_statements"] + # 使用 Chunk 的时间范围 time_range_query = f""" - MATCH (d:Dialogue) - WHERE d.group_id IN ['{self.user_id}', '{most_connected_user}'] - RETURN min(d.created_at) AS start_time, max(d.created_at) AS end_time + MATCH (c:Chunk) + WHERE c.group_id IN ['{self.user_id}', '{most_connected_user}'] + RETURN min(c.created_at) AS start_time, max(c.created_at) AS end_time """ time_records = await self.neo4j_connector.execute_query(time_range_query) start_year, end_year = "N/A", "N/A" @@ -253,84 +267,6 @@ class MemoryInsight: "time_range": f"{start_year}-{end_year}", } - async def generate_insight_report(self) -> str: - """ - Generates the final insight report in natural language. - """ - domain_dist, active_periods, social_conn = await asyncio.gather( - self.get_domain_distribution(), - self.get_active_periods(), - self.get_social_connections(), - ) - - prompt_parts = [] - - if domain_dist: - top_domains = ", ".join([f"{k}({v:.0%})" for k, v in list(domain_dist.items())[:3]]) - prompt_parts.append(f"- 核心领域: 用户的记忆主要集中在 {top_domains}。") - - if active_periods: - months_str = " 和 ".join(map(str, active_periods)) - prompt_parts.append(f"- 活跃时段: 用户在每年的 {months_str} 月最为活跃。") - - if social_conn: - prompt_parts.append( - f"- 社交关联: 与用户\"{social_conn['user_id']}\"拥有最多共同记忆({social_conn['common_memories_count']}条),时间范围主要在 {social_conn['time_range']}。" - ) - - if not prompt_parts: - return "暂无足够数据生成洞察报告。" - - system_prompt = '''你是一位资深的个人记忆分析师。你的任务是根据我提供的要点,为用户生成一段简洁、自然、个性化的记忆洞察报告。 - -重要规则: -1. 报告需要将所有要点流畅地串联成一个段落 -2. 语言风格要亲切、易于理解,就像和朋友聊天一样 -3. 不要添加任何额外的解释或标题,直接输出报告内容 -4. 只使用我提供的要点,不要编造或推测任何信息 -5. 如果某个维度没有数据(如没有活跃时段信息),就不要在报告中提及该维度 - -例如,如果输入是: -- 核心领域: 用户的记忆主要集中在 旅行(38%), 工作(24%), 家庭(21%)。 -- 活跃时段: 用户在每年的 4 和 10 月最为活跃。 -- 社交关联: 与用户"张明"拥有最多共同记忆(47条),时间范围主要在 2017-2020。 - -你的输出应该是: -"您的记忆集中在旅行(38%)、工作(24%)和家庭(21%)三大领域。每年4月和10月是您最活跃的记录期,可能与春秋季旅行计划相关。您与'张明'共同拥有最多记忆(47条),主要集中在2017-2020年间。" - -如果输入只有: -- 核心领域: 用户的记忆主要集中在 教育(65%), 学习(25%)。 - -你的输出应该是: -"您的记忆主要集中在教育(65%)和学习(25%)两大领域,显示出您对知识和成长的持续关注。"''' - - user_prompt = "\n".join(prompt_parts) - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt} - ] - - response = await self.llm_client.chat(messages=messages) - - # 确保返回字符串类型 - content = response.content - if isinstance(content, list): - # 如果是列表格式(如 [{'type': 'text', 'text': '...'}]),提取文本 - if len(content) > 0: - if isinstance(content[0], dict): - # 尝试提取 'text' 字段 - text = content[0].get('text', content[0].get('content', str(content[0]))) - return str(text) - else: - return str(content[0]) - return "" - elif isinstance(content, dict): - # 如果是字典格式,提取 text 字段 - return str(content.get('text', content.get('content', str(content)))) - else: - # 已经是字符串或其他类型,转为字符串 - return str(content) if content is not None else "" - async def close(self): """ Closes the database connection. @@ -346,10 +282,13 @@ async def main(): test_user_id = DEFAULT_GROUP_ID print(f"正在为用户 {test_user_id} 生成记忆洞察报告...\n") - insight = None try: - insight = MemoryInsight(user_id=test_user_id) - report = await insight.generate_insight_report() + # 使用服务层函数生成报告 + from app.services.user_memory_service import analytics_memory_insight_report + + result = await analytics_memory_insight_report(end_user_id=test_user_id) + report = result.get("report", "") + print("--- 记忆洞察报告 ---") print(report) print("---------------------") @@ -379,9 +318,6 @@ async def main(): print(f"写入 User-Dashboard.json 失败: {e}") except Exception as e: print(f"生成报告时出错: {e}") - finally: - if insight: - await insight.close() if __name__ == "__main__": diff --git a/api/app/core/memory/analytics/user_summary.py b/api/app/core/memory/analytics/user_summary.py index 61f58965..f0283993 100644 --- a/api/app/core/memory/analytics/user_summary.py +++ b/api/app/core/memory/analytics/user_summary.py @@ -80,7 +80,7 @@ class UserSummary: async def close(self): await self.connector.close() - async def _get_recent_statements(self, limit: int = 80) -> List[StatementRecord]: + async def _get_recent_statements(self, limit: int = 80) -> List[StatementRecord]: # TODO Used by user_memory_service """Fetch recent statements authored by the user/group for context.""" query = ( "MATCH (s:Statement) " @@ -100,70 +100,25 @@ class UserSummary: async def _get_top_entities(self, limit: int = 30) -> List[Tuple[str, int]]: """Reuse hot tag logic to get meaningful entities and their frequencies.""" # get_hot_memory_tags internally filters out non-meaningful nouns with LLM - return await get_hot_memory_tags(self.user_id, limit=limit) + return await get_hot_memory_tags(self.user_id, limit=limit) # TODO Used by user_memory_service - async def generate(self) -> str: - """Generate a Chinese '关于我' style summary using the LLM.""" - # 1) Collect context - entities = await self._get_top_entities(limit=40) - statements = await self._get_recent_statements(limit=100) - entity_lines = [f"{name} ({freq})" for name, freq in entities][:20] - statement_samples = [s.statement.strip() for s in statements if (s.statement or '').strip()][:20] - - # 2) Compose prompt - system_prompt = ( - "你是一位中文信息压缩助手。请基于提供的实体与语句," - "生成非常简洁的用户摘要,禁止臆测或虚构。要求:\n" - "- 3–4 句,总字数不超过 120;\n" - "- 先交代身份/城市,其次长期兴趣或习惯,最后给一两项代表性经历;\n" - "- 避免形容词堆砌与空话,不用项目符号,不分段;\n" - "- 使用客观的第三人称描述,语气克制、中立。" - ) - - user_content_parts = [ - f"用户ID: {self.user_id}", - "核心实体与频次: " + (", ".join(entity_lines) if entity_lines else "(空)"), - "代表性语句样本: " + (" | ".join(statement_samples) if statement_samples else "(空)"), - ] - user_prompt = "\n".join(user_content_parts) - - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] - - # 3) Call LLM - response = await self.llm.chat(messages=messages) +async def generate_user_summary(user_id: str | None = None) -> str: # TODO useless + """ + 生成用户摘要的便捷函数 + + Args: + user_id: 可选的用户ID - # 确保返回字符串类型 - content = response.content - if isinstance(content, list): - # 如果是列表格式(如 [{'type': 'text', 'text': '...'}]),提取文本 - if len(content) > 0: - if isinstance(content[0], dict): - # 尝试提取 'text' 字段 - text = content[0].get('text', content[0].get('content', str(content[0]))) - return str(text) - else: - return str(content[0]) - return "" - elif isinstance(content, dict): - # 如果是字典格式,提取 text 字段 - return str(content.get('text', content.get('content', str(content)))) - else: - # 已经是字符串或其他类型,转为字符串 - return str(content) if content is not None else "" - - -async def generate_user_summary(user_id: str | None = None) -> str: - # 默认从环境变量读取 - effective_group_id = user_id or DEFAULT_GROUP_ID - svc = UserSummary(effective_group_id) - try: - return await svc.generate() - finally: - await svc.close() + Returns: + 用户摘要字符串 + """ + # 导入服务层函数 + from app.services.user_memory_service import analytics_user_summary + + # 调用服务层函数 + result = await analytics_user_summary(user_id) + return result.get("summary", "") if __name__ == "__main__": diff --git a/api/app/core/memory/utils/prompt/prompt_utils.py b/api/app/core/memory/utils/prompt/prompt_utils.py index c39a3f89..842f3c82 100644 --- a/api/app/core/memory/utils/prompt/prompt_utils.py +++ b/api/app/core/memory/utils/prompt/prompt_utils.py @@ -316,3 +316,73 @@ async def render_emotion_suggestions_prompt( }) return rendered_prompt + + +async def render_user_summary_prompt( + user_id: str, + entities: str, + statements: str +) -> str: + """ + Renders the user summary prompt using the user_summary.jinja2 template. + + Args: + user_id: User identifier + entities: Core entities with frequency information + statements: Representative statement samples + + Returns: + Rendered prompt content as string + """ + template = prompt_env.get_template("user_summary.jinja2") + rendered_prompt = template.render( + user_id=user_id, + entities=entities, + statements=statements + ) + + # 记录渲染结果到提示日志 + log_prompt_rendering('user summary', rendered_prompt) + # 可选:记录模板渲染信息 + log_template_rendering('user_summary.jinja2', { + 'user_id': user_id, + 'entities_len': len(entities), + 'statements_len': len(statements) + }) + + return rendered_prompt + + +async def render_memory_insight_prompt( + domain_distribution: str = None, + active_periods: str = None, + social_connections: str = None +) -> str: + """ + Renders the memory insight prompt using the memory_insight.jinja2 template. + + Args: + domain_distribution: 核心领域分布信息 + active_periods: 活跃时段信息 + social_connections: 社交关联信息 + + Returns: + Rendered prompt content as string + """ + template = prompt_env.get_template("memory_insight.jinja2") + rendered_prompt = template.render( + domain_distribution=domain_distribution, + active_periods=active_periods, + social_connections=social_connections + ) + + # 记录渲染结果到提示日志 + log_prompt_rendering('memory insight', rendered_prompt) + # 可选:记录模板渲染信息 + log_template_rendering('memory_insight.jinja2', { + 'has_domain_distribution': bool(domain_distribution), + 'has_active_periods': bool(active_periods), + 'has_social_connections': bool(social_connections) + }) + + return rendered_prompt diff --git a/api/app/core/memory/utils/prompt/prompts/memory_insight.jinja2 b/api/app/core/memory/utils/prompt/prompts/memory_insight.jinja2 new file mode 100644 index 00000000..955c214d --- /dev/null +++ b/api/app/core/memory/utils/prompt/prompts/memory_insight.jinja2 @@ -0,0 +1,152 @@ +{% macro tidy(name) -%} + {{ name.replace('_', ' ')}} +{%- endmacro %} + + +===Task=== + +Your task is to generate a comprehensive memory insight report based on the provided data analysis. The report should include four distinct sections that capture different aspects of the user's memory patterns and characteristics. + + +===Inputs=== +{% if domain_distribution %} +- 核心领域分布: {{ domain_distribution }} +{% endif %} +{% if active_periods %} +- 活跃时段: {{ active_periods }} +{% endif %} +{% if social_connections %} +- 社交关联: {{ social_connections }} +{% endif %} + + +===Report Generation Requirements=== + +**General Guidelines:** +1. Base your analysis ONLY on the provided data - do not speculate or fabricate information +2. Use objective third-person descriptions with a professional and analytical tone +3. Avoid excessive adjectives and empty phrases +4. Strictly follow the output format specified below +5. If a dimension lacks data, skip that section or provide a brief note + +**Section-Specific Requirements:** + +1. **总体概述 (Overview)** (100-150 Chinese characters) + - Focus on: Overall analysis of user profile based on interaction logs + - Describe the user's main role, work network, and collaboration spirit + - Use professional, data-driven language style + - Example reference: "通过对156次交互日志的深度分析,系统发现三层一位主要用户档案和数据分析的产品经理。他的工作网络体现出鲜明的目标导向和团队协作精神。" + +2. **行为模式 (Behavior Pattern)** (80-120 Chinese characters) + - Focus on: Work patterns, time regularity, and behavioral characteristics + - Describe weekly work patterns and time preferences + - Use objective, analytical language + - Example reference: "张三的工作模式呈现出鲜明的周期性:周一通常用于规划和会议,周三周四专注于产品设计和用户研究,周五进行总结和复盘。他倾向于在上午进行头脑风暴,下午处理执行性工作。" + +3. **关键发现 (Key Findings)** (3-4 bullet points, 30-50 characters each) + - Focus on: Specific, insightful observations about user behavior and preferences + - Use bullet points (•) format + - Each finding should be concrete and data-supported + - Example reference: + "• 在产品决策中,张三总是优先考虑用户反应,这在68%的决策记录中得到体现 + • 他善于使用数据可视化工具来支持论点,这种习惯在项目管理中发挥了重要作用 + • 团队成员对他的评价中,"思路清晰"和"思路敏捷"两个关键词出现频率最高 + • 他对AI机器学习领域保持持续关注,近3个月参加了7次相关培训" + +4. **成长轨迹 (Growth Trajectory)** (100-150 Chinese characters) + - Focus on: User's growth journey, key milestones, and capability improvements + - Organize content chronologically + - Highlight role changes and achievements + - Use positive, encouraging tone + - Example reference: "从入职时的产品经理成长为高级产品经理,张三在产品单独、团队管理和技术理解三个方面都有显著提升。特别是在最近一年,他开始独立主导更复杂的项目,展现出更强的战略思维能力。他的成长轨迹显示出对新技术的持续学习和对产品思维的不断深化。" + + +===Output Format (MUST STRICTLY FOLLOW)=== + +【总体概述】 +[100-150 characters describing overall user profile and work network based on interaction analysis] + +【行为模式】 +[80-120 characters describing work patterns, time regularity, and behavioral characteristics] + +【关键发现】 +• [First key finding with data support, 30-50 characters] +• [Second key finding with data support, 30-50 characters] +• [Third key finding with data support, 30-50 characters] +• [Fourth key finding with data support, 30-50 characters] + +【成长轨迹】 +[100-150 characters describing growth journey, milestones, and capability improvements] + + +===Example=== + +Example Input: +- 核心领域分布: 产品管理(38%), 数据分析(24%), 团队协作(21%) +- 活跃时段: 用户在每年的 4 和 10 月最为活跃 +- 社交关联: 与用户"李明"拥有最多共同记忆(47条),时间范围主要在 2020-2023 + +Example Output: +【总体概述】 +通过对156次交互日志的深度分析,系统发现张三是一位主要从事用户档案和数据分析的产品经理。他的工作网络体现出鲜明的目标导向和团队协作精神,在产品管理、数据分析和团队协作三个领域都有深入的实践。 + +【行为模式】 +张三的工作模式呈现出鲜明的周期性:周一通常用于规划和会议,周三周四专注于产品设计和用户研究,周五进行总结和复盘。他倾向于在上午进行头脑风暴,下午处理执行性工作。每年4月和10月是他最活跃的时期。 + +【关键发现】 +• 在产品决策中,张三总是优先考虑用户反应,这在68%的决策记录中得到体现 +• 他善于使用数据可视化工具来支持论点,这种习惯在项目管理中发挥了重要作用 +• 团队成员对他的评价中,"思路清晰"和"思路敏捷"两个关键词出现频率最高 +• 他对AI机器学习领域保持持续关注,近3个月参加了7次相关培训 + +【成长轨迹】 +从入职时的产品经理成长为高级产品经理,张三在产品规划、团队管理和技术理解三个方面都有显著提升。特别是在最近一年,他开始独立主导更复杂的项目,展现出更强的战略思维能力。他与李明的47条共同记忆见证了他的成长历程。 + +===End of Example=== + + +===Reflection Process=== + +After generating the report, perform the following self-review steps: + +**Step 1: Data Grounding Check** +- Verify all statements are supported by the provided data +- Ensure no fabricated or speculated information is included +- Confirm all claims can be traced back to the input data + +**Step 2: Format Compliance** +- Verify each section follows the specified format with section headers +- Check character count limits for each section +- Ensure proper use of section markers (【】) +- Verify bullet points format for Key Findings section + +**Step 3: Tone and Style Review** +- Confirm objective third-person perspective is maintained +- Check for excessive adjectives or empty phrases +- Verify professional and analytical tone throughout + +**Step 4: Completeness Check** +- Ensure all four sections are present and complete +- Verify each section addresses its specific focus area +- Confirm the report provides actionable insights + + +===Output Requirements=== + +**LANGUAGE REQUIREMENT:** +- The output language should ALWAYS be Chinese (Simplified) +- All section content must be in Chinese +- Section headers must use the specified Chinese format: 【总体概述】【行为模式】【关键发现】【成长轨迹】 + +**FORMAT REQUIREMENT:** +- Each section must start with its header on a new line +- Content follows immediately after the header +- Sections are separated by blank lines +- Key Findings section must use bullet points (•) +- Strictly adhere to character limits for each section + +**CONTENT REQUIREMENT:** +- Only use provided data points +- Do not fabricate or speculate information +- If data is insufficient for a section, provide a brief note or skip +- Maintain professional, analytical tone throughout diff --git a/api/app/core/memory/utils/prompt/prompts/user_summary.jinja2 b/api/app/core/memory/utils/prompt/prompts/user_summary.jinja2 new file mode 100644 index 00000000..373ab31e --- /dev/null +++ b/api/app/core/memory/utils/prompt/prompts/user_summary.jinja2 @@ -0,0 +1,124 @@ +{% macro tidy(name) -%} + {{ name.replace('_', ' ')}} +{%- endmacro %} + + +===Task=== + +Your task is to generate a comprehensive user profile based on the provided entities and statements. The profile should include four distinct sections that capture different aspects of the user's identity and characteristics. + + +===Inputs=== +{% if user_id %} +- User ID: {{ user_id }} +{% endif %} +{% if entities %} +- Core Entities & Frequency: {{ entities }} +{% endif %} +{% if statements %} +- Representative Statement Samples: {{ statements }} +{% endif %} + + +===Profile Generation Requirements=== + +**General Guidelines:** +1. Base your analysis ONLY on the provided data - do not speculate or fabricate information +2. Use objective third-person descriptions with a restrained and neutral tone +3. Avoid excessive adjectives and empty phrases +4. Strictly follow the output format specified below + +**Section-Specific Requirements:** + +1. **Basic Introduction** (4-5 sentences, max 150 Chinese characters) + - Focus on: identity, occupation, location, and other basic demographic information + - Provide factual background about who the user is + +2. **Personality Traits** (2-3 sentences, max 80 Chinese characters) + - Focus on: personality characteristics, behavioral habits, communication style + - Describe observable patterns in how the user interacts and behaves + +3. **Core Values** (1-2 sentences, max 50 Chinese characters) + - Focus on: values, beliefs, goals, and aspirations + - Capture what matters most to the user and what drives their decisions + +4. **One-Sentence Summary** (1 sentence, max 40 Chinese characters) + - Provide a highly condensed characterization of the user's core traits + - Similar to a personal tagline or motto that captures their essence + + +===Output Format (MUST STRICTLY FOLLOW)=== + +【基本介绍】 +[4-5 sentences describing the user's basic identity, occupation, and location] + +【性格特点】 +[2-3 sentences describing the user's personality traits, behavioral habits, and communication style] + +【核心价值观】 +[1-2 sentences describing the user's values, beliefs, and goals] + +【一句话总结】 +[1 sentence providing a highly condensed summary of the user's core characteristics] + + +===Example=== + +Example Input: +- User ID: user_12345 +- Core Entities & Frequency: 产品经理 (15), AI (12), 深圳 (10), 数据分析 (8), 团队协作 (7) +- Representative Statement Samples: 我在深圳从事产品经理工作已经5年了 | 我相信好的产品源于对用户需求的深刻理解 | 我喜欢在团队中起到协调作用 | 数据驱动决策是我的工作原则 + +Example Output: +【基本介绍】 +我是张三,一名充满热情的高级产品经理。在过去的5年里,我专注于AI和数据驱动的产品设计,致力于创造能够真正改善用户生活的产品。我相信好的产品源于对用户需求的深刻理解和对技术可能性的不断探索。 + +【性格特点】 +性格开朗,善于沟通,注重细节。喜欢在团队中起到协调作用,帮助大家达成共识。面对挑战时保持乐观,相信每个问题都有解决方案。 + +【核心价值观】 +用户至上、数据驱动、持续学习、团队协作 + +【一句话总结】 +"让每一个产品决策都充满温度。" + +===End of Example=== + + +===Reflection Process=== + +After generating the profile, perform the following self-review steps: + +**Step 1: Data Grounding Check** +- Verify all statements are supported by the provided entities and statements +- Ensure no fabricated or speculated information is included +- Confirm all claims can be traced back to the input data + +**Step 2: Format Compliance** +- Verify each section follows the specified format with section headers +- Check character count limits for each section +- Ensure proper use of section markers (【】) + +**Step 3: Tone and Style Review** +- Confirm objective third-person perspective is maintained +- Check for excessive adjectives or empty phrases +- Verify neutral and restrained tone throughout + +**Step 4: Completeness Check** +- Ensure all four sections are present and complete +- Verify each section addresses its specific focus area +- Confirm the one-sentence summary effectively captures the user's essence + + +===Output Requirements=== + +**LANGUAGE REQUIREMENT:** +- The output language should ALWAYS be Chinese (Simplified) +- All section content must be in Chinese +- Section headers must use the specified Chinese format: 【基本介绍】【性格特点】【核心价值观】【一句话总结】 + +**FORMAT REQUIREMENT:** +- Each section must start with its header on a new line +- Content follows immediately after the header +- Sections are separated by blank lines +- Strictly adhere to character limits for each section diff --git a/api/app/models/end_user_model.py b/api/app/models/end_user_model.py index 1e1ce4f3..aa76e03c 100644 --- a/api/app/models/end_user_model.py +++ b/api/app/models/end_user_model.py @@ -23,14 +23,22 @@ class EndUser(Base): department = Column(String, nullable=True, comment="部门") contact = Column(String, nullable=True, comment="联系方式") phone = Column(String, nullable=True, comment="电话") - hire_date = Column(BigInteger, nullable=True, comment="入职日期(时间戳,毫秒)") - updatetime_profile = Column(BigInteger, nullable=True, comment="核心档案信息最后更新时间(时间戳,毫秒)") + hire_date = Column(DateTime, nullable=True, comment="入职日期") + updatetime_profile = Column(DateTime, nullable=True, comment="核心档案信息最后更新时间") - # 缓存字段 - Cache fields for pre-computed analytics - memory_insight = Column(Text, nullable=True, comment="缓存的记忆洞察报告") - user_summary = Column(Text, nullable=True, comment="缓存的用户摘要") - memory_insight_updated_at = Column(DateTime, nullable=True, comment="洞察报告最后更新时间") + # 用户摘要四个维度 - User Summary Four Dimensions + user_summary = Column(Text, nullable=True, comment="缓存的用户摘要(基本介绍)") + personality_traits = Column(Text, nullable=True, comment="性格特点") + core_values = Column(Text, nullable=True, comment="核心价值观") + one_sentence_summary = Column(Text, nullable=True, comment="一句话总结") user_summary_updated_at = Column(DateTime, nullable=True, comment="用户摘要最后更新时间") + + # 记忆洞察四个维度 - Memory Insight Four Dimensions + memory_insight = Column(Text, nullable=True, comment="缓存的记忆洞察报告(总体概述)") + behavior_pattern = Column(Text, nullable=True, comment="行为模式") + key_findings = Column(Text, nullable=True, comment="关键发现") + growth_trajectory = Column(Text, nullable=True, comment="成长轨迹") + memory_insight_updated_at = Column(DateTime, nullable=True, comment="洞察报告最后更新时间") # 与 App 的反向关系 app = relationship( diff --git a/api/app/repositories/end_user_repository.py b/api/app/repositories/end_user_repository.py index 69932101..b9e82693 100644 --- a/api/app/repositories/end_user_repository.py +++ b/api/app/repositories/end_user_repository.py @@ -123,13 +123,19 @@ class EndUserRepository: def update_memory_insight( self, end_user_id: uuid.UUID, - insight: str + memory_insight: str, + behavior_pattern: str, + key_findings: str, + growth_trajectory: str ) -> bool: - """更新记忆洞察缓存 + """更新记忆洞察缓存(四个维度) Args: end_user_id: 终端用户ID - insight: 记忆洞察内容 + memory_insight: 总体概述 + behavior_pattern: 行为模式 + key_findings: 关键发现 + growth_trajectory: 成长轨迹 Returns: bool: 更新成功返回True,否则返回False @@ -140,7 +146,10 @@ class EndUserRepository: .filter(EndUser.id == end_user_id) .update( { - EndUser.memory_insight: insight, + EndUser.memory_insight: memory_insight, # 总体概述存储在 memory_insight + EndUser.behavior_pattern: behavior_pattern, + EndUser.key_findings: key_findings, + EndUser.growth_trajectory: growth_trajectory, EndUser.memory_insight_updated_at: datetime.datetime.now() }, synchronize_session=False @@ -150,7 +159,7 @@ class EndUserRepository: self.db.commit() if updated_count > 0: - db_logger.info(f"成功更新终端用户 {end_user_id} 的记忆洞察缓存") + db_logger.info(f"成功更新终端用户 {end_user_id} 的记忆洞察缓存(四维度)") return True else: db_logger.warning(f"未找到终端用户 {end_user_id},无法更新记忆洞察缓存") @@ -164,13 +173,19 @@ class EndUserRepository: def update_user_summary( self, end_user_id: uuid.UUID, - summary: str + user_summary: str, + personality: str, + core_values: str, + one_sentence: str ) -> bool: - """更新用户摘要缓存 + """更新用户摘要缓存(四个部分) Args: end_user_id: 终端用户ID - summary: 用户摘要内容 + user_summary: 基本介绍 + personality: 性格特点 + core_values: 核心价值观 + one_sentence: 一句话总结 Returns: bool: 更新成功返回True,否则返回False @@ -181,7 +196,10 @@ class EndUserRepository: .filter(EndUser.id == end_user_id) .update( { - EndUser.user_summary: summary, + EndUser.user_summary: user_summary, # 基本介绍存储在 user_summary + EndUser.personality_traits: personality, + EndUser.core_values: core_values, + EndUser.one_sentence_summary: one_sentence, EndUser.user_summary_updated_at: datetime.datetime.now() }, synchronize_session=False @@ -191,7 +209,7 @@ class EndUserRepository: self.db.commit() if updated_count > 0: - db_logger.info(f"成功更新终端用户 {end_user_id} 的用户摘要缓存") + db_logger.info(f"成功更新终端用户 {end_user_id} 的用户摘要缓存(四部分)") return True else: db_logger.warning(f"未找到终端用户 {end_user_id},无法更新用户摘要缓存") @@ -300,15 +318,29 @@ def get_by_id(db: Session, end_user_id: uuid.UUID) -> Optional[EndUser]: repo = EndUserRepository(db) return repo.get_by_id(end_user_id) -def update_memory_insight(db: Session, end_user_id: uuid.UUID, insight: str) -> bool: - """更新记忆洞察缓存""" +def update_memory_insight( + db: Session, + end_user_id: uuid.UUID, + memory_insight: str, + behavior_pattern: str, + key_findings: str, + growth_trajectory: str +) -> bool: + """更新记忆洞察缓存(四个维度)""" repo = EndUserRepository(db) - return repo.update_memory_insight(end_user_id, insight) + return repo.update_memory_insight(end_user_id, memory_insight, behavior_pattern, key_findings, growth_trajectory) -def update_user_summary(db: Session, end_user_id: uuid.UUID, summary: str) -> bool: - """更新用户摘要缓存""" +def update_user_summary( + db: Session, + end_user_id: uuid.UUID, + user_summary: str, + personality: str, + core_values: str, + one_sentence: str +) -> bool: + """更新用户摘要缓存(四个部分)""" repo = EndUserRepository(db) - return repo.update_user_summary(end_user_id, summary) + return repo.update_user_summary(end_user_id, user_summary, personality, core_values, one_sentence) def get_all_by_workspace(db: Session, workspace_id: uuid.UUID) -> List[EndUser]: """获取工作空间的所有终端用户""" diff --git a/api/app/schemas/end_user_schema.py b/api/app/schemas/end_user_schema.py index 07188096..c9f9146d 100644 --- a/api/app/schemas/end_user_schema.py +++ b/api/app/schemas/end_user_schema.py @@ -22,8 +22,12 @@ class EndUser(BaseModel): department: Optional[str] = Field(description="部门", default=None) contact: Optional[str] = Field(description="联系方式", default=None) phone: Optional[str] = Field(description="电话", default=None) - hire_date: Optional[int] = Field(description="入职日期(时间戳,毫秒)", default=None) - updatetime_profile: Optional[int] = Field(description="核心档案信息最后更新时间(时间戳,毫秒)", default=None) + hire_date: Optional[datetime.datetime] = Field(description="入职日期", default=None) + updatetime_profile: Optional[datetime.datetime] = Field(description="核心档案信息最后更新时间", default=None) + + # 用户摘要和洞察更新时间 + user_summary_updated_at: Optional[datetime.datetime] = Field(description="用户摘要最后更新时间", default=None) + memory_insight_updated_at: Optional[datetime.datetime] = Field(description="洞察报告最后更新时间", default=None) class EndUserProfileResponse(BaseModel): @@ -36,8 +40,8 @@ class EndUserProfileResponse(BaseModel): department: Optional[str] = Field(description="部门", default=None) contact: Optional[str] = Field(description="联系方式", default=None) phone: Optional[str] = Field(description="电话", default=None) - hire_date: Optional[int] = Field(description="入职日期(时间戳,毫秒)", default=None) - updatetime_profile: Optional[int] = Field(description="核心档案信息最后更新时间(时间戳,毫秒)", default=None) + hire_date: Optional[datetime.datetime] = Field(description="入职日期", default=None) + updatetime_profile: Optional[datetime.datetime] = Field(description="核心档案信息最后更新时间", default=None) class EndUserProfileUpdate(BaseModel): diff --git a/api/app/services/user_memory_service.py b/api/app/services/user_memory_service.py index 6a444f27..bf0375fb 100644 --- a/api/app/services/user_memory_service.py +++ b/api/app/services/user_memory_service.py @@ -4,20 +4,25 @@ User Memory Service 处理用户记忆相关的业务逻辑,包括记忆洞察、用户摘要、节点统计和图数据等。 """ +import os import uuid -from typing import Any, Dict, List, Optional +from collections import Counter +from dataclasses import dataclass +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple from app.core.logging_config import get_logger -from app.core.memory.analytics.memory_insight import MemoryInsight -from app.core.memory.analytics.user_summary import generate_user_summary +from app.core.memory.utils.llm.llm_utils import MemoryClientFactory +from app.db import get_db_context from app.repositories.end_user_repository import EndUserRepository from app.repositories.neo4j.neo4j_connector import Neo4jConnector +from app.services.memory_config_service import MemoryConfigService +from pydantic import BaseModel, Field from sqlalchemy.orm import Session logger = get_logger(__name__) -# Neo4j connector instance -_neo4j_connector = Neo4jConnector() +# Neo4j connector instan class UserMemoryService: @@ -26,13 +31,42 @@ class UserMemoryService: def __init__(self): logger.info("UserMemoryService initialized") + @staticmethod + def _datetime_to_timestamp(dt: Optional[Any]) -> Optional[int]: + """将 DateTime 对象转换为时间戳(毫秒)""" + if dt is None: + return None + if hasattr(dt, 'timestamp'): + return int(dt.timestamp() * 1000) + return None + + @staticmethod + def convert_profile_to_dict_with_timestamp(profile_data: Any) -> dict: + """ + 将 Pydantic 模型转换为字典,自动转换所有 DateTime 字段为时间戳(毫秒) + + Args: + profile_data: Pydantic 模型对象 + + Returns: + 包含时间戳的字典 + """ + data = profile_data.model_dump() + # 自动转换所有 datetime 类型的字段 + for key, value in data.items(): + if hasattr(profile_data, key): + original_value = getattr(profile_data, key) + if hasattr(original_value, 'timestamp'): + data[key] = UserMemoryService._datetime_to_timestamp(original_value) + return data + async def get_cached_memory_insight( self, db: Session, end_user_id: str ) -> Dict[str, Any]: """ - 从数据库获取缓存的记忆洞察 + 从数据库获取缓存的记忆洞察(四个维度) Args: db: 数据库会话 @@ -40,8 +74,11 @@ class UserMemoryService: Returns: { - "report": str, - "updated_at": datetime, + "memory_insight": str, # 总体概述 + "behavior_pattern": str, # 行为模式 + "key_findings": List[str], # 关键发现(数组) + "growth_trajectory": str, # 成长轨迹 + "updated_at": int, # 时间戳(毫秒) "is_cached": bool } """ @@ -54,24 +91,52 @@ class UserMemoryService: if not end_user: logger.warning(f"未找到 end_user_id 为 {end_user_id} 的用户") return { - "report": None, + "memory_insight": None, + "behavior_pattern": None, + "key_findings": None, + "growth_trajectory": None, "updated_at": None, "is_cached": False, "message": "用户不存在" } - # 检查是否有缓存数据 - if end_user.memory_insight: - logger.info(f"成功获取 end_user_id {end_user_id} 的缓存记忆洞察") + # 检查是否有缓存数据(至少有一个字段不为空) + has_cache = any([ + end_user.memory_insight, + end_user.behavior_pattern, + end_user.key_findings, + end_user.growth_trajectory + ]) + + if has_cache: + # 反序列化 key_findings(从 JSON 字符串转为数组) + key_findings_value = end_user.key_findings + if key_findings_value: + try: + import json + key_findings_array = json.loads(key_findings_value) + except (json.JSONDecodeError, TypeError): + # 如果解析失败,尝试按 • 分割(兼容旧数据) + key_findings_array = [item.strip() for item in key_findings_value.split('•') if item.strip()] + else: + key_findings_array = [] + + logger.info(f"成功获取 end_user_id {end_user_id} 的缓存记忆洞察(四维度)") return { - "report": end_user.memory_insight, - "updated_at": end_user.memory_insight_updated_at, + "memory_insight": end_user.memory_insight, # 总体概述存储在 memory_insight + "behavior_pattern": end_user.behavior_pattern, + "key_findings": key_findings_array, # 返回数组 + "growth_trajectory": end_user.growth_trajectory, + "updated_at": self._datetime_to_timestamp(end_user.memory_insight_updated_at), "is_cached": True } else: logger.info(f"end_user_id {end_user_id} 的记忆洞察缓存为空") return { - "report": None, + "memory_insight": None, + "behavior_pattern": None, + "key_findings": None, + "growth_trajectory": None, "updated_at": None, "is_cached": False, "message": "数据尚未生成,请稍后重试或联系管理员" @@ -80,7 +145,10 @@ class UserMemoryService: except ValueError: logger.error(f"无效的 end_user_id 格式: {end_user_id}") return { - "report": None, + "memory_insight": None, + "behavior_pattern": None, + "key_findings": None, + "growth_trajectory": None, "updated_at": None, "is_cached": False, "message": "无效的用户ID格式" @@ -95,7 +163,7 @@ class UserMemoryService: end_user_id: str ) -> Dict[str, Any]: """ - 从数据库获取缓存的用户摘要 + 从数据库获取缓存的用户摘要(四个部分) Args: db: 数据库会话 @@ -103,7 +171,10 @@ class UserMemoryService: Returns: { - "summary": str, + "user_summary": str, + "personality": str, + "core_values": str, + "one_sentence": str, "updated_at": datetime, "is_cached": bool } @@ -117,24 +188,40 @@ class UserMemoryService: if not end_user: logger.warning(f"未找到 end_user_id 为 {end_user_id} 的用户") return { - "summary": None, + "user_summary": None, + "personality": None, + "core_values": None, + "one_sentence": None, "updated_at": None, "is_cached": False, "message": "用户不存在" } - # 检查是否有缓存数据 - if end_user.user_summary: + # 检查是否有缓存数据(至少有一个字段不为空) + has_cache = any([ + end_user.user_summary, + end_user.personality_traits, + end_user.core_values, + end_user.one_sentence_summary + ]) + + if has_cache: logger.info(f"成功获取 end_user_id {end_user_id} 的缓存用户摘要") return { - "summary": end_user.user_summary, - "updated_at": end_user.user_summary_updated_at, + "user_summary": end_user.user_summary, + "personality": end_user.personality_traits, + "core_values": end_user.core_values, + "one_sentence": end_user.one_sentence_summary, + "updated_at": self._datetime_to_timestamp(end_user.user_summary_updated_at), "is_cached": True } else: logger.info(f"end_user_id {end_user_id} 的用户摘要缓存为空") return { - "summary": None, + "user_summary": None, + "personality": None, + "core_values": None, + "one_sentence": None, "updated_at": None, "is_cached": False, "message": "数据尚未生成,请稍后重试或联系管理员" @@ -143,7 +230,10 @@ class UserMemoryService: except ValueError: logger.error(f"无效的 end_user_id 格式: {end_user_id}") return { - "summary": None, + "user_summary": None, + "personality": None, + "core_values": None, + "one_sentence": None, "updated_at": None, "is_cached": False, "message": "无效的用户ID格式" @@ -151,7 +241,8 @@ class UserMemoryService: except Exception as e: logger.error(f"获取缓存用户摘要时出错: {str(e)}") raise - + +# for user async def generate_and_cache_insight( self, db: Session, @@ -169,7 +260,10 @@ class UserMemoryService: Returns: { "success": bool, - "report": str, + "memory_insight": str, + "behavior_pattern": str, + "key_findings": List[str], # 数组格式 + "growth_trajectory": str, "error": Optional[str] } """ @@ -185,7 +279,10 @@ class UserMemoryService: logger.error(f"end_user_id {end_user_id} 不存在") return { "success": False, - "report": None, + "memory_insight": None, + "behavior_pattern": None, + "key_findings": None, + "growth_trajectory": None, "error": "用户不存在" } @@ -193,31 +290,55 @@ class UserMemoryService: try: logger.info(f"使用 end_user_id={end_user_id} 生成记忆洞察") result = await analytics_memory_insight_report(end_user_id) - report = result.get("report", "") - if not report: + memory_insight = result.get("memory_insight", "") + behavior_pattern = result.get("behavior_pattern", "") + key_findings_array = result.get("key_findings", []) # 现在是数组 + growth_trajectory = result.get("growth_trajectory", "") + + # 将 key_findings 数组序列化为 JSON 字符串以存储到数据库 + import json + key_findings_json = json.dumps(key_findings_array, ensure_ascii=False) if key_findings_array else "" + + if not any([memory_insight, behavior_pattern, key_findings_array, growth_trajectory]): logger.warning(f"end_user_id {end_user_id} 的记忆洞察生成结果为空") return { "success": False, - "report": None, + "memory_insight": None, + "behavior_pattern": None, + "key_findings": None, + "growth_trajectory": None, "error": "生成的洞察报告为空,可能Neo4j中没有该用户的数据" } - # 更新数据库缓存 - success = repo.update_memory_insight(user_uuid, report) + # 更新数据库缓存(四个维度) + # 注意:key_findings 存储为 JSON 字符串 + success = repo.update_memory_insight( + user_uuid, + memory_insight, + behavior_pattern, + key_findings_json, # 存储 JSON 字符串 + growth_trajectory + ) if success: - logger.info(f"成功为 end_user_id {end_user_id} 生成并缓存记忆洞察") + logger.info(f"成功为 end_user_id {end_user_id} 生成并缓存记忆洞察(四维度)") return { "success": True, - "report": report, + "memory_insight": memory_insight, + "behavior_pattern": behavior_pattern, + "key_findings": key_findings_array, # 返回数组 + "growth_trajectory": growth_trajectory, "error": None } else: logger.error(f"更新 end_user_id {end_user_id} 的记忆洞察缓存失败") return { "success": False, - "report": report, + "memory_insight": memory_insight, + "behavior_pattern": behavior_pattern, + "key_findings": key_findings_array, # 返回数组 + "growth_trajectory": growth_trajectory, "error": "数据库更新失败" } @@ -225,7 +346,10 @@ class UserMemoryService: logger.error(f"调用分析函数生成记忆洞察时出错: {str(e)}") return { "success": False, - "report": None, + "memory_insight": None, + "behavior_pattern": None, + "key_findings": None, + "growth_trajectory": None, "error": f"Neo4j或LLM服务不可用: {str(e)}" } @@ -233,14 +357,20 @@ class UserMemoryService: logger.error(f"无效的 end_user_id 格式: {end_user_id}") return { "success": False, - "report": None, + "memory_insight": None, + "behavior_pattern": None, + "key_findings": None, + "growth_trajectory": None, "error": "无效的用户ID格式" } except Exception as e: logger.error(f"生成并缓存记忆洞察时出错: {str(e)}") return { "success": False, - "report": None, + "memory_insight": None, + "behavior_pattern": None, + "key_findings": None, + "growth_trajectory": None, "error": str(e) } @@ -251,7 +381,7 @@ class UserMemoryService: workspace_id: Optional[uuid.UUID] = None ) -> Dict[str, Any]: """ - 生成并缓存用户摘要 + 生成并缓存用户摘要(四个部分) Args: db: 数据库会话 @@ -261,7 +391,10 @@ class UserMemoryService: Returns: { "success": bool, - "summary": str, + "user_summary": str, + "personality": str, + "core_values": str, + "one_sentence": str, "error": Optional[str] } """ @@ -277,38 +410,61 @@ class UserMemoryService: logger.error(f"end_user_id {end_user_id} 不存在") return { "success": False, - "summary": None, + "user_summary": None, + "personality": None, + "core_values": None, + "one_sentence": None, "error": "用户不存在" } # 使用 end_user_id 调用分析函数 try: logger.info(f"使用 end_user_id={end_user_id} 生成用户摘要") - summary = await generate_user_summary(end_user_id) + result = await analytics_user_summary(end_user_id) - if not summary: + user_summary = result.get("user_summary", "") + personality = result.get("personality", "") + core_values = result.get("core_values", "") + one_sentence = result.get("one_sentence", "") + + if not any([user_summary, personality, core_values, one_sentence]): logger.warning(f"end_user_id {end_user_id} 的用户摘要生成结果为空") return { "success": False, - "summary": None, + "user_summary": None, + "personality": None, + "core_values": None, + "one_sentence": None, "error": "生成的用户摘要为空,可能Neo4j中没有该用户的数据" } # 更新数据库缓存 - success = repo.update_user_summary(user_uuid, summary) + success = repo.update_user_summary( + user_uuid, + user_summary, + personality, + core_values, + one_sentence + ) if success: logger.info(f"成功为 end_user_id {end_user_id} 生成并缓存用户摘要") return { "success": True, - "summary": summary, + "user_summary": user_summary, + "personality": personality, + "core_values": core_values, + "one_sentence": one_sentence, "error": None } else: logger.error(f"更新 end_user_id {end_user_id} 的用户摘要缓存失败") return { "success": False, - "summary": summary, + "user_summary": user_summary, + "personality": personality, + "core_values": core_values, + "one_sentence": one_sentence, "error": "数据库更新失败" } @@ -316,7 +472,10 @@ class UserMemoryService: logger.error(f"调用分析函数生成用户摘要时出错: {str(e)}") return { "success": False, - "summary": None, + "user_summary": None, + "personality": None, + "core_values": None, + "one_sentence": None, "error": f"Neo4j或LLM服务不可用: {str(e)}" } @@ -324,17 +483,24 @@ class UserMemoryService: logger.error(f"无效的 end_user_id 格式: {end_user_id}") return { "success": False, - "summary": None, + "user_summary": None, + "personality": None, + "core_values": None, + "one_sentence": None, "error": "无效的用户ID格式" } except Exception as e: logger.error(f"生成并缓存用户摘要时出错: {str(e)}") return { "success": False, - "summary": None, + "user_summary": None, + "personality": None, + "core_values": None, + "one_sentence": None, "error": str(e) } - + +# for workspace async def generate_cache_for_workspace( self, db: Session, @@ -432,34 +598,212 @@ class UserMemoryService: async def analytics_memory_insight_report(end_user_id: Optional[str] = None) -> Dict[str, Any]: """ - 生成记忆洞察报告 + 生成记忆洞察报告(四个维度) + + 这个函数包含完整的业务逻辑: + 1. 使用 MemoryInsight 工具类获取基础数据(领域分布、活跃时段、社交关联) + 2. 使用 Jinja2 模板渲染提示词 + 3. 调用 LLM 生成四个维度的自然语言报告 + 4. 解析并返回四个部分 Args: end_user_id: 可选的终端用户ID Returns: - 包含报告的字典 + 包含四个维度报告的字典: { + "memory_insight": str, # 总体概述 + "behavior_pattern": str, # 行为模式 + "key_findings": List[str], # 关键发现(数组) + "growth_trajectory": str # 成长轨迹 + } """ + from app.core.memory.utils.prompt.prompt_utils import render_memory_insight_prompt + import re + insight = MemoryInsight(end_user_id) - report = await insight.generate_insight_report() - await insight.close() - data = {"report": report} - return data + + try: + # 1. 并行获取三个维度的数据 + import asyncio + domain_dist, active_periods, social_conn = await asyncio.gather( + insight.get_domain_distribution(), + insight.get_active_periods(), + insight.get_social_connections(), + ) + + # 2. 构建数据字符串 + domain_distribution_str = None + if domain_dist: + top_domains = ", ".join([f"{k}({v:.0%})" for k, v in list(domain_dist.items())[:3]]) + domain_distribution_str = f"用户的记忆主要集中在 {top_domains}" + + active_periods_str = None + if active_periods: + months_str = " 和 ".join(map(str, active_periods)) + active_periods_str = f"用户在每年的 {months_str} 月最为活跃" + + social_connections_str = None + if social_conn: + social_connections_str = f"与用户\"{social_conn['user_id']}\"拥有最多共同记忆({social_conn['common_memories_count']}条),时间范围主要在 {social_conn['time_range']}" + + # 3. 如果没有足够数据,返回默认消息 + if not any([domain_distribution_str, active_periods_str, social_connections_str]): + return { + "memory_insight": "暂无足够数据生成洞察报告。", + "behavior_pattern": "", + "key_findings": "", + "growth_trajectory": "" + } + + # 4. 使用 Jinja2 模板渲染提示词 + user_prompt = await render_memory_insight_prompt( + domain_distribution=domain_distribution_str, + active_periods=active_periods_str, + social_connections=social_connections_str + ) + + messages = [ + {"role": "user", "content": user_prompt} + ] + + # 5. 调用 LLM 生成报告 + response = await insight.llm_client.chat(messages=messages) + + # 6. 处理 LLM 响应,确保返回字符串类型 + content = response.content + if isinstance(content, list): + if len(content) > 0: + if isinstance(content[0], dict): + text = content[0].get('text', content[0].get('content', str(content[0]))) + full_response = str(text) + else: + full_response = str(content[0]) + else: + full_response = "" + elif isinstance(content, dict): + full_response = str(content.get('text', content.get('content', str(content)))) + else: + full_response = str(content) if content is not None else "" + + # 7. 解析四个部分 + # 使用正则表达式提取四个部分 + memory_insight_match = re.search(r'【总体概述】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL) + behavior_match = re.search(r'【行为模式】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL) + findings_match = re.search(r'【关键发现】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL) + trajectory_match = re.search(r'【成长轨迹】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL) + + memory_insight = memory_insight_match.group(1).strip() if memory_insight_match else "" + behavior_pattern = behavior_match.group(1).strip() if behavior_match else "" + key_findings_text = findings_match.group(1).strip() if findings_match else "" + growth_trajectory = trajectory_match.group(1).strip() if trajectory_match else "" + + # 将 key_findings 从文本转换为数组 + # 按 • 符号分割,并清理每个条目 + key_findings_array = [] + if key_findings_text: + # 分割并清理每个条目 + items = [item.strip() for item in key_findings_text.split('•') if item.strip()] + key_findings_array = items + + return { + "memory_insight": memory_insight, + "behavior_pattern": behavior_pattern, + "key_findings": key_findings_array, # 返回数组而不是字符串 + "growth_trajectory": growth_trajectory + } + + finally: + # 确保关闭连接 + await insight.close() async def analytics_user_summary(end_user_id: Optional[str] = None) -> Dict[str, Any]: """ - 生成用户摘要 + 生成用户摘要(包含四个部分) + + 这个函数包含完整的业务逻辑: + 1. 使用 UserSummary 工具类获取基础数据(实体、语句) + 2. 使用 prompt_utils 渲染提示词 + 3. 调用 LLM 生成四部分内容:基本介绍、性格特点、核心价值观、一句话总结 Args: end_user_id: 可选的终端用户ID Returns: - 包含摘要的字典 + 包含四部分摘要的字典: { + "user_summary": str, + "personality": str, + "core_values": str, + "one_sentence": str + } """ - summary = await generate_user_summary(end_user_id) - data = {"summary": summary} - return data + from app.core.memory.analytics.user_summary import UserSummary + from app.core.memory.utils.prompt.prompt_utils import render_user_summary_prompt + import re + + # 创建 UserSummary 实例 + user_summary_tool = UserSummary(end_user_id or os.getenv("SELECTED_GROUP_ID", "group_123")) + + try: + # 1) 收集上下文数据 + entities = await user_summary_tool._get_top_entities(limit=40) + statements = await user_summary_tool._get_recent_statements(limit=100) + + entity_lines = [f"{name} ({freq})" for name, freq in entities][:20] + statement_samples = [s.statement.strip() for s in statements if (s.statement or '').strip()][:20] + + # 2) 使用 prompt_utils 渲染提示词 + user_prompt = await render_user_summary_prompt( + user_id=user_summary_tool.user_id, + entities=", ".join(entity_lines) if entity_lines else "(空)", + statements=" | ".join(statement_samples) if statement_samples else "(空)" + ) + + messages = [ + {"role": "user", "content": user_prompt}, + ] + + # 3) 调用 LLM 生成摘要 + response = await user_summary_tool.llm.chat(messages=messages) + + # 4) 处理 LLM 响应,确保返回字符串类型 + content = response.content + if isinstance(content, list): + if len(content) > 0: + if isinstance(content[0], dict): + text = content[0].get('text', content[0].get('content', str(content[0]))) + full_response = str(text) + else: + full_response = str(content[0]) + else: + full_response = "" + elif isinstance(content, dict): + full_response = str(content.get('text', content.get('content', str(content)))) + else: + full_response = str(content) if content is not None else "" + + # 5) 解析四个部分 + # 使用正则表达式提取四个部分 + user_summary_match = re.search(r'【基本介绍】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL) + personality_match = re.search(r'【性格特点】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL) + core_values_match = re.search(r'【核心价值观】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL) + one_sentence_match = re.search(r'【一句话总结】\s*\n(.*?)(?=\n【|$)', full_response, re.DOTALL) + + user_summary = user_summary_match.group(1).strip() if user_summary_match else "" + personality = personality_match.group(1).strip() if personality_match else "" + core_values = core_values_match.group(1).strip() if core_values_match else "" + one_sentence = one_sentence_match.group(1).strip() if one_sentence_match else "" + + return { + "user_summary": user_summary, + "personality": personality, + "core_values": core_values, + "one_sentence": one_sentence + } + + finally: + # 确保关闭连接 + await user_summary_tool.close() async def analytics_node_statistics(