0


文件服务器FastDFS 消息队列中间件RabbitMQ

新标签页 (chinaunix.net)

FastDFS - Browse Files at SourceForge.net

一、FastDFS

  1. TrackerStorage
  2. tracker用来管理所有的storage,只是管理服务器,负责负载均衡。
  3. storage是存储服务器,每一个storage服务器都是一个单独的个体,storage服务器之间没有交互关系。
  4. storage中根目录包含256个一级目录、每个一级目录中包含256个二级子目录,在二级子目录中存储图片。存储图片时服务器会返回相应的groupremote,访问文件时通过这两个键值对获取图片。



①、在虚拟机中使用docker安装FastDFS

  1. docker load -i fastdfs.tar //在docker中加载镜像源文件
  2. mkdir -p /opt/fdfs/tracker
  3. docker run -d --network=host --name tracker -v /opt/fdfs/tracker:/var/fdfs delron/fastdfs tracker
  4. mkdir -p /opt/fdfs/storage
  5. docker run -d --network=host --name storage -e TRACKER_SERVER=192.168.222.128:22122 -v /opt/fdfs/storage:/var/fdfs -e GROUP_NAME=group1 delron/fastdfs storage
  6. //重启服务器需要删除文件。
  7. rm -rf /opt/fdfs/tracker/data/*.pid
  8. rm -rf /opt/fdfs/storage/data/*.pid
  9. //更改服务器的磁盘限制
  10. docker exec -it tracker bash
  11. cd /etc/fdfs
  12. vi /etc/fdfs/tracker.conf
  13. reserved_storage_space = 10K //磁盘剩余多少空间时关闭上传文件功能


②、java代码使用FastDFS

1)首先编写FastDFS配置信息fdfs.properties

  1. fastdfs.connect_timeout_in_seconds=10
  2. fastdfs.network_timeout_in_seconds=30
  3. fastdfs.charset=UTF-8
  4. # tracker????????????????
  5. fastdfs.tracker_servers=192.168.222.128:22122
  6. # tracker????????tracker.conf??http??????????
  7. fastdfs.http_tracker_http_port=8080

2)FastDFS 操作工具类

  1. 封装了加载配置文件fdfs.properties的静态代码块,还有上传和下载的方法。

  1. package com.xja.util;
  2. import org.csource.common.NameValuePair;
  3. import org.csource.fastdfs.*;
  4. import java.io.InputStream;
  5. import java.util.Properties;
  6. /**
  7. * FastDFS Java客户端工具
  8. */
  9. public final class FastDFSUtils {
  10. /**
  11. * 定义静态属性,Properties和StorageClient
  12. */
  13. private final static Properties PROPERTIES;
  14. private final static StorageClient STORAGE_CLIENT;
  15. /**
  16. * 静态初始化代码块,初始化静态属性
  17. * 静态初始化代码块有异常如何处理?
  18. * 处理的时候,try。。catch。。 抛出一个Error,终止虚拟机。
  19. */
  20. static{
  21. try {
  22. PROPERTIES = new Properties();
  23. // 读取配置文件
  24. PROPERTIES.load(
  25. FastDFSUtils.class
  26. .getClassLoader()
  27. .getResourceAsStream("fdfs.properties")
  28. );
  29. // 使用ClientGlobal初始化FastDFS客户端配置
  30. ClientGlobal.initByProperties(PROPERTIES);
  31. // 创建Tracker客户端对象
  32. TrackerClient trackerClient = new TrackerClient();
  33. // 基于Tracker客户端对象,获取Tracker服务器对象
  34. TrackerServer trackerServer = trackerClient.getConnection();
  35. // 基于Tracker服务器和客户端对象,获取Storage服务器对象
  36. StorageServer storageServer = trackerClient.getStoreStorage(trackerServer);
  37. // 创建Storage客户端对象
  38. STORAGE_CLIENT = new StorageClient(trackerServer, storageServer);
  39. }catch (Exception e){
  40. throw new ExceptionInInitializerError(e);
  41. }
  42. }
  43. /**
  44. * 删除文件
  45. * int delete_file(String 卷名, String 路径及文件名);
  46. * 返回值: 0代表成功,其他数字代表错误编码
  47. */
  48. public static int remote(String group, String remote){
  49. try {
  50. return STORAGE_CLIENT.delete_file(group, remote);
  51. }catch (Exception e){
  52. e.printStackTrace();
  53. return -1;
  54. }
  55. }
  56. /**
  57. * 查询某文件的元数据
  58. * @param group 卷名
  59. * @param remote 路径及文件名
  60. * @return 返回文件的元数据数组。发生错误返回null
  61. */
  62. public static NameValuePair[] getMetaData(String group, String remote){
  63. try{
  64. return STORAGE_CLIENT.get_metadata(group, remote);
  65. }catch (Exception e){
  66. e.printStackTrace();
  67. return null;
  68. }
  69. }
  70. /**
  71. * 下载文件工具方法
  72. * 下载方法
  73. * byte[] download_file(String 卷名, String 路径及文件名)
  74. * 返回要下载的文件内容
  75. * @param group 卷名
  76. * @param remote 路径及文件名
  77. * @return 返回下载的文件内容,发生错误返回null
  78. */
  79. public static byte[] download(String group, String remote){
  80. try {
  81. return STORAGE_CLIENT.download_file(group, remote);
  82. }catch (Exception e){
  83. e.printStackTrace();
  84. return null;
  85. }
  86. }
  87. /**
  88. * 上传文件的工具方法
  89. * 一定保存文件到FastDFS,一定保存至少一个元数据(文件原始名称)
  90. * @param inputStream 要上传的文件的输入流
  91. * @param fileName 上传文件的原始名称
  92. * @param metaProperties 上传文件的元数据,成对提供,如: 名,值,名,值
  93. * @return
  94. */
  95. public static String[] uploadFile(InputStream inputStream, String fileName, String... metaProperties){
  96. try {
  97. int length = inputStream.available();
  98. byte[] datas = new byte[length];
  99. inputStream.read(datas, 0, length);
  100. // 处理元数据
  101. NameValuePair[] nameValuePairs = null;
  102. if (metaProperties.length % 2 == 0) {
  103. // 参数数量满足要求,开始处理
  104. nameValuePairs = new NameValuePair[metaProperties.length / 2 + 1];
  105. for (int i = 0; i < nameValuePairs.length; i = i + 2) {
  106. nameValuePairs[i / 2] = new NameValuePair(metaProperties[i], metaProperties[i + 1]);
  107. }
  108. } else {
  109. nameValuePairs = new NameValuePair[1];
  110. }
  111. nameValuePairs[nameValuePairs.length - 1] = new NameValuePair("fileName", fileName);
  112. // 获取文件后缀
  113. String extName = getExtName(fileName);
  114. // 上传文件到FastDFS
  115. String[] result = STORAGE_CLIENT.upload_file(datas, extName, nameValuePairs);
  116. for (String s : result) {
  117. System.out.println("s = " + s);
  118. //s = group1
  119. //s = M00/00/00/wKjegGbqfoKACfLIAAAnJ2OoZs8411.txt
  120. }
  121. System.out.println("============= " + nameValuePairs[0]+ "/n"+ nameValuePairs[1]);
  122. //============= org.csource.common.NameValuePair@4ae28001/norg.csource.common.NameValuePair@3e472f5d
  123. return result;
  124. }catch (Exception e){
  125. // 发生任何异常,上传文件失败。返回null
  126. e.printStackTrace();
  127. return null;
  128. }
  129. }
  130. /**
  131. * 截取文件后缀
  132. * @param fileName
  133. * @return
  134. */
  135. private static String getExtName(String fileName){
  136. if(fileName.lastIndexOf(".") > -1){
  137. // 文件名称中包含字符 .
  138. return fileName.substring(fileName.lastIndexOf(".") + 1);
  139. }else{
  140. // 文件名称中不包含字符 .
  141. return "";
  142. }
  143. }
  144. /**
  145. * 提供获取Storage客户端对象的工具方法
  146. */
  147. public static StorageClient getStorageClient(){
  148. return STORAGE_CLIENT;
  149. }
  150. private FastDFSUtils(){}
  151. }

