0


Kafka安装与配置

1.下载Kafka2.13-3.1.0

最新版为 kafka_2.13-3.1.0.tgz

下载Zookeper

最新版为 zookeeper-3.8.0

2.单机安装zookeper

Kafka依赖于zookeeper,官方承诺将来会移除.

解压文件:

tar zxvf apache-zookeeper-3.8.0-bin.tar.gz -C /opt/
mv /opt/apache-zookeeper-3.8.0-bin/ /opt/zookeeper

在/opt/zookeeper/ 目录下创建数据文件目录和日志文件目录

mkdir /opt/zookeeper/zkData
mkdir /opt/zookeeper/zkLog

复制一份配置文件并修改

cd /opt/zookeeper/conf/
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
# 修改如下内容
dataDir=/opt/zookeeper/zkData
dataLogDir=/opt/zookeeper/zkLog

启动

cd /opt/zookeeper/bin/
# 启动zookeeper
./zkServer.sh start
# 查看进程是否启动
jps
# 查看状态
./zkServer.sh status
# 停止zookeeper
./zkServer.sh stop

3.安装Kafka

解压到指定目录

cd /home
$ tar -xzf kafka_2.13-3.1.0.tgz
$ cd kafka_2.13-3.1.0

修改config目录下vi server.propertie文件

listeners = PLAINTEXT://192.168.2.40:9092

#多个可用逗号分隔

#zookeeper.connect=server1:2181,server2:2181,server3:2181

zookeeper.connect=192.168.2.40:2181

启动命令:

bin/kafka-server-start.sh config/server.properties

此方式可以实时查看日志.

后台启动方式:

./kafka-server-start.sh -daemon ../config/server.properties

查询进程和关闭命令

jps
./kafka-server-stop.sh

登录zookeeper客户端,查看/brokers/ids

cd /opt/zookeeper/bin/
zkCli.sh
# 查询结果如下:
[zk: localhost:2181(CONNECTED) 0] ls /brokers/ids
[0]
[zk: localhost:2181(CONNECTED) 1] quit

kafka常见命令

#创建主题 主题名是 quickstart-events
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
#查询主题
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
#主题中写入消息
 bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
This is my first event
This is my second event
#主题中读取消息
 bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

kafka集群

假如现在有两台服务器192.168.2.40,192.168.2.41

kafka的安装与配置如上,两台服务器唯一不同的地方就是配置文件中的broker.id和listeners的配置

修改config目录下vi server.propertie文件

192.168.2.40

listeners = PLAINTEXT://192.168.2.40:9092
broker.id=0

192.168.2.41

listeners = PLAINTEXT://192.168.2.41:9092
broker.id=1

bin目录启动命令都添加

vi kafka-server-start.sh
#添加  export JMX_PORT="9999"
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
fi

登录zookeeper客户端,查看/brokers/ids

4.可视化工具kafka-eagle

下载:kafka-eaglev2.1.0.tar.gz

解压

cd /home
tar -zxvf efak-web-2.1.0-bin.tar.gz

在/etc/profile文件中添加环境变量KE_HOME

vi /etc/profile
# 在profile文件中添加
#解压后的efak目录
export KE_HOME=/home/efak-web-2.1.0
export PATH=$PATH:$KE_HOME/bin
# 使修改后的profile文件生效
. /etc/profile

安装MySQL并添加数据库ke,kafka-eagle之后会用到它;
修改配置文件$KE_HOME/conf/system-config.properties,主要是修改Zookeeper的配置和数据库配置,注释掉sqlite配置,改为使用MySQL.

######################################
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
 ######################################
# kafka eagle webui port
######################################
kafka.eagle.webui.port=8048
 ######################################
# kafka mysql jdbc driver address
######################################
kafka.eagle.driver=com.mysql.cj.jdbc.Driver
kafka.eagle.url=jdbc:mysql://localhost:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=123456

启动后会自动使用上面的数据库连接,创建并初始化数据库.名称ke.有时候ke数据库会初始化失败,这时可以手动去执行创建数据库的脚本。脚本如下:


/*!40101 SET NAMES utf8 */;

/*!40101 SET SQL_MODE=''*/;

/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`ke` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;

USE `ke`;

/*Table structure for table `ke_alarm_clusters` */

DROP TABLE IF EXISTS `ke_alarm_clusters`;

