0


kafka3.8+zookeeper3.9集群自动化部署、sasl+acl配置、kafka connect配置部署

1.kafka&zookeeper集群模式部署

1.1.kafka集群管理说明

1.kafka的主题会有多个分区,每个分区有多个副本,副本中有⼀个被选为领导者(选举是Kafka内部的机制,会
考虑副本的数据同步进度.健康状态等因素),负责处理所有的读写请求。
2.消费者组Rebalance机制是重分配订阅的topic的每个分区
3.消费者Rebalance触发时机
a.消费者组中consumer的个数发⽣变化(有新的consumer加入到消费者组,或者是某个consumer停⽌了)
b.订阅的topic的个数发⽣变化(消费者可以订阅多个主题,假设当前的消费者组订阅了三个主题,但有⼀个主
题突然被删除)
c.订阅的topic分区数发⽣变化(新的Partition加入topic)
4.⽣产者的分区写入策略:轮询策略(默认的策略),⽣产者在发送消息时,会按照分区的顺序(通常是根据
分区的顺序编号)依次将消息发送到各个分区。比如,如果有三个分区,那么消息1会被发送到分区0,消息2
会被发送到分区1,消息3会被发送到分区2,然后循环重复这个过程
在这里插入图片描述

1.2.环境介绍

kafka&zookeeper部署在kafka01,kafka02,kafka02服务器上,每台服务器部署一个节点,采取三个节点集群模式。 本部署适合生产部署,客户端使用需要配置本地/etc/hosts。
服务器名称主机ip地址kafka01192.112.8.4kafka02192.112.8.5kafka03192.112.8.6
部署架构图如下

1.3.创建目录并上传文件

root用户在三台服务器创建安装包目录

  1. mkdir -p /test/kafka_soft

只要将以下文件上传到/test/kafka_soft目录即可
由于传输文件只能用test用户,先用test用户在test目录下新建tempfile文件夹,再用test用户

将kafka_2.13-3.8.0.tgz,apache-zookeeper-3.9.2-bin.tar.gz,kafka_install.sh和zk_install.sh分别上传到三台服务器的
/test/tempfile目录,再移动到/test/kafka_soft目录

zk_install.sh内容为

  1. #!/bin/bash
  2. ZK_HOME="/test/kafka_soft"
  3. ZK_DATA_DIR="/testlog/zookeeper/data"
  4. ZK_LOG_DIR="/testlog/zookeeper/logs"
  5. SERVERS=("kafka01" "kafka02" "kafka03")
  6. SERVER_ID=$1
  7. ip1="192.112.8.4"
  8. ip2="192.112.8.5"
  9. ip3="192.112.8.6"
  10. HOSTNAME1="kafka01"
  11. HOSTNAME2="kafka02"
  12. HOSTNAME3="kafka03"
  13. HOSTS_FILE="/etc/hosts"
  14. if grep -q "$ip1 $HOSTNAME1" "$HOSTS_FILE"; then
  15. echo "IP $ip1 already exists in $HOSTS_FILE."
  16. else
  17. echo "$ip1 $HOSTNAME1" >> "$HOSTS_FILE"
  18. echo "Added $ip1 to $HOSTS_FILE."
  19. fi
  20. if grep -q "$ip2 $HOSTNAME2" "$HOSTS_FILE"; then
  21. echo "IP $ip2 already exists in $HOSTS_FILE."
  22. else
  23. echo "$ip2 $HOSTNAME2" >> "$HOSTS_FILE"
  24. echo "Added $ip2 to $HOSTS_FILE."
  25. fi
  26. if grep -q "$ip3 $HOSTNAME3" "$HOSTS_FILE"; then
  27. echo "IP $ip3 already exists in $HOSTS_FILE."
  28. else
  29. echo "$ip3 $HOSTNAME3" >> "$HOSTS_FILE"
  30. echo "Added $ip3 to $HOSTS_FILE."
  31. fi
  32. mkdir -p $ZK_DATA_DIR
  33. mkdir -p $ZK_LOG_DIR
  34. cd $ZK_HOME
  35. tar -xzvf apache-zookeeper-3.8.4-bin.tar.gz
  36. cat > $ZK_HOME/apache-zookeeper-3.8.4-bin/conf/zoo.cfg <<EOF
  37. tickTime=2000
  38. dataDir=$ZK_DATA_DIR
  39. dataLogDir=$ZK_LOG_DIR
  40. clientPort=2181
  41. maxClientCnxns=60
  42. initLimit=10
  43. syncLimit=4
  44. server.1=kafka01:2888:3888
  45. server.2=kafka02:2888:3888
  46. server.3=kafka03:2888:3888
  47. EOF
  48. echo $SERVER_ID > $ZK_DATA_DIR/myid
  49. cd $ZK_HOME/apache-zookeeper-3.8.4-bin/bin
  50. sed -i 's#ZOO_LOG_DIR="$ZOOKEEPER_PREFIX/logs"#ZOO_LOG_DIR="/testlog/zookeeper/logs"#' zkEnv.sh
  51. sed -i 's#ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-1000}"#ZK_SERVER_HEAP="${ZK_SERVER_HEAP:-8192}"#' zkEnv.sh
  52. sed -i 's#ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-256}"#ZK_CLIENT_HEAP="${ZK_CLIENT_HEAP:-4096}"#' zkEnv.sh
  53. chown -R kafka:kafka /test/kafka_soft
  54. chmod -R 755 /test/kafka_soft
  55. chown -R kafka:kafka /testlog/zookeeper
  56. chmod -R 755 /testlog/zookeeper
  57. #su -c " $ZK_HOME/apache-zookeeper-3.8.4-bin/bin/zkServer.sh start" -s /bin/bash kafka
  58. echo "ZooKeeper配置和启动已完成。"

