Pulsar 2.9.2 使用延时队列偶尔丢失消息的问题

Viewed 141

所使用版本:2.9.2版本,

部署环境:k8s部署

业务场景: 延时消息每30s发送一条车辆拉取位置消息,收到上一个消息再发送一条30s延时消息出去。

消息配置
使用的分区消息,消费者使用Shared模式.

final ConsumerBuilder<?> consumerBuilder = pulsarClient
                    .newConsumer(SchemaUtils.getSchema(holder.getAnnotation().serialization(),
                            holder.getAnnotation().clazz()))
                    .consumerName(urlBuildService.buildPulsarConsumerName(consumerName, generatedConsumerName))
                    .subscriptionName(urlBuildService.buildPulsarSubscriptionName(subscriptionName, generatedConsumerName))
                    .topic(urlBuildService.buildTopicUrl(topicName, namespace))
  //这里使用的Shared模式
                    .subscriptionType(subscriptionType)
                    .subscriptionInitialPosition(holder.getAnnotation().initialPosition())
                    .messageListener((consumer, msg) -> {
                        try {
                            final Method method = holder.getHandler();
                            method.setAccessible(true);
                            //可以将代理类注入到包装处理的
                            if (holder.isWrapped()) {
                                method.invoke(holder.getBean(), wrapMessage(msg));
                            } else {
                                method.invoke(holder.getBean(), msg.getValue());
                            }

                            consumer.acknowledge(msg);
                        } catch (Exception e) {
                            LoggerBuilder.busLogger().error(msg.getValue() + "-->消费失败", e);
                            consumer.negativeAcknowledge(msg);
//                        sink.tryEmitNext(new GpsException(PulsarError.PULSAR_CONSUMER_ERROR,e));
                        }
                    })
                    .enableRetry(Boolean.TRUE);

消费者代码

 @PulsarConsumer(topic = "delayQueueTopic", clazz = String.class, serialization = Serialization.STRING, subscriptionType = SubscriptionType.Shared, // 订阅模式,默认为独占模式
            subscriptionName = "delayQueueTopicSubscription", maxRedeliverCount = 50, // 最大重试次数
            deadLetterTopic = "delayQueueTopic_DLT", // 死信topic名称
            negativeAckRedeliveryDelay = 30 // 重试间隔时间 30s
    )
    // 如果消费失败,请抛出异常,这样消息会进入重试队列,之后可以重新消费,直到达到最大重试次数之后,进入死信队列。
    public void received(String carNo) {
        GpsLogger.busLogger().info("收到车辆延时队列消息,消息内容message={}", carNo);

        //校验并发送延时队列
        try {
            zjExecutor.needSendDelayQueue(carNo);
        } catch (Exception e) {
          //这里没有发生过异常
            GpsLogger.busLogger().error("发送延时队列失败,车牌号:{},异常信息:{}", carNo, e.getMessage());
            throw new RuntimeException(e);
        }
        /**
         * 处理消息
         */
            //处理点位
            try {
                ZjPosisitonParam zjPosisitonParam = new ZjPosisitonParam();
                zjPosisitonParam.setCarNo(carNo);

                zjBatchExecutor.batchInvokeZj(zjPosisitonParam);
            } catch (Exception e) {
                //直接吃掉异常
                GpsLogger.busLogger().error("延时队列处理点位信息失败,车牌号:{},异常信息:{}", carNo, e.getMessage());
            }

    }

监控partition-1情况如下