CREATE TABLE `ke_alarm_clusters` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `type` varchar(32) DEFAULT NULL,
  `cluster` varchar(64) DEFAULT NULL,
  `server` text,
  `alarm_group` varchar(128) DEFAULT NULL,
  `alarm_times` int DEFAULT NULL,
  `alarm_max_times` int DEFAULT NULL,
  `alarm_level` varchar(4) DEFAULT NULL,
  `is_normal` varchar(2) DEFAULT 'Y',
  `is_enable` varchar(2) DEFAULT 'Y',
  `created` varchar(32) DEFAULT NULL,
  `modify` varchar(32) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_alarm_clusters` */

/*Table structure for table `ke_alarm_config` */

DROP TABLE IF EXISTS `ke_alarm_config`;

CREATE TABLE `ke_alarm_config` (
  `cluster` varchar(64) NOT NULL,
  `alarm_group` varchar(128) NOT NULL,
  `alarm_type` varchar(16) DEFAULT NULL,
  `alarm_url` text,
  `http_method` varchar(16) DEFAULT NULL,
  `alarm_address` text,
  `created` varchar(32) DEFAULT NULL,
  `modify` varchar(32) DEFAULT NULL,
  PRIMARY KEY (`cluster`,`alarm_group`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_alarm_config` */

/*Table structure for table `ke_alarm_consumer` */

DROP TABLE IF EXISTS `ke_alarm_consumer`;

CREATE TABLE `ke_alarm_consumer` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `cluster` varchar(64) DEFAULT NULL,
  `group` varchar(128) DEFAULT NULL,
  `topic` varchar(128) DEFAULT NULL,
  `lag` bigint DEFAULT NULL,
  `alarm_group` varchar(128) DEFAULT NULL,
  `alarm_times` int DEFAULT NULL,
  `alarm_max_times` int DEFAULT NULL,
  `alarm_level` varchar(4) DEFAULT NULL,
  `is_normal` varchar(2) DEFAULT 'Y',
  `is_enable` varchar(2) DEFAULT 'Y',
  `created` varchar(32) DEFAULT NULL,
  `modify` varchar(32) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_alarm_consumer` */

/*Table structure for table `ke_alarm_crontab` */

DROP TABLE IF EXISTS `ke_alarm_crontab`;

CREATE TABLE `ke_alarm_crontab` (
  `id` bigint NOT NULL,
  `type` varchar(64) NOT NULL,
  `crontab` varchar(32) DEFAULT NULL,
  `is_enable` varchar(2) DEFAULT 'Y',
  `created` varchar(32) DEFAULT NULL,
  `modify` varchar(32) DEFAULT NULL,
  PRIMARY KEY (`id`,`type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_alarm_crontab` */

/*Table structure for table `ke_connect_config` */

DROP TABLE IF EXISTS `ke_connect_config`;

CREATE TABLE `ke_connect_config` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `cluster` varchar(64) DEFAULT NULL,
  `connect_uri` varchar(128) DEFAULT NULL,
  `version` varchar(32) DEFAULT NULL,
  `alive` varchar(16) DEFAULT NULL,
  `created` varchar(32) DEFAULT NULL,
  `modify` varchar(32) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_connect_config` */

/*Table structure for table `ke_consumer_bscreen` */

DROP TABLE IF EXISTS `ke_consumer_bscreen`;

