diff --git a/spring-graphql/src/main/java/org/springframework/graphql/client/RSocketGraphQlTransport.java b/spring-graphql/src/main/java/org/springframework/graphql/client/RSocketGraphQlTransport.java index 479f1b63..324c1fed 100644 --- a/spring-graphql/src/main/java/org/springframework/graphql/client/RSocketGraphQlTransport.java +++ b/spring-graphql/src/main/java/org/springframework/graphql/client/RSocketGraphQlTransport.java @@ -83,7 +83,6 @@ final class RSocketGraphQlTransport implements GraphQlTransport { public Flux executeSubscription(GraphQlRequest request) { return this.rsocketRequester.route(this.route).data(request.toMap()) .retrieveFlux(MAP_TYPE) - .doOnError(ex -> System.out.println(ex)) .onErrorResume(RejectedException.class, ex -> Flux.error(decodeErrors(request, ex))) .map(ResponseMapGraphQlResponse::new); } diff --git a/spring-graphql/src/main/java/org/springframework/graphql/server/DefaultWebGraphQlHandlerBuilder.java b/spring-graphql/src/main/java/org/springframework/graphql/server/DefaultWebGraphQlHandlerBuilder.java index 9c7b6dd5..e1fa7353 100644 --- a/spring-graphql/src/main/java/org/springframework/graphql/server/DefaultWebGraphQlHandlerBuilder.java +++ b/spring-graphql/src/main/java/org/springframework/graphql/server/DefaultWebGraphQlHandlerBuilder.java @@ -93,7 +93,7 @@ class DefaultWebGraphQlHandlerBuilder implements WebGraphQlHandler.Builder { Chain executionChain = this.interceptors.stream() .reduce(WebGraphQlInterceptor::andThen) - .map(interceptor -> (Chain) (request) -> interceptor.intercept(request, endOfChain)) + .map(interceptor -> interceptor.apply(endOfChain)) .orElse(endOfChain); return new WebGraphQlHandler() { diff --git a/spring-graphql/src/main/java/org/springframework/graphql/server/GraphQlRSocketHandler.java b/spring-graphql/src/main/java/org/springframework/graphql/server/GraphQlRSocketHandler.java index de691b4b..9f0fc9e8 100644 --- a/spring-graphql/src/main/java/org/springframework/graphql/server/GraphQlRSocketHandler.java +++ b/spring-graphql/src/main/java/org/springframework/graphql/server/GraphQlRSocketHandler.java @@ -102,20 +102,16 @@ public class GraphQlRSocketHandler { Assert.notNull(graphQlService, "ExecutionGraphQlService is required"); Assert.notNull(jsonEncoder, "JSON Encoder is required"); - this.executionChain = initExecutionChain(graphQlService, interceptors); + this.executionChain = initChain(graphQlService, interceptors); this.jsonEncoder = jsonEncoder; } - private static Chain initExecutionChain( - ExecutionGraphQlService graphQlService, List interceptors) { - - Chain endOfChain = request -> - graphQlService.execute(request).map(RSocketGraphQlResponse::new); - + private static Chain initChain(ExecutionGraphQlService service, List interceptors) { + Chain endOfChain = request -> service.execute(request).map(RSocketGraphQlResponse::new); return interceptors.isEmpty() ? endOfChain : interceptors.stream() .reduce(RSocketGraphQlInterceptor::andThen) - .map(interceptor -> (Chain) request -> interceptor.intercept(request, endOfChain)) + .map(interceptor -> interceptor.apply(endOfChain)) .orElse(endOfChain); } diff --git a/spring-graphql/src/main/java/org/springframework/graphql/server/RSocketGraphQlInterceptor.java b/spring-graphql/src/main/java/org/springframework/graphql/server/RSocketGraphQlInterceptor.java index 0ef9d321..cdfeb9e0 100644 --- a/spring-graphql/src/main/java/org/springframework/graphql/server/RSocketGraphQlInterceptor.java +++ b/spring-graphql/src/main/java/org/springframework/graphql/server/RSocketGraphQlInterceptor.java @@ -56,6 +56,15 @@ public interface RSocketGraphQlInterceptor { return (request, chain) -> intercept(request, nextRequest -> nextInterceptor.intercept(nextRequest, chain)); } + /** + * Apply this interceptor to the given {@code Chain} resulting in an intercepted chain. + * @param chain the chain to add interception around + * @return a new chain instance + */ + default Chain apply(Chain chain) { + return request -> intercept(request, chain); + } + /** * Contract for delegation to the rest of the chain. diff --git a/spring-graphql/src/main/java/org/springframework/graphql/server/WebGraphQlInterceptor.java b/spring-graphql/src/main/java/org/springframework/graphql/server/WebGraphQlInterceptor.java index 1a24bb92..31333aa8 100644 --- a/spring-graphql/src/main/java/org/springframework/graphql/server/WebGraphQlInterceptor.java +++ b/spring-graphql/src/main/java/org/springframework/graphql/server/WebGraphQlInterceptor.java @@ -60,6 +60,15 @@ public interface WebGraphQlInterceptor { return (request, chain) -> intercept(request, nextRequest -> nextInterceptor.intercept(nextRequest, chain)); } + /** + * Apply this interceptor to the given {@code Chain} resulting in an intercepted chain. + * @param chain the chain to add interception around + * @return a new chain instance + */ + default Chain apply(Chain chain) { + return request -> intercept(request, chain); + } + /** * Contract for delegation to the rest of the chain. diff --git a/spring-graphql/src/test/java/org/springframework/graphql/client/RSocketGraphQlClientTests.java b/spring-graphql/src/test/java/org/springframework/graphql/client/RSocketGraphQlClientBuilderTests.java similarity index 85% rename from spring-graphql/src/test/java/org/springframework/graphql/client/RSocketGraphQlClientTests.java rename to spring-graphql/src/test/java/org/springframework/graphql/client/RSocketGraphQlClientBuilderTests.java index bdb6917c..9bdb48b4 100644 --- a/spring-graphql/src/test/java/org/springframework/graphql/client/RSocketGraphQlClientTests.java +++ b/spring-graphql/src/test/java/org/springframework/graphql/client/RSocketGraphQlClientBuilderTests.java @@ -19,14 +19,11 @@ package org.springframework.graphql.client; import java.time.Duration; import java.util.Collections; import java.util.HashMap; -import java.util.List; import java.util.Map; import graphql.ExecutionInput; import graphql.ExecutionResult; import graphql.ExecutionResultImpl; -import graphql.GraphQLError; -import graphql.GraphqlErrorBuilder; import io.rsocket.Closeable; import io.rsocket.SocketAcceptor; import io.rsocket.core.RSocketServer; @@ -36,14 +33,12 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import reactor.test.StepVerifier; import org.springframework.graphql.ExecutionGraphQlResponse; import org.springframework.graphql.ExecutionGraphQlService; import org.springframework.graphql.GraphQlRequest; -import org.springframework.graphql.ResponseError; -import org.springframework.graphql.support.DefaultExecutionGraphQlResponse; import org.springframework.graphql.server.GraphQlRSocketHandler; +import org.springframework.graphql.support.DefaultExecutionGraphQlResponse; import org.springframework.http.codec.json.Jackson2JsonDecoder; import org.springframework.http.codec.json.Jackson2JsonEncoder; import org.springframework.lang.Nullable; @@ -63,7 +58,7 @@ import static org.assertj.core.api.Assertions.assertThat; * * @author Rossen Stoyanchev */ -public class RSocketGraphQlClientTests { +public class RSocketGraphQlClientBuilderTests { private static final String DOCUMENT = "{ Query }"; @@ -97,27 +92,6 @@ public class RSocketGraphQlClientTests { assertThat(request).isNotNull(); } - @Test - void subscriptionError() { - - String document = "subscription { greetings }"; - GraphQLError error = GraphqlErrorBuilder.newError().message("boo").build(); - ExecutionResult result = ExecutionResultImpl.newExecutionResult().addError(error).build(); - this.builderSetup.setMockResponse(document, result); - - Flux responseFlux = this.builderSetup.initBuilder().build() - .document(document).executeSubscription(); - - StepVerifier.create(responseFlux) - .expectErrorSatisfies(ex -> { - assertThat(ex).isInstanceOf(SubscriptionErrorException.class); - List errors = ((SubscriptionErrorException) ex).getErrors(); - assertThat(errors).hasSize(1); - assertThat(errors.get(0).getMessage()).isEqualTo("boo"); - }) - .verify(TIMEOUT); - } - private static class BuilderSetup { diff --git a/spring-graphql/src/test/java/org/springframework/graphql/client/RSocketGraphQlTransportTests.java b/spring-graphql/src/test/java/org/springframework/graphql/client/RSocketGraphQlTransportTests.java new file mode 100644 index 00000000..466e7b06 --- /dev/null +++ b/spring-graphql/src/test/java/org/springframework/graphql/client/RSocketGraphQlTransportTests.java @@ -0,0 +1,107 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.graphql.client; + +import java.time.Duration; +import java.util.List; + +import io.rsocket.Closeable; +import io.rsocket.SocketAcceptor; +import io.rsocket.core.RSocketServer; +import io.rsocket.exceptions.RejectedException; +import io.rsocket.transport.local.LocalClientTransport; +import io.rsocket.transport.local.LocalServerTransport; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import org.springframework.graphql.GraphQlResponse; +import org.springframework.graphql.ResponseError; +import org.springframework.graphql.support.DefaultGraphQlRequest; +import org.springframework.http.codec.json.Jackson2JsonDecoder; +import org.springframework.http.codec.json.Jackson2JsonEncoder; +import org.springframework.lang.Nullable; +import org.springframework.messaging.rsocket.RSocketRequester; +import org.springframework.messaging.rsocket.RSocketStrategies; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Tests for {@link RSocketGraphQlTransport} connecting to a + * {@link LocalServerTransport} and receiving stubbed responses via {@link SocketAcceptor}. + * + * @author Rossen Stoyanchev + */ +public class RSocketGraphQlTransportTests { + + private static final Jackson2JsonEncoder jsonEncoder = new Jackson2JsonEncoder(); + + private static final Jackson2JsonDecoder jsonDecoder = new Jackson2JsonDecoder(); + + + @Nullable + private Closeable server; + + + @AfterEach + void tearDown() { + if (this.server != null) { + this.server.dispose(); + } + } + + + @Test + void subscriptionError() { + + RSocketGraphQlTransport transport = createTransport(SocketAcceptor.forRequestStream(payload -> + Flux.error(new RejectedException( + "[{\"message\":\"boo\"," + + "\"locations\":[]," + + "\"errorType\":\"DataFetchingException\"," + + "\"path\":null," + + "\"extensions\":null}]")))); + + Flux responseFlux = + transport.executeSubscription(new DefaultGraphQlRequest("subscription { greetings }")); + + StepVerifier.create(responseFlux) + .expectErrorSatisfies(ex -> { + assertThat(ex).isInstanceOf(SubscriptionErrorException.class); + List errors = ((SubscriptionErrorException) ex).getErrors(); + assertThat(errors).hasSize(1); + assertThat(errors.get(0).getMessage()).isEqualTo("boo"); + }) + .verify(Duration.ofSeconds(5)); + } + + private RSocketGraphQlTransport createTransport(SocketAcceptor acceptor) { + + this.server = RSocketServer.create() + .acceptor(acceptor) + .bind(LocalServerTransport.create("local")) + .block(); + + RSocketRequester requester = RSocketRequester.builder() + .rsocketStrategies(RSocketStrategies.builder().encoder(jsonEncoder).decoder(jsonDecoder).build()) + .transport(LocalClientTransport.create("local")); + + return new RSocketGraphQlTransport("route", requester, jsonDecoder); + } + +} diff --git a/spring-graphql/src/test/java/org/springframework/graphql/server/GraphQlRSocketHandlerTests.java b/spring-graphql/src/test/java/org/springframework/graphql/server/GraphQlRSocketHandlerTests.java new file mode 100644 index 00000000..db0d5a21 --- /dev/null +++ b/spring-graphql/src/test/java/org/springframework/graphql/server/GraphQlRSocketHandlerTests.java @@ -0,0 +1,106 @@ +/* + * Copyright 2002-2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.graphql.server; + +import java.time.Duration; +import java.util.Collections; +import java.util.Map; + +import graphql.ExecutionInput; +import graphql.ExecutionResult; +import graphql.ExecutionResultImpl; +import graphql.GraphQLError; +import graphql.GraphqlErrorBuilder; +import io.rsocket.exceptions.InvalidException; +import io.rsocket.exceptions.RejectedException; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import org.springframework.core.codec.Encoder; +import org.springframework.graphql.ExecutionGraphQlService; +import org.springframework.graphql.server.webflux.GraphQlWebSocketHandler; +import org.springframework.graphql.support.DefaultExecutionGraphQlResponse; +import org.springframework.http.codec.json.Jackson2JsonEncoder; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Unit tests for {@link GraphQlWebSocketHandler}. + * @author Rossen Stoyanchev + */ +public class GraphQlRSocketHandlerTests { + + private static final Duration TIMEOUT = Duration.ofSeconds(5); + + private final Encoder encoder = new Jackson2JsonEncoder(); + + + @Test + void subscriptionWithFailedResponse() { + + String document = "subscription { greetings }"; + GraphQLError error = GraphqlErrorBuilder.newError().message("boo").build(); + ExecutionResult result = ExecutionResultImpl.newExecutionResult().addError(error).build(); + + Flux> responseFlux = handleSubscription(document, result); + + StepVerifier.create(responseFlux) + .expectErrorSatisfies(ex -> { + assertThat(ex).isInstanceOf(RejectedException.class); + assertThat(ex.getMessage()).isEqualTo( + "[{\"message\":\"boo\"," + + "\"locations\":[]," + + "\"errorType\":\"DataFetchingException\"," + + "\"path\":null," + + "\"extensions\":null}]"); + }) + .verify(TIMEOUT); + } + + @Test + void subscriptionWithValidResponseButNotPublisher() { + + String document = "subscription { greetings }"; + ExecutionResult result = ExecutionResultImpl.newExecutionResult().data(Collections.emptyMap()).build(); + + Flux> responseFlux = handleSubscription(document, result); + + StepVerifier.create(responseFlux) + .expectErrorSatisfies(ex -> { + assertThat(ex).isInstanceOf(InvalidException.class); + assertThat(ex.getMessage()).startsWith("Expected a Publisher for a subscription operation."); + }) + .verify(TIMEOUT); + } + + private Flux> handleSubscription(String document, ExecutionResult executionResult) { + ExecutionGraphQlService service = stubService(document, executionResult); + GraphQlRSocketHandler handler = new GraphQlRSocketHandler(service, Collections.emptyList(), this.encoder); + return handler.handleSubscription(Collections.singletonMap("query", document)); + } + + private ExecutionGraphQlService stubService(String document, ExecutionResult result) { + return request -> { + assertThat(request.getDocument()).isEqualTo(document); + ExecutionInput executionInput = ExecutionInput.newExecutionInput().query(document).build(); + return Mono.just(new DefaultExecutionGraphQlResponse(executionInput, result)); + }; + } + +}