Modified multi-in/out deployer test to work with POJOs

This commit is contained in:
Oleg Zhurakousky
2019-12-05 19:43:18 +01:00
parent ef8d03645d
commit fde045a7c7
2 changed files with 71 additions and 29 deletions

View File

@@ -11,14 +11,70 @@ import java.util.function.Function;
@SpringBootApplication
public class RepeaterApplication {
@Bean
public Function<Tuple2<Flux<String>, Flux<Integer>>,
Tuple2<Flux<Double>, Flux<String>>
> fn() {
return new MyFn();
}
@Bean
public Function<Tuple2<Flux<CartEvent>, Flux<CheckoutEvent>>, Flux<OrderEvent>> fn() {
return tuple -> {
Flux<CartEvent> cartEventStream = tuple.getT1();
Flux<CheckoutEvent> checkoutEventStream = tuple.getT2();
return Flux.zip(cartEventStream, checkoutEventStream, (cartEvent, checkoutEvent) -> {
OrderEvent oe = new OrderEvent();
oe.setOrderEvent(cartEvent.toString() + "- " + checkoutEvent.toString());
return oe;
});
};
}
public static void main(String[] args) {
SpringApplication.run(RepeaterApplication.class, args);
}
public static class CartEvent {
private String carEvent;
public String getCarEvent() {
return carEvent;
}
public void setCarEvent(String carEvent) {
this.carEvent = carEvent;
}
public String toString() {
return "CartEvent: " + carEvent;
}
}
public static class CheckoutEvent {
private String checkoutEvent;
public String getCheckoutEvent() {
return checkoutEvent;
}
public void setCheckoutEvent(String checkoutEvent) {
this.checkoutEvent = checkoutEvent;
}
public String toString() {
return "CheckoutEvent: " + checkoutEvent;
}
}
public static class OrderEvent {
private String orderEvent;
public String getOrderEvent() {
return orderEvent;
}
public void setOrderEvent(String orderEvent) {
this.orderEvent = orderEvent;
}
public String toString() {
return "OrderEvent: " + orderEvent;
}
}
}

View File

@@ -262,32 +262,18 @@ public class FunctionDeployerTests {
ApplicationContext context = SpringApplication.run(DeployerApplication.class, args);
FunctionCatalog catalog = context.getBean(FunctionCatalog.class);
Function<Tuple2<Flux<Message<byte[]>>, Flux<Message<byte[]>>>, Tuple2<Flux<Message<byte[]>>, Flux<Message<byte[]>>>> function =
catalog.lookup("fn", "application/json", "application/json");
Function<Tuple2<Flux<Message<byte[]>>, Flux<Message<byte[]>>>, Flux<Message<byte[]>>> multiInputFunction = catalog
.lookup("fn", "application/json");
Message<byte[]> msg1 = MessageBuilder.withPayload("\"one\"".getBytes()).build();
Message<byte[]> msg2 = MessageBuilder.withPayload("\"two\"".getBytes()).build();
Flux<Message<byte[]>> inputOne = Flux.just(msg1, msg2);
Message<byte[]> carEventMessage = MessageBuilder.withPayload("{\"carEvent\":\"CAR IS BUILT\"}".getBytes()).build();
Message<byte[]> checkoutEventMessage = MessageBuilder.withPayload("{\"checkoutEvent\":\"CAR IS CHECKED OUT\"}".getBytes()).build();
Flux<Message<byte[]>> carEventStream = Flux.just(carEventMessage);
Flux<Message<byte[]>> checkoutEventStream = Flux.just(checkoutEventMessage);
Message<byte[]> msgInt1 = MessageBuilder.withPayload("\"1\"".getBytes()).build();
Message<byte[]> msgInt2 = MessageBuilder.withPayload("\"2\"".getBytes()).build();
Flux<Message<byte[]>> inputTwo = Flux.just(msgInt1, msgInt2);
Flux<Message<byte[]>> result = multiInputFunction.apply(Tuples.of(carEventStream, checkoutEventStream));
Tuple2<Flux<Message<byte[]>>, Flux<Message<byte[]>>> result = function.apply(Tuples.of(inputOne, inputTwo));
List<String> result1 = new ArrayList<>();
List<String> result2 = new ArrayList<>();
result.getT1().subscribe(message -> {
result1.add(new String(message.getPayload()));
});
result.getT2().subscribe(message -> {
result2.add(new String(message.getPayload()));
});
assertThat(result1.get(0)).isEqualTo("1.5");
assertThat(result1.get(1)).isEqualTo("2.0");
assertThat(result2.get(0)).isEqualTo("\"one\"");
assertThat(result2.get(1)).isEqualTo("\"two\"");
byte[] resutBytes = result.blockFirst().getPayload();
assertThat(resutBytes).isEqualTo("{\"orderEvent\":\"CartEvent: CAR IS BUILT- CheckoutEvent: CAR IS CHECKED OUT\"}".getBytes());
}
/*