pulsar版本: 3.3.0
问题描述:
开启限流下,消费topic数据时偶现消费端不消费数据问题,消费组对应的msgBacklog和backlogSize都大于0,重启broker或者重新创建订阅组时才会消费到数据;
相关设置:
namespace设置了以下策略:
pulsar-admin namespaces set-message-ttl -ttl 3d paas-msg-sub/xxxxxxx
pulsar-admin namespaces set-retention -s 10G -t 3d paas-msg-sub/xxxxxxx
pulsar-admin namespaces set-subscription-dispatch-rate -dt 1 -bd 1310720 paas-msg-sub/xxxxxxx
目前测试过程中数据流量较低,没有达到set-subscription-dispatch-rate设置的值
其中出现不消费现象时,remove-subscription-dispatch-rate 解除限流,消费端会重新消费到数据
消费方式:
循环调用consumer batchReceive方法
相关代码:
Consumer<byte[]> consumer = pulsarClient.newConsumer()
.topic(topic)
.subscriptionType(SubscriptionType.Shared)
.subscriptionName("defaultSubscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.batchReceivePolicy(BatchReceivePolicy.DEFAULT_POLICY)
.consumerName(UUID.randomUUID().toString())
.ackTimeout(10, TimeUnit.SECONDS)
.receiverQueueSize(1000)
.subscribe();
this.consumeBackoff = new BackoffBuilder()
.setInitialTime(10, TimeUnit.MILLISECONDS)
.setMax(1, TimeUnit.SECONDS)
.setMandatoryStop(0, TimeUnit.SECONDS)
.create();
执行以下代码循环拉取数据
try {
Messages<byte[]> messages = this.consumer.batchReceive();
if (messages == null || messages.size() == 0) {
this.executorService.schedule(this::consume, consumeBackoff.next(), TimeUnit.MILLISECONDS);
return;
}
// xxxxxx 业务处理逻辑,执行完成后ack
this.executorService.schedule(this::consume, consumeBackoff.next(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
long backoff = consumeBackoff.next();
log.error("Failed to receive message and send to client, retry in {} ms.", backoff, e);
this.executorService.schedule(this::consume, backoff, TimeUnit.MILLISECONDS);
}