3)编写上传和下载路由

  1. 下载路由中需要加上内容类型和响应头,否则可能出现访问下载却出现在线预览图片

  1. @Controller
  2. public class LoginController {
  3. @RequestMapping("/index")
  4. public String uploadPage(){
  5. return "index";
  6. }
  7. @RequestMapping("/upload")
  8. public String upload(MultipartFile file, HttpSession session) throws IOException {
  9. InputStream inputStream = file.getInputStream();
  10. String originalFilename = file.getOriginalFilename();
  11. String[] strings = FastDFSUtils.uploadFile(inputStream, originalFilename, "s", "");
  12. System.out.println(strings);
  13. return "a";
  14. }
  15. @RequestMapping("/download")
  16. public void download(String group,String remote,HttpServletResponse response) throws IOException {
  17. byte[] datas = FastDFSUtils.download(group, remote);
  18. response.setContentType("application/octet-stream");
  19. // 设置下载文件的附件名称
  20. NameValuePair[] metaData = FastDFSUtils.getMetaData(group, remote);
  21. String fileName = "";
  22. for (NameValuePair metaDatum : metaData) {
  23. if ("fileName".equals(metaDatum.getName()))
  24. fileName = metaDatum.getValue();
  25. }
  26. response.setHeader("content-disposition",
  27. "attachment;filename="+fileName.toString());
  28. // 输出要下载的文件内容到客户端
  29. // byte[] datas = (byte[]) result.get("datas");
  30. response.getOutputStream().write(datas, 0, datas.length);
  31. response.getOutputStream().flush();
  32. }
  33. }

