Simplify defaults type mappings (#377)
* Remove redundant SchemaInfo.messageType as the key of the type mappings tells us that info
This commit is contained in:
@@ -10,11 +10,9 @@ spring:
|
||||
- message-type: com.acme.User
|
||||
schema-info:
|
||||
schema-type: AVRO
|
||||
message-type: com.acme.User
|
||||
- message-type: com.acme.Address
|
||||
schema-info:
|
||||
schema-type: JSON
|
||||
message-type: com.acme.Address
|
||||
----
|
||||
|
||||
NOTE: The `message-type` is the fully-qualified name of the message class.
|
||||
|
||||
@@ -117,8 +117,8 @@ public class PulsarAutoConfiguration {
|
||||
if (pulsarProperties.getDefaults().getTypeMappings() != null) {
|
||||
pulsarProperties.getDefaults().getTypeMappings().stream().filter((tm) -> tm.schemaInfo() != null)
|
||||
.forEach((tm) -> {
|
||||
var schema = schemaResolver.resolveSchema(tm.schemaInfo().schemaType(),
|
||||
tm.schemaInfo().messageType(), tm.schemaInfo().messageKeyType()).orElseThrow();
|
||||
var schema = schemaResolver.resolveSchema(tm.schemaInfo().schemaType(), tm.messageType(),
|
||||
tm.schemaInfo().messageKeyType()).orElseThrow();
|
||||
schemaResolver.addCustomSchemaMapping(tm.messageType(), schema);
|
||||
});
|
||||
}
|
||||
|
||||
@@ -1365,16 +1365,17 @@ public class PulsarProperties {
|
||||
* Represents a schema - holds enough information to construct an actual schema
|
||||
* instance.
|
||||
* @param schemaType schema type
|
||||
* @param messageType message type (not required for primitive schema types or key
|
||||
* value type)
|
||||
* @param messageKeyType message key type (required for key value type)
|
||||
*/
|
||||
public record SchemaInfo(SchemaType schemaType, @Nullable Class<?> messageType, @Nullable Class<?> messageKeyType) {
|
||||
public record SchemaInfo(SchemaType schemaType, @Nullable Class<?> messageKeyType) {
|
||||
public SchemaInfo {
|
||||
Objects.requireNonNull(schemaType, "schemaType must not be null");
|
||||
if (schemaType == SchemaType.NONE) {
|
||||
throw new IllegalArgumentException("schemaType NONE not supported");
|
||||
}
|
||||
if (schemaType != SchemaType.KEY_VALUE && messageKeyType != null) {
|
||||
throw new IllegalArgumentException("messageKeyType can only be set when schemaType is KEY_VALUE");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -321,9 +321,7 @@ class PulsarAutoConfigurationTests {
|
||||
contextRunner
|
||||
.withPropertyValues(
|
||||
"spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(Foo.class.getName()),
|
||||
"spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=JSON",
|
||||
"spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s"
|
||||
.formatted(Foo.class.getName()))
|
||||
"spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=JSON")
|
||||
.run((context -> assertThat(context).hasNotFailed().getBean(SchemaResolver.class)
|
||||
.asInstanceOf(InstanceOfAssertFactories.type(DefaultSchemaResolver.class))
|
||||
.extracting(DefaultSchemaResolver::getCustomSchemaMappings,
|
||||
@@ -339,8 +337,6 @@ class PulsarAutoConfigurationTests {
|
||||
"spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(Foo.class.getName()),
|
||||
"spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=%s"
|
||||
.formatted(SchemaType.KEY_VALUE.name()),
|
||||
"spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s"
|
||||
.formatted(Foo.class.getName()),
|
||||
"spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type=%s"
|
||||
.formatted(String.class.getName()))
|
||||
.run((context -> assertThat(context).hasNotFailed().getBean(SchemaResolver.class)
|
||||
|
||||
@@ -329,10 +329,9 @@ public class PulsarPropertiesTests {
|
||||
Map<String, String> props = new HashMap<>();
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON");
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName());
|
||||
bind(props);
|
||||
assertThat(properties.getDefaults().getTypeMappings()).containsExactly(
|
||||
new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.JSON, Foo.class, null)));
|
||||
assertThat(properties.getDefaults().getTypeMappings())
|
||||
.containsExactly(new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.JSON, null)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -341,10 +340,9 @@ public class PulsarPropertiesTests {
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].topic-name", "foo-topic");
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON");
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName());
|
||||
bind(props);
|
||||
assertThat(properties.getDefaults().getTypeMappings()).containsExactly(
|
||||
new TypeMapping(Foo.class, "foo-topic", new SchemaInfo(SchemaType.JSON, Foo.class, null)));
|
||||
assertThat(properties.getDefaults().getTypeMappings())
|
||||
.containsExactly(new TypeMapping(Foo.class, "foo-topic", new SchemaInfo(SchemaType.JSON, null)));
|
||||
}
|
||||
|
||||
@Test
|
||||
@@ -352,18 +350,17 @@ public class PulsarPropertiesTests {
|
||||
Map<String, String> props = new HashMap<>();
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "KEY_VALUE");
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName());
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName());
|
||||
bind(props);
|
||||
assertThat(properties.getDefaults().getTypeMappings()).containsExactly(
|
||||
new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.KEY_VALUE, Foo.class, String.class)));
|
||||
new TypeMapping(Foo.class, null, new SchemaInfo(SchemaType.KEY_VALUE, String.class)));
|
||||
}
|
||||
|
||||
@Test
|
||||
void schemaTypeRequired() {
|
||||
Map<String, String> props = new HashMap<>();
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-type", Foo.class.getName());
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName());
|
||||
assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause()
|
||||
.withMessageContaining("schemaType must not be null");
|
||||
}
|
||||
@@ -377,6 +374,16 @@ public class PulsarPropertiesTests {
|
||||
.withMessageContaining("schemaType NONE not supported");
|
||||
}
|
||||
|
||||
@Test
|
||||
void messageKeyTypeOnlyAllowedForKeyValueSchemaType() {
|
||||
Map<String, String> props = new HashMap<>();
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].message-type", Foo.class.getName());
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.schema-type", "JSON");
|
||||
props.put("spring.pulsar.defaults.type-mappings[0].schema-info.message-key-type", String.class.getName());
|
||||
assertThatExceptionOfType(BindException.class).isThrownBy(() -> bind(props)).havingRootCause()
|
||||
.withMessageContaining("messageKeyType can only be set when schemaType is KEY_VALUE");
|
||||
}
|
||||
|
||||
record Foo(String value) {
|
||||
}
|
||||
|
||||
|
||||
@@ -258,9 +258,7 @@ class PulsarBinderIntegrationTests implements PulsarTestContainerSupport {
|
||||
+ User.class.getName(),
|
||||
"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-user-sub2",
|
||||
"--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()),
|
||||
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO",
|
||||
"--spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s"
|
||||
.formatted(User.class.getName()))) {
|
||||
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) {
|
||||
Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
|
||||
.until(() -> output.toString().contains("Hello binder: User{name='user21', age=21}"));
|
||||
}
|
||||
@@ -312,9 +310,7 @@ class PulsarBinderIntegrationTests implements PulsarTestContainerSupport {
|
||||
+ String.class.getName(),
|
||||
"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-kv-sub1",
|
||||
"--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()),
|
||||
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO",
|
||||
"--spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s"
|
||||
.formatted(User.class.getName()))) {
|
||||
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) {
|
||||
Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
|
||||
.until(() -> output.toString().contains("Hello binder: 21->User{name='user21', age=21}"));
|
||||
}
|
||||
@@ -342,9 +338,7 @@ class PulsarBinderIntegrationTests implements PulsarTestContainerSupport {
|
||||
+ String.class.getName(),
|
||||
"--spring.cloud.stream.pulsar.bindings.userLogger-in-0.consumer.subscription-name=pbit-kv-sub2",
|
||||
"--spring.pulsar.defaults.type-mappings[0].message-type=%s".formatted(User.class.getName()),
|
||||
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO",
|
||||
"--spring.pulsar.defaults.type-mappings[0].schema-info.message-type=%s"
|
||||
.formatted(User.class.getName()))) {
|
||||
"--spring.pulsar.defaults.type-mappings[0].schema-info.schema-type=AVRO")) {
|
||||
Awaitility.await().atMost(Duration.ofSeconds(AWAIT_DURATION))
|
||||
.until(() -> output.toString().contains("Hello binder: 21->User{name='user21', age=21}"));
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user