新标签页 (chinaunix.net)
FastDFS - Browse Files at SourceForge.net
一、FastDFS
Tracker和Storage:
tracker用来管理所有的storage,只是管理服务器,负责负载均衡。
storage是存储服务器,每一个storage服务器都是一个单独的个体,storage服务器之间没有交互关系。
在storage中根目录包含256个一级目录、每个一级目录中包含256个二级子目录,在二级子目录中存储图片。存储图片时服务器会返回相应的group和remote,访问文件时通过这两个键值对获取图片。
①、在虚拟机中使用docker安装FastDFS
docker load -i fastdfs.tar //在docker中加载镜像源文件
mkdir -p /opt/fdfs/tracker
docker run -d --network=host --name tracker -v /opt/fdfs/tracker:/var/fdfs delron/fastdfs tracker
mkdir -p /opt/fdfs/storage
docker run -d --network=host --name storage -e TRACKER_SERVER=192.168.222.128:22122 -v /opt/fdfs/storage:/var/fdfs -e GROUP_NAME=group1 delron/fastdfs storage
//重启服务器需要删除文件。
rm -rf /opt/fdfs/tracker/data/*.pid
rm -rf /opt/fdfs/storage/data/*.pid
//更改服务器的磁盘限制
docker exec -it tracker bash
cd /etc/fdfs
vi /etc/fdfs/tracker.conf
reserved_storage_space = 10K //磁盘剩余多少空间时关闭上传文件功能
②、java代码使用FastDFS
1)首先编写FastDFS配置信息fdfs.properties
fastdfs.connect_timeout_in_seconds=10
fastdfs.network_timeout_in_seconds=30
fastdfs.charset=UTF-8
# tracker????????????????
fastdfs.tracker_servers=192.168.222.128:22122
# tracker????????tracker.conf??http??????????
fastdfs.http_tracker_http_port=8080
2)FastDFS 操作工具类
封装了加载配置文件fdfs.properties的静态代码块,还有上传和下载的方法。
package com.xja.util;
import org.csource.common.NameValuePair;
import org.csource.fastdfs.*;
import java.io.InputStream;
import java.util.Properties;
/**
* FastDFS Java客户端工具
*/
public final class FastDFSUtils {
/**
* 定义静态属性,Properties和StorageClient
*/
private final static Properties PROPERTIES;
private final static StorageClient STORAGE_CLIENT;
/**
* 静态初始化代码块,初始化静态属性
* 静态初始化代码块有异常如何处理?
* 处理的时候,try。。catch。。 抛出一个Error,终止虚拟机。
*/
static{
try {
PROPERTIES = new Properties();
// 读取配置文件
PROPERTIES.load(
FastDFSUtils.class
.getClassLoader()
.getResourceAsStream("fdfs.properties")
);
// 使用ClientGlobal初始化FastDFS客户端配置
ClientGlobal.initByProperties(PROPERTIES);
// 创建Tracker客户端对象
TrackerClient trackerClient = new TrackerClient();
// 基于Tracker客户端对象,获取Tracker服务器对象
TrackerServer trackerServer = trackerClient.getConnection();
// 基于Tracker服务器和客户端对象,获取Storage服务器对象
StorageServer storageServer = trackerClient.getStoreStorage(trackerServer);
// 创建Storage客户端对象
STORAGE_CLIENT = new StorageClient(trackerServer, storageServer);
}catch (Exception e){
throw new ExceptionInInitializerError(e);
}
}
/**
* 删除文件
* int delete_file(String 卷名, String 路径及文件名);
* 返回值: 0代表成功,其他数字代表错误编码
*/
public static int remote(String group, String remote){
try {
return STORAGE_CLIENT.delete_file(group, remote);
}catch (Exception e){
e.printStackTrace();
return -1;
}
}
/**
* 查询某文件的元数据
* @param group 卷名
* @param remote 路径及文件名
* @return 返回文件的元数据数组。发生错误返回null
*/
public static NameValuePair[] getMetaData(String group, String remote){
try{
return STORAGE_CLIENT.get_metadata(group, remote);
}catch (Exception e){
e.printStackTrace();
return null;
}
}
/**
* 下载文件工具方法
* 下载方法
* byte[] download_file(String 卷名, String 路径及文件名)
* 返回要下载的文件内容
* @param group 卷名
* @param remote 路径及文件名
* @return 返回下载的文件内容,发生错误返回null
*/
public static byte[] download(String group, String remote){
try {
return STORAGE_CLIENT.download_file(group, remote);
}catch (Exception e){
e.printStackTrace();
return null;
}
}
/**
* 上传文件的工具方法
* 一定保存文件到FastDFS,一定保存至少一个元数据(文件原始名称)
* @param inputStream 要上传的文件的输入流
* @param fileName 上传文件的原始名称
* @param metaProperties 上传文件的元数据,成对提供,如: 名,值,名,值
* @return
*/
public static String[] uploadFile(InputStream inputStream, String fileName, String... metaProperties){
try {
int length = inputStream.available();
byte[] datas = new byte[length];
inputStream.read(datas, 0, length);
// 处理元数据
NameValuePair[] nameValuePairs = null;
if (metaProperties.length % 2 == 0) {
// 参数数量满足要求,开始处理
nameValuePairs = new NameValuePair[metaProperties.length / 2 + 1];
for (int i = 0; i < nameValuePairs.length; i = i + 2) {
nameValuePairs[i / 2] = new NameValuePair(metaProperties[i], metaProperties[i + 1]);
}
} else {
nameValuePairs = new NameValuePair[1];
}
nameValuePairs[nameValuePairs.length - 1] = new NameValuePair("fileName", fileName);
// 获取文件后缀
String extName = getExtName(fileName);
// 上传文件到FastDFS
String[] result = STORAGE_CLIENT.upload_file(datas, extName, nameValuePairs);
for (String s : result) {
System.out.println("s = " + s);
//s = group1
//s = M00/00/00/wKjegGbqfoKACfLIAAAnJ2OoZs8411.txt
}
System.out.println("============= " + nameValuePairs[0]+ "/n"+ nameValuePairs[1]);
//============= org.csource.common.NameValuePair@4ae28001/norg.csource.common.NameValuePair@3e472f5d
return result;
}catch (Exception e){
// 发生任何异常,上传文件失败。返回null
e.printStackTrace();
return null;
}
}
/**
* 截取文件后缀
* @param fileName
* @return
*/
private static String getExtName(String fileName){
if(fileName.lastIndexOf(".") > -1){
// 文件名称中包含字符 .
return fileName.substring(fileName.lastIndexOf(".") + 1);
}else{
// 文件名称中不包含字符 .
return "";
}
}
/**
* 提供获取Storage客户端对象的工具方法
*/
public static StorageClient getStorageClient(){
return STORAGE_CLIENT;
}
private FastDFSUtils(){}
}
3)编写上传和下载路由
下载路由中需要加上内容类型和响应头,否则可能出现访问下载却出现在线预览图片
@Controller
public class LoginController {
@RequestMapping("/index")
public String uploadPage(){
return "index";
}
@RequestMapping("/upload")
public String upload(MultipartFile file, HttpSession session) throws IOException {
InputStream inputStream = file.getInputStream();
String originalFilename = file.getOriginalFilename();
String[] strings = FastDFSUtils.uploadFile(inputStream, originalFilename, "s", "");
System.out.println(strings);
return "a";
}
@RequestMapping("/download")
public void download(String group,String remote,HttpServletResponse response) throws IOException {
byte[] datas = FastDFSUtils.download(group, remote);
response.setContentType("application/octet-stream");
// 设置下载文件的附件名称
NameValuePair[] metaData = FastDFSUtils.getMetaData(group, remote);
String fileName = "";
for (NameValuePair metaDatum : metaData) {
if ("fileName".equals(metaDatum.getName()))
fileName = metaDatum.getValue();
}
response.setHeader("content-disposition",
"attachment;filename="+fileName.toString());
// 输出要下载的文件内容到客户端
// byte[] datas = (byte[]) result.get("datas");
response.getOutputStream().write(datas, 0, datas.length);
response.getOutputStream().flush();
}
}
4)上传一张图片,在浏览器中访问图片
这里的端口号为8888,fastdfs中内置有nginx服务器,请求从nginx服务器转发到storage服务器.
二、RabbitMQ
** publisher项目使用RabbitMQ软件将消息推送至交换机,交换机根据路由键将消息推送至相应队列中。Consumer项目中的监听器时刻监听提前设置好的监听队列,如果有消息进入队列中,会调用单元方法将消息中的数据取出消费,消费成功后返回信息在队列中删除消息**
** 如果消息在Consumer项目中拿取数据或者消费过程中出现错误,这个时候不会被删除,而是会多次尝试再次获取 消息 消费。达到一定次数,停止尝试。**
** 多个消费者同时监听一个消息队列时,采用轮循策略依次发送消息。**
1)虚拟机使用docker安装RabbitMQ
0、docker命令:
docker pull rabbitmq:management
docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 --restart=always -e DEFAULT_USER=wollo -e DEFAULT_PASS=wollo rabbitmq:management
** ①、 访问 http:192.168.222.128:15672: //Docker宿主机IP:15672**
** 这个是RabbitMQ提供的可视化界面,类似于Navicat。**
②、使用可视化界面操作RabbitMQ:
创建交换机、创建队列、将交换机和队列绑定并设置路由键
2)使用java代码连接操作RabbitMQ
1.新建一个父工程统一管理springBoot的版本号
打包方式为 pom
2.新建子工程publisher、consumer 及依赖配置
两个子工程都添加springAMQP依赖。
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
publisher配置文件:
#配置RabbitMq的链接参数
spring:
rabbitmq:
host: 192.168.222.128 # RabbitMQ服务器的IP。默认localhost
port: 5672 # RabbitMQ服务器的端口。
username: wollo # RabbitMQ的访问用户名。默认guest。
password: wollo # RabbitMQ的访问密码。默认guest
virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
publisher-confirm-type: correlated # 开启到达交换器确认机制。默认值:none,不开启确认机制。
publisher-returns: true # 开启到达消息队列确认机制。默认值:none,不开启确认机制。
consumer:
#配置RabbitMq的链接参数
spring:
rabbitmq:
host: 192.168.222.128 # RabbitMQ服务器的IP。默认localhost
port: 5672 # RabbitMQ服务器的端口。
username: wollo # RabbitMQ的访问用户名。默认guest。
password: wollo # RabbitMQ的访问密码。默认guest
virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
listener:
simple:
retry:
enabled: true # 开启重试机制
max-attempts: 3 # 重试消费1次
2.1)第一次使用Publisher发送消息,Consumer消费消息
** 1.前面在可视化界面中已经配置了交换机,绑定的消息队列,路由键**
** 交换机 rk.direct**
** 消息队列 rk.queue**
** 路由键 route.regex**
** 2.在publisher发送消息运行test()方法 ,别忘了编写启动类文件**
** 3.在Consumer中接收消息,需要指定接收的消息队列,同样别忘了编写启动类**
** 4.在运行的Consumer项目控制台中会打印"皇室"。**
2.2)发送和接收一个序列化对象User
** 1.新建commen子模块,publisher和consumer引入该模块**
** commen子模块新建User类,实现Serializable接口**
**2.可视化界面新建交换机rk.direct1 , 消息队列rk.queue1 ,发送user对象:**
** publisher的测试类:**
** **
** consumer的消息队列: **
** consumer项目控制台:**
2.3)其他类型交换器 基于注解创建交换机、消息队列
** 1.交换机类型有四种:**
** direct为完全匹配路由键**
** fanout为广播**
** topic为模糊匹配**
** 2.这里创建一个广播类型的交换机**
** key值任意写,因为广播针对绑定的所有消息队列。**
@Component
public class FanoutConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "fanout.queue",durable = "true",autoDelete="false"),
exchange = @Exchange(name = "rk.fanout",type = "fanout",durable = "true",autoDelete = "false"),
key = {"route.regex"}
)
})
public void OnMessage(String messageBody){
System.out.println("messageBody = " + messageBody);
}
}
** 3.topic交换机**
@Component
public class MyTopicConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "queue.topic.1", autoDelete = "false"),
exchange = @Exchange(name = "topic.first", type = "topic"),
key = {"小学.同学"}
)
})
public void onMessage1(String message){
System.out.println("小学.同学 - " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "queue.topic.2", autoDelete = "false"),
exchange = @Exchange(name = "topic.first", type = "topic"),
key = {"小学.老师"}
)
})
public void onMessage2(String message){
System.out.println("小学.老师 - " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "queue.topic.3", autoDelete = "false"),
exchange = @Exchange(name = "topic.first", type = "topic"),
key = {"小学.*"}
)
})
public void onMessage3(String message){
System.out.println("小学.* - " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "queue.topic.4", autoDelete = "false"),
exchange = @Exchange(name = "topic.first", type = "topic"),
key = {"大学.老师"}
)
})
public void onMessage4(String message){
System.out.println("大学.老师 - " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "queue.topic.5", autoDelete = "false"),
exchange = @Exchange(name = "topic.first", type = "topic"),
key = {"*.老师"}
)
})
public void onMessage5(String message){
System.out.println("*.老师 - " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "queue.topic.6", autoDelete = "false"),
exchange = @Exchange(name = "topic.first", type = "topic"),
key = {"#"}
)
})
public void onMessage6(String message){
System.out.println("# - " + message);
}
}
2.4)消息传递失败的处理方法
①publisher推送的消息失败的回调方法
1. 开启回调方法的确认机制。
2.实现回调接口
@Component
public class PublisherHandle implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
this.rabbitTemplate.setConfirmCallback(this);
this.rabbitTemplate.setReturnsCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("到达交换机:"+b);
}
/**
* @description:
* System.out.println( 交换器 : returnedMessage.getExchange ());
* System.out.println("路由键 : " + returnedMessage.getRoutingKey());
* System.out.println("路由失败编码 : " + returnedMessage.getReplyCode());
* System.out.println("路由失败描述 : " + returnedMessage.getReplyText());
* System.out.println("消息 : " + returnedMessage.getMessage());
* @author:rk
* @date: 2024/9/24 10:47
* @param: returnedMessage
**/
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
System.out.println("到达消息队列:"+returnedMessage.getReplyCode());
}
}
②consumer
** 配置中设置重试次数、或者开启手工ACK方式解决消息处理失败问题**
2.5)手动发送确认ACK
1.配置开启手工确认
2. 创建交换机、消息队列并设置手工ACK
@Component
public class MyAckConsumer {
//测试消费的ACK:
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "ack.queue"),
exchange = @Exchange(name = "ack.direct"),
key = {"ack.routing"}
)
})
public void testAckMessage(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)Long deliveryTag) throws Exception {
System.out.println("消息消费:"+message+"---"+deliveryTag);
//主动通知RabbitMq消息被正确消费,RabbitMq会将消息从队列中移除
//channel.basicAck(deliveryTag, false);
//主动通知RabbitMq消息没有被正确消费(消息的ID,消费结果,是否重试消费false表示删除消息true表示重复消费)
//channel.basicNack(deliveryTag,false,true);
//主动通知RabbitMq消息没有消费,直接丢弃(消息的ID,false表示删除消息true表示尝试重复消费),丢失一般是在消息消费的代码之前
channel.basicReject(deliveryTag, false);
}
}
2.6)同步消息处理
1.配置同步时长
2.消费者代码
@Component
public class SynchronizationMessageConsumer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(name = "return.queue"),
exchange = @Exchange(name = "return.direct"),
key = {"return.routing"}
)
})
public String testAckMessage(String message , Channel channel, @Header(AmqpHeaders.DELIVERY_TAG)Long deliveryTag) throws Exception {
System.out.println("消息的消费内容为:"+message);
return "今天天气不错,适合学习";
}
}
3.推送者代码:
4.测试
版权归原作者 别挡 所有, 如有侵权,请联系我们删除。