From 19cc0ab0704c5ade4f2aeee183fa002a45dacc9c Mon Sep 17 00:00:00 2001 From: Chris Bono Date: Tue, 21 Mar 2023 20:19:01 -0500 Subject: [PATCH] Simplify defaults type mappings (#377) * Remove redundant SchemaInfo.messageType as the key of the type mappings tells us that info --- .../schema-info/custom-schema-mapping.adoc | 2 -- .../PulsarAutoConfiguration.java | 4 +-- .../autoconfigure/PulsarProperties.java | 7 +++--- .../PulsarAutoConfigurationTests.java | 6 +---- .../autoconfigure/PulsarPropertiesTests.java | 25 ++++++++++++------- .../binder/PulsarBinderIntegrationTests.java | 12 +++------ 6 files changed, 26 insertions(+), 30 deletions(-) diff --git a/spring-pulsar-docs/src/main/asciidoc/schema-info/custom-schema-mapping.adoc b/spring-pulsar-docs/src/main/asciidoc/schema-info/custom-schema-mapping.adoc index 85233ff1..7fee39b9 100644 --- a/spring-pulsar-docs/src/main/asciidoc/schema-info/custom-schema-mapping.adoc +++ b/spring-pulsar-docs/src/main/asciidoc/schema-info/custom-schema-mapping.adoc @@ -10,11 +10,9 @@ spring: - message-type: com.acme.User schema-info: schema-type: AVRO - message-type: com.acme.User - message-type: com.acme.Address schema-info: schema-type: JSON - message-type: com.acme.Address ---- NOTE: The `message-type` is the fully-qualified name of the message class. diff --git a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java index adb87fdc..2bf071ba 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfiguration.java @@ -117,8 +117,8 @@ public class PulsarAutoConfiguration { if (pulsarProperties.getDefaults().getTypeMappings() != null) { pulsarProperties.getDefaults().getTypeMappings().stream().filter((tm) -> tm.schemaInfo() != null) .forEach((tm) -> { - var schema = schemaResolver.resolveSchema(tm.schemaInfo().schemaType(), - tm.schemaInfo().messageType(), tm.schemaInfo().messageKeyType()).orElseThrow(); + var schema = schemaResolver.resolveSchema(tm.schemaInfo().schemaType(), tm.messageType(), + tm.schemaInfo().messageKeyType()).orElseThrow(); schemaResolver.addCustomSchemaMapping(tm.messageType(), schema); }); } diff --git a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java index 018aab35..914a8a61 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/main/java/org/springframework/pulsar/autoconfigure/PulsarProperties.java @@ -1365,16 +1365,17 @@ public class PulsarProperties { * Represents a schema - holds enough information to construct an actual schema * instance. * @param schemaType schema type - * @param messageType message type (not required for primitive schema types or key - * value type) * @param messageKeyType message key type (required for key value type) */ - public record SchemaInfo(SchemaType schemaType, @Nullable Class messageType, @Nullable Class messageKeyType) { + public record SchemaInfo(SchemaType schemaType, @Nullable Class messageKeyType) { public SchemaInfo { Objects.requireNonNull(schemaType, "schemaType must not be null"); if (schemaType == SchemaType.NONE) { throw new IllegalArgumentException("schemaType NONE not supported"); } + if (schemaType != SchemaType.KEY_VALUE && messageKeyType != null) { + throw new IllegalArgumentException("messageKeyType can only be set when schemaType is KEY_VALUE"); + } } } diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java index 74529576..3e998edc 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarAutoConfigurationTests.java @@ -321,9 +321,7 @@ class PulsarAutoConfigurationTests { contextRunner .withPropertyValues( "spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(Foo.class.getName()), - "spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=JSON", - "spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s" - .formatted(Foo.class.getName())) + "spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=JSON") .run((context -> assertThat(context).hasNotFailed().getBean(SchemaResolver.class) .asInstanceOf(InstanceOfAssertFactories.type(DefaultSchemaResolver.class)) .extracting(DefaultSchemaResolver::getCustomSchemaMappings, @@ -339,8 +337,6 @@ class PulsarAutoConfigurationTests { "spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(Foo.class.getName()), "spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=%s" .formatted(SchemaType.KEY_VALUE.name()), - "spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s" - .formatted(Foo.class.getName()), "spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type=%s" .formatted(String.class.getName())) .run((context -> assertThat(context).hasNotFailed().getBean(SchemaResolver.class) diff --git a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java index 9ed6e0ff..61242166 100644 --- a/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java +++ b/spring-pulsar-spring-boot-autoconfigure/src/test/java/org/springframework/pulsar/autoconfigure/PulsarPropertiesTests.java @@ -329,10 +329,9 @@ public class PulsarPropertiesTests { Map props = new HashMap<>(); props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON"); - props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName()); bind(props); - assertThat(properties.getDefaults().getTypeMappings()).containsExactly( - new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.JSON, Foo.class, null))); + assertThat(properties.getDefaults().getTypeMappings()) + .containsExactly(new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.JSON, null))); } @Test @@ -341,10 +340,9 @@ public class PulsarPropertiesTests { props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); props.put("spring.pulsar.defaults.type-mappings[0].topic-name", "foo-topic"); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON"); - props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName()); bind(props); - assertThat(properties.getDefaults().getTypeMappings()).containsExactly( - new TypeMapping(Foo.class, "foo-topic", new SchemaInfo(SchemaType.JSON, Foo.class, null))); + assertThat(properties.getDefaults().getTypeMappings()) + .containsExactly(new TypeMapping(Foo.class, "foo-topic", new SchemaInfo(SchemaType.JSON, null))); } @Test @@ -352,18 +350,17 @@ public class PulsarPropertiesTests { Map props = new HashMap<>(); props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "KEY_VALUE"); - props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName()); props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName()); bind(props); assertThat(properties.getDefaults().getTypeMappings()).containsExactly( - new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.KEY_VALUE, Foo.class, String.class))); + new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.KEY_VALUE, String.class))); } @Test void schemaTypeRequired() { Map props = new HashMap<>(); props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); - props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName()); + props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName()); assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause() .withMessageContaining("schemaType must not be null"); } @@ -377,6 +374,16 @@ public class PulsarPropertiesTests { .withMessageContaining("schemaType NONE not supported"); } + @Test + void messageKeyTypeOnlyAllowedForKeyValueSchemaType() { + Map props = new HashMap<>(); + props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName()); + props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON"); + props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName()); + assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause() + .withMessageContaining("messageKeyType can only be set when schemaType is KEY_VALUE"); + } + record Foo(String value) { } diff --git a/spring-pulsar-spring-cloud-stream-binder/src/test/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarBinderIntegrationTests.java b/spring-pulsar-spring-cloud-stream-binder/src/test/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarBinderIntegrationTests.java index 7bfbf025..a4242985 100644 --- a/spring-pulsar-spring-cloud-stream-binder/src/test/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarBinderIntegrationTests.java +++ b/spring-pulsar-spring-cloud-stream-binder/src/test/java/org/springframework/pulsar/spring/cloud/stream/binder/PulsarBinderIntegrationTests.java @@ -258,9 +258,7 @@ class PulsarBinderIntegrationTests implements PulsarTestContainerSupport { + User.class.getName(), "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-user-sub2", "--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()), - "--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO", - "--spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s" - .formatted(User.class.getName()))) { + "--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: User{name='user21', age=21}")); } @@ -312,9 +310,7 @@ class PulsarBinderIntegrationTests implements PulsarTestContainerSupport { + String.class.getName(), "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-kv-sub1", "--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()), - "--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO", - "--spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s" - .formatted(User.class.getName()))) { + "--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: 21->User{name='user21', age=21}")); } @@ -342,9 +338,7 @@ class PulsarBinderIntegrationTests implements PulsarTestContainerSupport { + String.class.getName(), "--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-kv-sub2", "--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()), - "--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO", - "--spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s" - .formatted(User.class.getName()))) { + "--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) { Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION)) .until(() -> output.toString().contains("Hello binder: 21->User{name='user21', age=21}")); }