引言
博客背景:个人metagpt智能体开发第四章学习笔记
本文将使用的技术和方法:
- Python 工具
- 第三方公众号作为消息推送的功能,如server酱、wxpusher、Pushplus等,本文选择wxpusher,并获取uid和token
- 爬虫相关概念,不会的可以问chatgpt
正文
oss订阅智能体
1.OSSWatcher Role 实现
在实现OSSWatcher的Role之前,首先需要明确我们希望OSSWatcher执行哪些任务,即需要实现哪些Action。考虑到我们的目标是分析热门开源项目,因此需要先获取热门开源项目的信息。基于这一需求,我们可以将OSSWatcher拆分为两个Action:一是爬取热门开源项目,二是分析热门开源项目。
1.1 GitHub Trending爬取
- 功能:爬取当天不分国家语言和编程语言的热门仓库进行分析。
- 方法:直接爬取网页内容,可以加入筛选条件。 1.2 GitHub Trending总结
- 功能:分析爬取到的热门开源项目信息,并进行总结。
- 实现:LLM根据提示词进行分析,输出指定格式的结果。 1.3 OSSWatcher Role实现
- 方法:将以上两个Action写入
metagpt/actions/oss_trending.py
文件,并创建metagpt/roles/oss_watcher.py
文件编写Role代码。
实现代码:
# Role实现
class OssWatcher(Role):
name: str = "Codey",
profile: str = "OssWatcher",
goal: str = "Generate an insightful GitHub Trending analysis report.",
constraints: str = "Only analyze based on the provided GitHub Trending data.",
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
self._set_react_mode(react_mode="by_order")
async def _act(self) -> Message:
logger.info(f"{self._setting}: ready to {self._rc.todo}")
# By choosing the Action by order under the hood
# todo will be first SimpleWriteCode() then SimpleRunCode()
todo = self._rc.todo
msg = self.get_memories(k=1)[0] # find the most k recent messages
result = await todo.run(msg.content)
msg = Message(content=str(result))
# msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
self._rc.memory.add(msg)
return msg
Trigger实现
- 触发方式:最简单的是定时触发,常用的实现方式是使用crontab,也可使用Python异步库aiocron。
- 实现:使用函数方式和类方式结合aiocron实现定时Trigger。
实现代码:
# Trigger
class OssInfo(BaseModel):
url: str
timestamp: float = Field(default_factory=time.time)
class GithubTrendingCronTrigger():
def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
self.crontab = crontab(spec, tz=tz)
self.url = url
def __aiter__(self):
return self
async def __anext__(self):
await self.crontab.next()
logger.info(self.url)
logger.info(OssInfo(url=self.url))
logger.info(Message(self.url))
return Message(self.url)
# return Message(self.url, OssInfo(url=self.url))
Callback设计
- 功能:定义处理智能体生成的信息。
- 实现:发送信息到日常使用的应用,如weixin
其中我们使用的是wxpusher,需要WXPUSHER_TOKEN和WXPUSHER_UIDS
WXPUSHER_TOKEN即wxpush的APP_TOKEN,参考官方文档获取appToken
WXPUSHER_UIDS可以从应用管理页的”用户管理->用户列表“获取用户的UID,如果要发送给多个用户,可以用逗号将不同用户UID隔开
实现代码:
# callback
class WxPusherClient:
def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
self.base_url = base_url
self.token = token or WXPUSHER_TOKEN
# self.token = token or os.environ["WXPUSHER_TOKEN"]
async def send_message(
self,
content,
summary: Optional[str] = None,
content_type: int = 1,
topic_ids: Optional[list[int]] = None,
uids: Optional[list[int]] = None,
verify: bool = False,
url: Optional[str] = None,
):
payload = {
"appToken": self.token,
"content": content,
"summary": summary,
"contentType": content_type,
"topicIds": topic_ids or [],
"uids": uids or WXPUSHER_UIDS.split(","),
# "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
"verifyPay": verify,
"url": url,
}
url = f"{self.base_url}/api/send/message"
return await self._request("POST", url, json=payload)
async def _request(self, method, url, **kwargs):
async with aiohttp.ClientSession() as session:
async with session.request(method, url, **kwargs, ssl=False) as response:
response.raise_for_status()
return await response.json()
async def wxpusher_callback(msg: Message):
client = WxPusherClient()
await client.send_message(msg.content, content_type=3)
除此之外的必不可少一些小配置:
1.代理配置:由于GitHub为国外网站,可能会遇到网络问题,因为 aiohttp 默认不走系统代理,所以需要做下代理配置,可以通过在
config/key.yaml
配置文件中,添加自己代理服务器的配置,以解决网络问题:
GLOBAL_PROXY: http://127.0.0.1:8118 # 改成自己的代理服务器地址
通过「控制面板」——》「网络和Internet」——》「Internet选项」——》「连接」——》「局域网设置」,可以看到代理的地址和端口号;切记端口一定要复制成自己的!否则会显示计算机拒绝连接,不要粗心哦
2.设置自己想要的内容:可以修改下面代码中的内容实现自己感兴趣的方向与分析角度:
- 编程语言趋势:观察Trending列表中使用的编程语言,了解当前哪些编程语言在开发者社区中更受欢迎
- 项目类型和用途:分析Trending列表中的项目,看看它们是属于哪些类别,以及它们的具体用途是什么
- 社区活跃度:查看项目的星标数量、贡献者数量
- 新兴技术和工具:注意新项目和涌现的技术,以便了解当前的技术趋势
TRENDING_ANALYSIS_PROMPT = """# Requirements
您是GitHub趋势分析师,旨在根据最新的GitHub趋势为用户提供深入见解和个性化建议。根据上下文填写以下缺失信息,生成引人入胜且信息丰富的标题,确保用户发现与其兴趣相符的存储库,记得要中文。
# The title about Today's GitHub Trending
## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
---
# Format Example
[Title]
Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.
The Trends Categories
- Generative AI
Highlights of the List
- Project1: [provide specific reasons why this project is recommended].
...
```
Github Trending
{trending}
"""
#### 3.定时设置:由于作者想要看生成效果所以设置的每分钟发送订阅信息,后续想要保存这个推送机器人可以设置定时推送,方法也很简单、
方法一:可以修改
main
函数中的
spec
参数为适当的
cron
表达式,以便在每天的特定时间触发任务。
#main函数
async def main(spec: str = "0 9 * * *", discord: bool = False, wxpusher: bool = True):
...
方法二:
基于aiocron,
可以用cron语法非常灵活地配置定时规则
from pytz import timezone
beijing_tz = timezone('Asia/Shanghai') 获取北京时间的时区
cron_trigger = GithubTrendingCronTrigger("0 8 * * *", tz=beijing_tz)
### 最终实现效果:
![](https://img-blog.csdnimg.cn/direct/46f551e712194ae8aac1f5a0dda0164a.png)
![](https://img-blog.csdnimg.cn/direct/22e49b54ae5a4a819b51608cf31a4977.png)
### 完整代码:
import os
os.environ["ZHIPUAI_API_KEY"] = "此处填写你的"
from metagpt.environment import Environment
import asyncio
import os
import time
from typing import Any, AsyncGenerator, Awaitable, Callable, Optional
import aiohttp
from aiocron import crontab
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from pytz import BaseTzInfo
from metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message
WXPUSHER_TOKEN = "此处填写你的"
WXPUSHER_UIDS = "此处填写你的"
订阅模块,可以from metagpt.subscription import SubscriptionRunner导入,这里贴上代码供参考
class SubscriptionRunner(BaseModel):
"""A simple wrapper to manage subscription tasks for different roles using asyncio.
Example:
import asyncio
from metagpt.subscription import SubscriptionRunner
from metagpt.roles import Searcher
from metagpt.schema import Message
async def trigger():
while True:
yield Message("the latest news about OpenAI")
await asyncio.sleep(3600 * 24)
async def callback(msg: Message):
print(msg.content)
async def main():
pb = SubscriptionRunner()
await pb.subscribe(Searcher(), trigger(), callback)
await pb.run()
asyncio.run(main())
"""
tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)
class Config:
arbitrary_types_allowed = True
async def subscribe(
self,
role: Role,
trigger: AsyncGenerator[Message, None],
callback: Callable[
[
Message,
],
Awaitable[None],
],
):
"""Subscribes a role to a trigger and sets up a callback to be called with the role's response.
Args:
role: The role to subscribe.
trigger: An asynchronous generator that yields Messages to be processed by the role.
callback: An asynchronous function to be called with the response from the role.
"""
loop = asyncio.get_running_loop()
async def _start_role():
async for msg in trigger:
logger.info("===log===" * 3)
logger.info(msg)
logger.info(msg.content)
resp = await role.run(msg.content)
await callback(resp)
self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")
async def unsubscribe(self, role: Role):
"""Unsubscribes a role from its trigger and cancels the associated task.
Args:
role: The role to unsubscribe.
"""
task = self.tasks.pop(role)
task.cancel()
async def run(self, raise_exception: bool = True):
"""Runs all subscribed tasks and handles their completion or exception.
Args:
raise_exception: _description_. Defaults to True.
Raises:
task.exception: _description_
"""
while True:
for role, task in self.tasks.items():
if task.done():
if task.exception():
if raise_exception:
raise task.exception()
logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")
else:
logger.warning(
f"Task {task.get_name()} has completed. "
"If this is unexpected behavior, please check the trigger function."
)
self.tasks.pop(role)
break
else:
await asyncio.sleep(1)
Actions 的实现
TRENDING_ANALYSIS_PROMPT = """# Requirements
您是GitHub趋势分析师,旨在根据最新的GitHub趋势为用户提供深入见解和个性化建议。根据上下文填写以下缺失信息,生成引人入胜且信息丰富的标题,确保用户发现与其兴趣相符的存储库,记得要中文。
The title about Today's GitHub Trending
Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
Format Example
# [Title]
## Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.
## The Trends Categories
1. Generative AI
- [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
- [Project2](https://github/xx/project2): ...
...
## Highlights of the List
1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
...
Github Trending
{trending}
"""
class CrawlOSSTrending(Action):
async def run(self, url: str = "https://github.com/trending"):
async with aiohttp.ClientSession() as client:
async with client.get(url, proxy=CONFIG.global_proxy, ssl=False) as response:
response.raise_for_status()
html = await response.text()
soup = BeautifulSoup(html, 'html.parser')
repositories = []
for article in soup.select('article.Box-row'):
repo_info = {}
repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")
repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()
# Description
description_element = article.select_one('p')
repo_info['description'] = description_element.text.strip() if description_element else None
# Language
language_element = article.select_one('span[itemprop="programmingLanguage"]')
repo_info['language'] = language_element.text.strip() if language_element else None
# Stars and Forks
stars_element = article.select('a.Link--muted')[0]
forks_element = article.select('a.Link--muted')[1]
repo_info['stars'] = stars_element.text.strip()
repo_info['forks'] = forks_element.text.strip()
# Today's Stars
today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else None
repositories.append(repo_info)
return repositories
class AnalysisOSSTrending(Action):
async def run(
self,
trending: Any
):
return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))
Role实现
class OssWatcher(Role):
name: str = "Codey",
profile: str = "OssWatcher",
goal: str = "Generate an insightful GitHub Trending analysis report.",
constraints: str = "Only analyze based on the provided GitHub Trending data.",
def __init__(self, **kwargs) -> None:
super().__init__(**kwargs)
self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
self._set_react_mode(react_mode="by_order")
async def _act(self) -> Message:
logger.info(f"{self._setting}: ready to {self._rc.todo}")
# By choosing the Action by order under the hood
# todo will be first SimpleWriteCode() then SimpleRunCode()
todo = self._rc.todo
msg = self.get_memories(k=1)[0] # find the most k recent messages
result = await todo.run(msg.content)
msg = Message(content=str(result))
# msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
self._rc.memory.add(msg)
return msg
Trigger
class OssInfo(BaseModel):
url: str
timestamp: float = Field(default_factory=time.time)
class GithubTrendingCronTrigger():
def init(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
self.crontab = crontab(spec, tz=tz)
self.url = url
def __aiter__(self):
return self
async def __anext__(self):
await self.crontab.next()
logger.info(self.url)
logger.info(OssInfo(url=self.url))
logger.info(Message(self.url))
return Message(self.url)
# return Message(self.url, OssInfo(url=self.url))
callback
class WxPusherClient:
def init(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
self.base_url = base_url
self.token = token or WXPUSHER_TOKEN
# self.token = token or os.environ["WXPUSHER_TOKEN"]
async def send_message(
self,
content,
summary: Optional[str] = None,
content_type: int = 1,
topic_ids: Optional[list[int]] = None,
uids: Optional[list[int]] = None,
verify: bool = False,
url: Optional[str] = None,
):
payload = {
"appToken": self.token,
"content": content,
"summary": summary,
"contentType": content_type,
"topicIds": topic_ids or [],
"uids": uids or WXPUSHER_UIDS.split(","),
# "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
"verifyPay": verify,
"url": url,
}
url = f"{self.base_url}/api/send/message"
return await self._request("POST", url, json=payload)
async def _request(self, method, url, **kwargs):
async with aiohttp.ClientSession() as session:
async with session.request(method, url, **kwargs, ssl=False) as response:
response.raise_for_status()
return await response.json()
async def wxpusher_callback(msg: Message):
client = WxPusherClient()
await client.send_message(msg.content, content_type=3)
运行入口,
async def main(spec: str = "* * * * *", discord: bool = False, wxpusher: bool = True):
callbacks = []
if wxpusher:
callbacks.append(wxpusher_callback)
if not callbacks:
async def _print(msg: Message):
print(msg.content)
callbacks.append(_print)
async def callback(msg):
await asyncio.gather(*(call(msg) for call in callbacks))
runner = SubscriptionRunner()
await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)
await runner.run()
if name == "main":
import fire
fire.Fire(main)
```
版权归原作者 小楼一夜赏海棠 所有, 如有侵权,请联系我们删除。