CREATE TABLE `ke_consumer_bscreen` (
  `cluster` varchar(64) DEFAULT NULL,
  `group` varchar(128) DEFAULT NULL,
  `topic` varchar(64) DEFAULT NULL,
  `logsize` bigint DEFAULT NULL,
  `difflogsize` bigint DEFAULT NULL,
  `offsets` bigint DEFAULT NULL,
  `diffoffsets` bigint DEFAULT NULL,
  `lag` bigint DEFAULT NULL,
  `timespan` bigint DEFAULT NULL,
  `tm` varchar(16) DEFAULT NULL,
  KEY `idx_timespan` (`timespan`),
  KEY `idx_tm_cluster_diffoffsets` (`tm`,`cluster`,`diffoffsets`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_consumer_bscreen` */

/*Table structure for table `ke_consumer_group` */

DROP TABLE IF EXISTS `ke_consumer_group`;

CREATE TABLE `ke_consumer_group` (
  `cluster` varchar(64) NOT NULL,
  `group` varchar(128) NOT NULL,
  `topic` varchar(128) NOT NULL,
  `status` int DEFAULT NULL,
  PRIMARY KEY (`cluster`,`group`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_consumer_group` */

/*Table structure for table `ke_consumer_group_summary` */

DROP TABLE IF EXISTS `ke_consumer_group_summary`;

CREATE TABLE `ke_consumer_group_summary` (
  `cluster` varchar(64) NOT NULL,
  `group` varchar(128) NOT NULL,
  `topic_number` varchar(128) NOT NULL,
  `coordinator` varchar(128) DEFAULT NULL,
  `active_topic` int DEFAULT NULL,
  `active_thread_total` int DEFAULT NULL,
  PRIMARY KEY (`cluster`,`group`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_consumer_group_summary` */

/*Table structure for table `ke_logsize` */

DROP TABLE IF EXISTS `ke_logsize`;

CREATE TABLE `ke_logsize` (
  `cluster` varchar(64) DEFAULT NULL,
  `topic` varchar(64) DEFAULT NULL,
  `logsize` bigint DEFAULT NULL,
  `diffval` bigint DEFAULT NULL,
  `timespan` bigint DEFAULT NULL,
  `tm` varchar(16) DEFAULT NULL,
  KEY `idx_timespan` (`timespan`),
  KEY `idx_tm_topic` (`tm`,`topic`),
  KEY `idx_tm_cluster_diffval` (`tm`,`cluster`,`diffval`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_logsize` */

/*Table structure for table `ke_metrics` */

DROP TABLE IF EXISTS `ke_metrics`;

CREATE TABLE `ke_metrics` (
  `cluster` varchar(64) DEFAULT NULL,
  `broker` text,
  `type` varchar(32) DEFAULT NULL,
  `key` varchar(64) DEFAULT NULL,
  `value` varchar(128) DEFAULT NULL,
  `timespan` bigint DEFAULT NULL,
  `tm` varchar(16) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_metrics` */

/*Table structure for table `ke_metrics_offline` */

DROP TABLE IF EXISTS `ke_metrics_offline`;

CREATE TABLE `ke_metrics_offline` (
  `cluster` varchar(64) NOT NULL,
  `key` varchar(128) NOT NULL,
  `one` varchar(128) DEFAULT NULL,
  `mean` varchar(128) DEFAULT NULL,
  `five` varchar(128) DEFAULT NULL,
  `fifteen` varchar(128) DEFAULT NULL,
  PRIMARY KEY (`cluster`,`key`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_metrics_offline` */

/*Table structure for table `ke_p_role` */

DROP TABLE IF EXISTS `ke_p_role`;

CREATE TABLE `ke_p_role` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `name` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'role name',
  `seq` tinyint NOT NULL COMMENT 'rank',
  `description` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'role describe',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_p_role` */

insert  into `ke_p_role`(`id`,`name`,`seq`,`description`) values 
(1,'Administrator',1,'Have all permissions'),
(2,'Devs',2,'Own add or delete'),
(3,'Tourist',3,'Only viewer');

/*Table structure for table `ke_resources` */

DROP TABLE IF EXISTS `ke_resources`;

CREATE TABLE `ke_resources` (
  `resource_id` bigint NOT NULL AUTO_INCREMENT,
  `name` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'resource name',
  `url` varchar(255) NOT NULL,
  `parent_id` int NOT NULL,
  PRIMARY KEY (`resource_id`)
) ENGINE=InnoDB AUTO_INCREMENT=25 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_resources` */

insert  into `ke_resources`(`resource_id`,`name`,`url`,`parent_id`) values 
(1,'System','/system',-1),
(2,'User','/system/user',1),
(3,'Role','/system/role',1),
(4,'Resource','/system/resource',1),
(5,'Notice','/system/notice',1),
(6,'Topic','/topic',-1),
(7,'Message','/topic/message',6),
(8,'Create','/topic/create',6),
(9,'Alarm','/alarm',-1),
(10,'Add','/alarm/add',9),
(11,'Modify','/alarm/modify',9),
(12,'Cluster','/cluster',-1),
(13,'ZkCli','/cluster/zkcli',12),
(14,'UserDelete','/system/user/delete',1),
(15,'UserModify','/system/user/modify',1),
(16,'Mock','/topic/mock',6),
(18,'Create','/alarm/create',9),
(19,'History','/alarm/history',9),
(20,'Manager','/topic/manager',6),
(21,'PasswdReset','/system/user/reset',1),
(22,'Config','/alarm/config',9),
(23,'List','/alarm/list',9),
(24,'Hub','/topic/hub',6);

/*Table structure for table `ke_role_resource` */

DROP TABLE IF EXISTS `ke_role_resource`;

CREATE TABLE `ke_role_resource` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `role_id` int NOT NULL,
  `resource_id` int NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_role_resource` */

insert  into `ke_role_resource`(`id`,`role_id`,`resource_id`) values 
(1,1,1),
(2,1,2),
(3,1,3),
(4,1,4),
(5,1,5),
(6,1,7),
(7,1,8),
(8,1,10),
(9,1,11),
(10,1,13),
(11,2,7),
(12,2,8),
(13,2,13),
(14,2,10),
(15,2,11),
(16,1,14),
(17,1,15),
(18,1,16),
(19,1,18),
(20,1,19),
(21,1,20),
(22,1,21),
(23,1,22),
(24,1,23),
(25,1,24);

/*Table structure for table `ke_sql_history` */

DROP TABLE IF EXISTS `ke_sql_history`;

CREATE TABLE `ke_sql_history` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `cluster` varchar(64) DEFAULT NULL,
  `username` varchar(64) DEFAULT NULL,
  `host` varchar(128) DEFAULT NULL,
  `ksql` text,
  `status` varchar(16) DEFAULT NULL,
  `spend_time` bigint DEFAULT NULL,
  `created` varchar(32) DEFAULT NULL,
  `tm` varchar(16) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_sql_history` */

/*Table structure for table `ke_topic_rank` */

DROP TABLE IF EXISTS `ke_topic_rank`;

CREATE TABLE `ke_topic_rank` (
  `cluster` varchar(64) NOT NULL,
  `topic` varchar(64) NOT NULL,
  `tkey` varchar(64) NOT NULL,
  `tvalue` bigint DEFAULT NULL,
  PRIMARY KEY (`cluster`,`topic`,`tkey`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_topic_rank` */

/*Table structure for table `ke_user_role` */

DROP TABLE IF EXISTS `ke_user_role`;

CREATE TABLE `ke_user_role` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `user_id` int NOT NULL,
  `role_id` tinyint NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_user_role` */

insert  into `ke_user_role`(`id`,`user_id`,`role_id`) values 
(1,1,1);

/*Table structure for table `ke_users` */

DROP TABLE IF EXISTS `ke_users`;

CREATE TABLE `ke_users` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `rtxno` int NOT NULL,
  `username` varchar(64) NOT NULL,
  `password` varchar(128) NOT NULL,
  `email` varchar(64) NOT NULL,
  `realname` varchar(128) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

/*Data for the table `ke_users` */

insert  into `ke_users`(`id`,`rtxno`,`username`,`password`,`email`,`realname`) values 
(1,1000,'admin','123456','[email protected]','Administrator');

/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;
# 停止服务
bin/ke.sh stop
# 重启服务
bin/ke.sh restart
# 查看服务运行状态
bin/ke.sh status
# 查看服务状态
bin/ke.sh stats
# 动态查看服务输出日志
tail -f /logs/ke_console.out 
#查看进程号及端口号
netstat -ntlp
#通过PID查询出进程位置
ps aux|grep 进程号
#确定进程所在的目录
ll /proc/进程号;

​脚本创建成功,再重新启动服务即可.
启动成功可以直接访问,输入账号密码admin:123456,访问地址:http://192.168.2.40:8048/

注意观察 brokers,topics的数量。brokers为0的话没有连接成功.

可视化工具自然少不了监控,如果你想开启kafka-eagle对Kafka的监控功能的话,需要修改Kafka的启动脚本,暴露JMX的端口.

vi kafka-server-start.sh
#添加  export JMX_PORT="9999"
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
    export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
    export JMX_PORT="9999"
fi

kafka集群图示:

5.SpringBoot整合Kafka.

在pom.xml中添加

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

在application.yml中spring节点下添加

spring
    kafka:
    bootstrap-servers: 192.168.2.40:9092
    producer: # 生产者配置
      retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送
      batch-size: 16384 #16K
      buffer-memory: 33554432 #32M
      acks: 1
      # 指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: zhTestGroup # 消费者组
      enable-auto-commit: false # 关闭自动提交
      auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
      # RECORD
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
      # BATCH
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
      # TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
      # COUNT
      # TIME | COUNT 有一个条件满足时提交
      # COUNT_TIME
      # 当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后, 手动调用Acknowledgment.acknowledge()后提交
      # MANUAL
      # 手动调用Acknowledgment.acknowledge()后立即提交,一般使用这种
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

生产者:

@RestController
public class KafkaProducer {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    // 发送消息
    @GetMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("quickstart-events", normalMessage);
    }

}

消费者:

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"quickstart-events"})
    public void onMessage1(ConsumerRecord<?, ?> record,Consumer consumer,Acknowledgment ack){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
        //同步提交
        //consumer.commitSync();
        ack.acknowledge();
    }
}

可以用postman进行测试,观察结果.


本文转载自: https://blog.csdn.net/supercrsky/article/details/124570611
版权归原作者 上善若水_厚德载物 所有, 如有侵权,请联系我们删除。

“Kafka安装与配置”的评论:

还没有评论