4)上传一张图片,在浏览器中访问图片

  1. 这里的端口号为8888fastdfs中内置有nginx服务器,请求从nginx服务器转发到storage服务器.

二、RabbitMQ

  1. ** publisher项目使用RabbitMQ软件将消息推送至交换机,交换机根据路由键将消息推送至相应队列中。Consumer项目中的监听器时刻监听提前设置好的监听队列,如果有消息进入队列中,会调用单元方法将消息中的数据取出消费,消费成功后返回信息在队列中删除消息**

** 如果消息在Consumer项目中拿取数据或者消费过程中出现错误,这个时候不会被删除,而是会多次尝试再次获取 消息 消费。达到一定次数,停止尝试。**

** 多个消费者同时监听一个消息队列时,采用轮循策略依次发送消息。**

1)虚拟机使用docker安装RabbitMQ

  1. 0docker命令:
  1. docker pull rabbitmq:management
  2. docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 --restart=always -e DEFAULT_USER=wollo -e DEFAULT_PASS=wollo rabbitmq:management

** ①、 访问 http:192.168.222.128:15672: //Docker宿主机IP:15672**

** 这个是RabbitMQ提供的可视化界面,类似于Navicat。**

  1. ②、使用可视化界面操作RabbitMQ
  2. 创建交换机、创建队列、将交换机和队列绑定并设置路由键



2)使用java代码连接操作RabbitMQ

1.新建一个父工程统一管理springBoot的版本号

  1. 打包方式为 pom


2.新建子工程publisher、consumer 及依赖配置

  1. 两个子工程都添加springAMQP依赖。
  1. <dependencies>
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-test</artifactId>
  9. <scope>test</scope>
  10. </dependency>
  11. </dependencies>

  1. publisher配置文件:

  1. #配置RabbitMq的链接参数
  2. spring:
  3. rabbitmq:
  4. host: 192.168.222.128 # RabbitMQ服务器的IP。默认localhost
  5. port: 5672 # RabbitMQ服务器的端口。
  6. username: wollo # RabbitMQ的访问用户名。默认guest。
  7. password: wollo # RabbitMQ的访问密码。默认guest
  8. virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
  9. publisher-confirm-type: correlated # 开启到达交换器确认机制。默认值:none,不开启确认机制。
  10. publisher-returns: true # 开启到达消息队列确认机制。默认值:none,不开启确认机制。

  1. consumer

  1. #配置RabbitMq的链接参数
  2. spring:
  3. rabbitmq:
  4. host: 192.168.222.128 # RabbitMQ服务器的IP。默认localhost
  5. port: 5672 # RabbitMQ服务器的端口。
  6. username: wollo # RabbitMQ的访问用户名。默认guest。
  7. password: wollo # RabbitMQ的访问密码。默认guest
  8. virtual-host: / # 连接RabbitMQ中的哪一个虚拟主机。默认 /
  9. listener:
  10. simple:
  11. retry:
  12. enabled: true # 开启重试机制
  13. max-attempts: 3 # 重试消费1次


