0


【一起学Rust | 进阶篇 | Fang库】后台任务处理库——Fang

文章目录


前言

Fang是一个Rust的后台任务处理库,采用Postgres DB作为任务队列。同时支持Asynk和Blocking任务。

Asynk任务

采用的是

tokio

的特性,Worker工作在

tokio

下。

Blocking任务

使用的是

std::thread

,Worker工作在一个单独的线程。


一、Fang安装

1. 添加依赖

添加Fang到你的

Cargo.toml

文件中

注意

Fang仅支持rust 1.62+版本

仅使用Blocking

[dependencies]
fang ={ version ="0.7", features =["blocking"], default-features =false}

仅使用Asynk

[dependencies]
fang ={ version ="0.7", features =["asynk"], default-features =false}

同时使用Blocking和Asynk

fang ={ version ="0.7"}

2. 创建数据库

这里需要使用

Diesel CLI

来完成数据库的迁移,将在后面的文章中介绍

在你的Postgres DB中创建

fang_tasks

表,然后运行以下脚本

CREATE EXTENSION IFNOTEXISTS"uuid-ossp";CREATETYPE fang_task_state ASENUM('new','in_progress','failed','finished');CREATETABLE fang_tasks (
     id uuid PRIMARYKEYDEFAULT uuid_generate_v4(),
     metadata jsonb NOTNULL,
     error_message TEXT,
     state fang_task_state default'new'NOTNULL,
     task_type VARCHARdefault'common'NOTNULL,
     created_at TIMESTAMPWITHTIME ZONE NOTNULLDEFAULTNOW(),
     updated_at TIMESTAMPWITHTIME ZONE NOTNULLDEFAULTNOW());CREATEINDEX fang_tasks_state_index ON fang_tasks(state);CREATEINDEX fang_tasks_type_index ON fang_tasks(task_type);CREATEINDEX fang_tasks_created_at_index ON fang_tasks(created_at);CREATEINDEX fang_tasks_metadata_index ON fang_tasks(metadata);CREATETABLE fang_periodic_tasks (
  id uuid PRIMARYKEYDEFAULT uuid_generate_v4(),
  metadata jsonb NOTNULL,
  period_in_seconds INTEGERNOTNULL,
  scheduled_at TIMESTAMPWITHTIME ZONE,
  created_at TIMESTAMPWITHTIME ZONE NOTNULLDEFAULTNOW(),
  updated_at TIMESTAMPWITHTIME ZONE NOTNULLDEFAULTNOW());CREATEINDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at);CREATEINDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata);

这些文件可以在源码目录

migrations

中找到,github搜Fang,然后进入下载源码。

二、使用

1.定义一个任务

Blocking任务

每个要被Fang执行的任务都必须实现

fang::Runnable

特质,特质实现

#[typetag::serde]

使之具有反序列化任务的属性。

usefang::Error;usefang::Runnable;usefang::typetag;usefang::PgConnection;usefang::serde::{Deserialize,Serialize};#[derive(Serialize, Deserialize)]#[serde(crate = "fang::serde")]structMyTask{pub number:u16,}#[typetag::serde]implRunnableforMyTask{fnrun(&self, _connection:&PgConnection)->Result<(),Error>{println!("the number is {}",self.number);Ok(())}}

run函数的第二个参数是PgConnection,你可以重复使用它来操作任务队列,例如在当前作业执行期间添加一个新任务,或者,如果你要复用,可以在自己的查询中重新使用它。如果你不需要它,就忽略它。

Asynk任务

每个要被Fang执行的任务都必须实现

fang::AsyncRunnable

特质

注意

不要实现两个同名的AsyncRunnable,这会导致typetag失败