kafka_install.sh内容为

  1. #!/bin/bash
  2. kafka_HOME="/test/kafka_soft"
  3. kafka_DATA_DIR="/testlog/kafka/data"
  4. kafka_LOG_DIR="/testlog/kafka/logs"
  5. SERVERS=("kafka01" "kafka02" "kafka03")
  6. SERVER_ID=$1
  7. HOSTNAME=$2
  8. mkdir -p $kafka_DATA_DIR
  9. mkdir -p $kafka_LOG_DIR
  10. cd $kafka_HOME
  11. tar -xzvf kafka_2.13-3.8.0.tgz
  12. cd $kafka_HOME/kafka_2.13-3.8.0/config
  13. mv server.properties server_back.properties
  14. cat > $kafka_HOME/kafka_2.13-3.8.0/config/server.properties <<EOF
  15. broker.id=$SERVER_ID
  16. listeners=SASL_PLAINTEXT://$HOSTNAME:9092
  17. advertised.listeners=SASL_PLAINTEXT://$HOSTNAME:9092
  18. security.inter.broker.protocol=SASL_PLAINTEXT
  19. sasl.enabled.mechanisms=PLAIN
  20. sasl.mechanism.inter.broker.protocol=PLAIN
  21. log.dirs=/testlog/kafka/logs
  22. max.poll.interval.ms=600000
  23. session.timeout.ms=120000
  24. fetch.min.bytes=2048576
  25. num.partitions=3
  26. log.retention.hours=2400
  27. zookeeper.connect=kafka01:2181,kafka02:2181,kafka03:2181
  28. EOF
  29. cat > $kafka_HOME/kafka_2.13-3.8.0/config/kafka_server_jaas.conf <<EOF
  30. KafkaServer {
  31. org.apache.kafka.common.security.plain.PlainLoginModule required
  32. username="admin"
  33. password="admin"
  34. user_admin="admin"
  35. user_producer="test"
  36. user_consumer="test"
  37. };
  38. EOF
  39. cat > $kafka_HOME/kafka_2.13-3.8.0/config/kafka_client_jaas.conf <<EOF
  40. KafkaClient {
  41. org.apache.kafka.common.security.plain.PlainLoginModule required
  42. username="admin"
  43. password="admin";
  44. };
  45. EOF
  46. cat > $kafka_HOME/kafka_2.13-3.8.0/config/client.properties <<EOF
  47. # client.properties
  48. security.protocol=SASL_PLAINTEXT
  49. sasl.mechanism=PLAIN
  50. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="admin";
  51. EOF
  52. sed -i 's#export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"#export KAFKA_HEAP_OPTS="-Xmx10G -Xms10G -Djava.security.auth.login.config=/test/kafka_soft/kafka_2.13-3.8.0/config/kafka_server_jaas.conf "#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-server-start.sh
  53. sed -i 's#export KAFKA_HEAP_OPTS="-Xmx512M"#export KAFKA_HEAP_OPTS="-Xmx2048M -Djava.security.auth.login.config=/test/kafka_soft/kafka_2.13-3.8.0/config/kafka_client_jaas.conf"#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-console-producer.sh
  54. sed -i 's#export KAFKA_HEAP_OPTS="-Xmx512M"#export KAFKA_HEAP_OPTS="-Xmx2048M -Djava.security.auth.login.config=/test/kafka_soft/kafka_2.13-3.8.0/config/kafka_client_jaas.conf"#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-console-consumer.sh
  55. sed -i 's#LOG_DIR="$base_dir/logs"#LOG_DIR="/testlog/kafka/logs"#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-run-class.sh
  56. sed -i 's#KAFKA_HEAP_OPTS="-Xmx256M"#KAFKA_HEAP_OPTS="-Xmx2G -Xms2G"#' /test/kafka_soft/kafka_2.13-3.8.0/bin/kafka-run-class.sh
  57. chown -R kafka:kafka /test/kafka_soft
  58. chmod -R 755 /test/kafka_soft
  59. chown -R kafka:kafka /testlog/kafka
  60. chmod -R 755 /testlog/kafka
  61. echo "kafka配置已完成。"

