文章目录
本文档只是为了留档方便以后工作运维,或者给同事分享文档内容比较简陋命令也不是特别全,不适合小白观看,如有不懂可以私信,上班期间都是在得
1.配置文件
- Yml配置
spring:
kafka:
bootstrap-servers:
consumer:
group-id: iot-testaaaaaaaaaa11aaaaaaaaa
auto-offset-reset: earliest
key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
value-deserializer:org.apache.kafka.common.serialization.StringDeserializer
properties:
security:
protocol:SASL_SSL
sasl:
mechanism:PLAIN
jaas:
config:org.apache.kafka.common.security.plain.PlainLoginModule required
username=""
password="";
ssl:
truststore:
type:JKS
location: src/main/resources/client.truststore.jks
password:
endpoint.identification.algorithm:
2.消费者
1.注解方式
@KafkaListener(topics ={"abcd"})publicvoidlisten(ConsumerRecord<?,?> record){Optional<?> kafkaMessage =Optional.ofNullable(record.value());if(kafkaMessage.isPresent()){Object message = kafkaMessage.get();System.out.println("---->"+record);System.out.println("---->"+message);}}
2.KafkaConsumer
/**
* @author XHao
*/publicclassMqsConsumer{publicstaticfinalStringCONFIG_CONSUMER_FILE_NAME="mqs.sdk.consumer.properties";privateKafkaConsumer<Object,Object> consumer;MqsConsumer(String path){Properties props =newProperties();try{InputStream in =newBufferedInputStream(newFileInputStream(path));
props.load(in);}catch(IOException e){
e.printStackTrace();return;}
consumer =newKafkaConsumer<Object,Object>(props);}publicMqsConsumer(){Properties props =newProperties();try{
props =loadFromClasspath(CONFIG_CONSUMER_FILE_NAME);}catch(IOException e){
e.printStackTrace();return;}
consumer =newKafkaConsumer<Object,Object>(props);}publicvoidconsume(List topics){
consumer.subscribe(topics);}publicConsumerRecords<Object,Object>poll(long timeout){return consumer.poll(timeout);}publicvoidclose(){
consumer.close();}/**
* get classloader from thread context if no classloader found in thread
* context return the classloader which has loaded this class
*
* @return classloader
*/publicstaticClassLoadergetCurrentClassLoader(){ClassLoader classLoader =Thread.currentThread().getContextClassLoader();if(classLoader ==null){
classLoader =MqsConsumer.class.getClassLoader();}return classLoader;}/**
* 从classpath 加载配置信息
*
* @param configFileName 配置文件名称
* @return 配置信息
* @throws IOException
*/publicstaticPropertiesloadFromClasspath(String configFileName)throwsIOException{ClassLoader classLoader =getCurrentClassLoader();Properties config =newProperties();List<URL> properties =newArrayList<URL>();Enumeration<URL> propertyResources = classLoader
.getResources(configFileName);while(propertyResources.hasMoreElements()){
properties.add(propertyResources.nextElement());}for(URL url : properties){InputStream is =null;try{
is = url.openStream();
config.load(is);}finally{if(is !=null){
is.close();
is =null;}}}return config;}}
3.依赖
1.注解依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
2.KafkaConsumer依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>1.1.0</version></dependency>
可视化大屏项目经常用到消息转换,实时状态等 记录一下吧
如果点赞多,评论多会更新详细教程,待补充。
本文转载自: https://blog.csdn.net/xh365647/article/details/141961629
版权归原作者 重生之豪哥 所有, 如有侵权,请联系我们删除。
版权归原作者 重生之豪哥 所有, 如有侵权,请联系我们删除。