所使用版本:2.9.2版本,
部署环境:k8s部署
业务场景: 延时消息每30s发送一条车辆拉取位置消息,收到上一个消息再发送一条30s延时消息出去。
消息配置
使用的分区消息,消费者使用Shared模式.
final ConsumerBuilder<?> consumerBuilder = pulsarClient
.newConsumer(SchemaUtils.getSchema(holder.getAnnotation().serialization(),
holder.getAnnotation().clazz()))
.consumerName(urlBuildService.buildPulsarConsumerName(consumerName, generatedConsumerName))
.subscriptionName(urlBuildService.buildPulsarSubscriptionName(subscriptionName, generatedConsumerName))
.topic(urlBuildService.buildTopicUrl(topicName, namespace))
//这里使用的Shared模式
.subscriptionType(subscriptionType)
.subscriptionInitialPosition(holder.getAnnotation().initialPosition())
.messageListener((consumer, msg) -> {
try {
final Method method = holder.getHandler();
method.setAccessible(true);
//可以将代理类注入到包装处理的
if (holder.isWrapped()) {
method.invoke(holder.getBean(), wrapMessage(msg));
} else {
method.invoke(holder.getBean(), msg.getValue());
}
consumer.acknowledge(msg);
} catch (Exception e) {
LoggerBuilder.busLogger().error(msg.getValue() + "-->消费失败", e);
consumer.negativeAcknowledge(msg);
// sink.tryEmitNext(new GpsException(PulsarError.PULSAR_CONSUMER_ERROR,e));
}
})
.enableRetry(Boolean.TRUE);
消费者代码
@PulsarConsumer(topic = "delayQueueTopic", clazz = String.class, serialization = Serialization.STRING, subscriptionType = SubscriptionType.Shared, // 订阅模式,默认为独占模式
subscriptionName = "delayQueueTopicSubscription", maxRedeliverCount = 50, // 最大重试次数
deadLetterTopic = "delayQueueTopic_DLT", // 死信topic名称
negativeAckRedeliveryDelay = 30 // 重试间隔时间 30s
)
// 如果消费失败,请抛出异常,这样消息会进入重试队列,之后可以重新消费,直到达到最大重试次数之后,进入死信队列。
public void received(String carNo) {
GpsLogger.busLogger().info("收到车辆延时队列消息,消息内容message={}", carNo);
//校验并发送延时队列
try {
zjExecutor.needSendDelayQueue(carNo);
} catch (Exception e) {
//这里没有发生过异常
GpsLogger.busLogger().error("发送延时队列失败,车牌号:{},异常信息:{}", carNo, e.getMessage());
throw new RuntimeException(e);
}
/**
* 处理消息
*/
//处理点位
try {
ZjPosisitonParam zjPosisitonParam = new ZjPosisitonParam();
zjPosisitonParam.setCarNo(carNo);
zjBatchExecutor.batchInvokeZj(zjPosisitonParam);
} catch (Exception e) {
//直接吃掉异常
GpsLogger.busLogger().error("延时队列处理点位信息失败,车牌号:{},异常信息:{}", carNo, e.getMessage());
}
}
监控partition-1情况如下
# ./pulsar-admin topics stats \ persistent://gpsTenant/gpsServerNamespace/delayQueuePartitionTopic-partition-1
{
"msgRateIn" : 27.316733715678055,
"msgThroughputIn" : 1858.0712273103368,
"msgRateOut" : 28.13340087625443,
"msgThroughputOut" : 1913.6379276084924,
"bytesInCounter" : 6839012258,
"msgInCounter" : 101637611,
"bytesOutCounter" : 6864896370,
"msgOutCounter" : 102022950,
"averageMsgSize" : 68.0195241001278,
"msgChunkPublished" : false,
"storageSize" : 2513563501,
"backlogSize" : 2513200993,
"offloadedStorageSize" : 0,
"lastOffloadLedgerId" : 0,
"lastOffloadSuccessTimeStamp" : 0,
"lastOffloadFailureTimeStamp" : 0,
"publishers" : [ {
"accessMode" : "Shared",
"msgRateIn" : 5.633347222914246,
"msgThroughputIn" : 383.1676114047293,
"averageMsgSize" : 68.01775147928994,
"chunkedMessageRate" : 0.0,
"producerId" : 4,
"metadata" : { },
"address" : "/10.233.82.66:46944",
"connectedSince" : "2024-10-15T12:12:32.147744Z",
"clientVersion" : "2.9.2",
"producerName" : "pulsar-14-1851"
}, {
"accessMode" : "Shared",
"msgRateIn" : 5.150012731432307,
"msgThroughputIn" : 350.31753269247787,
"averageMsgSize" : 68.02265372168284,
"chunkedMessageRate" : 0.0,
"producerId" : 4,
"metadata" : { },
"address" : "/10.233.96.181:50378",
"connectedSince" : "2024-10-15T12:11:59.382382Z",
"clientVersion" : "2.9.2",
"producerName" : "pulsar-14-1830"
}, {
"accessMode" : "Shared",
"msgRateIn" : 5.4666803224341125,
"msgThroughputIn" : 371.83426217532025,
"averageMsgSize" : 68.01829268292683,
"chunkedMessageRate" : 0.0,
"producerId" : 4,
"metadata" : { },
"address" : "/10.233.124.30:34420",
"connectedSince" : "2024-10-15T12:12:12.752621Z",
"clientVersion" : "2.9.2",
"producerName" : "pulsar-14-1844"
}, {
"accessMode" : "Shared",
"msgRateIn" : 5.483347182257755,
"msgThroughputIn" : 372.98427535485195,
"averageMsgSize" : 68.02127659574468,
"chunkedMessageRate" : 0.0,
"producerId" : 4,
"metadata" : { },
"address" : "/10.233.121.125:38274",
"connectedSince" : "2024-10-15T12:11:56.53891Z",
"clientVersion" : "2.9.2",
"producerName" : "pulsar-14-1823"
}, {
"accessMode" : "Shared",
"msgRateIn" : 5.583346256639635,
"msgThroughputIn" : 379.76754568295735,
"averageMsgSize" : 68.0179104477612,
"chunkedMessageRate" : 0.0,
"producerId" : 4,
"metadata" : { },
"address" : "/10.233.70.115:55988",
"connectedSince" : "2024-10-15T12:12:05.004669Z",
"clientVersion" : "2.9.2",
"producerName" : "pulsar-14-1837"
} ],
"waitingPublishers" : 0,
"subscriptions" : {
"delayQueuePartitionTopicSubscription" : {
"msgRateOut" : 28.13340087625443,
"msgThroughputOut" : 1913.6379276084924,
"bytesOutCounter" : 6864896370,
"msgOutCounter" : 102022950,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0,
"msgBacklog" : 1655,
"backlogSize" : 0,
"msgBacklogNoDelayed" : 830,
"blockedSubscriptionOnUnackedMsgs" : false,
"msgDelayed" : 825,
"unackedMessages" : 0,
"type" : "Shared",
"msgRateExpired" : 0.0,
"totalMsgExpired" : 0,
"lastExpireTimestamp" : 0,
"lastConsumedFlowTimestamp" : 1729473874099,
"lastConsumedTimestamp" : 1729473880190,
"lastAckedTimestamp" : 1729473880213,
"lastMarkDeleteAdvancedTimestamp" : 1728375912676,
"consumers" : [ {
"msgRateOut" : 5.666679908753166,
"msgThroughputOut" : 385.46756744012714,
"bytesOutCounter" : 195588573,
"msgOutCounter" : 2906224,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "consumercom.qy566.gps.platform.mq.DelayQueuePartitionConsumerListenerreceivedjava.lang.String",
"availablePermits" : 776,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1729473879231,
"lastConsumedTimestamp" : 1729473880190,
"metadata" : { },
"address" : "/10.233.121.125:38274",
"connectedSince" : "2024-10-15T12:11:58.665854Z",
"clientVersion" : "2.9.2"
}, {
"msgRateOut" : 5.400012801630349,
"msgThroughputOut" : 367.28420404175307,
"bytesOutCounter" : 196407263,
"msgOutCounter" : 2918443,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "consumercom.qy566.gps.platform.mq.DelayQueuePartitionConsumerListenerreceivedjava.lang.String",
"availablePermits" : 557,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1729473879620,
"lastConsumedTimestamp" : 1729473879994,
"metadata" : { },
"address" : "/10.233.96.181:50378",
"connectedSince" : "2024-10-15T12:12:01.572133Z",
"clientVersion" : "2.9.2"
}, {
"msgRateOut" : 5.5666800397804606,
"msgThroughputOut" : 378.7342431855424,
"bytesOutCounter" : 196244859,
"msgOutCounter" : 2915927,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "consumercom.qy566.gps.platform.mq.DelayQueuePartitionConsumerListenerreceivedjava.lang.String",
"availablePermits" : 573,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1729473879216,
"lastConsumedTimestamp" : 1729473880060,
"metadata" : { },
"address" : "/10.233.70.115:55988",
"connectedSince" : "2024-10-15T12:12:08.571215Z",
"clientVersion" : "2.9.2"
}, {
"msgRateOut" : 5.300012896786382,
"msgThroughputOut" : 360.46754381036436,
"bytesOutCounter" : 195761174,
"msgOutCounter" : 2908795,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "consumercom.qy566.gps.platform.mq.DelayQueuePartitionConsumerListenerreceivedjava.lang.String",
"availablePermits" : 705,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1729473879518,
"lastConsumedTimestamp" : 1729473880094,
"metadata" : { },
"address" : "/10.233.124.30:34420",
"connectedSince" : "2024-10-15T12:12:14.894816Z",
"clientVersion" : "2.9.2"
}, {
"msgRateOut" : 6.200015229304075,
"msgThroughputOut" : 421.68436913070536,
"bytesOutCounter" : 196826861,
"msgOutCounter" : 2924637,
"msgRateRedeliver" : 0.0,
"chunkedMessageRate" : 0.0,
"consumerName" : "consumercom.qy566.gps.platform.mq.DelayQueuePartitionConsumerListenerreceivedjava.lang.String",
"availablePermits" : 863,
"unackedMessages" : 0,
"avgMessagesPerEntry" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"lastAckedTimestamp" : 1729473880213,
"lastConsumedTimestamp" : 1729473880159,
"metadata" : { },
"address" : "/10.233.82.66:46944",
"connectedSince" : "2024-10-15T12:12:36.491101Z",
"clientVersion" : "2.9.2"
} ],
"isDurable" : true,
"isReplicated" : false,
"allowOutOfOrderDelivery" : false,
"consumersAfterMarkDeletePosition" : { },
"nonContiguousDeletedMessagesRanges" : 1572,
"nonContiguousDeletedMessagesRangesSerializedSize" : 32994,
"durable" : true,
"replicated" : false
}
},
"replication" : { },
"deduplicationStatus" : "Disabled",
"nonContiguousDeletedMessagesRanges" : 1572,
"nonContiguousDeletedMessagesRangesSerializedSize" : 32994,
"compaction" : {
"lastCompactionRemovedEventCount" : 0,
"lastCompactionSucceedTimestamp" : 0,
"lastCompactionFailedTimestamp" : 0,
"lastCompactionDurationTimeInMills" : 0
}
}
结果: 消息每次都会发送成功并且返回msgid,但是消费者有时候偶尔就会收不到消息。导致发送的消息丢失.
这种情况遇到过么?应该怎么解决?