文章目录
第三章:Kafka,构建TB级异步消息系统
一、阻塞队列
BlockingQueue
- 解决线程通信的问题。- 阻塞方法:put
、take
。- 生产者消费者模式- 生产者:产生数据的线程。- 消费者:使用数据的线程。
- 实现类-
ArrayBlockingQueue
-LinkedBlockingQueue
-PriorityBlockingQueue、SynchronousQueue、DelayQueue
等。
1. 阻塞队列测试方法
在
test
中添加
BlockingQueueTests
类,来表示阻塞队列的测试方法,代码如下:
public class BlockingQueueTests {
public static void main(String[] args) {
BlockingQueue queue = new ArrayBlockingQueue(10);
new Thread(new Producer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
new Thread(new Consumer(queue)).start();
}
}
class Producer implements Runnable{
private BlockingQueue<Integer> queue;
public Producer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run(){
try{
for(int i = 0; i < 20; i ++ ) {
Thread.sleep(20);
queue.put(i);
System.out.println(Thread.currentThread().getName()+"生产:"+ queue.size());
}
}catch (Exception e){
e.printStackTrace();
}
}
}
class Consumer implements Runnable{
private BlockingQueue<Integer> queue;
public Consumer(BlockingQueue<Integer> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true){
Thread.sleep(new Random().nextInt(1000));
queue.take();
System.out.println(Thread.currentThread().getName()+"消费:"+ queue.size());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2. 测试结果
Thread-0生产:1
Thread-0生产:2
Thread-0生产:3
Thread-0生产:4
Thread-0生产:5
Thread-0生产:6
Thread-0生产:7
Thread-0生产:8
Thread-0生产:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-2消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-3消费:9
Thread-0生产:10
Thread-1消费:9
Thread-0生产:10
Thread-1消费:9
Thread-1消费:8
Thread-2消费:7
Thread-1消费:6
Thread-2消费:5
Thread-1消费:4
Thread-3消费:3
Thread-3消费:2
Thread-2消费:1
Thread-3消费:0
二、Kafka入门
- Kafka简介- Kafka是一个分布式的流媒体平台。- 应用:消息系统、日志收集、用户行为追踪、流式处理。
- Kafka特点- 高吞吐量、消息持久化、高可靠性、高扩展性。
- Kafka术语-
Broker
、Zookeeper
-Topic
、Partition
、Offset
-Leader Replica
、Follower Replica
1. Kafka下载
Kafka官网:
https://kafka.apache.org/
2. Kafka安装与配置
下载Kafka的安装包后进行解压,就相当于安装成功了。
需要进行以下配置:
修改 config包下的 zookeeper.properties:
修改 config包下的 server.properties:
3. Kafka的启动
首先在命令行中启动
Zookeeper
:
C:\Users\Andrew> d:
D:\> cd d:\kafka_2.13-3.2.3
d:\kafka_2.13-3.2.3>bin\windows\zookeeper-server-start.bat config\zookeeper.properties
启动成功后不关闭此窗口,重新打开一个新的命令窗口,用于启动
kafka
:
C:\Users\Andrew> d:
D:\> cd d:\kafka_2.13-3.2.3
d:\kafka_2.13-3.2.3>bin\windows\kafka-server-start.bat config\server.properties
注意: 当遇到“‘wmic’不是内部或外部命令,也不是可运行程序”。
在C盘下找到wbem文件夹,且里面包含WMIC.exe,将其添加到系统变量path中去。
比如我的路径是:
C:\Windows\System32\wbem
,在系统变量path中新建该路径。就可以正常启动Kafka了。
4. Kafka使用
- 创建主题 cd到
…\kafka_2.13-2.8.0\bin\windows
这里,然后输入kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
创建服务器端口号为9092(Kafka
默认端口号)的topic
,指生产者发布消息存储的位置在该服务器上localhost:9092
。--replication-factor
1 指1个副本。--partitions
1 指1个分区。--topic test
指该主题名为test
。 - 以生产者身份发送消息 输入:
kafka-console-producer.bat --broker-list localhost:9092 --topic test
生产者身份打开服务器列表中为localhost:9092
的服务器上的test
主题。--broker-list
指服务器列表。 并且输入要发送的消息: - 以消费者身份读取消息 新打开一个命令行窗口,且cd到
…\kafka_2.13-2.8.0\bin\windows
,并输入:kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
可以看到生产者发送的消息。并且这个消息队列中可以实时传送消息。 比如在生产者的命令行中继续输入信息,很快在消费者这边也能得到消息。
三、Spring整合Kafka
- 引入依赖-
spring-kafka
- 配置Kafka- 配置
server
、consumer
- 访问Kafka- 生产者:
kafkaTemplate.send(topic, data);
- 消费者:@KafkaListener(topics = {"test"}``````public void handleMessage(ConsumerRecord record) {}
1. 引入依赖
在
pom.xml
添加相关的依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.6</version>
</dependency>
2. 配置Kafka
#KafkaProperties
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=3000
3. 测试
在
test
包下添加
KafkaTests
类,代码如下:
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(classes = CommunityApplication.class)
public class KafkaTests {
@Autowired
private KafkaProducer kafkaProducer;
@Test
public void testKafka() {
kafkaProducer.sendMessage("test", "你好");
kafkaProducer.sendMessage("test", "在吗");
try {
Thread.sleep(1000 * 20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@Component
class KafkaProducer {
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendMessage(String topic, String content) {
kafkaTemplate.send(topic, content);
}
}
@Component
class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void handleMessage(ConsumerRecord record) {
System.out.println(record.value());
}
}
4. 测试结果
创作不易,如果有帮助到你,请给题解点个赞和收藏,让更多的人看到!!!
关注博主不迷路,内容持续更新中。
版权归原作者 Java技术一点通 所有, 如有侵权,请联系我们删除。