Purge warnings in IDE
This commit is contained in:
@@ -35,7 +35,8 @@ import org.springframework.util.StringUtils;
|
||||
*/
|
||||
public abstract class AbstractFunctionCompiler<F> {
|
||||
|
||||
private static Logger logger = LoggerFactory.getLogger(AbstractFunctionCompiler.class);
|
||||
private static Logger logger = LoggerFactory
|
||||
.getLogger(AbstractFunctionCompiler.class);
|
||||
|
||||
// Newlines in the property are escaped
|
||||
private static final String NEWLINE_ESCAPE = Matcher.quoteReplacement("\\n");
|
||||
@@ -44,20 +45,25 @@ public abstract class AbstractFunctionCompiler<F> {
|
||||
private static final String DOUBLE_DOUBLE_QUOTE = Matcher.quoteReplacement("\"\"");
|
||||
|
||||
/**
|
||||
* The user supplied code snippet is inserted into the template and then the result is compiled
|
||||
* The user supplied code snippet is inserted into the template and then the result is
|
||||
* compiled
|
||||
*/
|
||||
private static String SOURCE_CODE_TEMPLATE =
|
||||
"package " + AbstractFunctionCompiler.class.getPackage().getName() + ";\n" +
|
||||
"import java.util.*;\n" + // Helpful to include this
|
||||
"import java.util.function.*;\n" +
|
||||
"import reactor.core.publisher.Flux;\n" +
|
||||
"public class %s implements %sFactory {\n" +
|
||||
" public %s<%s> getResult() {\n" +
|
||||
" %s\n" +
|
||||
" }\n" +
|
||||
"}\n";
|
||||
// @formatter:off
|
||||
private static String SOURCE_CODE_TEMPLATE = "package "
|
||||
+ AbstractFunctionCompiler.class.getPackage().getName() + ";\n"
|
||||
+ "import java.util.*;\n" // Helpful to include this
|
||||
+ "import java.util.function.*;\n"
|
||||
+ "import reactor.core.publisher.Flux;\n"
|
||||
+ "public class %s implements %sFactory {\n"
|
||||
+ " public %s<%s> getResult() {\n"
|
||||
+ " %s\n"
|
||||
+ " }\n"
|
||||
+ "}\n";
|
||||
// @formatter:on
|
||||
|
||||
static enum ResultType { Consumer, Function, Supplier }
|
||||
static enum ResultType {
|
||||
Consumer, Function, Supplier
|
||||
}
|
||||
|
||||
private final ResultType resultType;
|
||||
|
||||
@@ -65,82 +71,98 @@ public abstract class AbstractFunctionCompiler<F> {
|
||||
|
||||
private final RuntimeJavaCompiler compiler = new RuntimeJavaCompiler();
|
||||
|
||||
AbstractFunctionCompiler(ResultType type, String... defaultResultTypeParameterizations) {
|
||||
AbstractFunctionCompiler(ResultType type,
|
||||
String... defaultResultTypeParameterizations) {
|
||||
this.resultType = type;
|
||||
this.defaultResultTypeParameterizations = defaultResultTypeParameterizations;
|
||||
}
|
||||
|
||||
/**
|
||||
* Produce a factory instance by:<ul>
|
||||
* Produce a factory instance by:
|
||||
* <ul>
|
||||
* <li>Decoding the code String to process any newlines/double-double-quotes
|
||||
* <li>Insert the code into the source code template for a class
|
||||
* <li>Compiling the class using the JDK provided Java Compiler
|
||||
* <li>Loading the compiled class
|
||||
* <li>Invoking a well known method on the factory class to produce a Consumer, Function, or Supplier instance
|
||||
* <li>Invoking a well known method on the factory class to produce a Consumer,
|
||||
* Function, or Supplier instance
|
||||
* <li>Returning that instance.
|
||||
* </ul>
|
||||
*
|
||||
* @return a factory instance
|
||||
*/
|
||||
public CompiledFunctionFactory<F> compile(String name, String code, String... resultTypeParameterizations) {
|
||||
public CompiledFunctionFactory<F> compile(String name, String code,
|
||||
String... resultTypeParameterizations) {
|
||||
if (name == null || name.length() == 0) {
|
||||
throw new IllegalArgumentException("name must not be empty");
|
||||
}
|
||||
logger.info("Initial code property value :'{}'", code);
|
||||
String parameterizations = StringUtils.arrayToCommaDelimitedString(
|
||||
(!ObjectUtils.isEmpty(resultTypeParameterizations)) ? resultTypeParameterizations : this.defaultResultTypeParameterizations);
|
||||
code = decode(code);
|
||||
if (code.startsWith("\"") && code.endsWith("\"")) {
|
||||
code = code.substring(1,code.length()-1);
|
||||
}
|
||||
if (!code.startsWith("return ") && !code.endsWith(";")) {
|
||||
code = String.format("return (%s<%s> & java.io.Serializable) %s;", resultType, parameterizations, code);
|
||||
String parameterizations = StringUtils.arrayToCommaDelimitedString(
|
||||
(!ObjectUtils.isEmpty(resultTypeParameterizations))
|
||||
? resultTypeParameterizations
|
||||
: this.defaultResultTypeParameterizations);
|
||||
code = decode(code);
|
||||
if (code.startsWith("\"") && code.endsWith("\"")) {
|
||||
code = code.substring(1, code.length() - 1);
|
||||
}
|
||||
if (!code.startsWith("return ") && !code.endsWith(";")) {
|
||||
code = String.format("return (%s<%s> & java.io.Serializable) %s;", resultType,
|
||||
parameterizations, code);
|
||||
}
|
||||
logger.info("Processed code property value :\n{}\n", code);
|
||||
String firstLetter = name.substring(0, 1).toUpperCase();
|
||||
name = (name.length() > 1) ? firstLetter + name.substring(1) : firstLetter;
|
||||
String className = String.format("%s.%s%sFactory", this.getClass().getPackage().getName(), name, resultType);
|
||||
CompilationResult compilationResult = buildAndCompileSourceCode(className, code, parameterizations);
|
||||
String className = String.format("%s.%s%sFactory",
|
||||
this.getClass().getPackage().getName(), name, resultType);
|
||||
CompilationResult compilationResult = buildAndCompileSourceCode(className, code,
|
||||
parameterizations);
|
||||
if (compilationResult.wasSuccessful()) {
|
||||
return new CompiledFunctionFactory(className, compilationResult);
|
||||
return new CompiledFunctionFactory<F>(className, compilationResult);
|
||||
}
|
||||
List<CompilationMessage> compilationMessages = compilationResult.getCompilationMessages();
|
||||
List<CompilationMessage> compilationMessages = compilationResult
|
||||
.getCompilationMessages();
|
||||
throw new CompilationFailedException(compilationMessages);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the source for and then compile and load a class that embodies
|
||||
* the supplied methodBody. The methodBody is inserted into a class template that
|
||||
* returns the specified parameterized type.
|
||||
* This method can return more than one class if the method body includes local class
|
||||
* declarations. An example methodBody would be <tt>return input -> input.buffer(5).map(list->list.get(0));</tt>.
|
||||
* Create the source for and then compile and load a class that embodies the supplied
|
||||
* methodBody. The methodBody is inserted into a class template that returns the
|
||||
* specified parameterized type. This method can return more than one class if the
|
||||
* method body includes local class declarations. An example methodBody would be
|
||||
* <tt>return input -> input.buffer(5).map(list->list.get(0));</tt>.
|
||||
*
|
||||
* @param className the name of the class
|
||||
* @param methodBody the source code for a method
|
||||
* @param parameterTypeString the String representation for the parameterized return type, e.g.:
|
||||
* <tt><Flux<Object>,Flux<Object></tt>
|
||||
* @return the list of Classes produced by compiling and then loading the snippet of code
|
||||
* @param parameterTypeString the String representation for the parameterized return
|
||||
* type, e.g.: <tt><Flux<Object>,Flux<Object></tt>
|
||||
* @return the list of Classes produced by compiling and then loading the snippet of
|
||||
* code
|
||||
*/
|
||||
private CompilationResult buildAndCompileSourceCode(String className, String methodBody, String parameterTypeString) {
|
||||
String sourceCode = makeSourceClassDefinition(className, methodBody, parameterTypeString);
|
||||
private CompilationResult buildAndCompileSourceCode(String className,
|
||||
String methodBody, String parameterTypeString) {
|
||||
String sourceCode = makeSourceClassDefinition(className, methodBody,
|
||||
parameterTypeString);
|
||||
return compiler.compile(className, sourceCode);
|
||||
}
|
||||
|
||||
private static String decode(String input) {
|
||||
return input.replaceAll(NEWLINE_ESCAPE, "\n").replaceAll(DOUBLE_DOUBLE_QUOTE, "\"");
|
||||
return input.replaceAll(NEWLINE_ESCAPE, "\n").replaceAll(DOUBLE_DOUBLE_QUOTE,
|
||||
"\"");
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a full source code definition for a class by applying the specified method body
|
||||
* to the Reactive template.
|
||||
* Make a full source code definition for a class by applying the specified method
|
||||
* body to the Reactive template.
|
||||
*
|
||||
* @param className the name of the class
|
||||
* @param methodBody the code to insert into the Reactive source class template
|
||||
* @return a complete Java Class definition
|
||||
*/
|
||||
private String makeSourceClassDefinition(String className, String methodBody, String types) {
|
||||
private String makeSourceClassDefinition(String className, String methodBody,
|
||||
String types) {
|
||||
String shortClassName = className.substring(className.lastIndexOf('.') + 1);
|
||||
String s = String.format(SOURCE_CODE_TEMPLATE, shortClassName, resultType, resultType, types, methodBody);
|
||||
String s = String.format(SOURCE_CODE_TEMPLATE, shortClassName, resultType,
|
||||
resultType, types, methodBody);
|
||||
System.out.println(s);
|
||||
return s;
|
||||
}
|
||||
|
||||
@@ -53,21 +53,27 @@ public class LocalFunctionGateway implements FunctionGateway {
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, R> void schedule(String functionName, Trigger trigger, Supplier<T> supplier, Consumer<R> consumer) {
|
||||
public <T, R> void schedule(String functionName, Trigger trigger,
|
||||
Supplier<T> supplier, Consumer<R> consumer) {
|
||||
Function<T, R> function = this.catalog.lookupFunction(functionName);
|
||||
this.scheduler.schedule(new FunctionInvokingRunnable(supplier, function, consumer), trigger);
|
||||
this.scheduler.schedule(
|
||||
new FunctionInvokingRunnable<T, R>(supplier, function, consumer),
|
||||
trigger);
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T, R> void subscribe(Publisher<T> publisher, String functionName, final Consumer<R> consumer) {
|
||||
public <T, R> void subscribe(Publisher<T> publisher, String functionName,
|
||||
final Consumer<R> consumer) {
|
||||
final Function<T, R> function = this.catalog.lookupFunction(functionName);
|
||||
publisher.subscribe(new Subscriber<T>() {
|
||||
|
||||
@Override
|
||||
public void onComplete() {}
|
||||
public void onComplete() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable error) {}
|
||||
public void onError(Throwable error) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNext(T next) {
|
||||
|
||||
@@ -19,11 +19,6 @@
|
||||
</properties>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.maven</groupId>
|
||||
<artifactId>maven-aether-provider</artifactId>
|
||||
<version>3.3.9</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-function-core</artifactId>
|
||||
@@ -58,6 +53,11 @@
|
||||
|
||||
<dependencyManagement>
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>org.apache.maven</groupId>
|
||||
<artifactId>maven-aether-provider</artifactId>
|
||||
<version>3.3.9</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.cloud</groupId>
|
||||
<artifactId>spring-cloud-function-parent</artifactId>
|
||||
|
||||
@@ -33,8 +33,6 @@ import org.springframework.boot.autoconfigure.condition.SpringBootCondition;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.cloud.function.invoker.AbstractFunctionInvoker;
|
||||
import org.springframework.cloud.function.registry.FunctionCatalog;
|
||||
import org.springframework.cloud.function.support.FluxFunction;
|
||||
import org.springframework.cloud.function.support.FunctionUtils;
|
||||
import org.springframework.cloud.stream.annotation.EnableBinding;
|
||||
import org.springframework.cloud.stream.binder.Binder;
|
||||
import org.springframework.cloud.stream.messaging.Processor;
|
||||
@@ -72,7 +70,7 @@ public class StreamConfiguration {
|
||||
long interval = properties.getInterval();
|
||||
Supplier<Flux<Object>> supplier = registry.lookupSupplier(name);
|
||||
return new SupplierInvokingMessageProducer<Object>(supplier, interval);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ConditionalOnFunction
|
||||
@@ -89,7 +87,7 @@ public class StreamConfiguration {
|
||||
Function<Object, Object> function = registry.lookupFunction(name);
|
||||
Assert.notNull(function, "no such function: " + name);
|
||||
return new StreamListeningFunctionInvoker(function);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ConditionalOnConsumer
|
||||
@@ -105,7 +103,7 @@ public class StreamConfiguration {
|
||||
String name = properties.getEndpoint();
|
||||
Consumer<Object> consumer = registry.lookupConsumer(name);
|
||||
return new StreamListeningConsumerInvoker<Object>(consumer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Conditional(SupplierCondition.class)
|
||||
@@ -129,7 +127,8 @@ public class StreamConfiguration {
|
||||
private @interface ConditionalOnConsumer {
|
||||
}
|
||||
|
||||
private static abstract class AbstractFunctionCondition extends SpringBootCondition implements ConfigurationCondition {
|
||||
private static abstract class AbstractFunctionCondition extends SpringBootCondition
|
||||
implements ConfigurationCondition {
|
||||
|
||||
private final Class<?> type;
|
||||
|
||||
@@ -138,8 +137,10 @@ public class StreamConfiguration {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConditionOutcome getMatchOutcome(ConditionContext context, AnnotatedTypeMetadata metadata) {
|
||||
String functionName = context.getEnvironment().getProperty("spring.cloud.function.stream.endpoint");
|
||||
public ConditionOutcome getMatchOutcome(ConditionContext context,
|
||||
AnnotatedTypeMetadata metadata) {
|
||||
String functionName = context.getEnvironment()
|
||||
.getProperty("spring.cloud.function.stream.endpoint");
|
||||
if (!StringUtils.hasText(functionName)) {
|
||||
return ConditionOutcome.noMatch("no endpoint function name available");
|
||||
}
|
||||
@@ -150,15 +151,17 @@ public class StreamConfiguration {
|
||||
}
|
||||
Class<?> beanType = context.getBeanFactory().getType(functionName);
|
||||
if (type.isAssignableFrom(beanType)) {
|
||||
return ConditionOutcome.match(String.format("bean '%s' is a %s", functionName, type));
|
||||
return ConditionOutcome
|
||||
.match(String.format("bean '%s' is a %s", functionName, type));
|
||||
}
|
||||
return ConditionOutcome.noMatch(String.format("bean '%s' is not a %s", functionName, type));
|
||||
return ConditionOutcome
|
||||
.noMatch(String.format("bean '%s' is not a %s", functionName, type));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigurationPhase getConfigurationPhase() {
|
||||
return ConfigurationPhase.REGISTER_BEAN;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class SupplierCondition extends AbstractFunctionCondition {
|
||||
|
||||
@@ -32,7 +32,8 @@ import reactor.core.publisher.Flux;
|
||||
/**
|
||||
* @author Mark Fisher
|
||||
*/
|
||||
public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker<Flux<?>, Flux<?>> {
|
||||
public class StreamListeningFunctionInvoker
|
||||
extends AbstractFunctionInvoker<Flux<?>, Flux<?>> {
|
||||
|
||||
public StreamListeningFunctionInvoker(Function<?, ?> function) {
|
||||
super(wrapIfNecessary(function));
|
||||
@@ -44,6 +45,7 @@ public class StreamListeningFunctionInvoker extends AbstractFunctionInvoker<Flux
|
||||
return this.doInvoke(input);
|
||||
}
|
||||
|
||||
@SuppressWarnings({ "rawtypes", "unchecked" })
|
||||
private static Function<Flux<?>, Flux<?>> wrapIfNecessary(Function function) {
|
||||
Assert.notNull(function, "Function must not be null");
|
||||
if (!FunctionUtils.isFluxFunction(function)) {
|
||||
|
||||
@@ -42,12 +42,15 @@ public class SupplierInvokingMessageProducer<T> extends MessageProducerSupport {
|
||||
? new FluxSupplier<>(supplier, Duration.ofMillis(interval))
|
||||
: new FluxSupplier<>(supplier);
|
||||
}
|
||||
this.supplier = (Supplier<Flux<T>>) supplier;
|
||||
@SuppressWarnings("unchecked")
|
||||
Supplier<Flux<T>> unchecked = (Supplier<Flux<T>>) supplier;
|
||||
this.supplier = unchecked;
|
||||
this.setOutputChannelName(Source.OUTPUT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStart() {
|
||||
this.supplier.get().subscribe(m -> this.sendMessage(MessageBuilder.withPayload(m).build()));
|
||||
this.supplier.get()
|
||||
.subscribe(m -> this.sendMessage(MessageBuilder.withPayload(m).build()));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -32,6 +32,11 @@
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-starter-logging</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.springframework.boot</groupId>
|
||||
<artifactId>spring-boot-configuration-processor</artifactId>
|
||||
<optional>true</optional>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
||||
@@ -30,7 +30,6 @@ import org.springframework.cloud.function.registry.FunctionCatalog;
|
||||
import org.springframework.cloud.task.configuration.EnableTask;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user