0


Github Trending微信推送机器人——metagpt学习-OSS订阅智能体

引言

博客背景:个人metagpt智能体开发第四章学习笔记

本文将使用的技术和方法:

  • Python 工具
  • 第三方公众号作为消息推送的功能,如server酱、wxpusher、Pushplus等,本文选择wxpusher,并获取uid和token
  • 爬虫相关概念,不会的可以问chatgpt

正文

oss订阅智能体

1.OSSWatcher Role 实现

在实现OSSWatcher的Role之前,首先需要明确我们希望OSSWatcher执行哪些任务,即需要实现哪些Action。考虑到我们的目标是分析热门开源项目,因此需要先获取热门开源项目的信息。基于这一需求,我们可以将OSSWatcher拆分为两个Action:一是爬取热门开源项目,二是分析热门开源项目。

1.1 GitHub Trending爬取

  • 功能:爬取当天不分国家语言和编程语言的热门仓库进行分析。
  • 方法:直接爬取网页内容,可以加入筛选条件。 1.2 GitHub Trending总结
  • 功能:分析爬取到的热门开源项目信息,并进行总结。
  • 实现:LLM根据提示词进行分析,输出指定格式的结果。 1.3 OSSWatcher Role实现
  • 方法:将以上两个Action写入metagpt/actions/oss_trending.py文件,并创建metagpt/roles/oss_watcher.py文件编写Role代码。

实现代码:

# Role实现
class OssWatcher(Role):
    name: str = "Codey",
    profile: str = "OssWatcher",
    goal: str = "Generate an insightful GitHub Trending analysis report.",
    constraints: str = "Only analyze based on the provided GitHub Trending data.",

    def __init__(self, **kwargs) -> None:
        super().__init__(**kwargs)
        self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
        self._set_react_mode(react_mode="by_order")

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self._rc.todo}")
        # By choosing the Action by order under the hood
        # todo will be first SimpleWriteCode() then SimpleRunCode()
        todo = self._rc.todo

        msg = self.get_memories(k=1)[0]  # find the most k recent messages
        result = await todo.run(msg.content)

        msg = Message(content=str(result))
        # msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
        self._rc.memory.add(msg)
        return msg

Trigger实现

  • 触发方式:最简单的是定时触发,常用的实现方式是使用crontab,也可使用Python异步库aiocron。
  • 实现:使用函数方式和类方式结合aiocron实现定时Trigger。

实现代码:

# Trigger
class OssInfo(BaseModel):
    url: str
    timestamp: float = Field(default_factory=time.time)

class GithubTrendingCronTrigger():
    def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
        self.crontab = crontab(spec, tz=tz)
        self.url = url

    def __aiter__(self):
        return self

    async def __anext__(self):
        await self.crontab.next()
        logger.info(self.url)
        logger.info(OssInfo(url=self.url))
        logger.info(Message(self.url))
        return Message(self.url)
        # return Message(self.url, OssInfo(url=self.url))

Callback设计

  • 功能:定义处理智能体生成的信息。
  • 实现:发送信息到日常使用的应用,如weixin

其中我们使用的是wxpusher,需要WXPUSHER_TOKEN和WXPUSHER_UIDS

WXPUSHER_TOKEN即wxpush的APP_TOKEN,参考官方文档获取appToken

WXPUSHER_UIDS可以从应用管理页的”用户管理->用户列表“获取用户的UID,如果要发送给多个用户,可以用逗号将不同用户UID隔开

实现代码:

# callback

class WxPusherClient:
    def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
        self.base_url = base_url
        self.token = token or WXPUSHER_TOKEN
        # self.token = token or os.environ["WXPUSHER_TOKEN"]

    async def send_message(
            self,
            content,
            summary: Optional[str] = None,
            content_type: int = 1,
            topic_ids: Optional[list[int]] = None,
            uids: Optional[list[int]] = None,
            verify: bool = False,
            url: Optional[str] = None,
    ):
        payload = {
            "appToken": self.token,
            "content": content,
            "summary": summary,
            "contentType": content_type,
            "topicIds": topic_ids or [],
            "uids": uids or WXPUSHER_UIDS.split(","),
            # "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
            "verifyPay": verify,
            "url": url,
        }
        url = f"{self.base_url}/api/send/message"
        return await self._request("POST", url, json=payload)

    async def _request(self, method, url, **kwargs):
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, **kwargs, ssl=False) as response:
                response.raise_for_status()
                return await response.json()

async def wxpusher_callback(msg: Message):
    client = WxPusherClient()
    await client.send_message(msg.content, content_type=3)

除此之外的必不可少一些小配置:

1.代理配置:由于GitHub为国外网站,可能会遇到网络问题,因为 aiohttp 默认不走系统代理,所以需要做下代理配置,可以通过在

config/key.yaml

配置文件中,添加自己代理服务器的配置,以解决网络问题:

GLOBAL_PROXY: http://127.0.0.1:8118  # 改成自己的代理服务器地址

