Pulsar发送Json Schema数据, 消费端丢失字段

Viewed 63

问题描述
Producer发送JSON类型数据到Pular集群, Consumer在消费拿到数据时, 存在字段丢失现象. 具体类定义: 自定义三个类MessageBody, User, Student. MessageBody作为Schema.Type, User是MessageBody其中一个字段, Student是User的子类. 构建MessageBody时, User字段赋值为一个Student实例. 最终消息收到后, 反序列化回来, Student中的字段丢失了

public class RequestBody {
    private String from;
    private User user;
}
public class User {
    private String ID;
    private String name;
    private Integer age;
}
public class Employee extends User {
    private String title;
}
// 发送数据构建
RequestBody body = new RequestBody();
Employee employee = new Employee("001", "Xuwei", 33);
employee.setTitle("Software Engineer");
body.setFrom("test");
body.setUser(employee);
sendMessage(commonConfig.getTestTopic(), body);
// Consumer收到的数据
{
    "from": "test",
    "user": {
        "name": "Xuwei",
        "age": 33,
        "id": "001"
    }
}
1 Answers

Pulsar在订阅Topic并发送JSON类型数据时, 需要指定消息内容及消息体定义的实体类. 在构建SchemaInfo时, 没有构建出子类实例中的字段属性, 但消息体原始数据是完整的. 导致消息写入时, 根据Schema构建消息实际写入内容丢失了字段. 即
image.png

image.png

解决方案
3种方式

  1. 客户端调整业务代码, RequestBody中通过子类做字段声明, 不要使用父类, 即
public class RequestBody {
    private String from;
    private Employee employee;
}
  1. 客户端通过Schema.STRING来发送消息, 发送端将消息实体类转为JSON字符串发送, 消费端拿到数据后再转回实体类.
  2. pulsar-admin 补齐丢失的字段
pulsarAdmin.schemas().createSchema(topic, schema);
pulsarAdmin.topicPolicies().setSchemaCompatibilityStrategy();