diff --git a/spring-graphql/src/main/java/org/springframework/graphql/client/DefaultRSocketGraphQlClientBuilder.java b/spring-graphql/src/main/java/org/springframework/graphql/client/DefaultRSocketGraphQlClientBuilder.java index 1d4933e4..0dc645d4 100644 --- a/spring-graphql/src/main/java/org/springframework/graphql/client/DefaultRSocketGraphQlClientBuilder.java +++ b/spring-graphql/src/main/java/org/springframework/graphql/client/DefaultRSocketGraphQlClientBuilder.java @@ -49,8 +49,10 @@ final class DefaultRSocketGraphQlClientBuilder private final RSocketRequester.Builder requesterBuilder; + @Nullable private Publisher> targetPublisher; + @Nullable private LoadbalanceStrategy loadbalanceStrategy; @Nullable @@ -95,8 +97,17 @@ final class DefaultRSocketGraphQlClientBuilder } @Override - public DefaultRSocketGraphQlClientBuilder clientTransport(ClientTransport clientTransport) { - this.clientTransport = clientTransport; + public DefaultRSocketGraphQlClientBuilder clientTransport(ClientTransport transport) { + this.clientTransport = transport; + return this; + } + + @Override + public DefaultRSocketGraphQlClientBuilder clientTransports( + Publisher> publisher, LoadbalanceStrategy strategy) { + + this.targetPublisher = publisher; + this.loadbalanceStrategy = strategy; return this; } @@ -114,15 +125,8 @@ final class DefaultRSocketGraphQlClientBuilder } @Override - public DefaultRSocketGraphQlClientBuilder rsocketRequester(Consumer requesterConsumer) { - requesterConsumer.accept(this.requesterBuilder); - return this; - } - - @Override - public DefaultRSocketGraphQlClientBuilder transports(Publisher> targetPublisher, LoadbalanceStrategy loadbalanceStrategy) { - this.targetPublisher = targetPublisher; - this.loadbalanceStrategy = loadbalanceStrategy; + public DefaultRSocketGraphQlClientBuilder rsocketRequester(Consumer consumer) { + consumer.accept(this.requesterBuilder); return this; } @@ -137,13 +141,18 @@ final class DefaultRSocketGraphQlClientBuilder RSocketRequester requester; - if (this.targetPublisher != null && this.loadbalanceStrategy != null) { - requester = this.requesterBuilder.transports(this.targetPublisher, this.loadbalanceStrategy); - } else { - Assert.state(this.clientTransport != null, "Neither WebSocket nor TCP networking configured"); + if (this.clientTransport != null) { requester = this.requesterBuilder.transport(this.clientTransport); } - RSocketGraphQlTransport graphQlTransport = new RSocketGraphQlTransport(this.route, requester, getJsonDecoder()); + else if (this.targetPublisher != null && this.loadbalanceStrategy != null) { + requester = this.requesterBuilder.transports(this.targetPublisher, this.loadbalanceStrategy); + } + else { + throw new IllegalStateException("Neither ClientTransport, nor Loadbalance targets and strategy"); + } + + RSocketGraphQlTransport graphQlTransport = + new RSocketGraphQlTransport(this.route, requester, getJsonDecoder()); return new DefaultRSocketGraphQlClient( super.buildGraphQlClient(graphQlTransport), requester, @@ -161,10 +170,13 @@ final class DefaultRSocketGraphQlClientBuilder private final RSocketRequester.Builder requesterBuilder; + @Nullable private final ClientTransport clientTransport; + @Nullable private final Publisher> targetPublisher; + @Nullable private final LoadbalanceStrategy loadbalanceStrategy; private final String route; @@ -172,8 +184,10 @@ final class DefaultRSocketGraphQlClientBuilder private final Consumer> builderInitializer; DefaultRSocketGraphQlClient( - GraphQlClient graphQlClient, RSocketRequester requester, RSocketRequester.Builder requesterBuilder, - ClientTransport clientTransport, Publisher> targetPublisher, LoadbalanceStrategy loadbalanceStrategy, + GraphQlClient graphQlClient, + RSocketRequester requester, RSocketRequester.Builder requesterBuilder, + @Nullable ClientTransport clientTransport, + @Nullable Publisher> targetPublisher, @Nullable LoadbalanceStrategy strategy, String route, Consumer> builderInitializer) { super(graphQlClient); @@ -182,7 +196,7 @@ final class DefaultRSocketGraphQlClientBuilder this.requesterBuilder = requesterBuilder; this.clientTransport = clientTransport; this.targetPublisher = targetPublisher; - this.loadbalanceStrategy = loadbalanceStrategy; + this.loadbalanceStrategy = strategy; this.route = route; this.builderInitializer = builderInitializer; } @@ -202,8 +216,12 @@ final class DefaultRSocketGraphQlClientBuilder @Override public RSocketGraphQlClient.Builder mutate() { DefaultRSocketGraphQlClientBuilder builder = new DefaultRSocketGraphQlClientBuilder(this.requesterBuilder); - builder.clientTransport(this.clientTransport); - builder.transports(this.targetPublisher, this.loadbalanceStrategy); + if (this.clientTransport != null) { + builder.clientTransport(this.clientTransport); + } + if (this.targetPublisher != null && this.loadbalanceStrategy != null) { + builder.clientTransports(this.targetPublisher, this.loadbalanceStrategy); + } builder.route(this.route); this.builderInitializer.accept(builder); return builder; diff --git a/spring-graphql/src/main/java/org/springframework/graphql/client/RSocketGraphQlClient.java b/spring-graphql/src/main/java/org/springframework/graphql/client/RSocketGraphQlClient.java index 6c15c0b7..133975d8 100644 --- a/spring-graphql/src/main/java/org/springframework/graphql/client/RSocketGraphQlClient.java +++ b/spring-graphql/src/main/java/org/springframework/graphql/client/RSocketGraphQlClient.java @@ -82,7 +82,9 @@ public interface RSocketGraphQlClient extends GraphQlClient { interface Builder> extends GraphQlClient.Builder { /** - * Select TCP as the underlying network protocol. + * Select TCP as the underlying network protocol. This delegates to + * {@link RSocketRequester.Builder#tcp(String, int)} to create the + * {@code RSocketRequester} instance. * @param host the remote host to connect to * @param port the remote port to connect to * @return the same builder instance @@ -90,19 +92,38 @@ public interface RSocketGraphQlClient extends GraphQlClient { B tcp(String host, int port); /** - * Select WebSocket as the underlying network protocol. + * Select WebSocket as the underlying network protocol. This delegates to + * {@link RSocketRequester.Builder#websocket(URI)} to create the + * {@code RSocketRequester} instance. * @param uri the URL for the WebSocket handshake * @return the same builder instance */ B webSocket(URI uri); /** - * Use a given {@link ClientTransport} to communicate with the remote server. + * Use a given {@link ClientTransport} to communicate with the remote + * server. This delegates to + * {@link RSocketRequester.Builder#transport(ClientTransport)} to create + * the {@code RSocketRequester} instance. * @param clientTransport the transport to use * @return the same builder instance */ B clientTransport(ClientTransport clientTransport); + /** + * Use a {@link Publisher} of {@link LoadbalanceTarget}s, each of which + * contains a {@link ClientTransport}. This delegates to + * {@link RSocketRequester.Builder#transports(Publisher, LoadbalanceStrategy)} + * to create the {@code RSocketRequester} instance. + * @param targetPublisher supplies list of targets to loadbalance against; + * the targets are replaced when the given {@code Publisher} emits again. + * @param loadbalanceStrategy the strategy to use for selecting from + * the list of targets. + * @return the same builder instance + * @since 1.0.3 + */ + B clientTransports(Publisher> targetPublisher, LoadbalanceStrategy loadbalanceStrategy); + /** * Customize the format of data payloads for the connection. *

By default, this is set to {@code "application/graphql+json"} but @@ -133,20 +154,6 @@ public interface RSocketGraphQlClient extends GraphQlClient { */ B rsocketRequester(Consumer requester); - /** - * Build an {@link RSocketRequester} with an - * {@link io.rsocket.loadbalance.LoadbalanceRSocketClient} that will - * connect to one of the given targets selected through the given - * {@link io.rsocket.loadbalance.LoadbalanceRSocketClient}. - * @param targetPublisher a {@code Publisher} that supplies a list of - * target transports to loadbalance against; the given list may be - * periodically updated by the {@code Publisher}. - * @param loadbalanceStrategy the strategy to use for selecting from - * the list of loadbalance targets. - * @return the same builder instance - */ - B transports(Publisher> targetPublisher, LoadbalanceStrategy loadbalanceStrategy); - /** * Build the {@code RSocketGraphQlClient} instance. */