前言
在上一篇跟着官方文档使用docker,在云服务器上搭建了一个单节点的Kafka集群,在云服务器上连接成功,当我在笔记本上使用Spark尝试连接的时候,无法消费到数据。
从上图可以看出,我在consumer config中明明将bootstrap.servers设置成了弹性公网IP,而且在笔记本上Telnet连接Kafka网络也是通的。
但是当我启动Spark程序之后,程序应该输出从kakfa中消费的数据,但是程序却卡住了。
从日志最后一行可以看到,这里居然连接的是localhost,这肯定是消费不到数据的。但是我明明在程序中使用的是弹性公网IP,怎么到这里就成了localhost了…
问题分析
遇到这种问题,首先排查Kafka服务端的问题,于是我就去容器中查看Kafka的配置,看那个配置选项使用了localhost,然后再进行反推这个问题。
通过docker exec指令进入到kafka容器,然后就开始从各个目录翻找server.properties,在/etc/kafk/docker目录找到了配置文件。
grep localhost一共找到了两个参数,去官网找了一下配置说明,发现第2个配置是导致此问题出现的”元凶“。
advertised.listeners
首先看advertised.listeners选项说明。
然后买一赠一,与listeners一起学习效果更佳。
listeners和advertised.listeners的大概意思就是,listeners是Kafka服务器用于监听客户端连接的地址,包括外部客户端和其他Kafka服务器之间的连接,用于客户端与kafka之间的数据交互的地址。
而advertised.listeners是Kafka向客户端广播的地址。如果Kafka服务器处于内部网络中,而客户端处于外部网络中,则advertised.listeners中的地址需要配置为外部可访问的地址。
这样客户端在连接时,先在zookeeper或者KRaft中查找advertised.listeners广播的地址,然后再去匹配listeners中配置的地址连接Kafka读取数据。
listeners的常见的写法如下:
PLAINTEXT://:9092
PLAINTEXT://0.0.0.0:9092
PLAINTEXT://ip:9092
前两种写法感觉没什么区别,都是监听所有的地址,包括弹性公网IP,第三种写法是监听固定地址。在docker容器的配置中,默认是第一种写法,所以是不用改的。我们只需要将dvertised.listeners中的localhost修改为弹性公网IP即可。
解决方案
我使用vi编辑server.properties,在保存时提示readOnly。查看没有文件权限,chmod修改权限失败,因为不是root用户,使用su、sudo来获取root权限,直接没有这两个命令。
只能另寻他法,之前不是刚在宿主机上安装了一个kafka客户端吗,我们可以修改客户端里面的配置文件,然后放到配置文件目录下,替换不就ok了吗。如果没有客户端,将kafka容器中的配置文件使用docker cp出来修改也可以。
熟悉docker的朋友都知道,我们有两种方式可以替换容器中的配置文件。
- 镜像内替换:重写Dockerfile,在利用原镜像构建新镜像的过程中完成替换
- 容器内替换:将修改好的配置文件直接拷贝到kafka容器内
替换镜像配置
镜像内替换的话属于一劳永逸,因为我们可能会多次使用这个镜像构建kafka容器,所以这样修改之后,只要在这个云服务器上就可以直接用这个镜像,构建外网可访问的kafka容器。
新建Dockerfile:
FROM apache/kafka:3.7.0
COPY server.properties /etc/kafka/docker
Dockerfile中只有两条命令,将配置文件拷贝到kafka原镜像内,其他的命令,例如CMD继承原镜像即可。然后执行docker build命令,基于DockerFile构建一个名为apache/kakfa_kraft:3.7.0的镜像。
这里提示一点,server.properties文件和Dockerfile必须在同一个目录,否则就会报错。
在构建好镜像之后,我们使用这个镜像启动一个名为kafka1的容器。
容器正常启动,且可以正常连接。
替换容器配置
第二种方式比较简单,我们原本用官方镜像启动了一个kafka的容器,执行下面命令就可以覆盖原有的配置文件。
dockercp server.properties kafka:/etc/kafka/docker/
替换完成,然后重启Kafka容器即可。
结果验证
spark的测试代码如下:
object Monitor {def main(args: Array[String]):Unit={val conf =new SparkConf().setAppName("aqi").setMaster("local[1]")val ssc =new StreamingContext(conf, Seconds(10))val kafkaParams = Map[String, Object]("bootstrap.servers"->"121.91.xx.xx:9092","key.deserializer"-> classOf[StringDeserializer],"value.deserializer"-> classOf[StringDeserializer],"group.id"->"aqi","auto.offset.reset"->"earliest","enable.auto.commit"->(false: java.lang.Boolean))val topics = Array("aqi_test")val stream: DStream[String]= KafkaUtils.createDirectStream[String,String](
ssc,
PreferConsistent,
Subscribe[String,String](topics, kafkaParams)).map(_.value)
stream.foreachRDD(rdd =>{
rdd.foreach(x =>{
println(x)})})
ssc.start
ssc.awaitTermination
}}
相同的程序,在使用上面两种方式修改完配置之后,启动程序都能消费到数据。
其他尝试
在第一步容器内,修改配置文件没有权限之后,我就尝试在创建kafka容器添加一些参数。
从日志看毫无疑问都是失败的,所以是还是我上面的两种方式比较简单。
结语
这就是我云服务器部署kafka时,遇到外网无法访问的问题复现和解决思路。整篇文章表达的技术性不高,主要是对docker的一些基础命令的使用。
版权归原作者 叫我阿柒啊 所有, 如有侵权,请联系我们删除。