#  ./pulsar-admin topics stats \ persistent://gpsTenant/gpsServerNamespace/delayQueuePartitionTopic-partition-1
{
  "msgRateIn" : 27.316733715678055,
  "msgThroughputIn" : 1858.0712273103368,
  "msgRateOut" : 28.13340087625443,
  "msgThroughputOut" : 1913.6379276084924,
  "bytesInCounter" : 6839012258,
  "msgInCounter" : 101637611,
  "bytesOutCounter" : 6864896370,
  "msgOutCounter" : 102022950,
  "averageMsgSize" : 68.0195241001278,
  "msgChunkPublished" : false,
  "storageSize" : 2513563501,
  "backlogSize" : 2513200993,
  "offloadedStorageSize" : 0,
  "lastOffloadLedgerId" : 0,
  "lastOffloadSuccessTimeStamp" : 0,
  "lastOffloadFailureTimeStamp" : 0,
  "publishers" : [ {
    "accessMode" : "Shared",
    "msgRateIn" : 5.633347222914246,
    "msgThroughputIn" : 383.1676114047293,
    "averageMsgSize" : 68.01775147928994,
    "chunkedMessageRate" : 0.0,
    "producerId" : 4,
    "metadata" : { },
    "address" : "/10.233.82.66:46944",
    "connectedSince" : "2024-10-15T12:12:32.147744Z",
    "clientVersion" : "2.9.2",
    "producerName" : "pulsar-14-1851"
  }, {
    "accessMode" : "Shared",
    "msgRateIn" : 5.150012731432307,
    "msgThroughputIn" : 350.31753269247787,
    "averageMsgSize" : 68.02265372168284,
    "chunkedMessageRate" : 0.0,
    "producerId" : 4,
    "metadata" : { },
    "address" : "/10.233.96.181:50378",
    "connectedSince" : "2024-10-15T12:11:59.382382Z",
    "clientVersion" : "2.9.2",
    "producerName" : "pulsar-14-1830"
  }, {
    "accessMode" : "Shared",
    "msgRateIn" : 5.4666803224341125,
    "msgThroughputIn" : 371.83426217532025,
    "averageMsgSize" : 68.01829268292683,
    "chunkedMessageRate" : 0.0,
    "producerId" : 4,
    "metadata" : { },
    "address" : "/10.233.124.30:34420",
    "connectedSince" : "2024-10-15T12:12:12.752621Z",
    "clientVersion" : "2.9.2",
    "producerName" : "pulsar-14-1844"
  }, {
    "accessMode" : "Shared",
    "msgRateIn" : 5.483347182257755,
    "msgThroughputIn" : 372.98427535485195,
    "averageMsgSize" : 68.02127659574468,
    "chunkedMessageRate" : 0.0,
    "producerId" : 4,
    "metadata" : { },
    "address" : "/10.233.121.125:38274",
    "connectedSince" : "2024-10-15T12:11:56.53891Z",
    "clientVersion" : "2.9.2",
    "producerName" : "pulsar-14-1823"
  }, {
    "accessMode" : "Shared",
    "msgRateIn" : 5.583346256639635,
    "msgThroughputIn" : 379.76754568295735,
    "averageMsgSize" : 68.0179104477612,
    "chunkedMessageRate" : 0.0,
    "producerId" : 4,
    "metadata" : { },
    "address" : "/10.233.70.115:55988",
    "connectedSince" : "2024-10-15T12:12:05.004669Z",
    "clientVersion" : "2.9.2",
    "producerName" : "pulsar-14-1837"
  } ],
  "waitingPublishers" : 0,
  "subscriptions" : {
    "delayQueuePartitionTopicSubscription" : {
      "msgRateOut" : 28.13340087625443,
      "msgThroughputOut" : 1913.6379276084924,
      "bytesOutCounter" : 6864896370,
      "msgOutCounter" : 102022950,
      "msgRateRedeliver" : 0.0,
      "chunkedMessageRate" : 0,
      "msgBacklog" : 1655,
      "backlogSize" : 0,
      "msgBacklogNoDelayed" : 830,
      "blockedSubscriptionOnUnackedMsgs" : false,
      "msgDelayed" : 825,
      "unackedMessages" : 0,
      "type" : "Shared",
      "msgRateExpired" : 0.0,
      "totalMsgExpired" : 0,
      "lastExpireTimestamp" : 0,
      "lastConsumedFlowTimestamp" : 1729473874099,
      "lastConsumedTimestamp" : 1729473880190,
      "lastAckedTimestamp" : 1729473880213,
      "lastMarkDeleteAdvancedTimestamp" : 1728375912676,
      "consumers" : [ {
        "msgRateOut" : 5.666679908753166,
        "msgThroughputOut" : 385.46756744012714,
        "bytesOutCounter" : 195588573,
        "msgOutCounter" : 2906224,
        "msgRateRedeliver" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "consumercom.qy566.gps.platform.mq.DelayQueuePartitionConsumerListenerreceivedjava.lang.String",
        "availablePermits" : 776,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 1729473879231,
        "lastConsumedTimestamp" : 1729473880190,
        "metadata" : { },
        "address" : "/10.233.121.125:38274",
        "connectedSince" : "2024-10-15T12:11:58.665854Z",
        "clientVersion" : "2.9.2"
      }, {
        "msgRateOut" : 5.400012801630349,
        "msgThroughputOut" : 367.28420404175307,
        "bytesOutCounter" : 196407263,
        "msgOutCounter" : 2918443,
        "msgRateRedeliver" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "consumercom.qy566.gps.platform.mq.DelayQueuePartitionConsumerListenerreceivedjava.lang.String",
        "availablePermits" : 557,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 1729473879620,
        "lastConsumedTimestamp" : 1729473879994,
        "metadata" : { },
        "address" : "/10.233.96.181:50378",
        "connectedSince" : "2024-10-15T12:12:01.572133Z",
        "clientVersion" : "2.9.2"
      }, {
        "msgRateOut" : 5.5666800397804606,
        "msgThroughputOut" : 378.7342431855424,
        "bytesOutCounter" : 196244859,
        "msgOutCounter" : 2915927,
        "msgRateRedeliver" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "consumercom.qy566.gps.platform.mq.DelayQueuePartitionConsumerListenerreceivedjava.lang.String",
        "availablePermits" : 573,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 1729473879216,
        "lastConsumedTimestamp" : 1729473880060,
        "metadata" : { },
        "address" : "/10.233.70.115:55988",
        "connectedSince" : "2024-10-15T12:12:08.571215Z",
        "clientVersion" : "2.9.2"
      }, {
        "msgRateOut" : 5.300012896786382,
        "msgThroughputOut" : 360.46754381036436,
        "bytesOutCounter" : 195761174,
        "msgOutCounter" : 2908795,
        "msgRateRedeliver" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "consumercom.qy566.gps.platform.mq.DelayQueuePartitionConsumerListenerreceivedjava.lang.String",
        "availablePermits" : 705,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 1729473879518,
        "lastConsumedTimestamp" : 1729473880094,
        "metadata" : { },
        "address" : "/10.233.124.30:34420",
        "connectedSince" : "2024-10-15T12:12:14.894816Z",
        "clientVersion" : "2.9.2"
      }, {
        "msgRateOut" : 6.200015229304075,
        "msgThroughputOut" : 421.68436913070536,
        "bytesOutCounter" : 196826861,
        "msgOutCounter" : 2924637,
        "msgRateRedeliver" : 0.0,
        "chunkedMessageRate" : 0.0,
        "consumerName" : "consumercom.qy566.gps.platform.mq.DelayQueuePartitionConsumerListenerreceivedjava.lang.String",
        "availablePermits" : 863,
        "unackedMessages" : 0,
        "avgMessagesPerEntry" : 0,
        "blockedConsumerOnUnackedMsgs" : false,
        "lastAckedTimestamp" : 1729473880213,
        "lastConsumedTimestamp" : 1729473880159,
        "metadata" : { },
        "address" : "/10.233.82.66:46944",
        "connectedSince" : "2024-10-15T12:12:36.491101Z",
        "clientVersion" : "2.9.2"
      } ],
      "isDurable" : true,
      "isReplicated" : false,
      "allowOutOfOrderDelivery" : false,
      "consumersAfterMarkDeletePosition" : { },
      "nonContiguousDeletedMessagesRanges" : 1572,
      "nonContiguousDeletedMessagesRangesSerializedSize" : 32994,
      "durable" : true,
      "replicated" : false
    }
  },
  "replication" : { },
  "deduplicationStatus" : "Disabled",
  "nonContiguousDeletedMessagesRanges" : 1572,
  "nonContiguousDeletedMessagesRangesSerializedSize" : 32994,
  "compaction" : {
    "lastCompactionRemovedEventCount" : 0,
    "lastCompactionSucceedTimestamp" : 0,
    "lastCompactionFailedTimestamp" : 0,
    "lastCompactionDurationTimeInMills" : 0
  }
}

结果: 消息每次都会发送成功并且返回msgid,但是消费者有时候偶尔就会收不到消息。导致发送的消息丢失.

这种情况遇到过么?应该怎么解决?

2 Answers

在Pulsar 2.9.2版本中,延迟消息的推送可能存在较大的延迟。https://github.com/apache/pulsar/pull/16068 对该问题进行了修复,Pulsar 2.9.4包含了该修复。

如果你们的延迟消息规模很大的话,建议Pulsar 3.0.x版本能够很好的支持大规模延迟消息。以前的版本延迟消息是在内存中跟踪的,可能存在OOM的问题还有重建索引等问题。请参考https://github.com/apache/pulsar/issues/16763。

不是延时,是一天之后都没有收到消息,到最后也没有收到消息