2.1)第一次使用Publisher发送消息,Consumer消费消息

** 1.前面在可视化界面中已经配置了交换机,绑定的消息队列,路由键**

** 交换机 rk.direct**

** 消息队列 rk.queue**

** 路由键 route.regex**

** 2.在publisher发送消息运行test()方法 ,别忘了编写启动类文件**

** 3.在Consumer中接收消息,需要指定接收的消息队列,同样别忘了编写启动类**

** 4.在运行的Consumer项目控制台中会打印"皇室"。**



2.2)发送和接收一个序列化对象User

** 1.新建commen子模块,publisher和consumer引入该模块**

** commen子模块新建User类,实现Serializable接口**

  1. **2.可视化界面新建交换机rk.direct1 , 消息队列rk.queue1 ,发送user对象:**

** publisher的测试类:**

  1. ** **

** consumer的消息队列: **



** consumer项目控制台:**



2.3)其他类型交换器 基于注解创建交换机、消息队列

  1. ** 1.交换机类型有四种:**

** direct为完全匹配路由键**

** fanout为广播**

** topic为模糊匹配**

  1. ** 2.这里创建一个广播类型的交换机**

** key值任意写,因为广播针对绑定的所有消息队列。**

  1. @Component
  2. public class FanoutConsumer {
  3. @RabbitListener(bindings = {
  4. @QueueBinding(
  5. value = @Queue(name = "fanout.queue",durable = "true",autoDelete="false"),
  6. exchange = @Exchange(name = "rk.fanout",type = "fanout",durable = "true",autoDelete = "false"),
  7. key = {"route.regex"}
  8. )
  9. })
  10. public void OnMessage(String messageBody){
  11. System.out.println("messageBody = " + messageBody);
  12. }
  13. }

** 3.topic交换机**

  1. @Component
  2. public class MyTopicConsumer {
  3. @RabbitListener(bindings = {
  4. @QueueBinding(
  5. value = @Queue(name = "queue.topic.1", autoDelete = "false"),
  6. exchange = @Exchange(name = "topic.first", type = "topic"),
  7. key = {"小学.同学"}
  8. )
  9. })
  10. public void onMessage1(String message){
  11. System.out.println("小学.同学 - " + message);
  12. }
  13. @RabbitListener(bindings = {
  14. @QueueBinding(
  15. value = @Queue(name = "queue.topic.2", autoDelete = "false"),
  16. exchange = @Exchange(name = "topic.first", type = "topic"),
  17. key = {"小学.老师"}
  18. )
  19. })
  20. public void onMessage2(String message){
  21. System.out.println("小学.老师 - " + message);
  22. }
  23. @RabbitListener(bindings = {
  24. @QueueBinding(
  25. value = @Queue(name = "queue.topic.3", autoDelete = "false"),
  26. exchange = @Exchange(name = "topic.first", type = "topic"),
  27. key = {"小学.*"}
  28. )
  29. })
  30. public void onMessage3(String message){
  31. System.out.println("小学.* - " + message);
  32. }
  33. @RabbitListener(bindings = {
  34. @QueueBinding(
  35. value = @Queue(name = "queue.topic.4", autoDelete = "false"),
  36. exchange = @Exchange(name = "topic.first", type = "topic"),
  37. key = {"大学.老师"}
  38. )
  39. })
  40. public void onMessage4(String message){
  41. System.out.println("大学.老师 - " + message);
  42. }
  43. @RabbitListener(bindings = {
  44. @QueueBinding(
  45. value = @Queue(name = "queue.topic.5", autoDelete = "false"),
  46. exchange = @Exchange(name = "topic.first", type = "topic"),
  47. key = {"*.老师"}
  48. )
  49. })
  50. public void onMessage5(String message){
  51. System.out.println("*.老师 - " + message);
  52. }
  53. @RabbitListener(bindings = {
  54. @QueueBinding(
  55. value = @Queue(name = "queue.topic.6", autoDelete = "false"),
  56. exchange = @Exchange(name = "topic.first", type = "topic"),
  57. key = {"#"}
  58. )
  59. })
  60. public void onMessage6(String message){
  61. System.out.println("# - " + message);
  62. }
  63. }


