Implement task-launch-request-function
Removed task-launch-request from stream-applications-core Fix Checkstyle errors
This commit is contained in:
committed by
Soby Chacko
parent
96880eaf84
commit
92b2fc2f39
@@ -0,0 +1,24 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import java.util.Collection;
|
||||
|
||||
import org.springframework.integration.handler.MessageProcessor;
|
||||
|
||||
public interface CommandLineArgumentsMessageMapper extends MessageProcessor<Collection<String>> {
|
||||
}
|
||||
@@ -0,0 +1,66 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Parses a comma delimited list of key value pairs in which the values can contain commas as well.
|
||||
*
|
||||
* @author Chris Schaeffer
|
||||
* @author David Turanski
|
||||
**/
|
||||
abstract class KeyValueListParser {
|
||||
|
||||
static Map<String, String> parseCommaDelimitedKeyValuePairs(String value) {
|
||||
Map<String, String> keyValuePairs = new HashMap<>();
|
||||
|
||||
if (StringUtils.isEmpty(value)) {
|
||||
return keyValuePairs;
|
||||
}
|
||||
|
||||
ArrayList<String> pairs = new ArrayList<>();
|
||||
|
||||
String[] candidates = StringUtils.commaDelimitedListToStringArray(value);
|
||||
|
||||
for (int i = 0; i < candidates.length; i++) {
|
||||
if (i > 0 && !candidates[i].contains("=")) {
|
||||
pairs.add(pairs.get(pairs.size() - 1) + "," + candidates[i]);
|
||||
}
|
||||
else {
|
||||
pairs.add(candidates[i]);
|
||||
}
|
||||
}
|
||||
|
||||
for (String pair : pairs) {
|
||||
addKeyValuePair(pair, keyValuePairs);
|
||||
}
|
||||
|
||||
return keyValuePairs;
|
||||
}
|
||||
|
||||
private static void addKeyValuePair(String pair, Map<String, String> properties) {
|
||||
int firstEquals = pair.indexOf('=');
|
||||
if (firstEquals != -1) {
|
||||
properties.put(pair.substring(0, firstEquals).trim(), pair.substring(firstEquals + 1).trim());
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,65 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonProperty;
|
||||
|
||||
public class TaskLaunchRequest {
|
||||
@JsonProperty("args")
|
||||
private List<String> commandlineArguments = new ArrayList<>();
|
||||
|
||||
@JsonProperty("deploymentProps")
|
||||
private Map<String, String> deploymentProperties = new HashMap<>();
|
||||
|
||||
@JsonProperty("name")
|
||||
private String taskName;
|
||||
|
||||
public void setCommandlineArguments(List<String> commandlineArguments) {
|
||||
this.commandlineArguments = new ArrayList<>(commandlineArguments);
|
||||
}
|
||||
|
||||
public List<String> getCommandlineArguments() {
|
||||
return this.commandlineArguments;
|
||||
}
|
||||
|
||||
public void setDeploymentProperties(Map<String, String> deploymentProperties) {
|
||||
this.deploymentProperties = deploymentProperties;
|
||||
}
|
||||
|
||||
public Map<String, String> getDeploymentProperties() {
|
||||
return this.deploymentProperties;
|
||||
}
|
||||
|
||||
public void setTaskName(String taskName) {
|
||||
this.taskName = taskName;
|
||||
}
|
||||
|
||||
public String getTaskName() {
|
||||
return this.taskName;
|
||||
}
|
||||
|
||||
public TaskLaunchRequest addCommmandLineArguments(Collection<String> args) {
|
||||
this.commandlineArguments.addAll(args);
|
||||
return this;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,31 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
|
||||
/**
|
||||
* A marker interface useful for unambiguous dependency injection of this Function.
|
||||
*
|
||||
* @author David Turanski
|
||||
**/
|
||||
@FunctionalInterface
|
||||
public interface TaskLaunchRequestFunction extends Function<Message<?>, Message<TaskLaunchRequest>> {
|
||||
|
||||
}
|
||||
@@ -0,0 +1,184 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.springframework.beans.factory.BeanFactory;
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.expression.EvaluationContext;
|
||||
import org.springframework.expression.Expression;
|
||||
import org.springframework.expression.spel.standard.SpelExpressionParser;
|
||||
import org.springframework.integration.expression.ExpressionUtils;
|
||||
import org.springframework.lang.Nullable;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
/**
|
||||
* Configuration for a {@link TaskLaunchRequestFunction}, provided as a common function that can be composed with other Suppliers or
|
||||
* Functions to transform any {@link Message} to a {@link TaskLaunchRequest} which may be used as input to the {@code TaskLauncherFunction} to launch a task.
|
||||
*
|
||||
* Command line arguments used by the task, as well as the task name itself may be statically configured or extracted from
|
||||
* the message contents, using SpEL. See {@link TaskLaunchRequestFunctionProperties} for details.
|
||||
*
|
||||
* It is also possible to provide your own implementations of {@link CommandLineArgumentsMessageMapper} and {@link TaskNameMessageMapper}.
|
||||
*
|
||||
* @author David Turanski
|
||||
**/
|
||||
@Configuration
|
||||
@EnableConfigurationProperties(TaskLaunchRequestFunctionProperties.class)
|
||||
public class TaskLaunchRequestFunctionConfiguration {
|
||||
|
||||
/**
|
||||
* The function name.
|
||||
*/
|
||||
public final static String TASK_LAUNCH_REQUEST_FUNCTION_NAME = "taskLaunchRequestFunction";
|
||||
|
||||
/**
|
||||
* A {@link java.util.function.Function} to transform a {@link Message} payload to a
|
||||
* {@link TaskLaunchRequest}.
|
||||
*
|
||||
* @param taskLaunchRequestMessageProcessor a {@link TaskLaunchRequestMessageProcessor}.
|
||||
*
|
||||
* @return a {@code TaskLaunchRequest} Message.
|
||||
*/
|
||||
@Bean(name = TASK_LAUNCH_REQUEST_FUNCTION_NAME)
|
||||
public TaskLaunchRequestFunction taskLaunchRequest(
|
||||
TaskLaunchRequestMessageProcessor taskLaunchRequestMessageProcessor) {
|
||||
return message -> taskLaunchRequestMessageProcessor.postProcessMessage(message);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TaskLaunchRequestSupplier taskLaunchRequestInitializer(
|
||||
TaskLaunchRequestFunctionProperties taskLaunchRequestProperties) {
|
||||
return new TaskLaunchRequestPropertiesInitializer(taskLaunchRequestProperties);
|
||||
}
|
||||
|
||||
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
|
||||
@Bean
|
||||
public TaskLaunchRequestMessageProcessor taskLaunchRequestMessageProcessor(
|
||||
TaskLaunchRequestSupplier taskLaunchRequestInitializer,
|
||||
TaskLaunchRequestFunctionProperties properties,
|
||||
EvaluationContext evaluationContext,
|
||||
@Nullable TaskNameMessageMapper taskNameMessageMapper,
|
||||
@Nullable CommandLineArgumentsMessageMapper commandLineArgumentsMessageMapper) {
|
||||
|
||||
if (taskNameMessageMapper == null) {
|
||||
taskNameMessageMapper = taskNameMessageMapper(properties, evaluationContext);
|
||||
}
|
||||
|
||||
if (commandLineArgumentsMessageMapper == null) {
|
||||
commandLineArgumentsMessageMapper = commandLineArgumentsMessageMapper(properties, evaluationContext);
|
||||
}
|
||||
|
||||
return new TaskLaunchRequestMessageProcessor(taskLaunchRequestInitializer,
|
||||
taskNameMessageMapper,
|
||||
commandLineArgumentsMessageMapper);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public EvaluationContext evaluationContext(BeanFactory beanFactory) {
|
||||
return ExpressionUtils.createStandardEvaluationContext(beanFactory);
|
||||
}
|
||||
|
||||
private TaskNameMessageMapper taskNameMessageMapper(TaskLaunchRequestFunctionProperties taskLaunchRequestProperties,
|
||||
EvaluationContext evaluationContext) {
|
||||
if (StringUtils.hasText(taskLaunchRequestProperties.getTaskNameExpression())) {
|
||||
SpelExpressionParser expressionParser = new SpelExpressionParser();
|
||||
Expression taskNameExpression = expressionParser
|
||||
.parseExpression(taskLaunchRequestProperties.getTaskNameExpression());
|
||||
return new ExpressionEvaluatingTaskNameMessageMapper(taskNameExpression, evaluationContext);
|
||||
}
|
||||
|
||||
return message -> taskLaunchRequestProperties.getTaskName();
|
||||
}
|
||||
|
||||
private CommandLineArgumentsMessageMapper commandLineArgumentsMessageMapper(
|
||||
TaskLaunchRequestFunctionProperties taskLaunchRequestFunctionProperties,
|
||||
EvaluationContext evaluationContext) {
|
||||
return new ExpressionEvaluatingCommandLineArgsMapper(taskLaunchRequestFunctionProperties.getArgExpressions(),
|
||||
evaluationContext);
|
||||
}
|
||||
|
||||
private static class TaskLaunchRequestPropertiesInitializer extends TaskLaunchRequestSupplier {
|
||||
TaskLaunchRequestPropertiesInitializer(
|
||||
TaskLaunchRequestFunctionProperties taskLaunchRequestProperties) {
|
||||
|
||||
this.commandLineArgumentSupplier(
|
||||
() -> new ArrayList<>(taskLaunchRequestProperties.getArgs()));
|
||||
|
||||
this.deploymentPropertiesSupplier(
|
||||
() -> KeyValueListParser.parseCommaDelimitedKeyValuePairs(
|
||||
taskLaunchRequestProperties.getDeploymentProperties()));
|
||||
|
||||
this.taskNameSupplier(() -> taskLaunchRequestProperties.getTaskName());
|
||||
}
|
||||
}
|
||||
|
||||
private static class ExpressionEvaluatingTaskNameMessageMapper implements TaskNameMessageMapper {
|
||||
|
||||
private final Expression expression;
|
||||
private final EvaluationContext evaluationContext;
|
||||
|
||||
ExpressionEvaluatingTaskNameMessageMapper(Expression expression, EvaluationContext evaluationContext) {
|
||||
this.evaluationContext = evaluationContext;
|
||||
this.expression = expression;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String processMessage(Message<?> message) {
|
||||
return expression.getValue(evaluationContext, message).toString();
|
||||
}
|
||||
}
|
||||
|
||||
private static class ExpressionEvaluatingCommandLineArgsMapper implements CommandLineArgumentsMessageMapper {
|
||||
private final Map<String, Expression> argExpressionsMap;
|
||||
|
||||
private final EvaluationContext evaluationContext;
|
||||
|
||||
ExpressionEvaluatingCommandLineArgsMapper(String argExpressions, EvaluationContext evaluationContext) {
|
||||
this.evaluationContext = evaluationContext;
|
||||
this.argExpressionsMap = new HashMap<>();
|
||||
if (StringUtils.hasText(argExpressions)) {
|
||||
SpelExpressionParser expressionParser = new SpelExpressionParser();
|
||||
|
||||
KeyValueListParser.parseCommaDelimitedKeyValuePairs(argExpressions).forEach(
|
||||
(k, v) -> argExpressionsMap.put(k, expressionParser.parseExpression(v)));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Collection<String> processMessage(Message<?> message) {
|
||||
return evaluateArgExpressions(message);
|
||||
}
|
||||
|
||||
private Collection<String> evaluateArgExpressions(Message<?> message) {
|
||||
List<String> results = new LinkedList<>();
|
||||
this.argExpressionsMap.forEach((k, expression) -> results
|
||||
.add(String.format("%s=%s", k, expression.getValue(this.evaluationContext, message))));
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,113 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import javax.validation.constraints.AssertFalse;
|
||||
import javax.validation.constraints.NotNull;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.util.StringUtils;
|
||||
import org.springframework.validation.annotation.Validated;
|
||||
|
||||
/**
|
||||
* Base Properties to create a {@link TaskLaunchRequest}.
|
||||
*
|
||||
* @author Chris Schaefer
|
||||
* @author David Turanski
|
||||
*/
|
||||
@Validated
|
||||
@ConfigurationProperties("task.launch.request")
|
||||
public class TaskLaunchRequestFunctionProperties {
|
||||
|
||||
/**
|
||||
* Comma separated list of optional args in key=value format.
|
||||
*/
|
||||
private List<String> args = new ArrayList<>();
|
||||
|
||||
/**
|
||||
* Comma separated list of option args as SpEL expressions in key=value format.
|
||||
*/
|
||||
private String argExpressions = "";
|
||||
|
||||
/**
|
||||
* Comma delimited list of deployment properties to be applied to the
|
||||
* TaskLaunchRequest.
|
||||
*/
|
||||
private String deploymentProperties = "";
|
||||
|
||||
/**
|
||||
* The Data Flow task name.
|
||||
*/
|
||||
private String taskName;
|
||||
|
||||
|
||||
/**
|
||||
* A SpEL expression to extract the task name from each Message, using the Message as the evaluation context.
|
||||
*/
|
||||
private String taskNameExpression;
|
||||
|
||||
@NotNull
|
||||
public List<String> getArgs() {
|
||||
return this.args;
|
||||
}
|
||||
|
||||
public void setArgs(List<String> args) {
|
||||
this.args = new ArrayList<>(args);
|
||||
}
|
||||
|
||||
@NotNull
|
||||
public String getDeploymentProperties() {
|
||||
return this.deploymentProperties;
|
||||
}
|
||||
|
||||
public void setDeploymentProperties(String deploymentProperties) {
|
||||
this.deploymentProperties = deploymentProperties;
|
||||
}
|
||||
|
||||
public String getTaskName() {
|
||||
return taskName;
|
||||
}
|
||||
|
||||
public void setTaskName(String taskName) {
|
||||
this.taskName = taskName;
|
||||
}
|
||||
|
||||
public String getTaskNameExpression() {
|
||||
return taskNameExpression;
|
||||
}
|
||||
|
||||
public void setTaskNameExpression(String taskNameExpression) {
|
||||
this.taskNameExpression = taskNameExpression;
|
||||
}
|
||||
|
||||
public String getArgExpressions() {
|
||||
return argExpressions;
|
||||
}
|
||||
|
||||
public void setArgExpressions(String argExpressions) {
|
||||
this.argExpressions = argExpressions;
|
||||
}
|
||||
|
||||
@AssertFalse(message = "Cannot specify both 'taskName' and 'taskNameExpression'.")
|
||||
public boolean isTaskNameAndTaskNameExpressionSet() {
|
||||
return StringUtils.hasText(this.taskName) && StringUtils.hasText(this.taskNameExpression);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,68 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.MessageHeaders;
|
||||
import org.springframework.messaging.core.MessagePostProcessor;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.util.Assert;
|
||||
import org.springframework.util.MimeTypeUtils;
|
||||
import org.springframework.util.StringUtils;
|
||||
|
||||
class TaskLaunchRequestMessageProcessor implements MessagePostProcessor {
|
||||
|
||||
private final TaskNameMessageMapper taskNameMessageMapper;
|
||||
|
||||
private final CommandLineArgumentsMessageMapper commandLineArgumentsMessageMapper;
|
||||
|
||||
private final TaskLaunchRequestSupplier taskLaunchRequestInitializer;
|
||||
|
||||
TaskLaunchRequestMessageProcessor(TaskLaunchRequestSupplier taskLaunchRequestInitializer,
|
||||
TaskNameMessageMapper taskNameMessageMapper,
|
||||
CommandLineArgumentsMessageMapper commandLIneArgumentsMessageMapper) {
|
||||
|
||||
this.taskLaunchRequestInitializer = taskLaunchRequestInitializer;
|
||||
|
||||
this.taskNameMessageMapper = taskNameMessageMapper;
|
||||
|
||||
this.commandLineArgumentsMessageMapper = commandLIneArgumentsMessageMapper;
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public Message<TaskLaunchRequest> postProcessMessage(Message<?> message) {
|
||||
TaskLaunchRequest taskLaunchRequest = taskLaunchRequestInitializer.get();
|
||||
|
||||
if (!StringUtils.hasText(taskLaunchRequest.getTaskName())) {
|
||||
taskLaunchRequest.setTaskName(taskNameMessageMapper.processMessage(message));
|
||||
Assert.hasText(taskLaunchRequest.getTaskName(),
|
||||
() -> "'taskName' is required in " + TaskLaunchRequest.class.getName());
|
||||
}
|
||||
|
||||
taskLaunchRequest.addCommmandLineArguments(commandLineArgumentsMessageMapper.processMessage(message));
|
||||
|
||||
MessageBuilder<TaskLaunchRequest> builder = MessageBuilder.withPayload(taskLaunchRequest)
|
||||
.copyHeaders(message.getHeaders());
|
||||
return adjustHeaders(builder).build();
|
||||
}
|
||||
|
||||
private MessageBuilder<TaskLaunchRequest> adjustHeaders(MessageBuilder<TaskLaunchRequest> builder) {
|
||||
builder.setHeader(MessageHeaders.CONTENT_TYPE, MimeTypeUtils.APPLICATION_JSON);
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,67 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.function.Supplier;
|
||||
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
class TaskLaunchRequestSupplier implements Supplier<TaskLaunchRequest> {
|
||||
|
||||
private Supplier<String> taskNameSupplier;
|
||||
|
||||
private Supplier<List<String>> commandLineArgumentsSupplier;
|
||||
|
||||
private Supplier<Map<String, String>> deploymentPropertiesSupplier;
|
||||
|
||||
public TaskLaunchRequestSupplier taskNameSupplier(Supplier<String> taskNameSupplier) {
|
||||
this.taskNameSupplier = taskNameSupplier;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TaskLaunchRequestSupplier commandLineArgumentSupplier(Supplier<List<String>> commandLineArgumentsSupplier) {
|
||||
this.commandLineArgumentsSupplier = commandLineArgumentsSupplier;
|
||||
return this;
|
||||
}
|
||||
|
||||
public TaskLaunchRequestSupplier deploymentPropertiesSupplier(
|
||||
Supplier<Map<String, String>> deploymentPropertiesSupplier) {
|
||||
this.deploymentPropertiesSupplier = deploymentPropertiesSupplier;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public TaskLaunchRequest get() {
|
||||
|
||||
Assert.notNull(this.taskNameSupplier, "'taskNameSupplier' is required.");
|
||||
|
||||
TaskLaunchRequest taskLaunchRequest = new TaskLaunchRequest();
|
||||
taskLaunchRequest.setTaskName(this.taskNameSupplier.get());
|
||||
|
||||
if (this.commandLineArgumentsSupplier != null) {
|
||||
taskLaunchRequest.setCommandlineArguments(this.commandLineArgumentsSupplier.get());
|
||||
}
|
||||
|
||||
if (this.deploymentPropertiesSupplier != null) {
|
||||
taskLaunchRequest.setDeploymentProperties(this.deploymentPropertiesSupplier.get());
|
||||
}
|
||||
|
||||
return taskLaunchRequest;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,23 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import org.springframework.integration.handler.MessageProcessor;
|
||||
|
||||
@FunctionalInterface
|
||||
public interface TaskNameMessageMapper extends MessageProcessor<String> {
|
||||
}
|
||||
@@ -0,0 +1,97 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static junit.framework.TestCase.assertEquals;
|
||||
import static junit.framework.TestCase.assertTrue;
|
||||
|
||||
/**
|
||||
* @author Chris Schaefer
|
||||
* @author David Turanski
|
||||
*/
|
||||
public class KeyValueListParserTests {
|
||||
|
||||
@Test
|
||||
public void testParseSimpleDeploymentProperty() {
|
||||
Map<String, String> deploymentProperties = KeyValueListParser.parseCommaDelimitedKeyValuePairs(
|
||||
"app.sftp.param=value");
|
||||
assertTrue("Invalid number of deployment properties: " + deploymentProperties.size(),
|
||||
deploymentProperties.size() == 1);
|
||||
assertTrue("Expected deployment key not found", deploymentProperties.containsKey("app.sftp.param"));
|
||||
assertEquals("Invalid deployment value", "value", deploymentProperties.get("app.sftp.param"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseSimpleDeploymentPropertyMultipleValues() {
|
||||
Map<String, String> deploymentProperties = KeyValueListParser.parseCommaDelimitedKeyValuePairs(
|
||||
"app.sftp.param=value1,value2,value3");
|
||||
|
||||
assertTrue("Invalid number of deployment properties: " + deploymentProperties.size(),
|
||||
deploymentProperties.size() == 1);
|
||||
assertTrue("Expected deployment key not found", deploymentProperties.containsKey("app.sftp.param"));
|
||||
assertEquals("Invalid deployment value", "value1,value2,value3", deploymentProperties.get("app.sftp.param"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseSpelExpressionMultipleValues() {
|
||||
Map<String, String> argExpressions = KeyValueListParser.parseCommaDelimitedKeyValuePairs(
|
||||
"arg1=payload.substr(0,2),arg2=headers['foo'],arg3=headers['bar']==false");
|
||||
|
||||
assertTrue("Invalid number of deployment properties: " + argExpressions.size(),
|
||||
argExpressions.size() == 3);
|
||||
assertTrue("Expected deployment key not found", argExpressions.containsKey("arg1"));
|
||||
assertEquals("Invalid deployment value", "payload.substr(0,2)", argExpressions.get("arg1"));
|
||||
|
||||
assertTrue("Expected deployment key not found", argExpressions.containsKey("arg2"));
|
||||
assertEquals("Invalid deployment value", "headers['foo']", argExpressions.get("arg2"));
|
||||
|
||||
assertTrue("Expected deployment key not found", argExpressions.containsKey("arg3"));
|
||||
assertEquals("Invalid deployment value", "headers['bar']==false", argExpressions.get("arg3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseMultipleDeploymentPropertiesSingleValue() {
|
||||
Map<String, String> deploymentProperties = KeyValueListParser.parseCommaDelimitedKeyValuePairs(
|
||||
"app.sftp.param=value1,app.sftp.other.param=value2");
|
||||
|
||||
assertTrue("Invalid number of deployment properties: " + deploymentProperties.size(),
|
||||
deploymentProperties.size() == 2);
|
||||
assertTrue("Expected deployment key not found", deploymentProperties.containsKey("app.sftp.param"));
|
||||
assertEquals("Invalid deployment value", "value1", deploymentProperties.get("app.sftp.param"));
|
||||
assertTrue("Expected deployment key not found", deploymentProperties.containsKey("app.sftp.other.param"));
|
||||
assertEquals("Invalid deployment value", "value2", deploymentProperties.get("app.sftp.other.param"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testParseMultipleDeploymentPropertiesMultipleValues() {
|
||||
TaskLaunchRequestFunctionProperties taskLaunchRequestProperties = new TaskLaunchRequestFunctionProperties();
|
||||
|
||||
Map<String, String> deploymentProperties = KeyValueListParser.parseCommaDelimitedKeyValuePairs(
|
||||
"app.sftp.param=value1,value2,app.sftp.other.param=other1,other2");
|
||||
|
||||
assertTrue("Invalid number of deployment properties: " + deploymentProperties.size(),
|
||||
deploymentProperties.size() == 2);
|
||||
assertTrue("Expected deployment key not found", deploymentProperties.containsKey("app.sftp.param"));
|
||||
assertEquals("Invalid deployment value", "value1,value2", deploymentProperties.get("app.sftp.param"));
|
||||
assertTrue("Expected deployment key not found", deploymentProperties.containsKey("app.sftp.other.param"));
|
||||
assertEquals("Invalid deployment value", "other1,other2", deploymentProperties.get("app.sftp.other.param"));
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,211 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.boot.WebApplicationType;
|
||||
import org.springframework.boot.autoconfigure.SpringBootApplication;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.boot.builder.SpringApplicationBuilder;
|
||||
import org.springframework.context.ApplicationContext;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.messaging.Message;
|
||||
import org.springframework.messaging.support.MessageBuilder;
|
||||
import org.springframework.test.annotation.DirtiesContext;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
import static org.assertj.core.api.Assertions.entry;
|
||||
|
||||
/**
|
||||
* @author David Turanski
|
||||
**/
|
||||
public class TaskLaunchRequestFunctionApplicationTests {
|
||||
|
||||
private SpringApplicationBuilder springApplicationBuilder;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
springApplicationBuilder = new SpringApplicationBuilder(TestApplication.class)
|
||||
.web(WebApplicationType.NONE);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DirtiesContext
|
||||
public void simpleDataflowTaskLaunchRequest() throws IOException {
|
||||
|
||||
ApplicationContext context = springApplicationBuilder.properties(
|
||||
"spring.jmx.enabled=false",
|
||||
"spring.cloud.stream.function.definition=taskLaunchRequestFunction",
|
||||
"task.launch.request.task-name=foo")
|
||||
.run();
|
||||
|
||||
TaskLaunchRequest taskLaunchRequest = verifyAndreceiveTaskLaunchRequest(context);
|
||||
|
||||
assertThat(taskLaunchRequest.getTaskName()).isEqualTo("foo");
|
||||
assertThat(taskLaunchRequest.getCommandlineArguments()).hasSize(0);
|
||||
assertThat(taskLaunchRequest.getDeploymentProperties()).hasSize(0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@DirtiesContext
|
||||
public void dataflowTaskLaunchRequestWithArgsAndDeploymentProperties() throws IOException {
|
||||
|
||||
ApplicationContext context = springApplicationBuilder.properties(
|
||||
"spring.jmx.enabled=false", "spring.cloud.stream.function.definition=taskLaunchRequestFunction",
|
||||
"task.launch.request.task-name=foo", "task.launch.request.args=foo=bar,baz=boo",
|
||||
"task.launch.request.deploymentProperties=count=3")
|
||||
.run();
|
||||
TaskLaunchRequest taskLaunchRequest = verifyAndreceiveTaskLaunchRequest(context);
|
||||
|
||||
assertThat(taskLaunchRequest.getTaskName()).isEqualTo("foo");
|
||||
assertThat(taskLaunchRequest.getCommandlineArguments()).containsExactlyInAnyOrder("foo=bar",
|
||||
"baz=boo");
|
||||
assertThat(taskLaunchRequest.getDeploymentProperties()).containsOnly(entry("count", "3"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@DirtiesContext
|
||||
public void taskLaunchRequestWithCommandLineArgsMessageMapper() throws IOException {
|
||||
|
||||
ApplicationContext context = springApplicationBuilder.properties(
|
||||
"spring.jmx.enabled=false", "spring.cloud.stream.function.definition=taskLaunchRequestFunction",
|
||||
"task.launch.request.task-name=foo", "enhanceTLRArgs=true")
|
||||
.run();
|
||||
|
||||
TaskLaunchRequest taskLaunchRequest = verifyAndreceiveTaskLaunchRequest(context);
|
||||
|
||||
assertThat(taskLaunchRequest.getTaskName()).isEqualTo("foo");
|
||||
assertThat(taskLaunchRequest.getCommandlineArguments()).hasSize(1);
|
||||
assertThat(taskLaunchRequest.getCommandlineArguments()).containsExactly("runtimeArg");
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@DirtiesContext
|
||||
public void taskLaunchRequestWithArgExpressions() throws IOException {
|
||||
ApplicationContext context = springApplicationBuilder.properties(
|
||||
"spring.jmx.enabled=false",
|
||||
"spring.cloud.stream.function.definition=taskLaunchRequestFunction",
|
||||
"task.launch.request.task-name=foo",
|
||||
"task.launch.request.arg-expressions=foo=payload.toUpperCase(),bar=payload.substring(0,2)")
|
||||
.run();
|
||||
|
||||
Message<String> message = MessageBuilder.withPayload("hello").build();
|
||||
|
||||
TaskLaunchRequestFunction taskLaunchRequestFunction = context.getBean(TaskLaunchRequestFunction.class);
|
||||
|
||||
Message<TaskLaunchRequest> response = taskLaunchRequestFunction.apply(message);
|
||||
|
||||
assertThat(response).isNotNull();
|
||||
TaskLaunchRequest request = response.getPayload();
|
||||
assertThat(request.getCommandlineArguments()).containsExactlyInAnyOrder("foo=HELLO", "bar=he");
|
||||
}
|
||||
|
||||
@Test
|
||||
@DirtiesContext
|
||||
public void taskLaunchRequestWithIntPayload() throws IOException {
|
||||
ApplicationContext context = springApplicationBuilder.properties(
|
||||
"spring.jmx.enabled=false", "spring.cloud.stream.function.definition=taskLaunchRequestFunction",
|
||||
"task.launch.request.task-name=foo",
|
||||
"task.launch.request.arg-expressions=i=payload")
|
||||
.run();
|
||||
|
||||
TaskLaunchRequestFunction taskLaunchRequestFunction = context.getBean(TaskLaunchRequestFunction.class);
|
||||
|
||||
Message<Integer> message = MessageBuilder.withPayload(123).build();
|
||||
|
||||
Message<TaskLaunchRequest> response = taskLaunchRequestFunction.apply(message);
|
||||
|
||||
assertThat(response).isNotNull();
|
||||
|
||||
TaskLaunchRequest request = response.getPayload();
|
||||
assertThat(request.getCommandlineArguments()).containsExactly("i=123");
|
||||
}
|
||||
|
||||
@Test
|
||||
@DirtiesContext
|
||||
public void taskNameExpression() throws IOException {
|
||||
ApplicationContext context = springApplicationBuilder.properties(
|
||||
"spring.jmx.enabled=false", "spring.cloud.stream.function.definition=taskLaunchRequestFunction",
|
||||
"task.launch.request.task-name-expression=payload+'_task'")
|
||||
.run();
|
||||
|
||||
TaskLaunchRequestFunction taskLaunchRequestFunction = context.getBean(TaskLaunchRequestFunction.class);
|
||||
|
||||
Message<?> message = MessageBuilder.withPayload("foo").build();
|
||||
|
||||
Message<TaskLaunchRequest> response = taskLaunchRequestFunction.apply(message);
|
||||
assertThat(response).isNotNull();
|
||||
|
||||
TaskLaunchRequest request = response.getPayload();
|
||||
assertThat(request.getTaskName()).isEqualTo("foo_task");
|
||||
}
|
||||
|
||||
@Test
|
||||
@DirtiesContext
|
||||
public void customTaskNameExtractor() throws IOException {
|
||||
ApplicationContext context = springApplicationBuilder.properties(
|
||||
"spring.jmx.enabled=false", "spring.cloud.stream.function.definition=taskLaunchRequestFunction",
|
||||
"customTaskNameExtractor=true")
|
||||
.run();
|
||||
TaskLaunchRequestFunction taskLaunchRequestFunction = context.getBean(TaskLaunchRequestFunction.class);
|
||||
|
||||
Message<String> message = MessageBuilder.withPayload("foo").build();
|
||||
|
||||
Message<TaskLaunchRequest> response = taskLaunchRequestFunction.apply(message);
|
||||
assertThat(response).isNotNull();
|
||||
|
||||
TaskLaunchRequest request = response.getPayload();
|
||||
assertThat(request.getTaskName()).isEqualTo("fooTask");
|
||||
|
||||
message = MessageBuilder.withPayload("bar").build();
|
||||
response = taskLaunchRequestFunction.apply(message);
|
||||
request = response.getPayload();
|
||||
|
||||
assertThat(request.getTaskName()).isEqualTo("defaultTask");
|
||||
}
|
||||
|
||||
private TaskLaunchRequest verifyAndreceiveTaskLaunchRequest(ApplicationContext context)
|
||||
throws IOException {
|
||||
TaskLaunchRequestFunction taskLaunchRequestFunction = context.getBean(TaskLaunchRequestFunction.class);
|
||||
Message<TaskLaunchRequest> message = taskLaunchRequestFunction
|
||||
.apply(MessageBuilder.withPayload(new byte[] {}).build());
|
||||
assertThat(message).isNotNull();
|
||||
return message.getPayload();
|
||||
}
|
||||
|
||||
@SpringBootApplication
|
||||
protected static class TestApplication {
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty("customTaskNameExtractor")
|
||||
TaskNameMessageMapper taskNameExtractor() {
|
||||
return message -> ((String) (message.getPayload())).equalsIgnoreCase("foo") ? "fooTask" : "defaultTask";
|
||||
}
|
||||
|
||||
@Bean
|
||||
@ConditionalOnProperty("enhanceTLRArgs")
|
||||
CommandLineArgumentsMessageMapper commandLineArgumentsProvider() {
|
||||
return message -> Collections.singletonList("runtimeArg");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,76 @@
|
||||
/*
|
||||
* Copyright 2020-2020 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* https://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.cloud.fn.task.launch.request;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
||||
import org.springframework.boot.test.util.TestPropertyValues;
|
||||
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.integration.config.EnableIntegration;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
/**
|
||||
* @author David Turanski
|
||||
**/
|
||||
public class TaskLaunchRequestFunctionPropertiesTests {
|
||||
|
||||
@Test
|
||||
public void deploymentPropertiesCanBeCustomized() {
|
||||
TaskLaunchRequestFunctionProperties properties = getBatchProperties(
|
||||
"task.launch.request.deploymentProperties:prop1=val1,prop2=val2");
|
||||
assertThat(properties.getDeploymentProperties()).isEqualTo("prop1=val1,prop2=val2");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void parametersCanBeCustomized() {
|
||||
TaskLaunchRequestFunctionProperties properties = getBatchProperties(
|
||||
"task.launch.request.args:jp1=jpv1,jp2=jpv2");
|
||||
List<String> args = properties.getArgs();
|
||||
|
||||
assertThat(args).isNotNull();
|
||||
assertThat(args).hasSize(2);
|
||||
assertThat(args.get(0)).isEqualTo("jp1=jpv1");
|
||||
assertThat(args.get(1)).isEqualTo("jp2=jpv2");
|
||||
}
|
||||
|
||||
private TaskLaunchRequestFunctionProperties getBatchProperties(String... var) {
|
||||
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext();
|
||||
|
||||
if (var != null) {
|
||||
TestPropertyValues.of(var).applyTo(context);
|
||||
}
|
||||
|
||||
context.register(Conf.class);
|
||||
context.refresh();
|
||||
|
||||
return context.getBean(TaskLaunchRequestFunctionProperties.class);
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@EnableIntegration
|
||||
@EnableConfigurationProperties(TaskLaunchRequestFunctionProperties.class)
|
||||
@Import(TaskLaunchRequestFunctionConfiguration.class)
|
||||
static class Conf {
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user