文章目录
前言
Fang是一个Rust的后台任务处理库,采用Postgres DB作为任务队列。同时支持Asynk和Blocking任务。
Asynk任务
采用的是
tokio
的特性,Worker工作在
tokio
下。
Blocking任务
使用的是
std::thread
,Worker工作在一个单独的线程。
一、Fang安装
1. 添加依赖
添加Fang到你的
Cargo.toml
文件中
注意
Fang仅支持rust 1.62+版本
仅使用Blocking
[dependencies]
fang ={ version ="0.7", features =["blocking"], default-features =false}
仅使用Asynk
[dependencies]
fang ={ version ="0.7", features =["asynk"], default-features =false}
同时使用Blocking和Asynk
fang ={ version ="0.7"}
2. 创建数据库
这里需要使用
Diesel CLI
来完成数据库的迁移,将在后面的文章中介绍
在你的Postgres DB中创建
fang_tasks
表,然后运行以下脚本
CREATE EXTENSION IFNOTEXISTS"uuid-ossp";CREATETYPE fang_task_state ASENUM('new','in_progress','failed','finished');CREATETABLE fang_tasks (
id uuid PRIMARYKEYDEFAULT uuid_generate_v4(),
metadata jsonb NOTNULL,
error_message TEXT,
state fang_task_state default'new'NOTNULL,
task_type VARCHARdefault'common'NOTNULL,
created_at TIMESTAMPWITHTIME ZONE NOTNULLDEFAULTNOW(),
updated_at TIMESTAMPWITHTIME ZONE NOTNULLDEFAULTNOW());CREATEINDEX fang_tasks_state_index ON fang_tasks(state);CREATEINDEX fang_tasks_type_index ON fang_tasks(task_type);CREATEINDEX fang_tasks_created_at_index ON fang_tasks(created_at);CREATEINDEX fang_tasks_metadata_index ON fang_tasks(metadata);CREATETABLE fang_periodic_tasks (
id uuid PRIMARYKEYDEFAULT uuid_generate_v4(),
metadata jsonb NOTNULL,
period_in_seconds INTEGERNOTNULL,
scheduled_at TIMESTAMPWITHTIME ZONE,
created_at TIMESTAMPWITHTIME ZONE NOTNULLDEFAULTNOW(),
updated_at TIMESTAMPWITHTIME ZONE NOTNULLDEFAULTNOW());CREATEINDEX fang_periodic_tasks_scheduled_at_index ON fang_periodic_tasks(scheduled_at);CREATEINDEX fang_periodic_tasks_metadata_index ON fang_periodic_tasks(metadata);
这些文件可以在源码目录
migrations
中找到,github搜Fang,然后进入下载源码。
二、使用
1.定义一个任务
Blocking任务
每个要被Fang执行的任务都必须实现
fang::Runnable
特质,特质实现
#[typetag::serde]
使之具有反序列化任务的属性。
usefang::Error;usefang::Runnable;usefang::typetag;usefang::PgConnection;usefang::serde::{Deserialize,Serialize};#[derive(Serialize, Deserialize)]#[serde(crate = "fang::serde")]structMyTask{pub number:u16,}#[typetag::serde]implRunnableforMyTask{fnrun(&self, _connection:&PgConnection)->Result<(),Error>{println!("the number is {}",self.number);Ok(())}}
run函数的第二个参数是PgConnection,你可以重复使用它来操作任务队列,例如在当前作业执行期间添加一个新任务,或者,如果你要复用,可以在自己的查询中重新使用它。如果你不需要它,就忽略它。
Asynk任务
每个要被Fang执行的任务都必须实现
fang::AsyncRunnable
特质
注意
不要实现两个同名的AsyncRunnable,这会导致typetag失败
usefang::AsyncRunnable;usefang::asynk::async_queue::AsyncQueueable;usefang::serde::{Deserialize,Serialize};usefang::async_trait;#[derive(Serialize, Deserialize)]#[serde(crate = "fang::serde")]structAsyncTask{pub number:u16,}#[typetag::serde]#[async_trait]implAsyncRunnableforAsyncTask{asyncfnrun(&self, _queueable:&mutdynAsyncQueueable)->Result<(),Error>{Ok(())}// this func is optional to impl// Default task-type it is commonfntask_type(&self)->String{"my-task-type".to_string()}}
2.任务队列
Blocking任务
需要使用
Queue::enqueue_task
来入队列
usefang::Queue;...Queue::enqueue_task(&MyTask{ number:10}).unwrap();
上面的示例在每次调用时都会创建一个新的 postgres 连接
重用相同的 postgres 连接来将多个任务排入队列
let queue =Queue::new();for id in&unsynced_feed_ids {
queue.push_task(&SyncFeedMyTask{ feed_id:*id }).unwrap();}
或者使用PgConnection结构体
Queue::push_task_query(pg_connection,&new_task).unwrap();
Asynk任务
使用
AsyncQueueable::insert_task
来入队,可以根据你自己后端来进行操作,默认为
Postgres
usefang::asynk::async_queue::AsyncQueue;usefang::NoTls;usefang::AsyncRunnable;// 创建异步队列let max_pool_size:u32=2;letmut queue =AsyncQueue::builder()// Postgres 数据库 url.uri("postgres://postgres:postgres@localhost/fang")// 允许的最大连接数控i昂.max_pool_size(max_pool_size)// 如果希望任务中的唯一性,则为false.duplicated_tasks(true).build();// 要进行操作之前,总是要先连接
queue.connect(NoTls).await.unwrap();
举个简单例子我们用的是NoTls,如果你有特殊需求,如果出于某种原因你想加密 postgres 流量。
let task =AsyncTask{8};let task_returned = queue
.insert_task(&task as&dynAsyncRunnable).await.unwrap();
3. 启动Worker
Blocking任务
每个Worker都在一个单独的线程中运行。如果panic,会重新启动。
使用
WorkerPool
来启动Worker,
WorkerPool::new
接收一个整型参数,Worker的数量
usefang::WorkerPool;WorkerPool::new(10).start();
使用
shutdown
停止线程
usefang::WorkerPool;
worker_pool =WorkerPool::new(10).start().unwrap;
worker_pool.shutdown()
Asynk任务
每个Worker都在一个单独的 tokio 任务中运行。如果panic,会重新启动。
使用
AsyncWorkerPool
来启动Worker
usefang::asynk::async_worker_pool::AsyncWorkerPool;// 必须创建一个队列// 插入一些任务letmut pool:AsyncWorkerPool<AsyncQueue<NoTls>>=AsyncWorkerPool::builder().number_of_workers(max_pool_size).queue(queue.clone()).build();
pool.start().await;
4. 配置
Blocking任务
在创建Blocking任务任务的时候,默认只能传入Worker数量参数,如果想要进行自定义配置,需要使用
WorkerPool.new_with_params
来创建,它接受两个参数——工人数量和WorkerParams结构体。
WorkerParams
的定义是这样的
pubstructWorkerParams{pub retention_mode:Option<RetentionMode>,pub sleep_params:Option<SleepParams>,pub task_type:Option<String>,}pubenumRetentionMode{KeepAll,RemoveAll,RemoveFinished,}pubstructSleepParams{pub sleep_period:u64,pub max_sleep_period:u64,pub min_sleep_period:u64,pub sleep_step:u64,}
Asynk任务
使用
AsyncWorkerPool
的builder方法即可。需要链式调用,创建一个
AsyncWorkerPool
,然后调用.queue(…),.sleep_params(…)(可选),.retention_mode(…)(可选),.number_of_workers(…)配置,最后调用.build()构建对象。
5. 配置Worker类型
可以指定Worker类型,来指定指定类型Worker执行指定类型的任务
Blocking任务
在
Runnable
特质中添加方法
...#[typetag::serde]implRunnableforMyTask{fnrun(&self)->Result<(),Error>{println!("the number is {}",self.number);Ok(())}fntask_type(&self)->String{"number".to_string()}}
设置
task_type
letmut worker_params =WorkerParams::new();
worker_params.set_task_type("number".to_string());WorkerPool::new_with_params(10, worker_params).start();
没有设置
task_type
的Worker可以执行任何任务
Asynk任务
功能与
Blocking任务
相同。使用
AsyncWorker
的builer来设置
6. 配置保留模式
默认情况下,所有成功完成的任务都会从数据库中删除,失败的任务不会。可以使用三种保留模式:
pubenumRetentionMode{KeepAll, \\ 不删除任务
RemoveAll, \\ 删除所有任务
RemoveFinished, \\ 默认值,完成就删除
}
Blocking任务
使用
set_retention_mode
设置保留模式
letmut worker_params =WorkerParams::new();
worker_params.set_retention_mode(RetentionMode::RemoveAll);WorkerPool::new_with_params(10, worker_params).start();
Asynk任务
使用
AsyncWorker
的builder。
7. 配置睡眠值
Blocking任务
使用
useSleepParams
来配置睡眠值:
pubstructSleepParams{pub sleep_period:u64, \\ 默认值 5pub max_sleep_period:u64, \\ 默认值 15pub min_sleep_period:u64, \\ 默认值 5pub sleep_step:u64, \\ 默认值 5}
如果数据库中没有任务,则Worker会休眠sleep_period,并且每次该值都会增加sleep_step,直到达到max_sleep_period. min_sleep_period是sleep_period的初始值。所有值都以秒为单位。
使用
set_sleep_params
来设置
let sleep_params =SleepParams{
sleep_period:2,
max_sleep_period:6,
min_sleep_period:2,
sleep_step:1,};letmut worker_params =WorkerParams::new();
worker_params.set_sleep_params(sleep_params);WorkerPool::new_with_params(10, worker_params).start();
Asynk任务
使用
AsyncWorker
的builder。
8. 定时任务
如果你从头到尾看的本文,那么什么也不需要做,否则你需要创建
fang_periodic_tasks
表,就在本文安装那个部分。
Blocking任务
usefang::Scheduler;usefang::Queue;let queue =Queue::new();
queue
.push_periodic_task(&SyncMyTask::default(),120).unwrap();
queue
.push_periodic_task(&DeliverMyTask::default(),60).unwrap();Scheduler::start(10,5);
在上面的示例中,push_periodic_task用于将指定的任务保存到表fang_periodic_tasks中,该表将fang_tasks每隔指定的秒数排队(保存到表中)。
Scheduler::start(10, 5)启动调度程序。它接受两个参数:
- 数据库检查周期(以秒为单位)
- 可接受的错误限制(以秒为单位)
Asynk任务
usefang::asynk::async_scheduler::Scheduler;usefang::asynk::async_queue::AsyncQueueable;usefang::asynk::async_queue::AsyncQueue;// 在此之前构建一个Async队列let schedule_in_future =Utc::now()+OtherDuration::seconds(5);let _periodic_task = queue.insert_periodic_task(&AsyncTask{ number:1},
schedule_in_future,10,).await;let check_period:u64=1;let error_margin_seconds:u64=2;letmut scheduler =Scheduler::builder().check_period(check_period).error_margin_seconds(error_margin_seconds).queue(&mut queue as&mutdynAsyncQueueable).build();// 在其他线程或循环之前添加更多任务// 调度程序循环
scheduler.start().await.unwrap();
总结
以上就是本文的所有内容,介绍了Rust中借助
Fang库
来实现后台任务,进行后台任务的处理,还有定时任务,配置等。
版权归原作者 广龙宇 所有, 如有侵权,请联系我们删除。