通过「控制面板」——》「网络和Internet」——》「Internet选项」——》「连接」——》「局域网设置」,可以看到代理的地址和端口号;切记端口一定要复制成自己的!否则会显示计算机拒绝连接,不要粗心哦

2.设置自己想要的内容:可以修改下面代码中的内容实现自己感兴趣的方向与分析角度:

  1. 编程语言趋势:观察Trending列表中使用的编程语言,了解当前哪些编程语言在开发者社区中更受欢迎
  2. 项目类型和用途:分析Trending列表中的项目,看看它们是属于哪些类别,以及它们的具体用途是什么
  3. 社区活跃度:查看项目的星标数量、贡献者数量
  4. 新兴技术和工具:注意新项目和涌现的技术,以便了解当前的技术趋势
TRENDING_ANALYSIS_PROMPT = """# Requirements
您是GitHub趋势分析师,旨在根据最新的GitHub趋势为用户提供深入见解和个性化建议。根据上下文填写以下缺失信息,生成引人入胜且信息丰富的标题,确保用户发现与其兴趣相符的存储库,记得要中文。

# The title about Today's GitHub Trending
## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
---
# Format Example

[Title]

Today's Trends

Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.

The Trends Categories

  1. Generative AI
    • Project1: [detail of the project, such as star total and today, language, ...]
    • Project2: ...
      ...

Highlights of the List

  1. Project1: [provide specific reasons why this project is recommended].
    ...
    ```

Github Trending

{trending}
"""


####  3.定时设置:由于作者想要看生成效果所以设置的每分钟发送订阅信息,后续想要保存这个推送机器人可以设置定时推送,方法也很简单、

方法一:可以修改 

main

 函数中的 

spec

 参数为适当的 

cron

 表达式,以便在每天的特定时间触发任务。

#main函数
async def main(spec: str = "0 9 * * *", discord: bool = False, wxpusher: bool = True):
...


方法二: 

基于aiocron,

可以用cron语法非常灵活地配置定时规则

from pytz import timezone
beijing_tz = timezone('Asia/Shanghai') 获取北京时间的时区
cron_trigger = GithubTrendingCronTrigger("0 8 * * *", tz=beijing_tz)


### 最终实现效果:

![](https://img-blog.csdnimg.cn/direct/46f551e712194ae8aac1f5a0dda0164a.png)

![](https://img-blog.csdnimg.cn/direct/22e49b54ae5a4a819b51608cf31a4977.png)

### 完整代码:

 

import os

os.environ["ZHIPUAI_API_KEY"] = "此处填写你的"

from metagpt.environment import Environment

import asyncio
import os
import time
from typing import Any, AsyncGenerator, Awaitable, Callable, Optional

import aiohttp
from aiocron import crontab
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from pytz import BaseTzInfo

from metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message

WXPUSHER_TOKEN = "此处填写你的"
WXPUSHER_UIDS = "此处填写你的"

订阅模块,可以from metagpt.subscription import SubscriptionRunner导入,这里贴上代码供参考

class SubscriptionRunner(BaseModel):
"""A simple wrapper to manage subscription tasks for different roles using asyncio.
Example:
import asyncio
from metagpt.subscription import SubscriptionRunner
from metagpt.roles import Searcher
from metagpt.schema import Message
async def trigger():
while True:
yield Message("the latest news about OpenAI")
await asyncio.sleep(3600 * 24)
async def callback(msg: Message):
print(msg.content)
async def main():
pb = SubscriptionRunner()
await pb.subscribe(Searcher(), trigger(), callback)
await pb.run()
asyncio.run(main())
"""

tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)

class Config:
    arbitrary_types_allowed = True

async def subscribe(
        self,
        role: Role,
        trigger: AsyncGenerator[Message, None],
        callback: Callable[
            [
                Message,
            ],
            Awaitable[None],
        ],
):
    """Subscribes a role to a trigger and sets up a callback to be called with the role's response.
    Args:
        role: The role to subscribe.
        trigger: An asynchronous generator that yields Messages to be processed by the role.
        callback: An asynchronous function to be called with the response from the role.
    """
    loop = asyncio.get_running_loop()

    async def _start_role():
        async for msg in trigger:
            logger.info("===log===" * 3)
            logger.info(msg)
            logger.info(msg.content)
            resp = await role.run(msg.content)
            await callback(resp)

    self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")

async def unsubscribe(self, role: Role):
    """Unsubscribes a role from its trigger and cancels the associated task.
    Args:
        role: The role to unsubscribe.
    """
    task = self.tasks.pop(role)
    task.cancel()

async def run(self, raise_exception: bool = True):
    """Runs all subscribed tasks and handles their completion or exception.
    Args:
        raise_exception: _description_. Defaults to True.
    Raises:
        task.exception: _description_
    """
    while True:
        for role, task in self.tasks.items():
            if task.done():
                if task.exception():
                    if raise_exception:
                        raise task.exception()
                    logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")
                else:
                    logger.warning(
                        f"Task {task.get_name()} has completed. "
                        "If this is unexpected behavior, please check the trigger function."
                    )
                self.tasks.pop(role)
                break
        else:
            await asyncio.sleep(1)

Actions 的实现

TRENDING_ANALYSIS_PROMPT = """# Requirements
您是GitHub趋势分析师,旨在根据最新的GitHub趋势为用户提供深入见解和个性化建议。根据上下文填写以下缺失信息,生成引人入胜且信息丰富的标题,确保用户发现与其兴趣相符的存储库,记得要中文。

The title about Today's GitHub Trending

Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.

The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.

Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.


Format Example

# [Title]

## Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.

## The Trends Categories
1. Generative AI
    - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
    - [Project2](https://github/xx/project2): ...
...

## Highlights of the List
1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
...

Github Trending

{trending}
"""

class CrawlOSSTrending(Action):
async def run(self, url: str = "https://github.com/trending"):
async with aiohttp.ClientSession() as client:
async with client.get(url, proxy=CONFIG.global_proxy, ssl=False) as response:
response.raise_for_status()
html = await response.text()

    soup = BeautifulSoup(html, 'html.parser')

    repositories = []

    for article in soup.select('article.Box-row'):
        repo_info = {}

        repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")
        repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()

        # Description
        description_element = article.select_one('p')
        repo_info['description'] = description_element.text.strip() if description_element else None

        # Language
        language_element = article.select_one('span[itemprop="programmingLanguage"]')
        repo_info['language'] = language_element.text.strip() if language_element else None

        # Stars and Forks
        stars_element = article.select('a.Link--muted')[0]
        forks_element = article.select('a.Link--muted')[1]
        repo_info['stars'] = stars_element.text.strip()
        repo_info['forks'] = forks_element.text.strip()

        # Today's Stars
        today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
        repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else None

        repositories.append(repo_info)

    return repositories

class AnalysisOSSTrending(Action):

async def run(
        self,
        trending: Any
):
    return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))

