diff --git a/.github/workflows/ci-pr.yml b/.github/workflows/ci-pr.yml index f5fb08d9..8ab4fed1 100644 --- a/.github/workflows/ci-pr.yml +++ b/.github/workflows/ci-pr.yml @@ -36,7 +36,7 @@ jobs: - name: Run Gradle build run: | - ./gradlew clean build -DdownloadRabbitConnector=true --continue --scan + ./gradlew clean build -DdownloadRabbitConnector=false --continue --scan - name: Capture Test Results if: failure() diff --git a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java index 58bb6b23..248238b4 100644 --- a/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java +++ b/spring-pulsar-reactive/src/main/java/org/springframework/pulsar/reactive/config/MethodReactivePulsarListenerEndpoint.java @@ -20,7 +20,6 @@ import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.function.Function; import org.apache.commons.logging.LogFactory; import org.apache.pulsar.client.api.Consumer; @@ -28,11 +27,6 @@ import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.schema.AvroSchema; -import org.apache.pulsar.client.impl.schema.JSONSchema; -import org.apache.pulsar.client.impl.schema.ProtobufSchema; -import org.apache.pulsar.common.schema.KeyValue; -import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; import org.springframework.core.MethodParameter; @@ -58,7 +52,6 @@ import org.springframework.pulsar.support.MessageConverter; import org.springframework.pulsar.support.converter.PulsarRecordMessageConverter; import org.springframework.util.Assert; -import com.google.protobuf.GeneratedMessageV3; import reactor.core.publisher.Flux; /** @@ -67,6 +60,7 @@ import reactor.core.publisher.Flux; * * @param Message payload type * @author Christophe Bornet + * @author Chris Bono */ public class MethodReactivePulsarListenerEndpoint extends AbstractReactivePulsarListenerEndpoint { @@ -140,68 +134,20 @@ public class MethodReactivePulsarListenerEndpoint extends AbstractReactivePul messageParameter = parameter.get(); } - // TODO refactor this all out later to SchemaResolver - // ResolvableType messageType = resolvableType(messageParameter); - // Schema schema = schemaResolver.getSchema(schemaType, messageType); - DefaultReactivePulsarMessageListenerContainer containerInstance = (DefaultReactivePulsarMessageListenerContainer) container; ReactivePulsarContainerProperties pulsarContainerProperties = containerInstance.getContainerProperties(); - SchemaType schemaType = pulsarContainerProperties.getSchemaType(); SchemaResolver schemaResolver = pulsarContainerProperties.getSchemaResolver(); - if (schemaType != SchemaType.NONE) { - switch (schemaType) { - case STRING -> pulsarContainerProperties.setSchema((Schema) Schema.STRING); - case BYTES -> pulsarContainerProperties.setSchema((Schema) Schema.BYTES); - case INT8 -> pulsarContainerProperties.setSchema((Schema) Schema.INT8); - case INT16 -> pulsarContainerProperties.setSchema((Schema) Schema.INT16); - case INT32 -> pulsarContainerProperties.setSchema((Schema) Schema.INT32); - case INT64 -> pulsarContainerProperties.setSchema((Schema) Schema.INT64); - case BOOLEAN -> pulsarContainerProperties.setSchema((Schema) Schema.BOOL); - case DATE -> pulsarContainerProperties.setSchema((Schema) Schema.DATE); - case DOUBLE -> pulsarContainerProperties.setSchema((Schema) Schema.DOUBLE); - case FLOAT -> pulsarContainerProperties.setSchema((Schema) Schema.FLOAT); - case INSTANT -> pulsarContainerProperties.setSchema((Schema) Schema.INSTANT); - case LOCAL_DATE -> pulsarContainerProperties.setSchema((Schema) Schema.LOCAL_DATE); - case LOCAL_DATE_TIME -> pulsarContainerProperties.setSchema((Schema) Schema.LOCAL_DATE_TIME); - case LOCAL_TIME -> pulsarContainerProperties.setSchema((Schema) Schema.LOCAL_TIME); - case JSON -> { - Schema messageSchema = getMessageSchema(messageParameter, JSONSchema::of); - pulsarContainerProperties.setSchema((Schema) messageSchema); - } - case AVRO -> { - Schema messageSchema = getMessageSchema(messageParameter, AvroSchema::of); - pulsarContainerProperties.setSchema((Schema) messageSchema); - } - case PROTOBUF -> { - @SuppressWarnings("unchecked") - Schema messageSchema = getMessageSchema(messageParameter, - (c -> ProtobufSchema.of((Class) c))); - pulsarContainerProperties.setSchema((Schema) messageSchema); - } - case KEY_VALUE -> { - Schema messageSchema = getMessageKeyValueSchema(schemaResolver, messageParameter); - pulsarContainerProperties.setSchema((Schema) messageSchema); - } - } + SchemaType schemaType = pulsarContainerProperties.getSchemaType(); + ResolvableType messageType = resolvableType(messageParameter); + Schema schema = schemaResolver.getSchema(schemaType, messageType); + if (schema != null) { + pulsarContainerProperties.setSchema((Schema) schema); } - else { - if (messageParameter != null) { - Schema messageSchema = null; - ResolvableType type = resolvableType(messageParameter); - if (KeyValue.class.isAssignableFrom(type.getRawClass())) { - messageSchema = getMessageKeyValueSchema(schemaResolver, messageParameter); - } - else { - messageSchema = getMessageSchema(messageParameter, - (messageClass) -> schemaResolver.getSchema(messageClass, false)); - } - if (messageSchema != null) { - pulsarContainerProperties.setSchema((Schema) messageSchema); - } - } + // Make sure the schemaType is updated to match the current schema + if (pulsarContainerProperties.getSchema() != null) { + SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType(); + pulsarContainerProperties.setSchemaType(type); } - SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType(); - pulsarContainerProperties.setSchemaType(type); ReactiveMessageConsumerBuilderCustomizer customizer1 = b -> b.deadLetterPolicy(this.deadLetterPolicy); container.setConsumerCustomizer(b -> { @@ -214,21 +160,6 @@ public class MethodReactivePulsarListenerEndpoint extends AbstractReactivePul return messageListener; } - private Schema getMessageSchema(MethodParameter messageParameter, Function, Schema> schemaFactory) { - ResolvableType messageType = resolvableType(messageParameter); - Class messageClass = messageType.getRawClass(); - return schemaFactory.apply(messageClass); - } - - private Schema getMessageKeyValueSchema(SchemaResolver schemaResolver, MethodParameter messageParameter) { - ResolvableType messageType = resolvableType(messageParameter); - Class keyClass = messageType.resolveGeneric(0); - Class valueClass = messageType.resolveGeneric(1); - Schema> keySchema = schemaResolver.getSchema(keyClass); - Schema> valueSchema = schemaResolver.getSchema(valueClass); - return Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE); - } - private ResolvableType resolvableType(MethodParameter methodParameter) { ResolvableType resolvableType = ResolvableType.forMethodParameter(methodParameter); Class rawClass = resolvableType.getRawClass(); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java index 660a0469..346397d5 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/config/MethodPulsarListenerEndpoint.java @@ -20,7 +20,6 @@ import java.lang.reflect.Method; import java.util.Arrays; import java.util.List; import java.util.Optional; -import java.util.function.Function; import org.apache.commons.logging.LogFactory; import org.apache.pulsar.client.api.Consumer; @@ -29,11 +28,6 @@ import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Messages; import org.apache.pulsar.client.api.RedeliveryBackoff; import org.apache.pulsar.client.api.Schema; -import org.apache.pulsar.client.impl.schema.AvroSchema; -import org.apache.pulsar.client.impl.schema.JSONSchema; -import org.apache.pulsar.client.impl.schema.ProtobufSchema; -import org.apache.pulsar.common.schema.KeyValue; -import org.apache.pulsar.common.schema.KeyValueEncodingType; import org.apache.pulsar.common.schema.SchemaType; import org.springframework.core.MethodParameter; @@ -60,8 +54,6 @@ import org.springframework.pulsar.support.converter.PulsarBatchMessageConverter; import org.springframework.pulsar.support.converter.PulsarRecordMessageConverter; import org.springframework.util.Assert; -import com.google.protobuf.GeneratedMessageV3; - /** * A {@link PulsarListenerEndpoint} providing the method to invoke to process an incoming * message for this endpoint. @@ -69,6 +61,7 @@ import com.google.protobuf.GeneratedMessageV3; * @param Message payload type * @author Soby Chacko * @author Alexander Preuß + * @author Chris Bono */ public class MethodPulsarListenerEndpoint extends AbstractPulsarListenerEndpoint { @@ -148,69 +141,18 @@ public class MethodPulsarListenerEndpoint extends AbstractPulsarListenerEndpo ConcurrentPulsarMessageListenerContainer containerInstance = (ConcurrentPulsarMessageListenerContainer) container; PulsarContainerProperties pulsarContainerProperties = containerInstance.getContainerProperties(); - SchemaType schemaType = pulsarContainerProperties.getSchemaType(); SchemaResolver schemaResolver = pulsarContainerProperties.getSchemaResolver(); - - // TODO refactor this all out later to SchemaResolver - // ResolvableType messageType = resolvableType(messageParameter); - // Schema schema = schemaResolver.getSchema(schemaType, messageType); - - if (schemaType != SchemaType.NONE) { - switch (schemaType) { - case STRING -> pulsarContainerProperties.setSchema(Schema.STRING); - case BYTES -> pulsarContainerProperties.setSchema(Schema.BYTES); - case INT8 -> pulsarContainerProperties.setSchema(Schema.INT8); - case INT16 -> pulsarContainerProperties.setSchema(Schema.INT16); - case INT32 -> pulsarContainerProperties.setSchema(Schema.INT32); - case INT64 -> pulsarContainerProperties.setSchema(Schema.INT64); - case BOOLEAN -> pulsarContainerProperties.setSchema(Schema.BOOL); - case DATE -> pulsarContainerProperties.setSchema(Schema.DATE); - case DOUBLE -> pulsarContainerProperties.setSchema(Schema.DOUBLE); - case FLOAT -> pulsarContainerProperties.setSchema(Schema.FLOAT); - case INSTANT -> pulsarContainerProperties.setSchema(Schema.INSTANT); - case LOCAL_DATE -> pulsarContainerProperties.setSchema(Schema.LOCAL_DATE); - case LOCAL_DATE_TIME -> pulsarContainerProperties.setSchema(Schema.LOCAL_DATE_TIME); - case LOCAL_TIME -> pulsarContainerProperties.setSchema(Schema.LOCAL_TIME); - case JSON -> { - Schema messageSchema = getMessageSchema(messageParameter, JSONSchema::of); - pulsarContainerProperties.setSchema(messageSchema); - } - case AVRO -> { - Schema messageSchema = getMessageSchema(messageParameter, AvroSchema::of); - pulsarContainerProperties.setSchema(messageSchema); - } - case PROTOBUF -> { - @SuppressWarnings("unchecked") - Schema messageSchema = getMessageSchema(messageParameter, - (c -> ProtobufSchema.of((Class) c))); - pulsarContainerProperties.setSchema(messageSchema); - } - case KEY_VALUE -> { - Schema messageSchema = getMessageKeyValueSchema(schemaResolver, messageParameter); - pulsarContainerProperties.setSchema(messageSchema); - } - } + SchemaType schemaType = pulsarContainerProperties.getSchemaType(); + ResolvableType messageType = resolvableType(messageParameter); + Schema schema = schemaResolver.getSchema(schemaType, messageType); + if (schema != null) { + pulsarContainerProperties.setSchema(schema); } - else { - if (messageParameter != null) { - - Schema messageSchema = null; - ResolvableType type = resolvableType(messageParameter); - if (KeyValue.class.isAssignableFrom(type.getRawClass())) { - messageSchema = getMessageKeyValueSchema(schemaResolver, messageParameter); - } - else { - messageSchema = getMessageSchema(messageParameter, - (messageClass) -> schemaResolver.getSchema(messageClass, false)); - } - if (messageSchema != null) { - pulsarContainerProperties.setSchema(messageSchema); - } - } + // Make sure the schemaType is updated to match the current schema + if (pulsarContainerProperties.getSchema() != null) { + SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType(); + pulsarContainerProperties.setSchemaType(type); } - SchemaType type = pulsarContainerProperties.getSchema().getSchemaInfo().getType(); - pulsarContainerProperties.setSchemaType(type); - container.setNegativeAckRedeliveryBackoff(this.negativeAckRedeliveryBackoff); container.setAckTimeoutRedeliveryBackoff(this.ackTimeoutRedeliveryBackoff); container.setDeadLetterPolicy(this.deadLetterPolicy); @@ -219,21 +161,6 @@ public class MethodPulsarListenerEndpoint extends AbstractPulsarListenerEndpo return messageListener; } - private Schema getMessageSchema(MethodParameter messageParameter, Function, Schema> schemaFactory) { - ResolvableType messageType = resolvableType(messageParameter); - Class messageClass = messageType.getRawClass(); - return schemaFactory.apply(messageClass); - } - - private Schema getMessageKeyValueSchema(SchemaResolver schemaResolver, MethodParameter messageParameter) { - ResolvableType messageType = resolvableType(messageParameter); - Class keyClass = messageType.resolveGeneric(0); - Class valueClass = messageType.resolveGeneric(1); - Schema> keySchema = schemaResolver.getSchema(keyClass); - Schema> valueSchema = schemaResolver.getSchema(valueClass); - return Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE); - } - private ResolvableType resolvableType(MethodParameter methodParameter) { ResolvableType resolvableType = ResolvableType.forMethodParameter(methodParameter); Class rawClass = resolvableType.getRawClass(); diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java index 13ff6d3a..77bb4464 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/DefaultSchemaResolver.java @@ -17,6 +17,8 @@ package org.springframework.pulsar.core; import java.nio.ByteBuffer; +import java.sql.Time; +import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -28,9 +30,18 @@ import java.util.Map; import java.util.Objects; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.ProtobufSchema; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; +import org.springframework.core.ResolvableType; import org.springframework.lang.Nullable; +import com.google.protobuf.GeneratedMessageV3; + /** * Default schema resolver capable of handling basic message types. * @@ -47,7 +58,12 @@ public class DefaultSchemaResolver implements SchemaResolver { private static final Map, Schema> BASE_SCHEMA_MAPPINGS = new HashMap<>(); static { BASE_SCHEMA_MAPPINGS.put(byte[].class, Schema.BYTES); + BASE_SCHEMA_MAPPINGS.put(ByteBuffer.class, Schema.BYTEBUFFER); + BASE_SCHEMA_MAPPINGS.put(ByteBuffer.allocate(0).getClass(), Schema.BYTEBUFFER); + BASE_SCHEMA_MAPPINGS.put(ByteBuffer.allocateDirect(0).getClass(), Schema.BYTEBUFFER); BASE_SCHEMA_MAPPINGS.put(String.class, Schema.STRING); + BASE_SCHEMA_MAPPINGS.put(Boolean.class, Schema.BOOL); + BASE_SCHEMA_MAPPINGS.put(boolean.class, Schema.BOOL); BASE_SCHEMA_MAPPINGS.put(Byte.class, Schema.INT8); BASE_SCHEMA_MAPPINGS.put(byte.class, Schema.INT8); BASE_SCHEMA_MAPPINGS.put(Short.class, Schema.INT16); @@ -56,16 +72,13 @@ public class DefaultSchemaResolver implements SchemaResolver { BASE_SCHEMA_MAPPINGS.put(int.class, Schema.INT32); BASE_SCHEMA_MAPPINGS.put(Long.class, Schema.INT64); BASE_SCHEMA_MAPPINGS.put(long.class, Schema.INT64); - BASE_SCHEMA_MAPPINGS.put(Boolean.class, Schema.BOOL); - BASE_SCHEMA_MAPPINGS.put(boolean.class, Schema.BOOL); - BASE_SCHEMA_MAPPINGS.put(ByteBuffer.class, Schema.BYTEBUFFER); - BASE_SCHEMA_MAPPINGS.put(ByteBuffer.allocate(0).getClass(), Schema.BYTEBUFFER); - BASE_SCHEMA_MAPPINGS.put(ByteBuffer.allocateDirect(0).getClass(), Schema.BYTEBUFFER); - BASE_SCHEMA_MAPPINGS.put(Date.class, Schema.DATE); - BASE_SCHEMA_MAPPINGS.put(Double.class, Schema.DOUBLE); - BASE_SCHEMA_MAPPINGS.put(double.class, Schema.DOUBLE); BASE_SCHEMA_MAPPINGS.put(Float.class, Schema.FLOAT); BASE_SCHEMA_MAPPINGS.put(float.class, Schema.FLOAT); + BASE_SCHEMA_MAPPINGS.put(Double.class, Schema.DOUBLE); + BASE_SCHEMA_MAPPINGS.put(double.class, Schema.DOUBLE); + BASE_SCHEMA_MAPPINGS.put(Date.class, Schema.DATE); + BASE_SCHEMA_MAPPINGS.put(Time.class, Schema.TIME); + BASE_SCHEMA_MAPPINGS.put(Timestamp.class, Schema.TIMESTAMP); BASE_SCHEMA_MAPPINGS.put(Instant.class, Schema.INSTANT); BASE_SCHEMA_MAPPINGS.put(LocalDate.class, Schema.LOCAL_DATE); BASE_SCHEMA_MAPPINGS.put(LocalDateTime.class, Schema.LOCAL_DATE_TIME); @@ -103,7 +116,64 @@ public class DefaultSchemaResolver implements SchemaResolver { return this.customSchemaMappings.getOrDefault(messageClass, (returnDefault ? Schema.BYTES : null)); } - @SuppressWarnings({ "unchecked" }) + @Override + @SuppressWarnings("unchecked") + public Schema getSchema(SchemaType schemaType, @Nullable ResolvableType messageType) { + Schema schema = switch (schemaType) { + case STRING -> Schema.STRING; + case BOOLEAN -> Schema.BOOL; + case INT8 -> Schema.INT8; + case INT16 -> Schema.INT16; + case INT32 -> Schema.INT32; + case INT64 -> Schema.INT64; + case FLOAT -> Schema.FLOAT; + case DOUBLE -> Schema.DOUBLE; + case DATE -> Schema.DATE; + case TIME -> Schema.TIME; + case TIMESTAMP -> Schema.TIMESTAMP; + case BYTES -> Schema.BYTES; + case INSTANT -> Schema.INSTANT; + case LOCAL_DATE -> Schema.LOCAL_DATE; + case LOCAL_TIME -> Schema.LOCAL_TIME; + case LOCAL_DATE_TIME -> Schema.LOCAL_DATE_TIME; + case JSON -> JSONSchema.of(requireNonNullMessageType(schemaType, messageType)); + case AVRO -> AvroSchema.of(requireNonNullMessageType(schemaType, messageType)); + case PROTOBUF -> { + Class messageClass = requireNonNullMessageType(schemaType, messageType); + yield ProtobufSchema.of((Class) messageClass); + } + case KEY_VALUE -> { + requireNonNullMessageType(schemaType, messageType); + yield getMessageKeyValueSchema(messageType); + } + case NONE -> { + if (messageType == null) { + yield Schema.BYTES; + } + if (KeyValue.class.isAssignableFrom(messageType.getRawClass())) { + yield getMessageKeyValueSchema(messageType); + } + yield getSchema(messageType.getRawClass(), false); + } + default -> throw new IllegalArgumentException("Unsupported schema type: " + schemaType.name()); + }; + return schema != null ? castToType(schema) : null; + } + + private Class requireNonNullMessageType(SchemaType schemaType, ResolvableType messageType) { + return Objects.requireNonNull(messageType, "messageType must be specified for " + schemaType.name()) + .getRawClass(); + } + + private Schema getMessageKeyValueSchema(ResolvableType messageType) { + Class keyClass = messageType.resolveGeneric(0); + Class valueClass = messageType.resolveGeneric(1); + Schema> keySchema = this.getSchema(keyClass); + Schema> valueSchema = this.getSchema(valueClass); + return Schema.KeyValue(keySchema, valueSchema, KeyValueEncodingType.INLINE); + } + + @SuppressWarnings("unchecked") private Schema castToType(Schema rawSchema) { return (Schema) rawSchema; } diff --git a/spring-pulsar/src/main/java/org/springframework/pulsar/core/SchemaResolver.java b/spring-pulsar/src/main/java/org/springframework/pulsar/core/SchemaResolver.java index 6ba6fc0c..f8175e1c 100644 --- a/spring-pulsar/src/main/java/org/springframework/pulsar/core/SchemaResolver.java +++ b/spring-pulsar/src/main/java/org/springframework/pulsar/core/SchemaResolver.java @@ -17,7 +17,9 @@ package org.springframework.pulsar.core; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.schema.SchemaType; +import org.springframework.core.ResolvableType; import org.springframework.lang.Nullable; /** @@ -61,4 +63,14 @@ public interface SchemaResolver { @Nullable Schema getSchema(Class messageType, boolean returnDefault); + /** + * Get the schema to use given a schema type and a message type. + * @param the schema type + * @param schemaType the schema type + * @param messageType the message type + * @return the schema to use + */ + @Nullable + Schema getSchema(SchemaType schemaType, @Nullable ResolvableType messageType); + } diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java index 2121c8ab..faf1497e 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/core/DefaultSchemaResolverTests.java @@ -17,9 +17,13 @@ package org.springframework.pulsar.core; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatExceptionOfType; import static org.junit.jupiter.params.provider.Arguments.arguments; +import static org.mockito.Mockito.mock; import java.nio.ByteBuffer; +import java.sql.Time; +import java.sql.Timestamp; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -31,11 +35,25 @@ import java.util.Map; import java.util.stream.Stream; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.KeyValueSchema; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.JSONSchema; +import org.apache.pulsar.client.impl.schema.ProtobufSchema; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.KeyValueEncodingType; +import org.apache.pulsar.common.schema.SchemaType; +import org.assertj.core.api.InstanceOfAssertFactories; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.core.ResolvableType; +import org.springframework.pulsar.listener.Proto; +import org.springframework.pulsar.listener.Proto.Person; + /** * Unit tests for {@link DefaultSchemaResolver}. * @@ -43,112 +61,254 @@ import org.junit.jupiter.params.provider.MethodSource; */ class DefaultSchemaResolverTests { - @ParameterizedTest - @MethodSource("schemaByMessageInstanceProvider") - void getSchemaByMessageInstance(T message, Schema expectedSchema) { - DefaultSchemaResolver resolver = new DefaultSchemaResolver(); - assertThat(resolver.getSchema(message)).isEqualTo(expectedSchema); + private DefaultSchemaResolver resolver = new DefaultSchemaResolver(); + + @Nested + class SchemaByMessageInstance { + + @ParameterizedTest + @MethodSource("primitiveTypeMessagesProvider") + void primitiveTypeMessages(T message, Schema expectedSchema) { + assertThat(resolver.getSchema(message)).isEqualTo(expectedSchema); + } + + static Stream primitiveTypeMessagesProvider() { + // @formatter:off + return Stream.of( + arguments("foo".getBytes(), Schema.BYTES), + arguments(ByteBuffer.wrap("foo".getBytes()), Schema.BYTEBUFFER), + arguments(ByteBuffer.allocateDirect(10), Schema.BYTEBUFFER), + arguments("foo", Schema.STRING), + arguments(Boolean.TRUE, Schema.BOOL), + arguments(Byte.valueOf("0"), Schema.INT8), + arguments((byte) 0, Schema.INT8), + arguments(Short.valueOf("0"), Schema.INT16), + arguments((short) 0, Schema.INT16), + arguments(Integer.valueOf("0"), Schema.INT32), + arguments(0, Schema.INT32), + arguments(Long.valueOf("0"), Schema.INT64), + arguments(0L, Schema.INT64), + arguments(Float.valueOf("2.4"), Schema.FLOAT), + arguments(2.5f, Schema.FLOAT), + arguments(Double.valueOf("2.2"), Schema.DOUBLE), + arguments(2.3d, Schema.DOUBLE), + arguments(new Date(), Schema.DATE), + arguments(new Time(System.currentTimeMillis()), Schema.TIME), + arguments(new Timestamp(System.currentTimeMillis()), Schema.TIMESTAMP), + arguments(Instant.now(), Schema.INSTANT), + arguments(LocalDate.now(), Schema.LOCAL_DATE), + arguments(LocalDateTime.now(), Schema.LOCAL_DATE_TIME), + arguments(LocalTime.NOON, Schema.LOCAL_TIME) + ); + // @formatter:on + } + + @Test + void customTypeMessages() { + Schema fooSchema = Schema.AVRO(Foo.class); + Map, Schema> customTypes = new HashMap<>(); + customTypes.put(Foo.class, fooSchema); + customTypes.put(Bar.class, Schema.STRING); + DefaultSchemaResolver resolver = new DefaultSchemaResolver(customTypes); + assertThat(resolver.getSchema(new Foo("foo1"))).isSameAs(fooSchema); + assertThat(resolver.getSchema(new Bar<>("bar1"))).isEqualTo(Schema.STRING); + assertThat(resolver.getSchema(new Zaa("zaa1"))).isEqualTo(Schema.BYTES); // default + } + } - static Stream schemaByMessageInstanceProvider() { - // @formatter:off - return Stream.of( - arguments("foo", Schema.STRING), - arguments("foo".getBytes(), Schema.BYTES), - arguments(Byte.valueOf("0"), Schema.INT8), - arguments((byte) 0, Schema.INT8), - arguments(Short.valueOf("0"), Schema.INT16), - arguments((short) 0, Schema.INT16), - arguments(Integer.valueOf("0"), Schema.INT32), - arguments(0, Schema.INT32), - arguments(Long.valueOf("0"), Schema.INT64), - arguments(0L, Schema.INT64), - arguments(Boolean.TRUE, Schema.BOOL), - arguments(ByteBuffer.wrap("foo".getBytes()), Schema.BYTEBUFFER), - arguments(ByteBuffer.allocateDirect(10), Schema.BYTEBUFFER), - arguments(new Date(), Schema.DATE), - arguments(Double.valueOf("2.2"), Schema.DOUBLE), - arguments(2.3d, Schema.DOUBLE), - arguments(Float.valueOf("2.4"), Schema.FLOAT), - arguments(2.5f, Schema.FLOAT), - arguments(Instant.now(), Schema.INSTANT), - arguments(LocalDate.now(), Schema.LOCAL_DATE), - arguments(LocalDateTime.now(), Schema.LOCAL_DATE_TIME), - arguments(LocalTime.NOON, Schema.LOCAL_TIME) - ); - // @formatter:on + @Nested + class SchemaByMessageType { + + @ParameterizedTest + @MethodSource("primitiveMessageTypesProvider") + void primitiveMessageTypes(Class messageType, Schema expectedSchema) { + assertThat(resolver.getSchema(messageType)).isEqualTo(expectedSchema); + } + + static Stream primitiveMessageTypesProvider() { + // @formatter:off + return Stream.of( + arguments(byte[].class, Schema.BYTES), + arguments(ByteBuffer.class, Schema.BYTEBUFFER), + arguments(ByteBuffer.wrap("foo".getBytes()).getClass(), Schema.BYTEBUFFER), + arguments(ByteBuffer.allocateDirect(10).getClass(), Schema.BYTEBUFFER), + arguments(String.class, Schema.STRING), + arguments(Boolean.class, Schema.BOOL), + arguments(boolean.class, Schema.BOOL), + arguments(Byte.class, Schema.INT8), + arguments(byte.class, Schema.INT8), + arguments(Short.class, Schema.INT16), + arguments(short.class, Schema.INT16), + arguments(Integer.class, Schema.INT32), + arguments(int.class, Schema.INT32), + arguments(Long.class, Schema.INT64), + arguments(long.class, Schema.INT64), + arguments(Date.class, Schema.DATE), + arguments(Time.class, Schema.TIME), + arguments(Timestamp.class, Schema.TIMESTAMP), + arguments(Float.class, Schema.FLOAT), + arguments(float.class, Schema.FLOAT), + arguments(Double.class, Schema.DOUBLE), + arguments(double.class, Schema.DOUBLE), + arguments(Instant.class, Schema.INSTANT), + arguments(LocalDate.class, Schema.LOCAL_DATE), + arguments(LocalDateTime.class, Schema.LOCAL_DATE_TIME), + arguments(LocalTime.class, Schema.LOCAL_TIME) + ); + // @formatter:on + } + + @Test + void customMessageTypes() { + assertThat(resolver.getSchema(Foo.class, false)).isNull(); + assertThat(resolver.getSchema(Foo.class, true)).isEqualTo(Schema.BYTES); + resolver = new DefaultSchemaResolver(Collections.singletonMap(Foo.class, Schema.STRING)); + assertThat(resolver.getSchema(Foo.class, false)).isEqualTo(Schema.STRING); + assertThat(resolver.getSchema(Bar.class, false)).isNull(); + assertThat(resolver.getSchema(Bar.class, true)).isEqualTo(Schema.BYTES); + } + } - @Test - void getSchemaByMessageInstanceCustomTypes() { - Schema fooSchema = Schema.AVRO(Foo.class); - Map, Schema> customTypes = new HashMap<>(); - customTypes.put(Foo.class, fooSchema); - customTypes.put(Bar.class, Schema.STRING); - DefaultSchemaResolver resolver = new DefaultSchemaResolver(customTypes); - assertThat(resolver.getSchema(new Foo("foo1"))).isSameAs(fooSchema); - assertThat(resolver.getSchema(new Bar<>("bar1"))).isEqualTo(Schema.STRING); - assertThat(resolver.getSchema(new Zaa("zaa1"))).isEqualTo(Schema.BYTES); // default - } + @Nested + class SchemaBySchemaTypeAndMessageType { - @ParameterizedTest - @MethodSource("schemaByMessageTypeProvider") - void getSchemaByMessageType(Class messageType, Schema expectedSchema) { - DefaultSchemaResolver resolver = new DefaultSchemaResolver(); - assertThat(resolver.getSchema(messageType)).isEqualTo(expectedSchema); - } + @ParameterizedTest + @MethodSource("primitiveSchemasProvider") + void primitiveSchemas(SchemaType schemaType, Schema expectedSchema) { + assertThat(resolver.getSchema(schemaType, null)).isEqualTo(expectedSchema); + } - static Stream schemaByMessageTypeProvider() { - // @formatter:off - return Stream.of( - arguments(String.class, Schema.STRING), - arguments(byte[].class, Schema.BYTES), - arguments(Byte.class, Schema.INT8), - arguments(byte.class, Schema.INT8), - arguments(Short.class, Schema.INT16), - arguments(short.class, Schema.INT16), - arguments(Integer.class, Schema.INT32), - arguments(int.class, Schema.INT32), - arguments(Long.class, Schema.INT64), - arguments(long.class, Schema.INT64), - arguments(Boolean.class, Schema.BOOL), - arguments(boolean.class, Schema.BOOL), - arguments(ByteBuffer.class, Schema.BYTEBUFFER), - arguments(ByteBuffer.wrap("foo".getBytes()).getClass(), Schema.BYTEBUFFER), - arguments(ByteBuffer.allocateDirect(10).getClass(), Schema.BYTEBUFFER), - arguments(Date.class, Schema.DATE), - arguments(Double.class, Schema.DOUBLE), - arguments(double.class, Schema.DOUBLE), - arguments(Float.class, Schema.FLOAT), - arguments(float.class, Schema.FLOAT), - arguments(Instant.class, Schema.INSTANT), - arguments(LocalDate.class, Schema.LOCAL_DATE), - arguments(LocalDateTime.class, Schema.LOCAL_DATE_TIME), - arguments(LocalTime.class, Schema.LOCAL_TIME) - ); - // @formatter:on - } + static Stream primitiveSchemasProvider() { + // @formatter:off + return Stream.of( + arguments(SchemaType.STRING, Schema.STRING), + arguments(SchemaType.BOOLEAN, Schema.BOOL), + arguments(SchemaType.INT8, Schema.INT8), + arguments(SchemaType.INT16, Schema.INT16), + arguments(SchemaType.INT32, Schema.INT32), + arguments(SchemaType.INT64, Schema.INT64), + arguments(SchemaType.FLOAT, Schema.FLOAT), + arguments(SchemaType.DOUBLE, Schema.DOUBLE), + arguments(SchemaType.DATE, Schema.DATE), + arguments(SchemaType.TIME, Schema.TIME), + arguments(SchemaType.TIMESTAMP, Schema.TIMESTAMP), + arguments(SchemaType.BYTES, Schema.BYTES), + arguments(SchemaType.INSTANT, Schema.INSTANT), + arguments(SchemaType.LOCAL_DATE, Schema.LOCAL_DATE), + arguments(SchemaType.LOCAL_TIME, Schema.LOCAL_TIME), + arguments(SchemaType.LOCAL_DATE_TIME, Schema.LOCAL_DATE_TIME) + ); + // @formatter:on + } - @Test - void getSchemaByMessageTypeCustomTypes() { - Schema fooSchema = Schema.AVRO(Foo.class); - Map, Schema> customTypes = new HashMap<>(); - customTypes.put(Foo.class, fooSchema); - customTypes.put(Bar.class, Schema.STRING); - DefaultSchemaResolver resolver = new DefaultSchemaResolver(customTypes); - assertThat(resolver.getSchema(Foo.class)).isSameAs(fooSchema); - assertThat(resolver.getSchema(Bar.class)).isEqualTo(Schema.STRING); - assertThat(resolver.getSchema(Zaa.class)).isEqualTo(Schema.BYTES); // default - } + @Test + void structSchemas() { + assertThat(resolver.getSchema(SchemaType.JSON, ResolvableType.forType(Foo.class))) + .isInstanceOf(JSONSchema.class) + .hasFieldOrPropertyWithValue("schema.fullName", sanitizedClassName(Foo.class)); + assertThat(resolver.getSchema(SchemaType.AVRO, ResolvableType.forType(Foo.class))) + .isInstanceOf(AvroSchema.class) + .hasFieldOrPropertyWithValue("schema.fullName", sanitizedClassName(Foo.class)); + assertThat(resolver.getSchema(SchemaType.PROTOBUF, ResolvableType.forType(Person.class))) + .isInstanceOf(ProtobufSchema.class) + .hasFieldOrPropertyWithValue("schema.fullName", sanitizedClassName(Proto.Person.class)); + ResolvableType kvType = ResolvableType.forClassWithGenerics(KeyValue.class, String.class, Integer.class); + assertThat(resolver.getSchema(SchemaType.KEY_VALUE, kvType)) + .asInstanceOf(InstanceOfAssertFactories.type(KeyValueSchema.class)).satisfies((keyValueSchema -> { + assertThat(keyValueSchema.getKeySchema()).isEqualTo(Schema.STRING); + assertThat(keyValueSchema.getValueSchema()).isEqualTo(Schema.INT32); + assertThat(keyValueSchema.getKeyValueEncodingType()).isEqualTo(KeyValueEncodingType.INLINE); + })); + } - @Test - void getSchemaByMessageTypeWithoutDefaults() { - DefaultSchemaResolver resolver = new DefaultSchemaResolver(); - assertThat(resolver.getSchema(Foo.class, false)).isNull(); + @ParameterizedTest + @EnumSource(value = SchemaType.class, names = { "JSON", "AVRO", "PROTOBUF", "KEY_VALUE" }) + void structSchemasRequireMessageType(SchemaType schemaType) { + assertThatExceptionOfType(NullPointerException.class).isThrownBy(() -> resolver.getSchema(schemaType, null)) + .withMessage("messageType must be specified for " + schemaType.name()); + } + + @ParameterizedTest + @EnumSource(value = SchemaType.class, names = { "PROTOBUF_NATIVE", "AUTO", "AUTO_CONSUME", "AUTO_PUBLISH" }) + void unsupportedSchemaTypes(SchemaType unsupportedType) { + assertThatExceptionOfType(IllegalArgumentException.class) + .isThrownBy(() -> resolver.getSchema(unsupportedType, null)) + .withMessage("Unsupported schema type: " + unsupportedType.name()); + } + + private String sanitizedClassName(Class clazz) { + return clazz.getName().replace("$", "."); + } + + @Nested + class SchemaTypeNone { + + @Test + void nullMessageType() { + assertThat(resolver.getSchema(SchemaType.NONE, null)).isEqualTo(Schema.BYTES); + } + + @Test + void primitiveMessageType() { + assertThat(resolver.getSchema(SchemaType.NONE, ResolvableType.forType(String.class))) + .isEqualTo(Schema.STRING); + } + + @Test + void customMessageType() { + assertThat(resolver.getSchema(SchemaType.NONE, ResolvableType.forType(Foo.class))).isNull(); + resolver = new DefaultSchemaResolver(Collections.singletonMap(Foo.class, Schema.STRING)); + assertThat(resolver.getSchema(SchemaType.NONE, ResolvableType.forType(Foo.class))) + .isEqualTo(Schema.STRING); + } + + @Test + void primitiveKeyValueMessageType() { + ResolvableType kvType = ResolvableType.forClassWithGenerics(KeyValue.class, String.class, + Integer.class); + assertThat(resolver.getSchema(SchemaType.NONE, kvType)) + .asInstanceOf(InstanceOfAssertFactories.type(KeyValueSchema.class)) + .satisfies((keyValueSchema -> { + assertThat(keyValueSchema.getKeySchema()).isEqualTo(Schema.STRING); + assertThat(keyValueSchema.getValueSchema()).isEqualTo(Schema.INT32); + assertThat(keyValueSchema.getKeyValueEncodingType()).isEqualTo(KeyValueEncodingType.INLINE); + })); + } + + @Test + void customKeyValueMessageTypeDefaultsToBytesSchema() { + ResolvableType kvType = ResolvableType.forClassWithGenerics(KeyValue.class, Foo.class, Bar.class); + assertThat(resolver.getSchema(SchemaType.NONE, kvType)) + .asInstanceOf(InstanceOfAssertFactories.type(KeyValueSchema.class)) + .satisfies((keyValueSchema -> { + assertThat(keyValueSchema.getKeySchema()).isEqualTo(Schema.BYTES); + assertThat(keyValueSchema.getValueSchema()).isEqualTo(Schema.BYTES); + assertThat(keyValueSchema.getKeyValueEncodingType()).isEqualTo(KeyValueEncodingType.INLINE); + })); + } + + @Test + void customKeyValueMessageTypeWithCustomTypeMappings() { + Schema fooSchema = mock(Schema.class); + Schema barSchema = mock(Schema.class); + Map, Schema> customTypes = new HashMap<>(); + customTypes.put(Foo.class, fooSchema); + customTypes.put(Bar.class, barSchema); + resolver = new DefaultSchemaResolver(customTypes); + ResolvableType kvType = ResolvableType.forClassWithGenerics(KeyValue.class, Foo.class, Bar.class); + assertThat(resolver.getSchema(SchemaType.NONE, kvType)) + .asInstanceOf(InstanceOfAssertFactories.type(KeyValueSchema.class)) + .satisfies((keyValueSchema -> { + assertThat(keyValueSchema.getKeySchema()).isSameAs(fooSchema); + assertThat(keyValueSchema.getValueSchema()).isSameAs(barSchema); + assertThat(keyValueSchema.getKeyValueEncodingType()).isEqualTo(KeyValueEncodingType.INLINE); + })); + } + + } - resolver = new DefaultSchemaResolver(Collections.singletonMap(Foo.class, Schema.STRING)); - assertThat(resolver.getSchema(Foo.class, false)).isEqualTo(Schema.STRING); - assertThat(resolver.getSchema(Bar.class, false)).isNull(); } record Foo(String value) { diff --git a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java index 8243a5cf..84406947 100644 --- a/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java +++ b/spring-pulsar/src/test/java/org/springframework/pulsar/listener/PulsarListenerTests.java @@ -86,10 +86,6 @@ import org.springframework.util.backoff.FixedBackOff; @DirtiesContext public class PulsarListenerTests implements PulsarTestContainerSupport { - static CountDownLatch latch = new CountDownLatch(1); - static CountDownLatch latch1 = new CountDownLatch(3); - static CountDownLatch latch2 = new CountDownLatch(3); - @Autowired PulsarTemplate pulsarTemplate; @@ -153,8 +149,12 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { @ContextConfiguration(classes = PulsarListenerBasicTestCases.TestPulsarListenersForBasicScenario.class) class PulsarListenerBasicTestCases { + static CountDownLatch latch = new CountDownLatch(1); + static CountDownLatch latch1 = new CountDownLatch(3); + static CountDownLatch latch2 = new CountDownLatch(3); + @Test - void testPulsarListenerWithTopicsPattern(@Autowired PulsarListenerEndpointRegistry registry) throws Exception { + void pulsarListenerWithTopicsPattern(@Autowired PulsarListenerEndpointRegistry registry) throws Exception { PulsarMessageListenerContainer baz = registry.getListenerContainer("baz"); PulsarContainerProperties containerProperties = baz.getContainerProperties(); assertThat(containerProperties.getTopicsPattern()).isEqualTo("persistent://public/default/pattern.*"); @@ -167,7 +167,7 @@ public class PulsarListenerTests implements PulsarTestContainerSupport { } @Test - void testPulsarListenerProvidedConsumerProperties(@Autowired PulsarListenerEndpointRegistry registry) + void pulsarListenerProvidedConsumerProperties(@Autowired PulsarListenerEndpointRegistry registry) throws Exception { PulsarContainerProperties pulsarContainerProperties = registry.getListenerContainer("foo")