usefang::AsyncRunnable;usefang::asynk::async_queue::AsyncQueueable;usefang::serde::{Deserialize,Serialize};usefang::async_trait;#[derive(Serialize, Deserialize)]#[serde(crate = "fang::serde")]structAsyncTask{pub number:u16,}#[typetag::serde]#[async_trait]implAsyncRunnableforAsyncTask{asyncfnrun(&self, _queueable:&mutdynAsyncQueueable)->Result<(),Error>{Ok(())}// this func is optional to impl// Default task-type it is commonfntask_type(&self)->String{"my-task-type".to_string()}}

2.任务队列

Blocking任务

需要使用

Queue::enqueue_task

来入队列

usefang::Queue;...Queue::enqueue_task(&MyTask{ number:10}).unwrap();

上面的示例在每次调用时都会创建一个新的 postgres 连接

重用相同的 postgres 连接来将多个任务排入队列

let queue =Queue::new();for id in&unsynced_feed_ids {
    queue.push_task(&SyncFeedMyTask{ feed_id:*id }).unwrap();}

或者使用PgConnection结构体

Queue::push_task_query(pg_connection,&new_task).unwrap();

Asynk任务

使用

AsyncQueueable::insert_task

来入队,可以根据你自己后端来进行操作,默认为

Postgres
usefang::asynk::async_queue::AsyncQueue;usefang::NoTls;usefang::AsyncRunnable;// 创建异步队列let max_pool_size:u32=2;letmut queue =AsyncQueue::builder()// Postgres 数据库 url.uri("postgres://postgres:postgres@localhost/fang")// 允许的最大连接数控i昂.max_pool_size(max_pool_size)// 如果希望任务中的唯一性,则为false.duplicated_tasks(true).build();// 要进行操作之前,总是要先连接
queue.connect(NoTls).await.unwrap();

举个简单例子我们用的是NoTls,如果你有特殊需求,如果出于某种原因你想加密 postgres 流量。

let task =AsyncTask{8};let task_returned = queue
  .insert_task(&task as&dynAsyncRunnable).await.unwrap();

3. 启动Worker

Blocking任务

每个Worker都在一个单独的线程中运行。如果panic,会重新启动。
使用

WorkerPool

来启动Worker,

WorkerPool::new

接收一个整型参数,Worker的数量

usefang::WorkerPool;WorkerPool::new(10).start();

使用

shutdown

停止线程

usefang::WorkerPool;

worker_pool =WorkerPool::new(10).start().unwrap;

worker_pool.shutdown()

Asynk任务

每个Worker都在一个单独的 tokio 任务中运行。如果panic,会重新启动。
使用

AsyncWorkerPool

来启动Worker

usefang::asynk::async_worker_pool::AsyncWorkerPool;// 必须创建一个队列// 插入一些任务letmut pool:AsyncWorkerPool<AsyncQueue<NoTls>>=AsyncWorkerPool::builder().number_of_workers(max_pool_size).queue(queue.clone()).build();

pool.start().await;

4. 配置

Blocking任务

在创建Blocking任务任务的时候,默认只能传入Worker数量参数,如果想要进行自定义配置,需要使用

WorkerPool.new_with_params

来创建,它接受两个参数——工人数量和WorkerParams结构体。

WorkerParams

的定义是这样的

pubstructWorkerParams{pub retention_mode:Option<RetentionMode>,pub sleep_params:Option<SleepParams>,pub task_type:Option<String>,}pubenumRetentionMode{KeepAll,RemoveAll,RemoveFinished,}pubstructSleepParams{pub sleep_period:u64,pub max_sleep_period:u64,pub min_sleep_period:u64,pub sleep_step:u64,}

Asynk任务

使用

AsyncWorkerPool

的builder方法即可。需要链式调用,创建一个

AsyncWorkerPool

,然后调用.queue(…),.sleep_params(…)(可选),.retention_mode(…)(可选),.number_of_workers(…)配置,最后调用.build()构建对象。

5. 配置Worker类型

可以指定Worker类型,来指定指定类型Worker执行指定类型的任务

Blocking任务

Runnable

特质中添加方法

