Pulsar 提供了三种消费模式:独立消费者模式、共享订阅模式和发布订阅模式。
1: 独立消费者模式:每个消费者实例都会独立地消费消息,并且每个消息只会被一个消费者消费。这种模式适合于需要完全独立处理消息的场景,例如数据采集和日志处理。
import org.apache.pulsar.client.api.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PulsarConsumerDemo {
private static final Logger log = LoggerFactory.getLogger(PulsarConsumerDemo.class);
public static void main(String[] args) throws PulsarClientException {
String pulsarServiceUrl = "pulsar://localhost:6650";
String topicName = "persistent://public/default/test-topic";
String subscriptionName = "test-subscription";
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(pulsarServiceUrl)
.build();
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
.topic(topicName)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
while (true) {
Message<String> msg = consumer.receive();
log.info("Received message: {}", msg.getValue());
consumer.acknowledge(msg);
}
// 如果不再需要消费消息,可以关闭消费者和 Pulsar 客户端
// consumer.close();
// pulsarClient.close();
}
}
在这个示例中,我们创建了一个独立的消费者实例,它会订阅名为 "test-topic" 的主题,并以独占模式订阅该主题。该消费者实例将接收来自 Pulsar 服务的所有消息,并将每条消息打印到日志中。注意,在处理完每条消息后,我们调用
consumer.acknowledge(msg)
来告诉 Pulsar 服务该消息已被处理,以便 Pulsar 服务可以将该消息从订阅的队列中删除。
2: 共享订阅模式:多个消费者实例可以订阅同一个主题,每个消息会被分发给其中一个消费者实例进行处理。这种模式适合于需要多个消费者处理同一个主题的场景,例如任务分发和负载均衡。
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
public class SharedSubscriptionDemo {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "my-topic";
private static final String SUBSCRIPTION_NAME = "my-subscription";
public static void main(String[] args) throws PulsarClientException {
// 创建 Pulsar 客户端
PulsarClient client = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
// 创建共享订阅模式的消费者
ConsumerBuilder<byte[]> consumerBuilder = client.newConsumer()
.topic(TOPIC_NAME)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionType(SubscriptionType.Shared)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
// 创建多个消费者实例
int numConsumers = 3;
Consumer<byte[]>[] consumers = new Consumer[numConsumers];
for (int i = 0; i < numConsumers; i++) {
consumers[i] = consumerBuilder.clone()
.consumerName("consumer-" + i)
.subscribe();
}
// 消费消息
while (true) {
for (Consumer<byte[]> consumer : consumers) {
// 每个消费者实例都会收到一部分消息
byte[] message = consumer.receive().getData();
System.out.printf("Consumer %s received message: %s%n", consumer.getConsumerName(), new String(message));
consumer.acknowledge(message);
}
}
}
}
在这个示例中,我们创建了一个名为 "my-topic" 的主题,使用共享订阅模式创建了三个消费者实例。当有消息发布到主题时,每个消费者实例都会收到其中一部分消息进行处理。在这里,我们简单地打印每个消费者实例收到的消息内容。
3: 发布订阅模式:每个消费者实例都会订阅一个或多个主题,并接收所有发布到这些主题的消息。这种模式适合于需要将消息广播给多个消费者实例的场景,例如实时推送和广告投放。
from pulsar import Client, Message
# 创建 Pulsar 客户端
client = Client('pulsar://localhost:6650')
# 创建生产者并发送消息到主题
producer = client.create_producer('my-topic')
producer.send(Message('Hello, Pulsar!'.encode('utf-8')))
# 创建两个消费者并订阅主题
consumer1 = client.subscribe('my-topic', 'my-subscription-1')
consumer2 = client.subscribe('my-topic', 'my-subscription-2')
# 从消费者中接收消息
while True:
msg1 = consumer1.receive()
msg2 = consumer2.receive()
print('Consumer 1 received: {}'.format(msg1.data().decode('utf-8')))
print('Consumer 2 received: {}'.format(msg2.data().decode('utf-8')))
consumer1.acknowledge(msg1)
consumer2.acknowledge(msg2)
这个例子创建了一个生产者,将消息发送到主题“my-topic”。然后,创建两个消费者并订阅相同的主题“my-topic”,并从每个消费者中接收消息。最后,确认消费者已成功处理消息并将其从队列中删除。
版权归原作者 修猫写代码 所有, 如有侵权,请联系我们删除。