diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml
index 8bc840369..ba25a7ea4 100644
--- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml
+++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml
@@ -21,6 +21,9 @@
UTF-8
1.8
2.0.2
+ 1.11.163
+ 1.0.3
+ 1.0.3
@@ -52,6 +55,16 @@
${aws-lambda-events.version}
provided
+
+ com.amazonaws
+ aws-java-sdk-kinesis
+ provided
+
+
+ com.amazonaws
+ amazon-kinesis-deaggregator
+ ${aws-kinesis-deaggregator.version}
+
io.projectreactor
reactor-core
@@ -61,7 +74,24 @@
spring-boot-starter-test
test
+
+ com.amazonaws
+ amazon-kinesis-aggregator
+ ${aws-kinesis-aggregator.version}
+ test
+
+
+
+
+ com.amazonaws
+ aws-java-sdk-bom
+ ${aws-java-sdk.version}
+ pom
+ import
+
+
+
diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandler.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandler.java
index 8e3638a24..c6771c266 100644
--- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandler.java
+++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandler.java
@@ -16,16 +16,34 @@
package org.springframework.cloud.function.adapter.aws;
-import java.util.List;
+import static java.util.stream.Collectors.toList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import com.amazonaws.kinesis.deagg.RecordDeaggregator;
+import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord;
+import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
-import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.cloud.function.context.catalog.FunctionInspector;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.support.GenericMessage;
/**
* @author Mark Fisher
+ * @author Halvdan Hoem Grelland
*/
-public class SpringBootKinesisEventHandler
- extends SpringBootRequestHandler {
+public class SpringBootKinesisEventHandler
+ extends SpringBootRequestHandler {
+
+ @Autowired
+ private ObjectMapper mapper;
+
+ @Autowired
+ private FunctionInspector inspector;
public SpringBootKinesisEventHandler() {
super();
@@ -35,9 +53,47 @@ public class SpringBootKinesisEventHandler
super(configurationClass);
}
+ @SuppressWarnings("unchecked")
@Override
- protected List convertEvent(KinesisEvent event) {
- // TODO: maybe convert to List
- return event.getRecords();
+ public List handleRequest(KinesisEvent event, Context context) {
+ return (List) super.handleRequest(event, context);
+ }
+
+ @Override
+ protected Object convertEvent(KinesisEvent event) {
+ List payloads = deserializePayloads(event.getRecords());
+
+ if (functionAcceptsMessage()) {
+ return wrapInMessages(payloads);
+ } else {
+ return payloads;
+ }
+ }
+
+ private List> wrapInMessages(List payloads) {
+ return payloads.stream()
+ .map(GenericMessage::new)
+ .collect(Collectors.toList());
+ }
+
+ private List deserializePayloads(List records) {
+ return RecordDeaggregator.deaggregate(records).stream()
+ .map(this::deserializeUserRecord)
+ .collect(toList());
+ }
+
+ @SuppressWarnings("unchecked")
+ private E deserializeUserRecord(UserRecord userRecord) {
+ try {
+ byte[] jsonBytes = userRecord.getData().array();
+ return (E) mapper.readValue(jsonBytes, getInputType());
+ }
+ catch (Exception e) {
+ throw new IllegalStateException("Cannot convert event", e);
+ }
+ }
+
+ private boolean functionAcceptsMessage() {
+ return inspector.isMessage(function());
}
}
diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandlerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandlerTests.java
new file mode 100644
index 000000000..15f8e38f1
--- /dev/null
+++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandlerTests.java
@@ -0,0 +1,226 @@
+/*
+ * Copyright 2017 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.function.adapter.aws;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.fail;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+
+import com.amazonaws.kinesis.agg.AggRecord;
+import com.amazonaws.kinesis.agg.RecordAggregator;
+import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Test;
+
+import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration;
+import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Import;
+import org.springframework.messaging.Message;
+
+/**
+ * @author Halvdan Hoem Grelland
+ */
+public class SpringBootKinesisEventHandlerTests {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private SpringBootKinesisEventHandler handler;
+
+ @Test
+ public void functionBeanHandlesKinesisEvent() throws Exception {
+ handler = new SpringBootKinesisEventHandler<>(FunctionConfig.class);
+ handler.initialize();
+
+ KinesisEvent event = asKinesisEvent(singletonList(new Foo("foo")));
+
+ List output = handler.handleRequest(event, null);
+
+ assertThat(output).containsExactly(new Bar("FOO"));
+ }
+
+ @Test
+ public void functionBeanHandlesAggregatedKinesisEvent() throws Exception {
+ handler = new SpringBootKinesisEventHandler<>(FunctionConfig.class);
+ handler.initialize();
+
+ List events = asList(new Foo("foo"), new Foo("bar"), new Foo("baz"));
+ KinesisEvent aggregatedEvent = asAggregatedKinesisEvent(events);
+
+ List output = handler.handleRequest(aggregatedEvent, null);
+
+ assertThat(output)
+ .containsExactly(new Bar("FOO"), new Bar("BAR"), new Bar("BAZ"));
+ }
+
+ @Test
+ public void functionMessageBean() throws Exception {
+ handler = new SpringBootKinesisEventHandler<>(FunctionMessageConfig.class);
+ handler.initialize();
+
+ KinesisEvent event = asKinesisEvent(asList(new Foo("foo"), new Foo("bar")));
+
+ List output = handler.handleRequest(event, null);
+
+ assertThat(output)
+ .containsExactly(new Bar("FOO"), new Bar("BAR"));
+ }
+
+ @Configuration
+ @Import({
+ ContextFunctionCatalogAutoConfiguration.class,
+ JacksonAutoConfiguration.class
+ })
+ protected static class FunctionConfig {
+ @Bean
+ public Function function() {
+ return foo -> new Bar(foo.getValue().toUpperCase());
+ }
+ }
+
+ @Configuration
+ @Import({
+ ContextFunctionCatalogAutoConfiguration.class,
+ JacksonAutoConfiguration.class
+ })
+ protected static class FunctionMessageConfig {
+ @Bean
+ public Function, Bar> function() {
+ return foo -> new Bar(foo.getPayload().getValue().toUpperCase());
+ }
+ }
+
+ protected static class Foo {
+
+ private String value;
+
+ public Foo() {
+ }
+
+ public Foo(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+ }
+
+ protected static class Bar {
+
+ private String value;
+
+ public Bar() {
+ }
+
+ public Bar(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Bar bar = (Bar) o;
+ return Objects.equals(value, bar.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(value);
+ }
+ }
+
+ private static KinesisEvent asKinesisEvent(List