...#[typetag::serde]implRunnableforMyTask{fnrun(&self)->Result<(),Error>{println!("the number is {}",self.number);Ok(())}fntask_type(&self)->String{"number".to_string()}}

设置

task_type
letmut worker_params =WorkerParams::new();
worker_params.set_task_type("number".to_string());WorkerPool::new_with_params(10, worker_params).start();

没有设置

task_type

的Worker可以执行任何任务

Asynk任务

功能与

Blocking任务

相同。使用

AsyncWorker

的builer来设置

6. 配置保留模式

默认情况下,所有成功完成的任务都会从数据库中删除,失败的任务不会。可以使用三种保留模式:

pubenumRetentionMode{KeepAll,        \\ 不删除任务
    RemoveAll,      \\ 删除所有任务
    RemoveFinished, \\ 默认值,完成就删除
}

Blocking任务

使用

set_retention_mode

设置保留模式

letmut worker_params =WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::RemoveAll);WorkerPool::new_with_params(10, worker_params).start();

Asynk任务

使用

AsyncWorker

的builder。

7. 配置睡眠值

Blocking任务

使用

useSleepParams

来配置睡眠值:

pubstructSleepParams{pub sleep_period:u64,     \\ 默认值 5pub max_sleep_period:u64, \\ 默认值 15pub min_sleep_period:u64, \\ 默认值 5pub sleep_step:u64,       \\ 默认值 5}

如果数据库中没有任务,则Worker会休眠sleep_period,并且每次该值都会增加sleep_step,直到达到max_sleep_period. min_sleep_period是sleep_period的初始值。所有值都以秒为单位。

使用

set_sleep_params

来设置

let sleep_params =SleepParams{
    sleep_period:2,
    max_sleep_period:6,
    min_sleep_period:2,
    sleep_step:1,};letmut worker_params =WorkerParams::new();
worker_params.set_sleep_params(sleep_params);WorkerPool::new_with_params(10, worker_params).start();

Asynk任务

使用

AsyncWorker

的builder。

8. 定时任务

如果你从头到尾看的本文,那么什么也不需要做,否则你需要创建

fang_periodic_tasks

表,就在本文安装那个部分。

Blocking任务

usefang::Scheduler;usefang::Queue;let queue =Queue::new();

queue
     .push_periodic_task(&SyncMyTask::default(),120).unwrap();

queue
     .push_periodic_task(&DeliverMyTask::default(),60).unwrap();Scheduler::start(10,5);

在上面的示例中,push_periodic_task用于将指定的任务保存到表fang_periodic_tasks中,该表将fang_tasks每隔指定的秒数排队(保存到表中)。

Scheduler::start(10, 5)启动调度程序。它接受两个参数:

  • 数据库检查周期(以秒为单位)
  • 可接受的错误限制(以秒为单位)

Asynk任务

usefang::asynk::async_scheduler::Scheduler;usefang::asynk::async_queue::AsyncQueueable;usefang::asynk::async_queue::AsyncQueue;// 在此之前构建一个Async队列let schedule_in_future =Utc::now()+OtherDuration::seconds(5);let _periodic_task = queue.insert_periodic_task(&AsyncTask{ number:1},
    schedule_in_future,10,).await;let check_period:u64=1;let error_margin_seconds:u64=2;letmut scheduler =Scheduler::builder().check_period(check_period).error_margin_seconds(error_margin_seconds).queue(&mut queue as&mutdynAsyncQueueable).build();// 在其他线程或循环之前添加更多任务// 调度程序循环
scheduler.start().await.unwrap();

总结

以上就是本文的所有内容,介绍了Rust中借助

Fang库

来实现后台任务,进行后台任务的处理,还有定时任务,配置等。


本文转载自: https://blog.csdn.net/weixin_47754149/article/details/126230426
版权归原作者 广龙宇 所有, 如有侵权,请联系我们删除。

“【一起学Rust | 进阶篇 | Fang库】后台任务处理库——Fang”的评论:

还没有评论