1.4.安装zookeeper

1.4.1.安装zookeeper

root用户在kafka01执行zk_install.sh

  1. cd /test
  2. mv tempfile/* ./kafka_soft
  3. cd /test/kafka_soft
  4. chmod 755 zk_install.sh
  5. ./zk_install.sh 1

root用户在kafka02执行zk_install.sh

  1. cd /test
  2. mv tempfile/* ./kafka_soft
  3. cd /test/kafka_soft
  4. chmod 755 zk_install.sh
  5. ./zk_install.sh 2

root用户在kafka02执行zk_install.sh

  1. cd /test
  2. mv tempfile/* ./kafka_soft
  3. cd /test/kafka_soft
  4. chmod 755 zk_install.sh
  5. ./zk_install.sh 3

1.4.2.启动和验证zookeeper

三台服务器均执行启动命令

  1. su test
  2. /test/kafka_soft/apache-zookeeper-3.9.2-bin/bin/zkServer.sh start

三台服务器均执行查看节点状态命令

  1. su test
  2. /test/kafka_soft/apache-zookeeper-3.9.2-bin/bin/zkServer.sh status

如下图,一个节点是leader,另外两个节点是follower在这里插入图片描述

1.5.安装kafka

1.5.1.安装kafka(jdk需要自行安装)

root用户在kafka01执行kafka_install.sh

  1. cd /test/kafka_soft
  2. chmod 755 kafka_install.sh
  3. ./kafka_install.sh 1 kafka01

root用户在kafka02执行kafka_install.sh

  1. cd /test/kafka_soft
  2. chmod 755 kafka_install.sh
  3. ./kafka_install.sh 2 kafka02

root用户在kafka02执行kafka_install.sh

  1. cd /test/kafka_soft
  2. chmod 755 kafka_install.sh
  3. ./kafka_install.sh 3 kafka02

1.5.2.启动kafka

三台服务器均执行启动命令

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./kafka-server-start.sh -daemon ../config/server.properties

1.5.3.验证kafka集群(如果连接的ssh是test用户则无需执行su test)

ssh登录任意一台kafka服务器,以任意一台服务创建topic,执行

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./kafka-topics.sh --create --bootstrap-server kafka02:9092 --replication-factor 3 --partitions 3 --topic test1 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties

ssh登录任意一台kafka服务器,以任意一台为服务查看topic,执行

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./kafka-topics.sh --list --bootstrap-server kafka01:9092 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties

在这里插入图片描述

ssh登录任意一台kafka服务器,以任意一台服务作为producer发送消息,执行

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./kafka-console-producer.sh --bootstrap-server kafka01:9092 --topic test1 --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN

ssh登录任意一台kafka服务器,以任意一台服务作为consumer接受消息,执行

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic test1 --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

在producer端发送消息,consumer可以正常接受消息,验证通过
在这里插入图片描述

测试完之后删掉测试数据

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./kafka-topics.sh --bootstrap-server kafka02:9092 --delete --topic test1 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties

1.6.kafka开启acl

1.6.1.修改配置

修改配置文件/test/kafka_soft/kafka_2.13-3.8.0/config/server.properties
将下列配置新增到配置文件中,所有节点均需执行

  1. authorizer.class.name=kafka.security.authorizer.AclAuthorizer
  2. allow.everyone.if.no.acl.found=false
  3. super.users=User:admin

重启各个节点

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./kafka-server-stop.sh
  4. ./kafka-server-start.sh -daemon ../config/server.properties

在添加acl规则前所有用户都有创建主题,查看主题,生产消息和消费消息权限
在添acl规则后只有admin用户拥有所有权限,普通用户没有任何权限

1.6.2.权限验证

在添加规则前producer用户无法生产和消费消息

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./kafka-console-producer.sh --bootstrap-server kafka03:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
  4. ./kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

给producer用户添加主题(test )的acl可写规则

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --add --allow-principal User:producer --operation Write --topic test

给producer用户添加主题(test )的acl可读规则

  1. ./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --add --allow-principal User:producer --operation Read --topic test --group '*'

查看主题(test )acl规则

  1. ./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --list --topic test

添加acl规则后主题test 生产和消费消息

  1. ./kafka-console-producer.sh --bootstrap-server kafka02:9092 --topic test --producer-property security.protocol=SASL_PLAINTEXT --producer-property sasl.mechanism=PLAIN
  2. ./kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

移除acl规则

  1. ./kafka-acls.sh --authorizer-properties zookeeper.connect=kafka02:2181 --remove --allow-principal User:alice --operation Write --topic test

1.7.kafka其他命令

创建自定义保留主题的消息的时间的,retention.ms是毫秒

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./kafka-topics.sh --create --bootstrap-server kafka02:9092 --replication-factor 3 --partitions 3 --topic test-half -config retention.ms=1800000 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties

查看主题信息

  1. ./kafka-configs.sh --describe --entity-type topics --entity-name test-half --bootstrap-server kafka02:9092 -command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties

修改主题保留消息时间

  1. ./kafka-configs.sh --alter --entity-type topics --entity-name test-half --add-config retention.ms=1800000 --bootstrap-server kafka02:9092 --command-config /test/kafka_soft/kafka_2.13-3.8.0/config/client.properties

1.8.安装kafka connect

默认全部用test用户

1.8.1.单点配置

  1. su test
  2. mkdir -p /test/kafka_soft/plugins/Fileconnector
  3. touch /test/kafka_soft/test.txt
  4. touch /test/kafka_soft/test.sink.txt
  5. cp /test/kafka_soft/kafka_2.13-3.8.0/libs/connect-file-3.8.0.jar /test/kafka_soft/plugins/Fileconnector/

1、在kafka01上修改connect-standalone.properties配置
修改/test/kafka_soft/kafka_2.13-3.8.0/config/connect-standalone.properties

  1. bootstrap.servers=localhost:9092

改为

  1. bootstrap.servers=kafka01:9092

并在/test/kafka_soft/kafka_2.13-3.8.0/config/connect-standalone.properties文件下面追加

  1. security.protocol=SASL_PLAINTEXT
  2. sasl.mechanism=PLAIN
  3. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
  4. producer.bootstrap.servers=kafka01:9092
  5. producer.security.protocol=SASL_PLAINTEXT
  6. producer.sasl.mechanism=PLAIN
  7. producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
  8. consumer.bootstrap.servers=kafka01:9092
  9. consumer.security.protocol=SASL_PLAINTEXT
  10. consumer.sasl.mechanism=PLAIN
  11. consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
  12. offset.flush.interval.ms=10000
  13. listenters=HTTP://:8083
  14. plugin.path=/test/kafka_soft/plugins

2、在kafka01上修改connect-file-source.properties配置
修改/test/kafka_soft/kafka_2.13-3.8.0/config/connect-file-source.properties

  1. file=test.txt
  2. topic=connect-test

改为

  1. file=/test/kafka_soft/test.txt
  2. topic=test

3、在kafka01上修改connect-file-sink.properties配置
修改/test/kafka_soft/kafka_2.13-3.8.0/config/connect-file-sink.properties

  1. file=test.sink.txt
  2. topics=test

改为

  1. file=/test/kafka_soft/test.sink.txt
  2. topics=test

4、单点启动kafka connect

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0
  3. bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

5、验证
若验证用户是非admin用户需要配置读写权限
新开一个ssh窗口,三台服务器均可

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./kafka-console-consumer.sh --bootstrap-server kafka02:9092 --topic test --from-beginning --consumer-property security.protocol=SASL_PLAINTEXT --consumer-property sasl.mechanism=PLAIN

新开kafka01的ssh窗口

  1. cd /test/kafka_soft
  2. echo "20241030-01" >>test.txt
  3. echo "20241030-02" >>test.txt

查看topic是否能消费消息

查看test.sink.txt是否有消息进入

  1. cd /test/kafka_soft
  2. cat test.sink.txt

1.8.2.集群配置

1、修改配置,三个节点均需执行

  1. su test
  2. mkdir -p /test/kafka_soft/plugins/Fileconnector
  3. touch /test/kafka_soft/test.txt
  4. touch /test/kafka_soft/test.sink.txt
  5. cp /test/kafka_soft/kafka_2.13-3.8.0/libs/connect-file-3.8.0.jar /test/kafka_soft/plugins/Fileconnector/
  6. cd /test/kafka_soft/kafka_2.13-3.8.0/config
  7. mv connect-distributed.properties connect-distributed_20241030.properties
  1. cat > /test/kafka_soft/kafka_2.13-3.8.0/config/connect-distributed.properties <<EOF
  2. bootstrap.servers=192.112.8.4:9092,192.112.8.5:9092,192.112.8.6:9092
  3. group.id=connect-cluster
  4. security.protocol=SASL_PLAINTEXT
  5. sasl.mechanism=PLAIN
  6. sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
  7. producer.bootstrap.servers=192.112.8.4:9092,192.112.8.5:9092,192.112.8.6:9092
  8. producer.security.protocol=SASL_PLAINTEXT
  9. producer.sasl.mechanism=PLAIN
  10. producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
  11. consumer.bootstrap.servers=192.112.8.4:9092,192.112.8.5:9092,192.112.8.6:9092
  12. consumer.security.protocol=SASL_PLAINTEXT
  13. consumer.sasl.mechanism=PLAIN
  14. consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="adminpassword";
  15. key.converter=org.apache.kafka.connect.json.JsonConverter
  16. value.converter=org.apache.kafka.connect.json.JsonConverter
  17. key.converter.schemas.enable=true
  18. value.converter.schemas.enable=true
  19. offset.storage.topic=connect-offsets
  20. offset.storage.replication.factor=1
  21. config.storage.topic=connect-configs
  22. config.storage.replication.factor=1
  23. status.storage.topic=connect-status
  24. status.storage.replication.factor=1
  25. offset.flush.interval.ms=10000
  26. listeners=HTTP://:8083
  27. plugin.path=/test/kafka_soft/plugins
  28. EOF

2、kafka connect集群启动
三个节点均需执行

  1. su test
  2. cd /test/kafka_soft/kafka_2.13-3.8.0/bin
  3. ./connect-distributed.sh -daemon ../config/connect-distributed.properties

3、验证

  1. curl http://192.112.8.4:8083
  2. curl http://192.112.8.5:8083
  3. curl http://192.112.8.6:8083

4、生成连接器

  1. cat > /test/kafka_soft/file-source-connector-test.json <<EOF
  2. {
  3. "name": "file-source-connector",
  4. "config": {
  5. "connector.class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
  6. "tasks.max": "1",
  7. "file": "/test/kafka_soft/xiaoli.txt",
  8. "topic": "test "
  9. }
  10. }
  11. EOF
  1. cd /test/kafka_soft
  2. curl -X POST -H "Content-Type: application/json" --data @file-source-connector-test.json http://192.112.8.4:8083/connectors

4、列出连接器

  1. curl -X GET http://192.112.8.4:8083/connectors

5、查看连接器状态

  1. curl -X GET http://192.112.8.4:8083/connectors/file-source-connector/status

6、删除连接器

  1. curl -X DELETE http://192.112.8.4:8083/connectors/file-source-connector
标签: 自动化 kafka linq

本文转载自: https://blog.csdn.net/pamela394829274/article/details/143734542
版权归原作者 逆风而行ゝ 所有, 如有侵权,请联系我们删除。

“kafka3.8+zookeeper3.9集群自动化部署、sasl+acl配置、kafka connect配置部署”的评论:

还没有评论