@@ -30,7 +30,6 @@ import reactor.core.publisher.Flux;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.cloud.function.context.AbstractSpringFunctionAdapterInitializer;
|
||||
|
||||
|
||||
/**
|
||||
* @author Dave Syer
|
||||
* @author Oleg Zhurakousky
|
||||
@@ -50,15 +49,13 @@ public class SpringBootStreamHandler extends AbstractSpringFunctionAdapterInitia
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleRequest(InputStream input, OutputStream output, Context context)
|
||||
throws IOException {
|
||||
public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException {
|
||||
initialize(context);
|
||||
Object value = convertStream(input);
|
||||
Publisher<?> flux = apply(extract(value));
|
||||
this.mapper.writeValue(output, result(value, flux));
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
protected void initialize(Context context) {
|
||||
super.initialize(context);
|
||||
@@ -74,13 +71,22 @@ public class SpringBootStreamHandler extends AbstractSpringFunctionAdapterInitia
|
||||
return Flux.just(input);
|
||||
}
|
||||
|
||||
/*
|
||||
* Will convert to POJOP or generic map unless user
|
||||
* explicitly requests InputStream (e.g., Function<InputStream, ?>).
|
||||
*/
|
||||
private Object convertStream(InputStream input) {
|
||||
Object convertedResult = input;
|
||||
try {
|
||||
return this.mapper.readValue(input, getInputType());
|
||||
Class<?> inputType = getInputType();
|
||||
if (!InputStream.class.isAssignableFrom(inputType)) {
|
||||
convertedResult = this.mapper.readValue(input, inputType);
|
||||
}
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IllegalStateException("Cannot convert event", e);
|
||||
throw new IllegalStateException("Cannot convert event stream", e);
|
||||
}
|
||||
return convertedResult;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -18,16 +18,21 @@ package org.springframework.cloud.function.adapter.aws;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.InputStream;
|
||||
import java.util.Map;
|
||||
import java.util.function.Function;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import org.junit.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
|
||||
import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.context.annotation.Import;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
@@ -79,6 +84,26 @@ public class SpringBootStreamHandlerTests {
|
||||
assertThat(output.toString()).isEqualTo("{\"value\":\"FOO\"}");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void typelessFunctionConfig() throws Exception {
|
||||
this.handler = new SpringBootStreamHandler(TypelessFunctionConfig.class);
|
||||
this.handler.initialize(null);
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
this.handler.handleRequest(
|
||||
new ByteArrayInputStream("{\"value\":\"foo\"}".getBytes()), output, null);
|
||||
assertThat(output.toString()).isEqualTo("{\"value\":\"foo\"}");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void inputStreamFunctionConfig() throws Exception {
|
||||
this.handler = new SpringBootStreamHandler(InputStreamFunctionConfig.class);
|
||||
this.handler.initialize(null);
|
||||
ByteArrayOutputStream output = new ByteArrayOutputStream();
|
||||
this.handler.handleRequest(
|
||||
new ByteArrayInputStream("{\"value\":\"foo\"}".getBytes()), output, null);
|
||||
assertThat(output.toString()).isEqualTo("{\"value\":\"FOO\"}");
|
||||
}
|
||||
|
||||
@Configuration
|
||||
protected static class NoCatalogNonFluxFunctionConfig {
|
||||
|
||||
@@ -122,6 +147,44 @@ public class SpringBootStreamHandlerTests {
|
||||
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@Import({ ContextFunctionCatalogAutoConfiguration.class,
|
||||
JacksonAutoConfiguration.class })
|
||||
protected static class TypelessFunctionConfig {
|
||||
|
||||
@Bean
|
||||
public Function<?, ?> function() {
|
||||
return value -> {
|
||||
Assert.isTrue(value instanceof Map, "Expected value should be Map");
|
||||
return value;
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Configuration
|
||||
@Import({ ContextFunctionCatalogAutoConfiguration.class,
|
||||
JacksonAutoConfiguration.class })
|
||||
protected static class InputStreamFunctionConfig {
|
||||
|
||||
@Autowired
|
||||
private ObjectMapper mapper;
|
||||
|
||||
@Bean
|
||||
public Function<InputStream, ?> function() {
|
||||
return value -> {
|
||||
try {
|
||||
Foo foo = this.mapper.readValue((InputStream) value, Foo.class);
|
||||
return new Bar(foo.getValue().toUpperCase());
|
||||
}
|
||||
catch (Exception e) {
|
||||
throw new IllegalStateException("Failed test", e);
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
protected static class Foo {
|
||||
|
||||
private String value;
|
||||
|
||||
Reference in New Issue
Block a user