1.生产者推送数据
常用参数
bootstrap.servers:Kafka集群中的Broker列表,格式为host1:port1,host2:port2,…。生产者会从这些Broker中选择一个可用的Broker作为消息发送的目标Broker。
acks:Broker对消息的确认模式。可选值为0、1、all。0表示生产者不会等待Broker的任何确认消息;1表示生产者会等待Broker的Leader副本确认消息;all表示生产者会等待所有副本都确认消息。确认模式越高,可靠性越高,但延迟也越大。
retries:消息发送失败时的重试次数。默认值为0,表示不进行重试。可以将其设置为大于0的值,例如3,表示最多重试3次。
batch.size:消息批量发送的大小。当生产者累积到一定数量的消息时,会将其打包成一个批次一次性发送给Broker。默认值为16384字节,即16KB。
linger.ms:消息发送的延迟时间。生产者会等待一定的时间,以便将更多的消息打包成一个批次一次性发送给Broker。默认值为0,表示立即发送。设置较大的值可以提高吞吐量,但可能会增加消息的延迟。
buffer.memory:生产者可用于缓存消息的内存大小。默认值为33554432字节,即32MB。如果生产者生产消息的速度快于发送消息的速度,可能会导致缓存溢出。可以调整该参数来适应生产者的生产速度。
key.serializer:Key的序列化器。Kafka消息可以包含Key和Value,Key和Value都需要进行序列化。该参数指定Key的序列化器。
value.serializer:Value的序列化器。该参数指定Value的序列化器。
max.block.ms:生产者在发送消息之前等待Broker元数据信息的最长时间。如果在该时间内无法获取到Broker元数据信息,则会抛出TimeoutException异常。默认值为60000毫秒,即60秒。
compression.type:消息压缩类型。可选值为none、gzip、snappy、lz4。默认值为none,表示不进行压缩。压缩可以减少消息的传输大小,提高网络带宽的利用率,但会增加CPU的消耗。
interceptor.classes:消息拦截器列表。可以指定多个消息拦截器对消息进行加工处理。例如,可以在消息中添加时间戳、添加消息来源等信息。
以上参数只是一部分,Kafka生产者还有更多参数可以进行配置。需要根据实际情况选择合适的参数进行配置。
例子
下面是一个单例模式配置 kafka生产者的例子(避免多次创建实例,减少资源的消耗)
publicclassSingletonKafkaProducerExample{privatestaticSingletonKafkaProducerExample instance;privatestaticProducer<String,String> producer;privateSingletonKafkaProducerExample(){//参数设置Properties props =newProperties();
props.put("bootstrap.servers","ip:端口");
props.put("acks","all");
props.put("max.block.ms",120000);//默认60s
props.put("retries",3)//默认0;
props.put("batch.size",16384);
props.put("linger.ms",1);
props.put("buffer.memory",33554432);
props.put("request.timeout.ms",60*1000);
props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");//sasl认证 (根据实际情况看是否配置)
props.put("security.protocol","SASL_PLAINTEXT");
props.put("sasl.mechanism","PLAIN");
props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';");
producer =newKafkaProducer<>(props);
logger.info("kafka连接成功");}publicstaticSingletonKafkaProducerExamplegetInstance(){if(instance ==null){synchronized(SingletonKafkaProducerExample.class){if(instance ==null){
instance =newSingletonKafkaProducerExample();}}}return instance;}publicvoidsendMessage(String topic,String key,String value){try{//这里也可以不用设置key和partition,例如不设置分区 系统会使用轮询算法自动匹配partitionProducerRecord<String,String>record=newProducerRecord<>(topic, key, value);Future<RecordMetadata> future = producer.send(record,(metadata, exception)->{if(exception !=null){System.err.println("发送消息到"+ metadata.topic()+"失败:"+ exception.getMessage());}else{System.out.println("发送消息到"+ metadata.topic()+"成功:partition="+ metadata.partition()+", offset="+ metadata.offset());}});
future.get();// 等待返回数据}catch(InterruptedException|ExecutionException e){System.err.println("发送消息失败:"+ e.getMessage());}}publicvoidcloseProducer(){
producer.close();}}
以上参数配置只是案例,实际参数配置需要根据业务情况自己设置
可能遇见的问题
1.多个topic发送消息的时候总有1.2发送失败 报Failed to update metadata after 60000ms
这种情况出现的原因可能是Kafka集群中Broker的元数据信息还没有被更新到Kafka客户端中,导致Kafka客户端无法连接到指定的Broker。
解决
增加等待时间:可以通过设置max.block.ms属性来增加等待时间
提高重试次数:可以通过设置retries属性来提高重试次数
检查Broker配置
检查网络连接
检查Kafka版本
如果下面3个都没问题,就增加等待时间和重试次数。本人遇到这样的问题解决了
消费者 推送数据
importorg.apache.kafka.clients.consumer.ConsumerRecords;importorg.apache.kafka.clients.consumer.ConsumerRecord;importorg.apache.kafka.clients.consumer.KafkaConsumer;importjava.util.Arrays;importjava.util.Properties;importjava.util.concurrent.ExecutorService;importjava.util.concurrent.Executors;publicclassKafkaConsumerExample{publicstaticvoidmain(String[] args){// 配置消费者参数Properties props =newProperties();/*
bootstrap.servers
Kafka集群中Broker的地址列表,格式为"hostname:port",例如:"localhost:9092"。可以配置多个Broker,用逗号分隔。
*/
props.put("bootstrap.servers","ip:port");/*
group.id
消费者组的名称,同一个消费者组中的消费者会共享消费消息的责任。例如:"test"。
*/
props.put("group.id","test");/*
enable.auto.commit
是否自动提交偏移量,默认为true。如果为false,则需要手动提交偏移量。
*/
props.put("enable.auto.commit","true");/*
session.timeout.ms
消费者会话超时时间(毫秒),如果消费者在该时间内没有向Kafka Broker发送心跳,则会被认为已经失效。默认10000毫秒。
*/
props.put("session.timeout.ms","30000");/*
auto.offset.reset
如果消费者在初始化时没有指定偏移量或指定的偏移量不存在,则从哪个位置开始消费,默认latest,即从最新的消息开始消费。其他可选值为earliest和none。
*/
props.put("auto.offset.reset","earliest");/*
key.deserializer
key的反序列化方式,例如:"org.apache.kafka.common.serialization.StringDeserializer"。
*/
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");/*
value.deserializer
value的反序列化方式,例如:"org.apache.kafka.common.serialization.StringDeserializer"。
*/
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");/*
max.poll.records
每次拉取消息的最大记录数,默认500条。
*/
props.put("max.poll.records","10000");/*
fetch.min.bytes
每次拉取的最小字节数,默认1字节。
fetch.max.bytes
每次拉取的最大字节数,默认52428800字节,即50MB。
fetch.max.wait.ms
最长等待时间(毫秒),如果在该时间内没有拉取到任何消息,则返回空结果。默认500毫秒。
*/
props.put("fetch.min.bytes","1024");
props.put("fetch.max.bytes","1048576");
props.put("fetch.max.wait.ms","500");/*
max.partition.fetch.bytes
每个分区最大拉取字节数,默认1048576字节,即1MB。
*/
props.put("max.partition.fetch.bytes","1024");/*
connections.max.idle.ms
最大空闲连接时间(毫秒),超过该时间则连接被认为已经过期并关闭。默认540000毫秒,即9分钟。
*/
props.put("connections.max.idle.ms","540000");/*
request.timeout.ms
请求超时时间(毫秒),如果在该时间内没有收到Broker的响应,则认为请求失败。默认30000毫秒。
*/
props.put("request.timeout.ms","40000");/*
retry.backoff.ms
重试等待时间(毫秒),如果请求失败,则等待一段时间后再次重试。默认500毫秒。
*/
props.put("retry.backoff.ms","500");/*
security.protocol
安全协议类型,例如SSL或SASL_SSL。
ssl.keystore.location
SSL证书的路径和名称。
ssl.keystore.password
SSL证书的密码。
ssl.truststore.location
SSL信任证书库的路径和名称。
ssl.truststore.password
SSL信任证书库的密码。
*/
props.put("security.protocol","SSL");
props.put("ssl.keystore.location","/path/to/keystore");
props.put("ssl.keystore.password","password");
props.put("ssl.truststore.location","/path/to/truststore");
props.put("ssl.truststore.password","password");// 创建Kafka消费者实例KafkaConsumer<String,String> consumer =newKafkaConsumer<>(props);// 订阅主题
consumer.subscribe(Arrays.asList("my-topic"));// 创建线程池ExecutorService executor =Executors.newFixedThreadPool(6);// 消费消息while(true){ConsumerRecords<String,String> records = consumer.poll(100);for(ConsumerRecord<String,String>record: records){// 获取消息所在分区的编号int partition =record.partition();// 将消息提交给对应的线程进行处理
executor.submit(newMessageHandler(record.value(), partition));}}}// 消息处理器staticclassMessageHandlerimplementsRunnable{privatefinalString message;privatefinalint partition;publicMessageHandler(String message,int partition){this.message = message;this.partition = partition;}@Overridepublicvoidrun(){// 对消息进行处理System.out.printf("Partition %d: Message received: %s%n", partition, message);}}}
以上参数根据自己需求填写
可以根据分区 使用多线程执行
版权归原作者 Leaf吧 所有, 如有侵权,请联系我们删除。