文章目录
系列文章
【AI的未来 - AI Agent系列】【MetaGPT】0. 你的第一个MetaGPT程序
【AI的未来 - AI Agent系列】【MetaGPT】1. AI Agent如何重构世界
【AI的未来 - AI Agent系列】【MetaGPT】2. 实现自己的第一个Agent
本文主要内容
以《MetaGPT智能体开发入门》课程的 Task4 为例,利用MetaGPT从0开始实现一个自己的订阅智能体,定时收集网页信息,将信息汇总然后自动发送到微信和邮箱。
根据上面教程的介绍,你已经学会了如何针对具体场景开发一个实用的资讯收集助手;现在,你可以试着完成一个能订阅自己感兴趣的资讯的Agent:
- 根据前面你所学习的爬虫基本知识(如果你对写爬虫代码感到不熟练,使用GPT帮助你),为你的Agent自定义两个获取资讯的Action类 - Action 1:根据第四章 3.2.1和3.2.2的指引,独立实现对Github Trending(https://github.com/trending)页面的爬取,并获取每一个项目的 名称、URL链接、描述- Action 2:独立完成对Huggingface Papers(https://huggingface.co/papers)页面的爬取,先获取到每一篇Paper的链接(提示:标题元素中的href标签),并通过链接访问标题的描述页面(例如:https://huggingface.co/papers/2312.03818),在页面中获取一篇Paper的 标题、摘要
- 参考第三章 1.4 的内容,重写有关方法,使你的Agent能自动生成总结内容的目录,然后根据二级标题进行分块,每块内容做出对应的总结,形成一篇资讯文档;
- 自定义Agent的SubscriptionRunner类,独立实现Trigger、Callback的功能,让你的Agent定时为通知渠道发送以上总结的资讯文档(尝试实现邮箱发送的功能,这是加分项)
这节课其实就是告诉我们一个订阅智能体的主要组成部分,然后练习怎样用MetaGPT串起来。
- 订阅智能体的主要组成部分: - 定时器:定时触发订阅任务- 订阅信息的获取和总结:爬虫 + 大模型- callback:经过总结的信息通过回调,给第三方发消息(微信、邮箱等)
Task4 - 任务一:独立实现对Github Trending页面的爬取,并获取每一个项目的 名称、URL链接、描述
这个跟着教程一步步走就行,过程中我没有遇到太大的问题,主要问题是import的包不够…
完整代码及注释
- 先看执行结果,微信上收到推送
- 完整代码及细节注释
# 加载 .env 到环境变量from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())import asyncio
from metagpt.subscription import SubscriptionRunner
from metagpt.roles import Searcher
from metagpt.schema import Message
import aiohttp
from bs4 import BeautifulSoup
from metagpt.actions.action import Action
from metagpt.logs import logger
# 1. Action1,爬虫,爬取 GitHub Trending 页面classCrawlOSSTrending(Action):"""
爬取 GitHub Trending 页面
"""asyncdefrun(self, url:str="https://github.com/trending"):asyncwith aiohttp.ClientSession()as client:# async with client.get(url, proxy=CONFIG.global_proxy) as response:asyncwith client.get(url)as response:# 1.1 我这里直接就能访问github,所以不需要设置代理
response.raise_for_status()
html =await response.text()# logger.debug(html)
soup = BeautifulSoup(html,'html.parser')
repositories =[]## 1.2 解析Html页面元素,获取我们想要的数据:项目名称、url、描述、编程语言、star数量、fork数量for article in soup.select('article.Box-row'):
repo_info ={}
repo_info['name']= article.select_one('h2 a').text.strip().replace("\n","").replace(" ","")
repo_info['url']="https://github.com"+ article.select_one('h2 a')['href'].strip()# Description
description_element = article.select_one('p')
repo_info['description']= description_element.text.strip()if description_element elseNone# Language
language_element = article.select_one('span[itemprop="programmingLanguage"]')
repo_info['language']= language_element.text.strip()if language_element elseNone# Stars and Forks
stars_element = article.select('a.Link--muted')[0]
forks_element = article.select('a.Link--muted')[1]
repo_info['stars']= stars_element.text.strip()
repo_info['forks']= forks_element.text.strip()# Today's Stars
today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
repo_info['today_stars']= today_stars_element.text.strip()if today_stars_element elseNone
repositories.append(repo_info)return repositories
from typing import Any
from metagpt.actions.action import Action
TRENDING_ANALYSIS_PROMPT ="""# Requirements
You are a GitHub Trending Analyst, aiming to provide users with insightful and personalized recommendations based on the latest
GitHub Trends. Based on the context, fill in the following missing information, generate engaging and informative titles,
ensuring users discover repositories aligned with their interests.
# The title about Today's GitHub Trending
## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
---
# Format Example
# [Title]
## Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.
## The Trends Categories
1. Generative AI
- [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
- [Project2](https://github/xx/project2): ...
...
## Highlights of the List
1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
...
---
# Github Trending
{trending}
"""## 2. 总结爬取到的信息,其实就是 prompt + 爬取到的信息,丢给大模型然后获取结果classAnalysisOSSTrending(Action):asyncdefrun(
self,
trending: Any
):returnawait self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))from metagpt.roles import Role
## 3. 定义role,初始化上面的两个action,按顺序执行classOssWatcher(Role):def__init__(
self,
name="Codey",
profile="OssWatcher",
goal="Generate an insightful GitHub Trending analysis report.",
constraints="Only analyze based on the provided GitHub Trending data.",):super().__init__(name, profile, goal, constraints)
self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])## 3.1 初始化两个action
self._set_react_mode(react_mode="by_order")## 3.2 按顺序执行asyncdef_act(self)-> Message:
logger.info(f"{self._setting}: ready to {self._rc.todo}")
todo = self._rc.todo
msg = self.get_memories(k=1)[0]# 3.4 获取最新的一条memory,爬完后的数据存在里面供分析action使用
result =await todo.run(msg.content)
msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
self._rc.memory.add(msg)# 3.3 第一个action执行完,把结果存入memoryreturn msg
##============================ 以上为智能体定义 =================================================from pydantic import BaseModel, Field
import time
# TriggerclassOssInfo(BaseModel):
url:str
timestamp:float= Field(default_factory=time.time)from typing import Optional
from pytz import BaseTzInfo
from aiocron import crontab
## 4. 定时器的实现,用crontab库实现classGithubTrendingCronTrigger():def__init__(self, spec:str, tz: Optional[BaseTzInfo]=None, url:str="https://github.com/trending")->None:
self.crontab = crontab(spec, tz=tz)## 4.1 这里将定时策略传入crontab
self.url = url
def__aiter__(self):return self
asyncdef__anext__(self):## 4.3 __anext__ 方法会在每次迭代中被调用await self.crontab.next()## 4.2 等待 crontab 下一次触发,不到定时时间,这里阻塞,不会执行后续代码return Message(self.url, OssInfo(url=self.url))##======================== 以上为定时器定义 =================================================import os
# 5. wechat订阅消息callback,这里使用的是wxpusher实现的# WxPusherClient的功能是给指定用户推送消息classWxPusherClient:def__init__(self, token: Optional[str]=None, base_url:str="http://wxpusher.zjiecode.com"):
self.base_url = base_url
self.token = token or os.environ["WXPUSHER_TOKEN"]# 5.1 从环境变量中获取token,所以你需要在环境变量中配置WXPUSHER_TOKEN或在配置文件中设置WXPUSHER_TOKENasyncdefsend_message(
self,
content,
summary: Optional[str]=None,
content_type:int=1,
topic_ids: Optional[list[int]]=None,
uids: Optional[list[int]]=None,
verify:bool=False,
url: Optional[str]=None,):
payload ={"appToken": self.token,"content": content,"summary": summary,"contentType": content_type,"topicIds": topic_ids or[],# 5.2 从环境变量中获取uids,所以你需要在环境变量中配置WXPUSHER_UIDS# uids是你想推送给哪个微信,必须是关注了你这个订阅号的微信才可以知道uid"uids": uids or os.environ["WXPUSHER_UIDS"].split(","),"verifyPay": verify,"url": url,}
url =f"{self.base_url}/api/send/message"returnawait self._request("POST", url, json=payload)asyncdef_request(self, method, url,**kwargs):asyncwith aiohttp.ClientSession()as session:asyncwith session.request(method, url,**kwargs)as response:
response.raise_for_status()returnawait response.json()# 5.3 微信callback wrapper,使用WxPusherClient给指定微信推送消息asyncdefwxpusher_callback(msg: Message):
client = WxPusherClient()await client.send_message(msg.content, content_type=3)##======================== 以上为回调和微信推送定义 =========================================# 6. 运行入口,这里为了测试方便,定时执行策略为当前时间+1分钟from datetime import datetime, timedelta
current_time = datetime.now()## 6.1 获取当前时间
target_time = current_time + timedelta(minutes=1)## 6.2 目标时间,当前时间+1分钟
cron_expression = target_time.strftime('%M %H %d %m %w')
spec = cron_expression
logger.info(f"cron expression: {spec}")asyncdefmain(spec:str= spec, wxpusher:bool=True):
callbacks =[]
callbacks.append(wxpusher_callback)asyncdefcallback(msg):await asyncio.gather(*(call(msg)for call in callbacks))# 6.3 遍历所有回调函数,触发回调,分发消息
runner = SubscriptionRunner()await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)# 订阅智能体,本例中所有的工作都由它组织起来await runner.run()if __name__ =="__main__":import fire
fire.Fire(main)
- 这里只实现了微信推送,没有去做Discord。用微信跑通一遍流程,也算完整了。
- 爬虫不太熟悉,这里使用的教程中的代码,但是自己也试着用GPT自己写了爬虫代码,后面可以看到。
- 定时器不熟悉、网络请求aiohttp也没用过… 但是通过代码完全可以会用,照葫芦画瓢还是很简单的。后面分拆开来具体学。不是本课程重点。
Task4 - 任务二:独立完成对Huggingface Papers页面的爬取
因为Hugging face我访问不到,需要出海,所以我这里换了个爬取页面,爬的是 https://github.com/topics,github的topic页面,然后通过热门topic进入相应topic链接获取简介,如下图。
代码及注释
这里就不贴完整代码了,定时器和callback完全相同,主要修改的是爬虫Action
......## 1. 爬虫ActionclassCrawlOSSTrending(Action):## 1.2 进入相应topic详细页面,获取topic的具体介绍asyncdef_get_brief(self, url):asyncwith aiohttp.ClientSession()as client:asyncwith client.get(url)as response:
response.raise_for_status()
html =await response.text()
soup = BeautifulSoup(html,'html.parser')
topic_list = soup.find_all('div', class_='markdown-body f5 mb-2')for topic_item in topic_list:
topic_intro_tag = topic_item.find('p')
topic_intro = topic_intro_tag.text.strip()if topic_intro_tag else'No introduction available.'return topic_intro
## 1.1 爬取topics页面asyncdefrun(self, url:str="https://github.com/topics"):asyncwith aiohttp.ClientSession()as client:asyncwith client.get(url)as response:
response.raise_for_status()
html =await response.text()
soup = BeautifulSoup(html,'html.parser')
repositories =[]
topic_list = soup.find_all('li', class_='col-12 col-sm-6 col-md-4 mb-4')for topic_item in topic_list:
repo_info ={}
topic_link = topic_item.find('a', class_='no-underline')['href']## 1.1.1 解析出topics的名称
repo_info['name']= topic_item.find('p', class_='f3').text.strip()## 1.1.2 解析出topics的url
repo_info['url']=f'https://github.com{topic_link}'## 1.1.3 通过_get_brief进入详细页面获取介绍
repo_info['description']=await self._get_brief(repo_info['url'])
repositories.append(repo_info)return repositories
......
- 运行结果展示
Task4 - 任务三:形成一篇资讯文档
这个任务其实没看懂什么意思,根据自己的理解实现了一种。
我认为这个任务其实不是练习什么新东西,而是学会怎样将现有的知识串起来,数据流打通。例如我下面的实现步骤。
- 爬取 github trending 信息
- 总结 github trending 信息,前面的任务到这里结束,给callback了
- 本任务将 github trending 总结信息给到 WriteDirectory 去生成目录
- 生成的目录给到WriteContent Action,根据目录写内容
代码及注释
- 先看运行结果 - 微信推送
- 先看运行结果 - Markdown文件
- 代码及细节注释
# 加载 .env 到环境变量from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())import asyncio
from metagpt.subscription import SubscriptionRunner
from metagpt.roles import Searcher
from metagpt.schema import Message
import aiohttp
from bs4 import BeautifulSoup
from metagpt.actions.action import Action
from metagpt.logs import logger
from datetime import datetime
from typing import Dict
from metagpt.actions.write_tutorial import WriteDirectory, WriteContent
from metagpt.const import TUTORIAL_PATH
from metagpt.roles import Role
from metagpt.utils.fileimport File
import fire
import time
from metagpt.prompts.tutorial_assistant import DIRECTORY_PROMPT, CONTENT_PROMPT
from metagpt.utils.common import OutputParser
## 1.写目录的类,根据总结的GitHub Trending趋势,抽取出一二级标题classWriteDirectory(Action):"""Action class for writing tutorial directories."""def__init__(self, name:str="", language:str="Chinese",*args,**kwargs):super().__init__(name,*args,**kwargs)
self.language = language
asyncdefrun(self, topic:str,*args,**kwargs)-> Dict:"""Execute the action to generate a tutorial directory according to the topic.
Args:
topic: The tutorial topic.
Returns:
the tutorial directory information, including {"title": "xxx", "directory": [{"dir 1": ["sub dir 1", "sub dir 2"]}]}.
"""
COMMON_PROMPT ="""
你需要在以下topic中抽取出标题内容,包含一级标题和二级标题。the topic "{topic}".
注意:一级标题以"##"开头,二级标题以数字序号加"."开头。
"""
DIRECTORY_PROMPT = COMMON_PROMPT +"""
Please provide the specific table of contents for this tutorial, strictly following the following requirements:
1. The output must be strictly in the specified language, {language}.
2. Answer strictly in the dictionary format like {{"title": "xxx", "directory": [{{"dir 1": ["sub dir 1", "sub dir 2"]}}, {{"dir 2": ["sub dir 3", "sub dir 4"]}}]}}.
3. The directory should be as specific and sufficient as possible, with a primary and secondary directory.The secondary directory is in the array.
4. Do not have extra spaces or line breaks.
5. Each directory title has practical significance.
"""
prompt = DIRECTORY_PROMPT.format(topic=topic, language=self.language)
resp =await self._aask(prompt=prompt)return OutputParser.extract_struct(resp,dict)## 2. 根据目录标题,写新闻稿classWriteContent(Action):"""Action class for writing tutorial content."""def__init__(self, name:str="", directory:str="", language:str="Chinese",*args,**kwargs):super().__init__(name,*args,**kwargs)
self.language = language
self.directory = directory
logger.info(f"init writeContent, {directory}")asyncdefrun(self, topic:str,*args,**kwargs)->str:"""Execute the action to write document content according to the directory and topic."""
COMMON_PROMPT ="""
你是一个github技术专家的老手了,现在你需要写一篇关于github每日热门项目趋势的新闻,新闻标题是"{topic}".
"""
CONTENT_PROMPT = COMMON_PROMPT +"""
现在我将给你这个主题的模块目录标题。
请根据这个标题,写不少于500字的新闻稿。
The module directory titles for the topic is as follows:
{directory}
Strictly limit output according to the following requirements:
1. Follow the Markdown syntax format for layout.
2. The output must be strictly in the specified language, {language}.
3. Do not have redundant output, including concluding remarks.
4. Strict requirement not to output the topic "{topic}".
"""
prompt = CONTENT_PROMPT.format(
topic=topic, language=self.language, directory=self.directory)returnawait self._aask(prompt=prompt)## 3. 爬虫,爬取trending信息classCrawlOSSTrending(Action):asyncdefrun(self, url:str="https://github.com/trending"):...... 与前面的代码完全相同
return repositories
......与前面的代码完全相同TRENDING_ANALYSIS_PROMPT
## 3. 总结Trending的ActionclassAnalysisOSSTrending(Action):asyncdefrun(
self,
trending: Any
):
result =await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))return result
from metagpt.roles import Role
## 4. 订阅智能体RoleclassOssWatcher(Role):def__init__(
self,
name="Codey",
profile="OssWatcher",
goal="Generate an insightful GitHub Trending analysis report.",
constraints="Only analyze based on the provided GitHub Trending data.",):super().__init__(name, profile, goal, constraints)
self._init_actions([CrawlOSSTrending, AnalysisOSSTrending, WriteDirectory(language="Chinese")])## 4.1 本任务的重点,在总结完trending之后,将内容直接给WriteDirectory去抽取目录# self._set_react_mode(react_mode="by_order") ## 4.2 这里与前面不一样,一定要注意不要写这一句!设置了这一句,会执行到 WriteDirectory就结束,不会执行后面动态添加的WriteContent Action,应该与执行顺序有关,后续看下源码
self.topic =""
self.main_title =""
self.total_content =""
self.language ="Chinese"## 4.3 这里执行trendding爬虫和总结的actionasyncdef_act1(self)-> Message:
logger.info(f"{self._setting}: ready to {self._rc.todo}")
todo = self._rc.todo
msg = self.get_memories(k=1)[0]
result =await todo.run(msg.content)
msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
self._rc.memory.add(msg)## 4.4 思考1:这里将爬虫结果填入memory,下面又返回了memory,返回的memory在MetaGPT源码中不会再次添加进Memory了?什么时候需要手动添加,什么时候不用手动添加?return msg
asyncdef_think(self)->None:"""Determine the next action to be taken by the role."""
logger.info(self._rc.state)...... 与之前代码一样
asyncdef_handle_directory(self, titles: Dict)-> Message:"""Handle the directories for the tutorial document."""
self.main_title = titles.get("title")
directory =f"{self.main_title}\n"
self.total_content +=f"# {self.main_title}"
actions =list()for first_dir in titles.get("directory"):
key =list(first_dir.keys())[0]
directory +=f"- {key}\n"for second_dir in first_dir[key]:
directory +=f" - {second_dir}\n"
actions.append(WriteContent(
language=self.language, directory=second_dir))## 4.5 根据目录,动态添加WriteContent Action
self._init_actions(actions)
self._rc.todo =Nonereturn Message(content=directory)asyncdef_act(self)-> Message:"""Perform an action as determined by the role."""
todo = self._rc.todo
iftype(todo)is CrawlOSSTrending ortype(todo)is AnalysisOSSTrending:returnawait self._act1()## 4.6 trending相关的执行这个act
time.sleep(20)## 4.7 避免OpenAI的请求速率和tokend限制,每次调用前先歇个20s......不是办法的办法iftype(todo)is WriteDirectory:
msg = self._rc.memory.get(k=1)[0]## 4.8 这里拿的是AnalysisOSSTrending的结果,一大堆文字
self.topic = msg.content
resp =await todo.run(topic=self.topic)returnawait self._handle_directory(resp)
resp =await todo.run(topic="Github今日热门项目")## 4.9 这里传入的是一个固定题目,WriteContent是根据这个题目和具体的目录写内容if self.total_content !="":
self.total_content +="\n\n\n"
self.total_content += resp
return Message(content=resp, role=self.profile)## 4.10 思考2:你看,这里就直接返回了msg,没像思考1那里一样先手动添加msg :self._rc.memory.add(msg),需要弄清楚asyncdef_react(self)-> Message:"""Execute the assistant's think and actions."""whileTrue:await self._think()if self._rc.todo isNone:break
msg =await self._act()## 4.11 写文件
root_path = TUTORIAL_PATH / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")await File.write(root_path,f"{self.main_title}.md", self.total_content.encode('utf-8'))return msg
......定时器和callback,以及运行入口的代码都不变,与前面的任务一样
可能存在的问题和思考
- 我没有去详细验证生成的这些项目简介和项目特点是否完全准确,只是答题看着还可以。要详细验证,或保证结果的完全正确性,需要更精细的控制手段。
- 推送信息只推送最后一段,没有全文推送,这可能是与callback的msg内容只获取了最后一段有关,有空再调一下
- 有的目录没有生成内容,触发了大模型的限制?怎么解决?有待研究
- 自己写的Prompt很烂,勉强能用…
- Python代码水平比较烂,都是想到哪写到哪,读者见谅…
Task4 - 任务四:实现邮箱发送的功能
实现邮箱发送功能,也就是实现一个邮箱的callback,callback里调用邮箱接口发送信息。
代码及注释
- 先看运行结果
- 代码及细节注释
...... Action和role,以及定时器的代码不变
import email.utils
from metagpt.schema import Message
import smtplib
from email.mime.text import MIMEText
classEmailClient:def__init__(self):passasyncdefsend_email(self, sender_email, receiver_email, subject, message):# 设置发件人和收件人的邮箱地址
sender = sender_email
receiver = receiver_email
# 设置邮件内容
msg = MIMEText(message,'plain','utf-8')
msg['Subject']= subject
msg['From']= email.utils.formataddr(('同学小张', sender))
msg['To']= receiver_email
# 连接到SMTP服务器
smtp_server ='smtp.qq.com'
smtp_port =25
smtp_username ='[email protected]'
smtp_password ='xxxxxxxxx'# 此处密码是登录QQ邮箱后开启STMP处生成的,非账号密码try:
server = smtplib.SMTP(smtp_server, smtp_port)
server.login(smtp_username, smtp_password)
server.sendmail(sender, receiver, msg.as_string())
server.quit()print("邮件发送成功!")except Exception as e:print("邮件发送失败:",str(e))asyncdefemail_pusher_callback(msg: Message):
client = EmailClient()# 调用函数发送邮件
sender_email ='[email protected]'# 发送邮箱
receiver_email ='[email protected]'# 接收邮箱
subject ='今日GitHub Trending分析'
message = msg.content
await client.send_email(sender_email, receiver_email, subject, message)# asyncio.run(email_pusher_callback(Message("test"))) # 邮箱接口测试# 运行入口,from datetime import datetime, timedelta
current_time = datetime.now()
target_time = current_time + timedelta(minutes=1)
cron_expression = target_time.strftime('%M %H %d %m %w')
spec = cron_expression
logger.info(f"cron expression: {spec}")asyncdefmain(spec:str= spec, wxpusher:bool=True):
callbacks =[]
callbacks.append(wxpusher_callback)# 微信callback
callbacks.append(email_pusher_callback)# 邮箱callbackifnot callbacks:asyncdef_print(msg: Message):print(msg.content)
callbacks.append(_print)asyncdefcallback(msg):await asyncio.gather(*(call(msg)for call in callbacks))
runner = SubscriptionRunner()await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)await runner.run()if __name__ =="__main__":import fire
fire.Fire(main)
先写到这,总体来说,订阅智能体的任务是跑通了,对一些概念和MetaGPT的用法有了认知。后面有时间再详细拆解每一步的实现和细节,以及过程中遇到的坑及解决方法。还有爬虫的代码如何用GPT写,微信推送如何做出来的,邮箱推送如何做出来的,定时器的简单探索等,这些本课用到的相关能力也要进一步拆解和学习总结。
版权归原作者 同学小张 所有, 如有侵权,请联系我们删除。