Role实现

class OssWatcher(Role):
name: str = "Codey",
profile: str = "OssWatcher",
goal: str = "Generate an insightful GitHub Trending analysis report.",
constraints: str = "Only analyze based on the provided GitHub Trending data.",

def __init__(self, **kwargs) -> None:
    super().__init__(**kwargs)
    self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
    self._set_react_mode(react_mode="by_order")

async def _act(self) -> Message:
    logger.info(f"{self._setting}: ready to {self._rc.todo}")
    # By choosing the Action by order under the hood
    # todo will be first SimpleWriteCode() then SimpleRunCode()
    todo = self._rc.todo

    msg = self.get_memories(k=1)[0]  # find the most k recent messages
    result = await todo.run(msg.content)

    msg = Message(content=str(result))
    # msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
    self._rc.memory.add(msg)
    return msg

Trigger

class OssInfo(BaseModel):
url: str
timestamp: float = Field(default_factory=time.time)

class GithubTrendingCronTrigger():
def init(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
self.crontab = crontab(spec, tz=tz)
self.url = url

def __aiter__(self):
    return self

async def __anext__(self):
    await self.crontab.next()
    logger.info(self.url)
    logger.info(OssInfo(url=self.url))
    logger.info(Message(self.url))
    return Message(self.url)
    # return Message(self.url, OssInfo(url=self.url))

callback

class WxPusherClient:
def init(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
self.base_url = base_url
self.token = token or WXPUSHER_TOKEN
# self.token = token or os.environ["WXPUSHER_TOKEN"]

async def send_message(
        self,
        content,
        summary: Optional[str] = None,
        content_type: int = 1,
        topic_ids: Optional[list[int]] = None,
        uids: Optional[list[int]] = None,
        verify: bool = False,
        url: Optional[str] = None,
):
    payload = {
        "appToken": self.token,
        "content": content,
        "summary": summary,
        "contentType": content_type,
        "topicIds": topic_ids or [],
        "uids": uids or WXPUSHER_UIDS.split(","),
        # "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
        "verifyPay": verify,
        "url": url,
    }
    url = f"{self.base_url}/api/send/message"
    return await self._request("POST", url, json=payload)

async def _request(self, method, url, **kwargs):
    async with aiohttp.ClientSession() as session:
        async with session.request(method, url, **kwargs, ssl=False) as response:
            response.raise_for_status()
            return await response.json()

async def wxpusher_callback(msg: Message):
client = WxPusherClient()
await client.send_message(msg.content, content_type=3)

运行入口,

async def main(spec: str = "* * * * *", discord: bool = False, wxpusher: bool = True):
callbacks = []

if wxpusher:
    callbacks.append(wxpusher_callback)

if not callbacks:
    async def _print(msg: Message):
        print(msg.content)

    callbacks.append(_print)

async def callback(msg):
    await asyncio.gather(*(call(msg) for call in callbacks))

runner = SubscriptionRunner()
await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)
await runner.run()

if name == "main":
import fire

fire.Fire(main)

```

标签: gpt python 人工智能

本文转载自: https://blog.csdn.net/m0_73382195/article/details/136061949
版权归原作者 小楼一夜赏海棠 所有, 如有侵权,请联系我们删除。

“Github Trending微信推送机器人——metagpt学习-OSS订阅智能体”的评论:

还没有评论