Listener endpoints use schema resolver (#280)

* Listener endpoints use schema resolver

See #269

* Disabled PulsarFunctions IT
This commit is contained in:
Chris Bono
2023-01-20 09:57:25 -06:00
committed by GitHub
parent 4a10825cfa
commit e52ef5dea3
7 changed files with 375 additions and 275 deletions

View File

@@ -20,7 +20,6 @@ import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;
import org.apache.commons.logging.LogFactory;
import org.apache.pulsar.client.api.Consumer;
@@ -28,11 +27,6 @@ import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.ProtobufSchema;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.core.MethodParameter;
@@ -58,7 +52,6 @@ import org.springframework.pulsar.support.MessageConverter;
import org.springframework.pulsar.support.converter.PulsarRecordMessageConverter;
import org.springframework.util.Assert;
import com.google.protobuf.GeneratedMessageV3;
import reactor.core.publisher.Flux;
/**
@@ -67,6 +60,7 @@ import reactor.core.publisher.Flux;
*
* @param <V> Message payload type
* @author Christophe Bornet
* @author Chris Bono
*/
public class MethodReactivePulsarListenerEndpoint<V> extends AbstractReactivePulsarListenerEndpoint<V> {
@@ -140,68 +134,20 @@ public class MethodReactivePulsarListenerEndpoint<V> extends AbstractReactivePul
messageParameter = parameter.get();
}
// TODO refactor this all out later to SchemaResolver
// ResolvableType messageType = resolvableType(messageParameter);
// Schema<?> schema = schemaResolver.getSchema(schemaType, messageType);
DefaultReactivePulsarMessageListenerContainer<?> containerInstance = (DefaultReactivePulsarMessageListenerContainer<?>) container;
ReactivePulsarContainerProperties<?> pulsarContainerProperties = containerInstance.getContainerProperties();
SchemaType schemaType = pulsarContainerProperties.getSchemaType();
SchemaResolver schemaResolver = pulsarContainerProperties.getSchemaResolver();
if (schemaType != SchemaType.NONE) {
switch (schemaType) {
case STRING -> pulsarContainerProperties.setSchema((Schema) Schema.STRING);
case BYTES -> pulsarContainerProperties.setSchema((Schema) Schema.BYTES);
case INT8 -> pulsarContainerProperties.setSchema((Schema) Schema.INT8);
case INT16 -> pulsarContainerProperties.setSchema((Schema) Schema.INT16);
case INT32 -> pulsarContainerProperties.setSchema((Schema) Schema.INT32);
case INT64 -> pulsarContainerProperties.setSchema((Schema) Schema.INT64);
case BOOLEAN -> pulsarContainerProperties.setSchema((Schema) Schema.BOOL);
case DATE -> pulsarContainerProperties.setSchema((Schema) Schema.DATE);
case DOUBLE -> pulsarContainerProperties.setSchema((Schema) Schema.DOUBLE);
case FLOAT -> pulsarContainerProperties.setSchema((Schema) Schema.FLOAT);
case INSTANT -> pulsarContainerProperties.setSchema((Schema) Schema.INSTANT);
case LOCAL_DATE -> pulsarContainerProperties.setSchema((Schema) Schema.LOCAL_DATE);
case LOCAL_DATE_TIME -> pulsarContainerProperties.setSchema((Schema) Schema.LOCAL_DATE_TIME);
case LOCAL_TIME -> pulsarContainerProperties.setSchema((Schema) Schema.LOCAL_TIME);
case JSON -> {
Schema<?> messageSchema = getMessageSchema(messageParameter, JSONSchema::of);
pulsarContainerProperties.setSchema((Schema) messageSchema);
}
case AVRO -> {
Schema<?> messageSchema = getMessageSchema(messageParameter, AvroSchema::of);
pulsarContainerProperties.setSchema((Schema) messageSchema);
}
case PROTOBUF -> {
@SuppressWarnings("unchecked")
Schema<?> messageSchema = getMessageSchema(messageParameter,
(c -> ProtobufSchema.of((Class<? extends GeneratedMessageV3>) c)));
pulsarContainerProperties.setSchema((Schema) messageSchema);
}
case KEY_VALUE -> {
Schema<?> messageSchema = getMessageKeyValueSchema(schemaResolver, messageParameter);
pulsarContainerProperties.setSchema((Schema) messageSchema);
}
}
SchemaType schemaType = pulsarContainerProperties.getSchemaType();
ResolvableType messageType = resolvableType(messageParameter);
Schema<?> schema = schemaResolver.getSchema(schemaType, messageType);
if (schema != null) {
pulsarContainerProperties.setSchema((Schema) schema);
}
else {
if (messageParameter != null) {
Schema<?> messageSchema = null;
ResolvableType type = resolvableType(messageParameter);
if (KeyValue.class.isAssignableFrom(type.getRawClass())) {
messageSchema = getMessageKeyValueSchema(schemaResolver, messageParameter);
}
else {
messageSchema = getMessageSchema(messageParameter,
(messageClass) -> schemaResolver.getSchema(messageClass, false));
}
if (messageSchema != null) {
pulsarContainerProperties.setSchema((Schema) messageSchema);
}
}
// Make sure the schemaType is updated to match the current schema
if (pulsarContainerProperties.getSchema() != null) {
SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType();
pulsarContainerProperties.setSchemaType(type);
}
SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType();
pulsarContainerProperties.setSchemaType(type);
ReactiveMessageConsumerBuilderCustomizer<V> customizer1 = b -> b.deadLetterPolicy(this.deadLetterPolicy);
container.setConsumerCustomizer(b -> {
@@ -214,21 +160,6 @@ public class MethodReactivePulsarListenerEndpoint<V> extends AbstractReactivePul
return messageListener;
}
private Schema<?> getMessageSchema(MethodParameter messageParameter, Function<Class<?>, Schema<?>> schemaFactory) {
ResolvableType messageType = resolvableType(messageParameter);
Class<?> messageClass = messageType.getRawClass();
return schemaFactory.apply(messageClass);
}
private Schema<?> getMessageKeyValueSchema(SchemaResolver schemaResolver, MethodParameter messageParameter) {
ResolvableType messageType = resolvableType(messageParameter);
Class<?> keyClass = messageType.resolveGeneric(0);
Class<?> valueClass = messageType.resolveGeneric(1);
Schema<? extends Class<?>> keySchema = schemaResolver.getSchema(keyClass);
Schema<? extends Class<?>> valueSchema = schemaResolver.getSchema(valueClass);
return Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE);
}
private ResolvableType resolvableType(MethodParameter methodParameter) {
ResolvableType resolvableType = ResolvableType.forMethodParameter(methodParameter);
Class<?> rawClass = resolvableType.getRawClass();