文章目录
0. 前置推荐阅读
- 订阅智能体实战- 【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件
- ActionNode基础与实战- 【AI的未来 - AI Agent系列】【MetaGPT】4. ActionNode从理论到实战- 【AI的未来 - AI Agent系列】【MetaGPT】4.1 细说我在ActionNode实战中踩的那些坑
1. 本文内容
跟着《MetaGPT智能体开发入门》教程,对上次实现的订阅智能体进行优化和进阶,也用上ActionNode模块。
上次实现订阅智能体的文章可参考 【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件 。在上次这篇文章中我们虽然完成了订阅智能体的功能,但是这个智能体的作用是被局限在某一特定领域的,当我们需要订阅另外一个数据源的分析结果时,我们需要手动再写一个Role,这个智能体不太通用。
如何实现一个更通用的订阅智能体?实现思路如下:
(1)解析用户指令(分析用户需求,我们期望用户用一句自然语言下指令即可得到想要的)
(2)爬取网页
(3)让大模型写从html中提取用户需要的数据的代码
(4)将爬取网页和大模型写的提取用户需要数据的代码结合,得到爬取指定网页信息的Action
(5)将提取后的数据,让大模型进行处理和分析,得到网页数据分析的Action
(6)将这两个Action组合,就可以得到一个特定网页的Watcher Role
下面我们一步一步来实现。
2. 解析用户指令(分析用户需求)
写爬虫代码是需要先知道要爬取的网页URL、网页数据提取需求是什么,最简单的方式就是用户自己填写,但是这就让我们的入参变得复杂。所以,我们首先实现一个解析用户指令的代码,用户只需用一句自然语言输入自己的需求,代码自动解析出网页URL、网页数据提取需求等,即:将用户的输入结构化。
2.1 完整代码及注释
# 加载 .env 到环境变量# from dotenv import load_dotenv, find_dotenv# _ = load_dotenv(find_dotenv())from metagpt.actions.action_node import ActionNode
from metagpt.actions.action import Action
import asyncio
## 分析用户的要求语言
LANGUAGE = ActionNode(
key="language",
expected_type=str,
instruction="Provide the language used in the project, typically matching the user's requirement language.",
example="en_us",)## 分析用户的订阅推送时间
CRON_EXPRESSION = ActionNode(
key="Cron Expression",
expected_type=str,
instruction="If the user requires scheduled triggering, please provide the corresponding 5-field cron expression. ""Otherwise, leave it blank.",
example="",)## 分析用户订阅的网址URL,可以是列表
CRAWLER_URL_LIST = ActionNode(
key="Crawler URL List",
expected_type=list[str],
instruction="List the URLs user want to crawl. Leave it blank if not provided in the User Requirement.",
example=["https://example1.com","https://example2.com"],)## 分析用户所需要的网站数据
PAGE_CONTENT_EXTRACTION = ActionNode(
key="Page Content Extraction",
expected_type=str,
instruction="Specify the requirements and tips to extract from the crawled web pages based on User Requirement.",
example="Retrieve the titles and content of articles published today.",)## 分析用户所需要的汇总数据的方式
CRAWL_POST_PROCESSING = ActionNode(
key="Crawl Post Processing",
expected_type=str,
instruction="Specify the processing to be applied to the crawled content, such as summarizing today's news.",
example="Generate a summary of today's news articles.",)## 补充说明,如果url或定时器解析为空,则提示用户补充
INFORMATION_SUPPLEMENT = ActionNode(
key="Information Supplement",
expected_type=str,
instruction="If unable to obtain the Cron Expression, prompt the user to provide the time to receive subscription ""messages. If unable to obtain the URL List Crawler, prompt the user to provide the URLs they want to crawl. Keep it ""blank if everything is clear",
example="",)
NODES =[
LANGUAGE,
CRON_EXPRESSION,
CRAWLER_URL_LIST,
PAGE_CONTENT_EXTRACTION,
CRAWL_POST_PROCESSING,
INFORMATION_SUPPLEMENT,]
PARSE_SUB_REQUIREMENTS_NODE = ActionNode.from_children("ParseSubscriptionReq", NODES)## 解析用户的需求的Action
PARSE_SUB_REQUIREMENT_TEMPLATE ="""
### User Requirement
{requirements}
"""classParseSubRequirement(Action):asyncdefrun(self, requirements):
requirements ="\n".join(i.content for i in requirements)
context = PARSE_SUB_REQUIREMENT_TEMPLATE.format(requirements=requirements)
node =await PARSE_SUB_REQUIREMENTS_NODE.fill(context=context, llm=self.llm)return node
if __name__ =="__main__":from metagpt.schema import Message
asyncio.run(ParseSubRequirement().run([Message("从36kr创投平台https://pitchhub.36kr.com/financing-flash 爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在晚上七点半送给我")]))
2.2 运行结果
3. 利用大模型写爬虫代码
3.1 对html内容进行精简
一般html元素太多,所以调用大模型时token数非常大,甚至一次对话都无法接收一个完整的html,我们可以先对网页内容做一下简化,因为对元素进行定位一般用css selector就够了,所以我们可以主要提供html的class属性信息,另外可以将html转成css表达式和对应的内容提供给llm,从而减少token的消耗。
defget_outline(page):
soup = _get_soup(page.html)
outline =[]defprocess_element(element, depth):
name = element.name
ifnot name:returnif name in["script","style"]:return
element_info ={"name": element.name,"depth": depth}if name in["svg"]:
element_info["text"]=None
outline.append(element_info)return
element_info["text"]= element.string
# Check if the element has an "id" attributeif"id"in element.attrs:
element_info["id"]= element["id"]if"class"in element.attrs:
element_info["class"]= element["class"]
outline.append(element_info)for child in element.children:
process_element(child, depth +1)for element in soup.body.children:
process_element(element,1)return outline
3.2 利用大模型写爬虫代码
PROMPT_TEMPLATE ="""Please complete the web page crawler parse function to achieve the User Requirement. The parse \
function should take a BeautifulSoup object as input, which corresponds to the HTML outline provided in the Context.
```python
from bs4 import BeautifulSoup
# only complete the parse function
def parse(soup: BeautifulSoup):
...
# Return the object that the user wants to retrieve, don't use print
## User Requirement
{requirement}
## Context
The outline of html page to scrabe is show like below:
```tree
{outline}
"""classWriteCrawlerCode(Action):asyncdefrun(self, requirement):
requirement: Message = requirement[-1]## 上一步解析完成的用户需求结果
data = requirement.instruct_content.dict()
urls = data["Crawler URL List"]## 获取用户想要的网页URL
query = data["Page Content Extraction"]## 获取用户想关注的网页数据
codes ={}for url in urls:
codes[url]=await self._write_code(url, query)## 每个URL的爬虫程序return"\n".join(f"# {url}\n{code}"for url, code in codes.items())asyncdef_write_code(self, url, query):
page =await WebBrowserEngine().run(url)
outline = get_outline(page)
outline ="\n".join(f"{' '*i['depth']}{'.'.join([i['name'],*i.get('class',[])])}: {i['text']if i['text']else''}"for i in outline
)
code_rsp =await self._aask(PROMPT_TEMPLATE.format(outline=outline, requirement=query))
code = CodeParser.parse_code(block="", text=code_rsp)return code
3.3 补充代码,测试本节程序
- 引入所需包
import asyncio
from metagpt.actions.action import Action
from metagpt.schema import Message
from metagpt.tools.web_browser_engine import WebBrowserEngine
from metagpt.utils.common import CodeParser
from metagpt.utils.parse_html import _get_soup
- main运行函数
if __name__ =="__main__":from metagpt.actions.action_node import ActionNode
cls = ActionNode.create_model_class("ActionModel",{"Cron Expression":(str,...),"Crawler URL List":(list[str],...),"Page Content Extraction":(str,...),"Crawl Post Processing":(str,...),})
data ={"Cron Expression":"0 30 19 * * *","Crawler URL List":["https://pitchhub.36kr.com/financing-flash"],"Page Content Extraction":"从36kr创投平台爬取所有初创企业融资的信息,获取标题,链接, 时间。","Crawl Post Processing":"总结今天的融资新闻。",}
asyncio.run(WriteCrawlerCode().run([Message(instruct_content=cls(**data))]))
3.4 运行结果及踩坑
3.4.1 运行结果
3.4.2 坑一:No module named ‘playwright’
- 解决方法:
pip install playwright
4. 爬虫工程师角色定义:CrawlerEngineer
# 定义爬虫工程师角色from metagpt.roles import Role
classCrawlerEngineer(Role):
name:str="同学小张的专属爬虫工程师"
profile:str="Crawling Engineer"
goal:str="Write elegant, readable, extensible, efficient code"
constraints:str="The code should conform to standards like PEP8 and be modular and maintainable"def__init__(self,**kwargs)->None:super().__init__(**kwargs)
self._init_actions([WriteCrawlerCode])
self._watch([ParseSubRequirement])## 触发?
爬虫工程师角色的代码中,初始化了一个写爬虫代码的Action,为WriteCrawlerCode。那这个角色什么时候触发运行呢?答案在这一句:
self._watch([ParseSubRequirement])
,当解析完用户需求之后,会触发该角色的运行,触发写代码的动作。
5. 订阅助手角色定义:SubscriptionAssistant
classSubscriptionAssistant(Role):"""Analyze user subscription requirements."""
name:str="同学小张的订阅助手"
profile:str="Subscription Assistant"
goal:str="analyze user subscription requirements to provide personalized subscription services."
constraints:str="utilize the same language as the User Requirement"def__init__(self,**kwargs)->None:super().__init__(**kwargs)
self._init_actions([ParseSubRequirement, RunSubscription])## 2. 先解析用户需求,然后运行订阅
self._watch([UserRequirement, WriteCrawlerCode])## 触发?asyncdef_think(self)->bool:
cause_by = self.rc.history[-1].cause_by
if cause_by == any_to_str(UserRequirement):
state =0elif cause_by == any_to_str(WriteCrawlerCode):
state =1if self.rc.state == state:
self.rc.todo =NonereturnFalse
self._set_state(state)returnTrue
订阅助手角色定义的代码中,初始化了两个Action,一个是解析用户需求,一个是RunSubscription,运行智能体。
看下它是怎么运作的:
(1)
self._watch([UserRequirement, WriteCrawlerCode])
它观察了这两个输入,用户输入和写完爬虫代码。
(2)_think函数
当上一次的信息是来自用户输入,则运行ParseSubRequirement,如果上一次输入是写爬虫代码的Action,则运行RunSubscription。
6. 运行订阅智能体的Action:RunSubscription
6.1 总结信息的Action
有了用户需求,有了爬虫代码,还缺的就是将爬下来的数据总结成用户需求中想要的内容。下面的SubAction就是干这事儿的(类比之前订阅智能体中的
AnalysisOSSTrending
):
data是从所有Url网页中爬下来的数据,process是之前解析的用户对数据的处理需求。
SUB_ACTION_TEMPLATE ="""
## Requirements
Answer the question based on the provided context {process}. If the question cannot be answered, please summarize the context.
## context
{data}"
"""classSubAction(Action):asyncdefrun(self,*args,**kwargs):
pages =await WebBrowserEngine().run(*urls)iflen(urls)==1:
pages =[pages]
data =[]for url, page inzip(urls, pages):
data.append(getattr(modules[url],"parse")(page.soup))## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结returnawait self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))
6.2 运行订阅智能体的Action
# 运行订阅智能体的ActionclassRunSubscription(Action):asyncdefrun(self, msgs):from metagpt.roles.role import Role
from metagpt.subscription import SubscriptionRunner
code = msgs[-1].content ## 4. 获取爬虫代码
req = msgs[-2].instruct_content.dict()## 5. 获取用户需求
urls = req["Crawler URL List"]
process = req["Crawl Post Processing"]
spec = req["Cron Expression"]
SubAction = self.create_sub_action_cls(urls, code, process)## 6. 创建一个Action,urls网页链接、code爬虫代码、process用户需求的数据
SubRole =type("SubRole",(Role,),{})## 7. 定时触发的Role
role = SubRole()
role.init_actions([SubAction])
runner = SubscriptionRunner()asyncdefcallback(msg):print(msg)await runner.subscribe(role, CronTrigger(spec), callback)await runner.run()@staticmethoddefcreate_sub_action_cls(urls:list[str], code:str, process:str):
modules ={}for url in urls[::-1]:
code, current = code.rsplit(f"# {url}", maxsplit=1)
name = uuid4().hex
module =type(sys)(name)exec(current, module.__dict__)
modules[url]= module
classSubAction(Action):asyncdefrun(self,*args,**kwargs):
pages =await WebBrowserEngine().run(*urls)iflen(urls)==1:
pages =[pages]
data =[]for url, page inzip(urls, pages):
data.append(getattr(modules[url],"parse")(page.soup))## 8. 根据用户的数据需求,和爬取的网页数据,让大模型总结returnawait self.llm.aask(SUB_ACTION_TEMPLATE.format(process=process, data=data))return SubAction
7. 定时器代码和callback代码
这部分代码就可以直接复用原来订阅智能体的代码了,通用的。
7.1 定时器
classCronTrigger:def__init__(self, spec:str, tz: Optional[BaseTzInfo]=None)->None:
self.crontab = crontab(spec, tz=tz)def__aiter__(self):return self
asyncdef__anext__(self):await self.crontab.next()return Message()
7.2 callback
classWxPusherClient:def__init__(self, token: Optional[str]=None, base_url:str="http://wxpusher.zjiecode.com"):
self.base_url = base_url
self.token = token or os.environ["WXPUSHER_TOKEN"]# 5.1 从环境变量中获取token,所以你需要在环境变量中配置WXPUSHER_TOKEN或在配置文件中设置WXPUSHER_TOKENasyncdefsend_message(
self,
content,
summary: Optional[str]=None,
content_type:int=1,
topic_ids: Optional[list[int]]=None,
uids: Optional[list[int]]=None,
verify:bool=False,
url: Optional[str]=None,):
payload ={"appToken": self.token,"content": content,"summary": summary,"contentType": content_type,"topicIds": topic_ids or[],# 5.2 从环境变量中获取uids,所以你需要在环境变量中配置WXPUSHER_UIDS# uids是你想推送给哪个微信,必须是关注了你这个订阅号的微信才可以知道uid"uids": uids or os.environ["WXPUSHER_UIDS"].split(","),"verifyPay": verify,"url": url,}
url =f"{self.base_url}/api/send/message"returnawait self._request("POST", url, json=payload)asyncdef_request(self, method, url,**kwargs):asyncwith aiohttp.ClientSession()as session:asyncwith session.request(method, url,**kwargs)as response:
response.raise_for_status()returnawait response.json()# 5.3 微信callback wrapper,使用WxPusherClient给指定微信推送消息asyncdefwxpusher_callback(msg: Message):
client = WxPusherClient()await client.send_message(msg.content, content_type=3)
7.3 集成callback
# 运行订阅智能体的ActionclassRunSubscription(Action):asyncdefrun(self, msgs):
······ 与前文代码一样
callbacks =[]
callbacks.append(wxpusher_callback)asyncdefcallback(msg):print(msg)await asyncio.gather(*(call(msg)for call in callbacks))# 遍历所有回调函数,触发回调,分发消息
······ 与前文代码一样
8. 整体运行
创建了一个Team,其中有SubscriptionAssistant角色和CrawlerEngineer角色。
用户输入信息,然后
run_project
,SubscriptionAssistant首先观察到用户输入信息,触发ParseSubRequirement动作。
if __name__ =="__main__":import asyncio
from metagpt.team import Team
team = Team()
team.hire([SubscriptionAssistant(), CrawlerEngineer()])
team.run_project("从36kr创投平台https://pitchhub.36kr.com/financing-flash爬取所有初创企业融资的信息,获取标题,链接, 时间,总结今天的融资新闻,然后在早上10:56送给我")
asyncio.run(team.run())
运行结果
9. 可能有的坑
9.1 更新MetaGPT源码后,运行报错:No module named ‘zhipuai.types’
- 解决方法
pip install zhipuai
本文完。按照教程跑通了订阅智能体进阶的流程。
这好像是教程中唯一的一个多智能体的例子。所以里面涉及了多智能体之间的交互方式,懵懵懂懂,需要深入了解下(_watch、_observe等)。咱们后面的文章细讲。
版权归原作者 同学小张 所有, 如有侵权,请联系我们删除。