diff --git a/spring-cloud-function-deployer/src/it/bootapp-multi/src/main/java/function/example/RepeaterApplication.java b/spring-cloud-function-deployer/src/it/bootapp-multi/src/main/java/function/example/RepeaterApplication.java index 026a47a78..73f4d9c94 100644 --- a/spring-cloud-function-deployer/src/it/bootapp-multi/src/main/java/function/example/RepeaterApplication.java +++ b/spring-cloud-function-deployer/src/it/bootapp-multi/src/main/java/function/example/RepeaterApplication.java @@ -11,14 +11,70 @@ import java.util.function.Function; @SpringBootApplication public class RepeaterApplication { - @Bean - public Function, Flux>, - Tuple2, Flux> - > fn() { - return new MyFn(); - } + @Bean + public Function, Flux>, Flux> fn() { + return tuple -> { + Flux cartEventStream = tuple.getT1(); + Flux checkoutEventStream = tuple.getT2(); + + return Flux.zip(cartEventStream, checkoutEventStream, (cartEvent, checkoutEvent) -> { + OrderEvent oe = new OrderEvent(); + oe.setOrderEvent(cartEvent.toString() + "- " + checkoutEvent.toString()); + return oe; + }); + }; + } public static void main(String[] args) { SpringApplication.run(RepeaterApplication.class, args); } + + public static class CartEvent { + private String carEvent; + + public String getCarEvent() { + return carEvent; + } + + public void setCarEvent(String carEvent) { + this.carEvent = carEvent; + } + + public String toString() { + return "CartEvent: " + carEvent; + } + } + + public static class CheckoutEvent { + private String checkoutEvent; + + public String getCheckoutEvent() { + return checkoutEvent; + } + + public void setCheckoutEvent(String checkoutEvent) { + this.checkoutEvent = checkoutEvent; + } + + public String toString() { + return "CheckoutEvent: " + checkoutEvent; + } + } + + public static class OrderEvent { + private String orderEvent; + + public String getOrderEvent() { + return orderEvent; + } + + public void setOrderEvent(String orderEvent) { + this.orderEvent = orderEvent; + } + + public String toString() { + return "OrderEvent: " + orderEvent; + } + } + } diff --git a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java index 7a7418db1..6a0000a05 100644 --- a/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java +++ b/spring-cloud-function-deployer/src/test/java/org/springframework/cloud/function/deployer/FunctionDeployerTests.java @@ -262,32 +262,18 @@ public class FunctionDeployerTests { ApplicationContext context = SpringApplication.run(DeployerApplication.class, args); FunctionCatalog catalog = context.getBean(FunctionCatalog.class); - Function>, Flux>>, Tuple2>, Flux>>> function = - catalog.lookup("fn", "application/json", "application/json"); + Function>, Flux>>, Flux>> multiInputFunction = catalog + .lookup("fn", "application/json"); - Message msg1 = MessageBuilder.withPayload("\"one\"".getBytes()).build(); - Message msg2 = MessageBuilder.withPayload("\"two\"".getBytes()).build(); - Flux> inputOne = Flux.just(msg1, msg2); + Message carEventMessage = MessageBuilder.withPayload("{\"carEvent\":\"CAR IS BUILT\"}".getBytes()).build(); + Message checkoutEventMessage = MessageBuilder.withPayload("{\"checkoutEvent\":\"CAR IS CHECKED OUT\"}".getBytes()).build(); + Flux> carEventStream = Flux.just(carEventMessage); + Flux> checkoutEventStream = Flux.just(checkoutEventMessage); - Message msgInt1 = MessageBuilder.withPayload("\"1\"".getBytes()).build(); - Message msgInt2 = MessageBuilder.withPayload("\"2\"".getBytes()).build(); - Flux> inputTwo = Flux.just(msgInt1, msgInt2); + Flux> result = multiInputFunction.apply(Tuples.of(carEventStream, checkoutEventStream)); - Tuple2>, Flux>> result = function.apply(Tuples.of(inputOne, inputTwo)); - List result1 = new ArrayList<>(); - List result2 = new ArrayList<>(); - result.getT1().subscribe(message -> { - result1.add(new String(message.getPayload())); - }); - result.getT2().subscribe(message -> { - result2.add(new String(message.getPayload())); - }); - - assertThat(result1.get(0)).isEqualTo("1.5"); - assertThat(result1.get(1)).isEqualTo("2.0"); - - assertThat(result2.get(0)).isEqualTo("\"one\""); - assertThat(result2.get(1)).isEqualTo("\"two\""); + byte[] resutBytes = result.blockFirst().getPayload(); + assertThat(resutBytes).isEqualTo("{\"orderEvent\":\"CartEvent: CAR IS BUILT- CheckoutEvent: CAR IS CHECKED OUT\"}".getBytes()); } /*