Handle early closure of WebSocket connection

Ensure the error is routed to the graphQlSessionSink when the WebSocket
connection is closed before the GraphQLSession is initialized.

Closes gh-1098
This commit is contained in:
rstoyanchev
2025-01-29 15:20:29 +00:00
parent 8f303e630f
commit d2002bbe84
2 changed files with 33 additions and 6 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -339,7 +339,7 @@ final class WebSocketGraphQlTransport implements GraphQlTransport {
if (logger.isDebugEnabled()) {
logger.debug(closeStatusMessage);
}
graphQlSession.terminateRequests(closeStatusMessage, closeStatus);
terminateGraphQlSession(graphQlSession, closeStatus, closeStatusMessage, null);
})
.doOnError((cause) -> {
CloseStatus closeStatus = CloseStatus.NO_STATUS_CODE;
@@ -347,11 +347,21 @@ final class WebSocketGraphQlTransport implements GraphQlTransport {
if (logger.isErrorEnabled()) {
logger.error(closeStatusMessage);
}
graphQlSession.terminateRequests(closeStatusMessage, closeStatus);
terminateGraphQlSession(graphQlSession, closeStatus, closeStatusMessage, cause);
})
.subscribe();
}
private void terminateGraphQlSession(
GraphQlSession session, CloseStatus closeStatus, String closeStatusMessage, @Nullable Throwable cause) {
if (sessionNotInitialized()) {
this.graphQlSessionSink.tryEmitError(new IllegalStateException(closeStatusMessage, cause));
this.graphQlSessionSink = Sinks.unsafe().one();
}
session.terminateRequests(closeStatusMessage, closeStatus);
}
private String initCloseStatusMessage(CloseStatus status, @Nullable Throwable ex, GraphQlSession session) {
String reason = session + " disconnected";
if (isStopped()) {

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2002-2024 the original author or authors.
* Copyright 2002-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -317,7 +317,7 @@ public class WebSocketGraphQlTransportTests {
}
@Test
void errorBeforeConnectionAck() {
void errorBeforeConnectionAckWithStart() {
// Errors before GraphQL session initialized should be routed, no hanging on start
@@ -325,9 +325,26 @@ public class WebSocketGraphQlTransportTests {
handler.connectionInitHandler(initPayload -> Mono.error(new IllegalStateException("boo")));
TestWebSocketClient client = new TestWebSocketClient(handler);
String expectedMessage = "disconnected with CloseStatus[code=1002, reason=null]";
StepVerifier.create(createTransport(client).start())
.expectErrorMessage("boo")
.expectErrorSatisfies(ex -> assertThat(ex).hasMessageEndingWith(expectedMessage))
.verify(TIMEOUT);
}
@Test // gh-1098
void errorBeforeConnectionAckWithRequest() {
// Errors before GraphQL session initialized should be routed, no hanging on start
MockGraphQlWebSocketServer handler = new MockGraphQlWebSocketServer();
handler.connectionInitHandler(initPayload -> Mono.error(new IllegalStateException("boo")));
TestWebSocketClient client = new TestWebSocketClient(session -> session.close(CloseStatus.POLICY_VIOLATION));
String expectedMessage = "disconnected with CloseStatus[code=1008, reason=null]";
StepVerifier.create(createTransport(client).execute(new DefaultGraphQlRequest("{Query1}")))
.expectErrorSatisfies(ex -> assertThat(ex).hasMessageEndingWith(expectedMessage))
.verify(TIMEOUT);
}