GH-685 Move Kotlin configurations to s-c-function-context module

Resolves #685
This commit is contained in:
Oleg Zhurakousky
2021-04-12 15:02:56 +02:00
parent ee0bc4e28f
commit ecd9902ced
6 changed files with 65 additions and 15 deletions

View File

@@ -0,0 +1,255 @@
/*
* Copyright 2012-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.function.context.config;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import kotlin.jvm.functions.Function0;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.functions.Function3;
import kotlin.jvm.functions.Function4;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.beans.factory.annotation.AnnotatedBeanDefinition;
import org.springframework.beans.factory.config.BeanDefinition;
import org.springframework.beans.factory.config.BeanFactoryPostProcessor;
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.config.ConstructorArgumentValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.ResolvableType;
import org.springframework.util.ObjectUtils;
/**
* Configuration class which defines the required infrastructure to bootstrap Kotlin
* lambdas as invocable functions within the context of the framework.
*
* @author Oleg Zhurakousky
* @author Adrien Poupard
* @since 2.0
*/
@Configuration
@ConditionalOnClass(name = "kotlin.jvm.functions.Function0")
public class KotlinLambdaToFunctionAutoConfiguration {
protected final Log logger = LogFactory.getLog(getClass());
/**
* Will transform all discovered Kotlin's Function lambdas to java
* Supplier, Function and Consumer, retaining the original Kotlin type
* characteristics.
*
* @return the bean factory post processor
*/
@Bean
public BeanFactoryPostProcessor kotlinToFunctionTransformerOld() {
return new BeanFactoryPostProcessor() {
@Override
public void postProcessBeanFactory(
ConfigurableListableBeanFactory beanFactory) throws BeansException {
String[] beanDefinitionNames = beanFactory.getBeanDefinitionNames();
for (String beanDefinitionName : beanDefinitionNames) {
BeanDefinition beanDefinition = beanFactory.getBeanDefinition(beanDefinitionName);
if (beanDefinition instanceof AnnotatedBeanDefinition && ((AnnotatedBeanDefinition) beanDefinition).getFactoryMethodMetadata() != null) {
String typeName = ((AnnotatedBeanDefinition) beanDefinition).getFactoryMethodMetadata().getReturnTypeName();
if (typeName.startsWith("kotlin.jvm.functions.Function")) {
RootBeanDefinition cbd = new RootBeanDefinition(KotlinFunctionWrapper.class);
ConstructorArgumentValues ca = new ConstructorArgumentValues();
ca.addGenericArgumentValue(beanDefinition);
cbd.setConstructorArgumentValues(ca);
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(beanDefinitionName + FunctionRegistration.REGISTRATION_NAME_SUFFIX, cbd);
}
}
}
}
};
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public static final class KotlinFunctionWrapper implements Function<Object, Object>, Supplier<Object>, Consumer<Object[]>,
Function0<Object>, Function1<Object, Object>, Function2<Object, Object, Object>,
Function3<Object, Object, Object, Object>, Function4<Object, Object, Object, Object, Object>,
FactoryBean<FunctionRegistration>,
BeanNameAware,
BeanFactoryAware {
private final Object kotlinLambdaTarget;
private String name;
private ConfigurableListableBeanFactory beanFactory;
public KotlinFunctionWrapper(Object kotlinLambdaTarget) {
this.kotlinLambdaTarget = kotlinLambdaTarget;
}
@Override
public Object apply(Object input) {
if (ObjectUtils.isEmpty(input)) {
return this.invoke();
}
else if (ObjectUtils.isArray(input)) {
return null;
}
else {
return this.invoke(input);
}
}
@Override
public Object invoke(Object arg0, Object arg1, Object arg2, Object arg3) {
return ((Function4) this.kotlinLambdaTarget).invoke(arg0, arg1, arg2, arg3);
}
@Override
public Object invoke(Object arg0, Object arg1, Object arg2) {
return ((Function3) this.kotlinLambdaTarget).invoke(arg0, arg1, arg2);
}
@Override
public Object invoke(Object arg0, Object arg1) {
return ((Function2) this.kotlinLambdaTarget).invoke(arg0, arg1);
}
@Override
public Object invoke(Object arg0) {
if (CoroutinesUtils.isValidSuspendingFunction(kotlinLambdaTarget, arg0)) {
return CoroutinesUtils.invokeSuspendingFunction(kotlinLambdaTarget, arg0);
}
return ((Function1) this.kotlinLambdaTarget).invoke(arg0);
}
@Override
public Object invoke() {
if (CoroutinesUtils.isValidSuspendingSupplier(kotlinLambdaTarget)) {
return CoroutinesUtils.invokeSuspendingSupplier(kotlinLambdaTarget);
}
return ((Function0) this.kotlinLambdaTarget).invoke();
}
@Override
public void accept(Object[] input) {
this.apply(input);
}
@Override
public Object get() {
return this.apply(null);
}
@Override
public FunctionRegistration getObject() throws Exception {
String name = this.name.endsWith(FunctionRegistration.REGISTRATION_NAME_SUFFIX)
? this.name.replace(FunctionRegistration.REGISTRATION_NAME_SUFFIX, "")
: this.name;
Type functionType = FunctionContextUtils.findType(name, this.beanFactory);
FunctionRegistration<?> registration = new FunctionRegistration<>(this, name);
Type[] types = ((ParameterizedType) functionType).getActualTypeArguments();
if (functionType.getTypeName().contains("Function0")) {
functionType = ResolvableType.forClassWithGenerics(Supplier.class, ResolvableType.forType(types[0]))
.getType();
}
else if (isValidKotlinFunction(functionType, types)) {
functionType = ResolvableType.forClassWithGenerics(Function.class, ResolvableType.forType(types[0]),
ResolvableType.forType(types[1])).getType();
}
else if (isValidKotlinSuspendSupplier(functionType, types)) {
Type continuationReturnType = CoroutinesUtils.getSuspendingFunctionReturnType(types[0]);
functionType = ResolvableType.forClassWithGenerics(
Supplier.class,
ResolvableType.forClassWithGenerics(Flux.class, ResolvableType.forType(continuationReturnType))
).getType();
}
else if (isValidKotlinSuspendFunction(functionType, types)) {
Type continuationArgType = CoroutinesUtils.getSuspendingFunctionArgType(types[0]);
Type continuationReturnType = CoroutinesUtils.getSuspendingFunctionReturnType(types[1]);
functionType = ResolvableType.forClassWithGenerics(
Function.class,
ResolvableType.forClassWithGenerics(Flux.class, ResolvableType.forType(continuationArgType)),
ResolvableType.forClassWithGenerics(Flux.class, ResolvableType.forType(continuationReturnType))
).getType();
}
else if (isValidKotlinSuspendConsumer(functionType, types)) {
Type continuationArgType = CoroutinesUtils.getSuspendingFunctionArgType(types[0]);
functionType = ResolvableType.forClassWithGenerics(
Consumer.class,
ResolvableType.forClassWithGenerics(Flux.class, ResolvableType.forType(continuationArgType))
).getType();
}
else {
throw new UnsupportedOperationException("Multi argument Kotlin functions are not currently supported");
}
registration = registration.type(functionType);
return registration;
}
private boolean isValidKotlinFunction(Type functionType, Type[] type) {
return functionType.getTypeName().contains(Function1.class.getName()) && type.length == 2 && !CoroutinesUtils.isContinuationType(type[0]);
}
private boolean isValidKotlinSuspendSupplier(Type functionType, Type[] type) {
return functionType.getTypeName().contains(Function1.class.getName()) && type.length == 2 && CoroutinesUtils.isContinuationFlowType(type[0]);
}
private boolean isValidKotlinSuspendConsumer(Type functionType, Type[] type) {
return functionType.getTypeName().contains(Function2.class.getName()) && type.length == 3 && CoroutinesUtils.isFlowType(type[0]) && CoroutinesUtils.isContinuationUnitType(type[1]);
}
private boolean isValidKotlinSuspendFunction(Type functionType, Type[] type) {
return functionType.getTypeName().contains(Function2.class.getName()) && type.length == 3 && CoroutinesUtils.isContinuationFlowType(type[1]);
}
@Override
public Class<?> getObjectType() {
return FunctionRegistration.class;
}
@Override
public void setBeanName(String name) {
this.name = name;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
this.beanFactory = (ConfigurableListableBeanFactory) beanFactory;
}
}
}

View File

@@ -0,0 +1,125 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@file:JvmName("CoroutinesUtils")
package org.springframework.cloud.function.context.config
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactor.asFlux
import kotlinx.coroutines.reactor.mono
import reactor.core.publisher.Flux
import java.lang.reflect.ParameterizedType
import java.lang.reflect.Type
import java.lang.reflect.WildcardType
import kotlin.coroutines.Continuation
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn
/**
* @author Adrien Poupard
*
*/
fun isValidSuspendingFunction(kotlinLambdaTarget: Any, arg0: Any): Boolean {
return arg0 is Flux<*> && kotlinLambdaTarget is Function2<*, *, *>
}
fun getSuspendingFunctionArgType(type: Type): Type {
return getFlowTypeArguments(type)
}
fun getFlowTypeArguments(type: Type): Type {
if(!isFlowType(type)) {
return type
}
val parameterizedLowerType = type as ParameterizedType
if(parameterizedLowerType.actualTypeArguments.isEmpty()) {
return parameterizedLowerType
}
val actualTypeArgument = parameterizedLowerType.actualTypeArguments[0]
return if(actualTypeArgument is WildcardType) {
val wildcardTypeLower = parameterizedLowerType.actualTypeArguments[0] as WildcardType
wildcardTypeLower.upperBounds[0]
} else {
actualTypeArgument
}
}
fun isFlowType(type: Type): Boolean {
return type.typeName.startsWith(Flow::class.qualifiedName!!)
}
fun getSuspendingFunctionReturnType(type: Type): Type {
val lower = getContinuationTypeArguments(type)
return getFlowTypeArguments(lower)
}
fun isContinuationType(type: Type): Boolean {
return type.typeName.startsWith(Continuation::class.qualifiedName!!)
}
fun isContinuationUnitType(type: Type): Boolean {
return isContinuationType(type) && type.typeName.contains(Unit::class.qualifiedName!!)
}
fun isContinuationFlowType(type: Type): Boolean {
return isContinuationType(type) && type.typeName.contains(Flow::class.qualifiedName!!)
}
private fun getContinuationTypeArguments(type: Type): Type {
if(!isContinuationType(type)) {
return type
}
val parameterizedType = type as ParameterizedType
val wildcardType = parameterizedType.actualTypeArguments[0] as WildcardType
return wildcardType.lowerBounds[0]
}
fun invokeSuspendingFunction(kotlinLambdaTarget: Any, arg0: Any): Flux<Any> {
val function = kotlinLambdaTarget as SuspendFunction
val flux = arg0 as Flux<Any>
return fluxSuspendingFlowFunction(flux, function)
}
fun isValidSuspendingSupplier(kotlinLambdaTarget: Any): Boolean {
return kotlinLambdaTarget is Function1<*, *>
}
fun invokeSuspendingSupplier(kotlinLambdaTarget: Any): Flux<Any> {
val supplier = kotlinLambdaTarget as SuspendSupplier
return mono(Dispatchers.Unconfined) {
suspendCoroutineUninterceptedOrReturn<Flow<Any>> {
supplier.invoke(it)
}
}.flatMapMany {
it.asFlux()
}
}
fun fluxSuspendingFlowFunction(flux: Flux<Any>, target: SuspendFunction): Flux<Any> {
return mono(Dispatchers.Unconfined) {
suspendCoroutineUninterceptedOrReturn<Flow<Any>> {
target.invoke(flux.asFlow(), it)
}
}.flatMapMany {
it.asFlux()
}
}
private typealias SuspendFunction = (Any?, Any?) -> Any?
private typealias SuspendSupplier = (Any?) -> Any?

View File

@@ -1,6 +1,7 @@
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration,\
org.springframework.cloud.function.cloudevent.CloudEventsFunctionExtensionConfiguration
org.springframework.cloud.function.cloudevent.CloudEventsFunctionExtensionConfiguration,\
org.springframework.cloud.function.context.config.KotlinLambdaToFunctionAutoConfiguration
org.springframework.cloud.function.context.WrapperDetector=\
org.springframework.cloud.function.context.config.FluxWrapperDetector
org.springframework.context.ApplicationContextInitializer=\