Array reification error with KStream[] in Kotlin

Exclude Kafka Streams functions with KStream[] return type as this
causes some array reification errors in Kotlin.

See this issue for more details: https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1044

Resolves https://github.com/spring-cloud/spring-cloud-function/issues/669

Resolves #670
This commit is contained in:
Soby Chacko
2021-03-17 14:41:19 -04:00
committed by Oleg Zhurakousky
parent 1e42e82063
commit 6401697f18
3 changed files with 14 additions and 2 deletions

View File

@@ -125,6 +125,9 @@ public class FunctionRegistration<T> implements BeanNameAware {
public FunctionRegistration<T> type(FunctionType type) {
Type t = FunctionTypeUtils.discoverFunctionTypeFromClass(this.target.getClass());
if (t == null) { // only valid for Kafka Stream KStream[] return type.
return null;
}
FunctionType discoveredFunctionType = FunctionType.of(t);
Class<?> inputType = TypeResolver.resolveRawClass(discoveredFunctionType.getInputType(), null);
Class<?> outputType = TypeResolver.resolveRawClass(discoveredFunctionType.getOutputType(), null);

View File

@@ -141,8 +141,10 @@ public class BeanFactoryAwareFunctionRegistry extends SimpleFunctionRegistry imp
if (functionRegistration == null) {
functionRegistration = new FunctionRegistration(functionCandidate, functionName).type(functionType);
}
this.register(functionRegistration);
// Certain Kafka Streams functions such as KStream[] return types could be null (esp when using Kotlin).
if (functionRegistration != null) {
this.register(functionRegistration);
}
}
else {
if (logger.isDebugEnabled()) {

View File

@@ -149,6 +149,13 @@ public final class FunctionTypeUtils {
Assert.isTrue(isFunctional(functionalClass), "Type must be one of Supplier, Function or Consumer");
if (Function.class.isAssignableFrom(functionalClass)) {
for (Type superInterface : functionalClass.getGenericInterfaces()) {
if (superInterface != null && !superInterface.equals(Object.class)) {
if (superInterface.toString().contains("KStream") && ResolvableType.forType(superInterface).getGeneric(1).isArray()) {
return null;
}
}
}
return TypeResolver.reify(Function.class, (Class<Function<?, ?>>) functionalClass);
}
else if (Consumer.class.isAssignableFrom(functionalClass)) {