|
|
|
|
@@ -1,5 +1,5 @@
|
|
|
|
|
/*
|
|
|
|
|
* Copyright 2002-2014 the original author or authors.
|
|
|
|
|
* Copyright 2002-2015 the original author or authors.
|
|
|
|
|
*
|
|
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
|
@@ -65,6 +65,7 @@ import org.springframework.web.socket.sockjs.frame.SockJsFrame;
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* An XHR transport based on Undertow's {@link io.undertow.client.UndertowClient}.
|
|
|
|
|
* Compatible with Undertow 1.0, 1.1, 1.2.
|
|
|
|
|
*
|
|
|
|
|
* <p>When used for testing purposes (e.g. load testing) or for specific use cases
|
|
|
|
|
* (like HTTPS configuration), a custom OptionMap should be provided:
|
|
|
|
|
@@ -88,13 +89,15 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
|
|
|
|
|
private static final AttachmentKey<String> RESPONSE_BODY = AttachmentKey.create(String.class);
|
|
|
|
|
|
|
|
|
|
private final Pool<ByteBuffer> bufferPool;
|
|
|
|
|
|
|
|
|
|
private final UndertowClient httpClient;
|
|
|
|
|
|
|
|
|
|
private final OptionMap optionMap;
|
|
|
|
|
|
|
|
|
|
private final XnioWorker worker;
|
|
|
|
|
|
|
|
|
|
private final UndertowClient httpClient;
|
|
|
|
|
private final Pool<ByteBuffer> bufferPool;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public UndertowXhrTransport() throws IOException {
|
|
|
|
|
this(OptionMap.builder().parse(Options.WORKER_NAME, "SockJSClient").getMap());
|
|
|
|
|
@@ -102,19 +105,20 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
|
|
|
|
|
public UndertowXhrTransport(OptionMap optionMap) throws IOException {
|
|
|
|
|
Assert.notNull(optionMap, "'optionMap' is required");
|
|
|
|
|
this.bufferPool = new ByteBufferSlicePool(1048, 1048);
|
|
|
|
|
this.httpClient = UndertowClient.getInstance();
|
|
|
|
|
this.optionMap = optionMap;
|
|
|
|
|
this.worker = Xnio.getInstance().createWorker(optionMap);
|
|
|
|
|
this.httpClient = UndertowClient.getInstance();
|
|
|
|
|
this.bufferPool = new ByteBufferSlicePool(1048, 1048);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
private static HttpHeaders toHttpHeaders(HeaderMap headerMap) {
|
|
|
|
|
HttpHeaders responseHeaders = new HttpHeaders();
|
|
|
|
|
Iterator<HttpString> names = headerMap.getHeaderNames().iterator();
|
|
|
|
|
while(names.hasNext()) {
|
|
|
|
|
while (names.hasNext()) {
|
|
|
|
|
HttpString name = names.next();
|
|
|
|
|
Iterator<String> values = headerMap.get(name).iterator();
|
|
|
|
|
while(values.hasNext()) {
|
|
|
|
|
while (values.hasNext()) {
|
|
|
|
|
responseHeaders.add(name.toString(), values.next());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -130,21 +134,24 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Return Undertow's native HTTP client
|
|
|
|
|
*/
|
|
|
|
|
public UndertowClient getHttpClient() {
|
|
|
|
|
return httpClient;
|
|
|
|
|
return this.httpClient;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Return the {@link org.xnio.XnioWorker} backing the I/O operations for Undertow's HTTP client
|
|
|
|
|
* Return the {@link org.xnio.XnioWorker} backing the I/O operations
|
|
|
|
|
* for Undertow's HTTP client.
|
|
|
|
|
* @see org.xnio.Xnio
|
|
|
|
|
*/
|
|
|
|
|
public XnioWorker getWorker() {
|
|
|
|
|
return this.worker;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected ResponseEntity<String> executeInfoRequestInternal(URI infoUrl) {
|
|
|
|
|
return executeRequest(infoUrl, Methods.GET, getRequestHeaders(), null);
|
|
|
|
|
@@ -156,23 +163,23 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected ResponseEntity<String> executeRequest(URI url, HttpString method, HttpHeaders headers, String body) {
|
|
|
|
|
CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
|
List<ClientResponse> responses = new CopyOnWriteArrayList<ClientResponse>();
|
|
|
|
|
|
|
|
|
|
final CountDownLatch latch = new CountDownLatch(1);
|
|
|
|
|
final List<ClientResponse> responses = new CopyOnWriteArrayList<ClientResponse>();
|
|
|
|
|
try {
|
|
|
|
|
final ClientConnection connection = this.httpClient.connect(url, this.worker,
|
|
|
|
|
this.bufferPool, this.optionMap).get();
|
|
|
|
|
ClientConnection connection = this.httpClient.connect(
|
|
|
|
|
url, this.worker, this.bufferPool, this.optionMap).get();
|
|
|
|
|
try {
|
|
|
|
|
final ClientRequest request = new ClientRequest().setMethod(method).setPath(url.getPath());
|
|
|
|
|
ClientRequest request = new ClientRequest().setMethod(method).setPath(url.getPath());
|
|
|
|
|
request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.HOST), url.getHost());
|
|
|
|
|
if (body !=null && !body.isEmpty()) {
|
|
|
|
|
if (body != null && !body.isEmpty()) {
|
|
|
|
|
request.getRequestHeaders().add(HttpString.tryFromString(HttpHeaders.CONTENT_LENGTH), body.length());
|
|
|
|
|
}
|
|
|
|
|
addHttpHeaders(request, headers);
|
|
|
|
|
connection.sendRequest(request, createRequestCallback(body, responses, latch));
|
|
|
|
|
|
|
|
|
|
latch.await();
|
|
|
|
|
final ClientResponse response = responses.iterator().next();
|
|
|
|
|
ClientResponse response = responses.iterator().next();
|
|
|
|
|
HttpStatus status = HttpStatus.valueOf(response.getResponseCode());
|
|
|
|
|
HttpHeaders responseHeaders = toHttpHeaders(response.getResponseHeaders());
|
|
|
|
|
String responseBody = response.getAttachment(RESPONSE_BODY);
|
|
|
|
|
@@ -185,10 +192,10 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
catch (IOException ex) {
|
|
|
|
|
throw new SockJsTransportFailureException("Failed to execute request to " + url, null, ex);
|
|
|
|
|
throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
|
|
|
|
|
}
|
|
|
|
|
catch(InterruptedException ex) {
|
|
|
|
|
throw new SockJsTransportFailureException("Failed to execute request to " + url, null, ex);
|
|
|
|
|
catch (InterruptedException ex) {
|
|
|
|
|
throw new SockJsTransportFailureException("Interrupted while processing request to " + url, ex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
@@ -203,21 +210,18 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
@Override
|
|
|
|
|
public void completed(final ClientExchange result) {
|
|
|
|
|
responses.add(result.getResponse());
|
|
|
|
|
|
|
|
|
|
new StringReadChannelListener(result.getConnection().getBufferPool()) {
|
|
|
|
|
@Override
|
|
|
|
|
protected void stringDone(String string) {
|
|
|
|
|
result.getResponse().putAttachment(RESPONSE_BODY, string);
|
|
|
|
|
latch.countDown();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void error(IOException ex) {
|
|
|
|
|
onFailure(latch, ex);
|
|
|
|
|
}
|
|
|
|
|
}.setup(result.getResponseChannel());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void failed(IOException ex) {
|
|
|
|
|
onFailure(latch, ex);
|
|
|
|
|
@@ -238,28 +242,28 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
onFailure(latch, ex);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void failed(IOException ex) {
|
|
|
|
|
onFailure(latch, ex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void onFailure(final CountDownLatch latch, IOException ex) {
|
|
|
|
|
private void onFailure(CountDownLatch latch, IOException ex) {
|
|
|
|
|
latch.countDown();
|
|
|
|
|
throw new SockJsTransportFailureException("Failed to execute request", null, ex);
|
|
|
|
|
throw new SockJsTransportFailureException("Failed to execute request", ex);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void connectInternal(TransportRequest request, WebSocketHandler handler, URI receiveUrl,
|
|
|
|
|
HttpHeaders handshakeHeaders, XhrClientSockJsSession session, SettableListenableFuture<WebSocketSession> connectFuture) {
|
|
|
|
|
HttpHeaders handshakeHeaders, XhrClientSockJsSession session,
|
|
|
|
|
SettableListenableFuture<WebSocketSession> connectFuture) {
|
|
|
|
|
|
|
|
|
|
executeReceiveRequest(receiveUrl, handshakeHeaders, session, connectFuture);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void executeReceiveRequest(final URI url, final HttpHeaders headers, final XhrClientSockJsSession session,
|
|
|
|
|
final SettableListenableFuture<WebSocketSession> connectFuture) {
|
|
|
|
|
|
|
|
|
|
if (logger.isTraceEnabled()) {
|
|
|
|
|
logger.trace("Starting XHR receive request, url=" + url);
|
|
|
|
|
}
|
|
|
|
|
@@ -273,10 +277,9 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
addHttpHeaders(httpRequest, headers);
|
|
|
|
|
result.sendRequest(httpRequest, createConnectCallback(url, getRequestHeaders(), session, connectFuture));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void failed(IOException ex) {
|
|
|
|
|
throw new SockJsTransportFailureException("Failed to execute request to " + url, null, ex);
|
|
|
|
|
throw new SockJsTransportFailureException("Failed to execute request to " + url, ex);
|
|
|
|
|
}
|
|
|
|
|
},
|
|
|
|
|
url, this.worker, this.bufferPool, this.optionMap);
|
|
|
|
|
@@ -289,11 +292,9 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
return new ClientCallback<ClientExchange>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void completed(final ClientExchange result) {
|
|
|
|
|
|
|
|
|
|
result.setResponseListener(new ClientCallback<ClientExchange>() {
|
|
|
|
|
@Override
|
|
|
|
|
public void completed(final ClientExchange result) {
|
|
|
|
|
|
|
|
|
|
public void completed(ClientExchange result) {
|
|
|
|
|
ClientResponse response = result.getResponse();
|
|
|
|
|
if (response.getResponseCode() != 200) {
|
|
|
|
|
HttpStatus status = HttpStatus.valueOf(response.getResponseCode());
|
|
|
|
|
@@ -320,9 +321,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
IoUtils.safeClose(result.getConnection());
|
|
|
|
|
onFailure(exc);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void failed(IOException exc) {
|
|
|
|
|
IoUtils.safeClose(result.getConnection());
|
|
|
|
|
@@ -330,12 +329,10 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
}
|
|
|
|
|
});
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void failed(IOException exc) {
|
|
|
|
|
onFailure(exc);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void onFailure(Throwable failure) {
|
|
|
|
|
if (connectFuture.setException(failure)) {
|
|
|
|
|
return;
|
|
|
|
|
@@ -349,21 +346,26 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
public class SockJsResponseListener implements ChannelListener<StreamSourceChannel> {
|
|
|
|
|
|
|
|
|
|
private final ClientConnection connection;
|
|
|
|
|
|
|
|
|
|
private final URI url;
|
|
|
|
|
|
|
|
|
|
private final HttpHeaders headers;
|
|
|
|
|
|
|
|
|
|
private final XhrClientSockJsSession session;
|
|
|
|
|
|
|
|
|
|
private final SettableListenableFuture<WebSocketSession> connectFuture;
|
|
|
|
|
|
|
|
|
|
private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
|
|
|
|
|
|
|
|
|
|
public SockJsResponseListener(ClientConnection connection, URI url, HttpHeaders headers,
|
|
|
|
|
XhrClientSockJsSession sockJsSession, SettableListenableFuture<WebSocketSession> connectFuture) {
|
|
|
|
|
|
|
|
|
|
this.connection = connection;
|
|
|
|
|
this.url = url;
|
|
|
|
|
this.headers = headers;
|
|
|
|
|
@@ -371,7 +373,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
this.connectFuture = connectFuture;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
public void setup(final StreamSourceChannel channel) {
|
|
|
|
|
public void setup(StreamSourceChannel channel) {
|
|
|
|
|
channel.suspendReads();
|
|
|
|
|
channel.getReadSetter().set(this);
|
|
|
|
|
channel.resumeReads();
|
|
|
|
|
@@ -388,7 +390,6 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
Pooled<ByteBuffer> pooled = this.connection.getBufferPool().allocate();
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
int r;
|
|
|
|
|
do {
|
|
|
|
|
@@ -403,7 +404,7 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
onSuccess();
|
|
|
|
|
}
|
|
|
|
|
else {
|
|
|
|
|
while(buffer.hasRemaining()) {
|
|
|
|
|
while (buffer.hasRemaining()) {
|
|
|
|
|
int b = buffer.get();
|
|
|
|
|
if (b == '\n') {
|
|
|
|
|
handleFrame();
|
|
|
|
|
@@ -413,8 +414,8 @@ public class UndertowXhrTransport extends AbstractXhrTransport implements XhrTra
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} while (r > 0);
|
|
|
|
|
}
|
|
|
|
|
while (r > 0);
|
|
|
|
|
}
|
|
|
|
|
catch (IOException exc) {
|
|
|
|
|
onFailure(exc);
|
|
|
|
|
|