From 2c6f71d2750c294daa796ad6063f3f5a05ab125f Mon Sep 17 00:00:00 2001 From: Halvdan Hoem Grelland Date: Sun, 18 Mar 2018 13:42:44 +0100 Subject: [PATCH] Add support for aggregated Kinesis records in SpringBootKinesisEventHandler The current implementation of SpringBootKinesisEventHandler only handles non-aggregated events. Attempting to process aggregated events caused ungraceful failure. This commit fixes that by de-aggregating any events before performing conversion. Non-aggregated events are still handled transparently. In addition, detection of Message input is performed, and output messages are wrapped accordingly. --- .../spring-cloud-function-adapter-aws/pom.xml | 30 +++ .../aws/SpringBootKinesisEventHandler.java | 70 +++++- .../SpringBootKinesisEventHandlerTests.java | 226 ++++++++++++++++++ 3 files changed, 319 insertions(+), 7 deletions(-) create mode 100644 spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandlerTests.java diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml index 8bc840369..ba25a7ea4 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/pom.xml @@ -21,6 +21,9 @@ UTF-8 1.8 2.0.2 + 1.11.163 + 1.0.3 + 1.0.3 @@ -52,6 +55,16 @@ ${aws-lambda-events.version} provided + + com.amazonaws + aws-java-sdk-kinesis + provided + + + com.amazonaws + amazon-kinesis-deaggregator + ${aws-kinesis-deaggregator.version} + io.projectreactor reactor-core @@ -61,7 +74,24 @@ spring-boot-starter-test test + + com.amazonaws + amazon-kinesis-aggregator + ${aws-kinesis-aggregator.version} + test + + + + + com.amazonaws + aws-java-sdk-bom + ${aws-java-sdk.version} + pom + import + + + diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandler.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandler.java index 8e3638a24..c6771c266 100644 --- a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandler.java +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/main/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandler.java @@ -16,16 +16,34 @@ package org.springframework.cloud.function.adapter.aws; -import java.util.List; +import static java.util.stream.Collectors.toList; +import java.util.List; +import java.util.stream.Collectors; + +import com.amazonaws.kinesis.deagg.RecordDeaggregator; +import com.amazonaws.services.kinesis.clientlibrary.types.UserRecord; +import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; -import com.amazonaws.services.lambda.runtime.events.KinesisEvent.KinesisEventRecord; +import com.fasterxml.jackson.databind.ObjectMapper; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.function.context.catalog.FunctionInspector; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.GenericMessage; /** * @author Mark Fisher + * @author Halvdan Hoem Grelland */ -public class SpringBootKinesisEventHandler - extends SpringBootRequestHandler { +public class SpringBootKinesisEventHandler + extends SpringBootRequestHandler { + + @Autowired + private ObjectMapper mapper; + + @Autowired + private FunctionInspector inspector; public SpringBootKinesisEventHandler() { super(); @@ -35,9 +53,47 @@ public class SpringBootKinesisEventHandler super(configurationClass); } + @SuppressWarnings("unchecked") @Override - protected List convertEvent(KinesisEvent event) { - // TODO: maybe convert to List - return event.getRecords(); + public List handleRequest(KinesisEvent event, Context context) { + return (List) super.handleRequest(event, context); + } + + @Override + protected Object convertEvent(KinesisEvent event) { + List payloads = deserializePayloads(event.getRecords()); + + if (functionAcceptsMessage()) { + return wrapInMessages(payloads); + } else { + return payloads; + } + } + + private List> wrapInMessages(List payloads) { + return payloads.stream() + .map(GenericMessage::new) + .collect(Collectors.toList()); + } + + private List deserializePayloads(List records) { + return RecordDeaggregator.deaggregate(records).stream() + .map(this::deserializeUserRecord) + .collect(toList()); + } + + @SuppressWarnings("unchecked") + private E deserializeUserRecord(UserRecord userRecord) { + try { + byte[] jsonBytes = userRecord.getData().array(); + return (E) mapper.readValue(jsonBytes, getInputType()); + } + catch (Exception e) { + throw new IllegalStateException("Cannot convert event", e); + } + } + + private boolean functionAcceptsMessage() { + return inspector.isMessage(function()); } } diff --git a/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandlerTests.java b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandlerTests.java new file mode 100644 index 000000000..15f8e38f1 --- /dev/null +++ b/spring-cloud-function-adapters/spring-cloud-function-adapter-aws/src/test/java/org/springframework/cloud/function/adapter/aws/SpringBootKinesisEventHandlerTests.java @@ -0,0 +1,226 @@ +/* + * Copyright 2017 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.function.adapter.aws; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.function.Function; + +import com.amazonaws.kinesis.agg.AggRecord; +import com.amazonaws.kinesis.agg.RecordAggregator; +import com.amazonaws.services.lambda.runtime.events.KinesisEvent; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.Test; + +import org.springframework.boot.autoconfigure.jackson.JacksonAutoConfiguration; +import org.springframework.cloud.function.context.config.ContextFunctionCatalogAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Import; +import org.springframework.messaging.Message; + +/** + * @author Halvdan Hoem Grelland + */ +public class SpringBootKinesisEventHandlerTests { + + private static final ObjectMapper mapper = new ObjectMapper(); + + private SpringBootKinesisEventHandler handler; + + @Test + public void functionBeanHandlesKinesisEvent() throws Exception { + handler = new SpringBootKinesisEventHandler<>(FunctionConfig.class); + handler.initialize(); + + KinesisEvent event = asKinesisEvent(singletonList(new Foo("foo"))); + + List output = handler.handleRequest(event, null); + + assertThat(output).containsExactly(new Bar("FOO")); + } + + @Test + public void functionBeanHandlesAggregatedKinesisEvent() throws Exception { + handler = new SpringBootKinesisEventHandler<>(FunctionConfig.class); + handler.initialize(); + + List events = asList(new Foo("foo"), new Foo("bar"), new Foo("baz")); + KinesisEvent aggregatedEvent = asAggregatedKinesisEvent(events); + + List output = handler.handleRequest(aggregatedEvent, null); + + assertThat(output) + .containsExactly(new Bar("FOO"), new Bar("BAR"), new Bar("BAZ")); + } + + @Test + public void functionMessageBean() throws Exception { + handler = new SpringBootKinesisEventHandler<>(FunctionMessageConfig.class); + handler.initialize(); + + KinesisEvent event = asKinesisEvent(asList(new Foo("foo"), new Foo("bar"))); + + List output = handler.handleRequest(event, null); + + assertThat(output) + .containsExactly(new Bar("FOO"), new Bar("BAR")); + } + + @Configuration + @Import({ + ContextFunctionCatalogAutoConfiguration.class, + JacksonAutoConfiguration.class + }) + protected static class FunctionConfig { + @Bean + public Function function() { + return foo -> new Bar(foo.getValue().toUpperCase()); + } + } + + @Configuration + @Import({ + ContextFunctionCatalogAutoConfiguration.class, + JacksonAutoConfiguration.class + }) + protected static class FunctionMessageConfig { + @Bean + public Function, Bar> function() { + return foo -> new Bar(foo.getPayload().getValue().toUpperCase()); + } + } + + protected static class Foo { + + private String value; + + public Foo() { + } + + public Foo(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + } + + protected static class Bar { + + private String value; + + public Bar() { + } + + public Bar(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Bar bar = (Bar) o; + return Objects.equals(value, bar.value); + } + + @Override + public int hashCode() { + return Objects.hash(value); + } + } + + private static KinesisEvent asKinesisEvent(List payloads) { + KinesisEvent kinesisEvent = new KinesisEvent(); + + List kinesisEventRecords = new ArrayList<>(); + + for (Object payload : payloads) { + KinesisEvent.Record record = new KinesisEvent.Record(); + record.setData(asJsonByteBuffer(payload)); + + KinesisEvent.KinesisEventRecord kinesisEventRecord = + new KinesisEvent.KinesisEventRecord(); + kinesisEventRecord.setKinesis(record); + + kinesisEventRecords.add(kinesisEventRecord); + } + + kinesisEvent.setRecords(kinesisEventRecords); + + return kinesisEvent; + } + + private static KinesisEvent asAggregatedKinesisEvent(List payloads) { + RecordAggregator aggregator = new RecordAggregator(); + + payloads.stream() + .map(SpringBootKinesisEventHandlerTests::asJsonByteBuffer) + .forEach(buffer -> { + try { + aggregator.addUserRecord("fakePartitionKey", buffer.array()); + } catch (Exception e) { + fail("Creating aggregated record failed"); + } + }); + + AggRecord aggRecord = aggregator.clearAndGet(); + + KinesisEvent.Record record = new KinesisEvent.Record(); + record.setData(ByteBuffer.wrap(aggRecord.toRecordBytes())); + + KinesisEvent.KinesisEventRecord wrappingRecord = new KinesisEvent.KinesisEventRecord(); + wrappingRecord.setKinesis(record); + wrappingRecord.setEventVersion("1.0"); + + KinesisEvent event = new KinesisEvent(); + event.setRecords(singletonList(wrappingRecord)); + + return event; + } + + private static ByteBuffer asJsonByteBuffer(Object object) { + try { + return ByteBuffer.wrap(mapper.writeValueAsBytes(object)); + } catch (JsonProcessingException e) { + fail("Setting up test data failed", e); + throw new RuntimeException(e); + } + } +}