2.4)消息传递失败的处理方法

①publisher推送的消息失败的回调方法

  1. 1. 开启回调方法的确认机制。![](https://i-blog.csdnimg.cn/direct/9e69a8893f084447a9e0794ceda1ed7e.png)
  2. 2.实现回调接口
  1. @Component
  2. public class PublisherHandle implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback{
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. @PostConstruct
  6. public void init(){
  7. this.rabbitTemplate.setConfirmCallback(this);
  8. this.rabbitTemplate.setReturnsCallback(this);
  9. }
  10. @Override
  11. public void confirm(CorrelationData correlationData, boolean b, String s) {
  12. System.out.println("到达交换机:"+b);
  13. }
  14. /**
  15. * @description:
  16. * System.out.println( 交换器 : returnedMessage.getExchange ());
  17. * System.out.println("路由键 : " + returnedMessage.getRoutingKey());
  18. * System.out.println("路由失败编码 : " + returnedMessage.getReplyCode());
  19. * System.out.println("路由失败描述 : " + returnedMessage.getReplyText());
  20. * System.out.println("消息 : " + returnedMessage.getMessage());
  21. * @author:rk
  22. * @date: 2024/9/24 10:47
  23. * @param: returnedMessage
  24. **/
  25. @Override
  26. public void returnedMessage(ReturnedMessage returnedMessage) {
  27. System.out.println("到达消息队列:"+returnedMessage.getReplyCode());
  28. }
  29. }


②consumer

** 配置中设置重试次数、或者开启手工ACK方式解决消息处理失败问题**



2.5)手动发送确认ACK

  1. 1.配置开启手工确认![](https://i-blog.csdnimg.cn/direct/c13beb4a1b3d4755be39a9055c810a84.png)
  2. 2. 创建交换机、消息队列并设置手工ACK
  1. @Component
  2. public class MyAckConsumer {
  3. //测试消费的ACK:
  4. @RabbitListener(bindings = {
  5. @QueueBinding(
  6. value = @Queue(name = "ack.queue"),
  7. exchange = @Exchange(name = "ack.direct"),
  8. key = {"ack.routing"}
  9. )
  10. })
  11. public void testAckMessage(String message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG)Long deliveryTag) throws Exception {
  12. System.out.println("消息消费:"+message+"---"+deliveryTag);
  13. //主动通知RabbitMq消息被正确消费,RabbitMq会将消息从队列中移除
  14. //channel.basicAck(deliveryTag, false);
  15. //主动通知RabbitMq消息没有被正确消费(消息的ID,消费结果,是否重试消费false表示删除消息true表示重复消费)
  16. //channel.basicNack(deliveryTag,false,true);
  17. //主动通知RabbitMq消息没有消费,直接丢弃(消息的ID,false表示删除消息true表示尝试重复消费),丢失一般是在消息消费的代码之前
  18. channel.basicReject(deliveryTag, false);
  19. }
  20. }


2.6)同步消息处理

  1. 1.配置同步时长


  1. 2.消费者代码
  1. @Component
  2. public class SynchronizationMessageConsumer {
  3. @RabbitListener(bindings = {
  4. @QueueBinding(
  5. value = @Queue(name = "return.queue"),
  6. exchange = @Exchange(name = "return.direct"),
  7. key = {"return.routing"}
  8. )
  9. })
  10. public String testAckMessage(String message , Channel channel, @Header(AmqpHeaders.DELIVERY_TAG)Long deliveryTag) throws Exception {
  11. System.out.println("消息的消费内容为:"+message);
  12. return "今天天气不错,适合学习";
  13. }
  14. }

  1. 3.推送者代码:


  1. 4.测试


本文转载自: https://blog.csdn.net/weixin_45939821/article/details/142467935
版权归原作者 别挡 所有, 如有侵权,请联系我们删除。

“文件服务器FastDFS 消息队列中间件RabbitMQ”的评论:

还没有评论