Java技术栈 —— Kafka入门(二)
一、如何在Java项目中使用Kafka?
让我们先从简单的往topic存message与取message开始[1],也就是先实现文章[1]的那种效果,文章[1]的代码没有完整告诉我们他是怎么做到这一切的。
一、参考文章或视频链接[1] kafka的topic分区后partion中的数据是一致的么? - 汪进的回答 - 知乎
1.1 导入依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.0.2</version></dependency>
1.2 执行Java代码
执行文章[1]里的代码,这里我对代码稍作改动。
importjava.util.Properties;importjava.util.Random;importorg.apache.kafka.clients.producer.KafkaProducer;importorg.apache.kafka.clients.producer.ProducerConfig;importorg.apache.kafka.clients.producer.ProducerRecord;importorg.apache.kafka.common.serialization.StringSerializer;publicclassProducer{publicstaticString topic ="kafka_test";//定义主题publicstaticvoidmain(String[] args)throwsInterruptedException{Properties p =newProperties();
p.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"127.0.0.1:9092");//kafka地址,多个地址用逗号分割
p.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
p.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);KafkaProducer<String,String> kafkaProducer =newKafkaProducer<>(p);try{while(true){String msg ="Hello,"+newRandom().nextInt(100);ProducerRecord<String,String>record=newProducerRecord<String,String>(topic, msg);
kafkaProducer.send(record);System.out.println("消息发送成功:"+ msg);Thread.sleep(500);}}finally{
kafkaProducer.close();}}}
1.2 参考文章或视频链接[1] 《Java操作Kafka(最简单的使用) 》 - 掘金[2] 《二十分钟快速上手Kafka开发(Java示例) 》 - 博客园
1.3 在Kafka中查看数据
为方便使用,我们将Kafka的安装路径添加到系统路径PATH中去。
# 或者vim /etc/profile
$ vim ~/.bashrc
exportKAFKA_HOME=/home/programmer/DevelopEnvironment/kafka_2.13-3.6.1
exportPATH=$KAFKA_HOME/bin:$PATH
$ source ~/.bashrc
# (1)实时查看对应topic的历史数据# kafka-console-consumer.sh --bootstrap-server <broker地址> --topic <主题名称> [--from-beginning]
$ kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic "kafka_test" --from-beginning
# (2)kafka-topics.sh可以查看以前发送给topic的数据# 查看所有topic
$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --list
# 查看指定topic
$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 -describe -topic "kafka_test"# (3)删除指定topic
$ kafka-topics.sh --bootstrap-server 127.0.0.1:9092 --delete --topic "kafka_test"
1.3 参考文章或视频链接[1] 《怎么查看kafka里的数据》 - 火山引擎[2] 《大数据:开发运维必备常用的Kafka操作命令整理,可当速查手册》 - 今日头条
二、Kafka辨析
2.1 术语
按照从小到大的关系进行排列。
#mermaid-svg-354duGyJTEDtUHUV {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-354duGyJTEDtUHUV .error-icon{fill:#552222;}#mermaid-svg-354duGyJTEDtUHUV .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-354duGyJTEDtUHUV .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-354duGyJTEDtUHUV .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-354duGyJTEDtUHUV .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-354duGyJTEDtUHUV .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-354duGyJTEDtUHUV .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-354duGyJTEDtUHUV .marker{fill:#333333;stroke:#333333;}#mermaid-svg-354duGyJTEDtUHUV .marker.cross{stroke:#333333;}#mermaid-svg-354duGyJTEDtUHUV svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-354duGyJTEDtUHUV .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-354duGyJTEDtUHUV .cluster-label text{fill:#333;}#mermaid-svg-354duGyJTEDtUHUV .cluster-label span{color:#333;}#mermaid-svg-354duGyJTEDtUHUV .label text,#mermaid-svg-354duGyJTEDtUHUV span{fill:#333;color:#333;}#mermaid-svg-354duGyJTEDtUHUV .node rect,#mermaid-svg-354duGyJTEDtUHUV .node circle,#mermaid-svg-354duGyJTEDtUHUV .node ellipse,#mermaid-svg-354duGyJTEDtUHUV .node polygon,#mermaid-svg-354duGyJTEDtUHUV .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-354duGyJTEDtUHUV .node .label{text-align:center;}#mermaid-svg-354duGyJTEDtUHUV .node.clickable{cursor:pointer;}#mermaid-svg-354duGyJTEDtUHUV .arrowheadPath{fill:#333333;}#mermaid-svg-354duGyJTEDtUHUV .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-354duGyJTEDtUHUV .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-354duGyJTEDtUHUV .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-354duGyJTEDtUHUV .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-354duGyJTEDtUHUV .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-354duGyJTEDtUHUV .cluster text{fill:#333;}#mermaid-svg-354duGyJTEDtUHUV .cluster span{color:#333;}#mermaid-svg-354duGyJTEDtUHUV div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-354duGyJTEDtUHUV :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;}
partition
topic
Broker
Cluster
MultipleClusters
2.1 参考文章或视频链接[1] 《看完这篇Kafka,你也许就会了Kafka》- CSDN[2] 《Kafka详解(包括kafka集群搭建)》- CSDN
2.2 Kafka与数据库的区别?
虽然都有查询与存储数据的功能,但Kafka更适合处理大规模的实时数据流,而数据库更适合存储和管理结构化数据。
三、业务提炼与总结
我发现现在设计的这类业务系统,不管是分布式、高并发,还是什么Hadoop、Redis,亦或是现实中的组织架构也好,都有这两个重要特点:
- 水平易扩展(提高易用性)。 提高并发度,可以多使唤一些线程。
- 垂直可备份(提高可用性)。 多备份一点没错,有备无患。
现实中的组织架构也有这种特点,同级别的官员可以相互调动,这叫水平可扩展,下级官员干出成绩,经过考察认定具备了一定能力后,就可以得到提拔,这叫垂直易备份,从这个角度上来说,下级也可以是上级的备份,一个省的组织架构和国家机关的组织架构就是垂直关系,从命名上就可以看出这种关系,国家级 –> 省部 –> 厅局 –> 县处 –> 乡科。 因此,无论是从计算机角度出发,还是从现实出发,具备上述两种特点的系统,绝对是一个兼具可用与易用性的系统。我想,需要在生活中带着好奇的眼光去观察,观察哪些地方需要水平可扩展,垂直易备份的,首先是分析物的关系,然后是分析人的关系,只有做到这两点,才是一个好的系统。
版权归原作者 键盘国治理专家 所有, 如有侵权,请联系我们删除。