0


14-部署Kafkasource和KafkaChannel

  • 部署KafkaSource- KafkaSource负责将Kafka中的消息记录转为CloudEvents- 仅在需要从Kafka中加载消息并提供给Knative Eventing上的应用程序使用时才需要KafkaSource- 命令:kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.1/eventing-kafka-controller.yamlkubectl apply -f https://mirror.ghproxy.com/https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.1/eventing-kafka-source.yaml
  • 部署KafkaChannel- 负责在Knative Eventing上提供基于Kafka集群的Channel实现,后端基于Kafka Topic- https://knative.dev/docs/install/yaml-install/eventing/install-eventing-with-yaml/#verify-the-installation```kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.1/eventing-kafka-controller.yamlkubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.12.1/eventing-kafka-channel.yaml```- 如果所使用的Kafka集群是外置的,那需要修改kafka-channel-config的configmap中boostrap server的地址.kubectl get cm kafka-channel-config -n knative-eventing -o yaml在这里插入图片描述- 修改Default Channel的default-ch-webhook相关配置: 可以在全局修改,也可以在某个名称空间修改apiVersion: v1kind: ConfigMapmetadata:name: default-ch-webhook namespace: knative-eventingdata:default-ch-config:| clusterDefault: apiVersion: messaging.knative.dev/v1 kind: InMemoryChannel namespaceDefaults: event-demo: apiVersion: messaging.knative.dev/v1beta1 kind: KafkaChannel spec: numPartitions: 5 replicationFactor: 1 # Kafka集群的节点数量,决定了其复制因子的可用值上限;但使用的partition数量可按需定义;
  • 测试KafkaChannel- 创建KafkaChannel~$ kn channel create kc01 --type messaging.knative.dev:v1beta1:KafkaChannel``````~$ kn channel list NAME TYPE URL AGE READY REASON kc01 KafkaChannel http://kc01-kn-channel.default.svc.cluster.local 37s True 提示:在没有设置KafkaChannel为默认Channel类型的名称空间下创建KafkaChannel时需要显式指定其类型。- 测试KafkaChannel- 创建订阅至KafkaChannel/kc01的Subscriptionkn subscription create sub-to-kc01 --channel messaging.knative.dev:v1beta1:KafkaChannel:kc01 --sink ksvc:event-display说明:此处事先存在一个可用的名为event-display的Knative Service。- 创建用于模拟Source的Podkubectl run client-$RANDOM--image=ikubernetes/admin-box:v1.2 --rm--restart=Never -it--command -- /bin/bash- 在Pod的交互式接口发送CloudEventsroot@client-18448 ~# curl -v "http://kc01-kn-channel.default.svc.cluster.local" -X POST -H "Content-Type: application/cloudevents+json" \-d'{"id": "100001", "specversion": "1.0", "type": "com.icloud2native.flow.seq", "source": "Curl", "data": {"message":"Hello from Knative Eventing KafkaChannel"}}'- 在最后的Sink上查看生成的CloudEventsPOD=$(kubectl get pods -l serving.knative.dev/configuration=event-display -ojsonpath='{.items[0].metadata.name}')~$ kubectl logs $PODDefaulted container "user-container" out of: user-container, queue-proxy☁️ cloudevents.EventContext Attributes, specversion: 1.0 type: com.icloud2native.flow.seq source: Curl id: 100001Data, {"message":"Hello from Knative Eventing KafkaChannel"}
标签: 云原生 knative kafka

本文转载自: https://blog.csdn.net/weixin_38753143/article/details/135569959
版权归原作者 爱写代码的小男孩 所有, 如有侵权,请联系我们删除。

“14-部署Kafkasource和KafkaChannel”的评论:

还没有评论