Moved BodyExtractor and BodyInserter to http.codec
This commit moves the web.reactive.function.[BodyInserter|BodyExtractor] to http.codec, so that they can be used from the client as well. Furthermore, it parameterized both inserter and extractor over ReactiveHttpOutputMessage and ReactiveHttpInputMessage respectively, so that they can be limited to only be used on the client or server.
This commit is contained in:
@@ -0,0 +1,130 @@
|
||||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.codec;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.codec.ByteBufferDecoder;
|
||||
import org.springframework.core.codec.StringDecoder;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DefaultDataBuffer;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.http.MediaType;
|
||||
import org.springframework.http.ReactiveHttpInputMessage;
|
||||
import org.springframework.http.codec.json.Jackson2JsonDecoder;
|
||||
import org.springframework.http.codec.xml.Jaxb2XmlDecoder;
|
||||
import org.springframework.mock.http.server.reactive.test.MockServerHttpRequest;
|
||||
import org.springframework.tests.TestSubscriber;
|
||||
|
||||
/**
|
||||
* @author Arjen Poutsma
|
||||
*/
|
||||
public class BodyExtractorsTests {
|
||||
|
||||
private BodyExtractor.Context context;
|
||||
|
||||
@Before
|
||||
public void createContext() {
|
||||
final List<HttpMessageReader<?>> messageReaders = new ArrayList<>();
|
||||
messageReaders.add(new DecoderHttpMessageReader<>(new ByteBufferDecoder()));
|
||||
messageReaders.add(new DecoderHttpMessageReader<>(new StringDecoder()));
|
||||
messageReaders.add(new DecoderHttpMessageReader<>(new Jaxb2XmlDecoder()));
|
||||
messageReaders.add(new DecoderHttpMessageReader<>(new Jackson2JsonDecoder()));
|
||||
|
||||
this.context = new BodyExtractor.Context() {
|
||||
@Override
|
||||
public Supplier<Stream<HttpMessageReader<?>>> messageReaders() {
|
||||
return messageReaders::stream;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toMono() throws Exception {
|
||||
BodyExtractor<Mono<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toMono(String.class);
|
||||
|
||||
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
|
||||
DefaultDataBuffer dataBuffer =
|
||||
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
|
||||
Flux<DataBuffer> body = Flux.just(dataBuffer);
|
||||
|
||||
MockServerHttpRequest request = new MockServerHttpRequest();
|
||||
request.setBody(body);
|
||||
|
||||
Mono<String> result = extractor.extract(request, this.context);
|
||||
|
||||
TestSubscriber.subscribe(result)
|
||||
.assertComplete()
|
||||
.assertValues("foo");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toFlux() throws Exception {
|
||||
BodyExtractor<Flux<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toFlux(String.class);
|
||||
|
||||
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
|
||||
DefaultDataBuffer dataBuffer =
|
||||
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
|
||||
Flux<DataBuffer> body = Flux.just(dataBuffer);
|
||||
|
||||
MockServerHttpRequest request = new MockServerHttpRequest();
|
||||
request.setBody(body);
|
||||
|
||||
Flux<String> result = extractor.extract(request, this.context);
|
||||
TestSubscriber.subscribe(result)
|
||||
.assertComplete()
|
||||
.assertValues("foo");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void toFluxUnacceptable() throws Exception {
|
||||
BodyExtractor<Flux<String>, ReactiveHttpInputMessage> extractor = BodyExtractors.toFlux(String.class);
|
||||
|
||||
DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
|
||||
DefaultDataBuffer dataBuffer =
|
||||
factory.wrap(ByteBuffer.wrap("foo".getBytes(StandardCharsets.UTF_8)));
|
||||
Flux<DataBuffer> body = Flux.just(dataBuffer);
|
||||
|
||||
MockServerHttpRequest request = new MockServerHttpRequest();
|
||||
request.getHeaders().setContentType(MediaType.APPLICATION_JSON);
|
||||
request.setBody(body);
|
||||
|
||||
BodyExtractor.Context emptyContext = new BodyExtractor.Context() {
|
||||
@Override
|
||||
public Supplier<Stream<HttpMessageReader<?>>> messageReaders() {
|
||||
return () -> Collections.<HttpMessageReader<?>>emptySet().stream();
|
||||
}
|
||||
};
|
||||
|
||||
Flux<String> result = extractor.extract(request, emptyContext);
|
||||
TestSubscriber.subscribe(result)
|
||||
.assertError(UnsupportedMediaTypeException.class);
|
||||
}
|
||||
|
||||
}
|
||||
@@ -0,0 +1,165 @@
|
||||
/*
|
||||
* Copyright 2002-2016 the original author or authors.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.springframework.http.codec;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.file.Files;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.function.Supplier;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
import org.springframework.core.codec.ByteBufferEncoder;
|
||||
import org.springframework.core.codec.CharSequenceEncoder;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.core.io.Resource;
|
||||
import org.springframework.core.io.buffer.DataBuffer;
|
||||
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
|
||||
import org.springframework.http.ReactiveHttpOutputMessage;
|
||||
import org.springframework.http.codec.json.Jackson2JsonEncoder;
|
||||
import org.springframework.http.codec.xml.Jaxb2XmlEncoder;
|
||||
import org.springframework.http.server.reactive.ServerHttpResponse;
|
||||
import org.springframework.mock.http.server.reactive.test.MockServerHttpResponse;
|
||||
import org.springframework.tests.TestSubscriber;
|
||||
|
||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||
import static org.junit.Assert.assertArrayEquals;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
/**
|
||||
* @author Arjen Poutsma
|
||||
*/
|
||||
public class BodyInsertersTests {
|
||||
|
||||
private BodyInserter.Context context;
|
||||
|
||||
@Before
|
||||
public void createContext() {
|
||||
final List<HttpMessageWriter<?>> messageWriters = new ArrayList<>();
|
||||
messageWriters.add(new EncoderHttpMessageWriter<>(new ByteBufferEncoder()));
|
||||
messageWriters.add(new EncoderHttpMessageWriter<>(new CharSequenceEncoder()));
|
||||
messageWriters.add(new EncoderHttpMessageWriter<>(new Jaxb2XmlEncoder()));
|
||||
messageWriters.add(new EncoderHttpMessageWriter<>(new Jackson2JsonEncoder()));
|
||||
|
||||
this.context = new BodyInserter.Context() {
|
||||
@Override
|
||||
public Supplier<Stream<HttpMessageWriter<?>>> messageWriters() {
|
||||
return messageWriters::stream;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void ofObject() throws Exception {
|
||||
String body = "foo";
|
||||
BodyInserter<String, ReactiveHttpOutputMessage> inserter = BodyInserters.fromObject(body);
|
||||
|
||||
assertEquals(body, inserter.t());
|
||||
|
||||
MockServerHttpResponse response = new MockServerHttpResponse();
|
||||
Mono<Void> result = inserter.insert(response, this.context);
|
||||
TestSubscriber.subscribe(result)
|
||||
.assertComplete();
|
||||
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap(body.getBytes(UTF_8));
|
||||
DataBuffer buffer = new DefaultDataBufferFactory().wrap(byteBuffer);
|
||||
TestSubscriber.subscribe(response.getBody())
|
||||
.assertComplete()
|
||||
.assertValues(buffer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ofPublisher() throws Exception {
|
||||
Flux<String> body = Flux.just("foo");
|
||||
BodyInserter<Flux<String>, ReactiveHttpOutputMessage> inserter = BodyInserters.fromPublisher(body, String.class);
|
||||
|
||||
assertEquals(body, inserter.t());
|
||||
|
||||
MockServerHttpResponse response = new MockServerHttpResponse();
|
||||
Mono<Void> result = inserter.insert(response, this.context);
|
||||
TestSubscriber.subscribe(result)
|
||||
.assertComplete();
|
||||
|
||||
ByteBuffer byteBuffer = ByteBuffer.wrap("foo".getBytes(UTF_8));
|
||||
DataBuffer buffer = new DefaultDataBufferFactory().wrap(byteBuffer);
|
||||
TestSubscriber.subscribe(response.getBody())
|
||||
.assertComplete()
|
||||
.assertValues(buffer);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ofResource() throws Exception {
|
||||
Resource body = new ClassPathResource("response.txt", getClass());
|
||||
BodyInserter<Resource, ReactiveHttpOutputMessage> inserter = BodyInserters.fromResource(body);
|
||||
|
||||
assertEquals(body, inserter.t());
|
||||
|
||||
MockServerHttpResponse response = new MockServerHttpResponse();
|
||||
Mono<Void> result = inserter.insert(response, this.context);
|
||||
TestSubscriber.subscribe(result)
|
||||
.assertComplete();
|
||||
|
||||
byte[] expectedBytes = Files.readAllBytes(body.getFile().toPath());
|
||||
|
||||
TestSubscriber.subscribe(response.getBody())
|
||||
.assertComplete()
|
||||
.assertValuesWith(dataBuffer -> {
|
||||
byte[] resultBytes = new byte[dataBuffer.readableByteCount()];
|
||||
dataBuffer.read(resultBytes);
|
||||
assertArrayEquals(expectedBytes, resultBytes);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ofServerSentEventFlux() throws Exception {
|
||||
ServerSentEvent<String> event = ServerSentEvent.builder("foo").build();
|
||||
Flux<ServerSentEvent<String>> body = Flux.just(event);
|
||||
BodyInserter<Flux<ServerSentEvent<String>>, ServerHttpResponse> inserter =
|
||||
BodyInserters.fromServerSentEvents(body);
|
||||
|
||||
assertEquals(body, inserter.t());
|
||||
|
||||
MockServerHttpResponse response = new MockServerHttpResponse();
|
||||
Mono<Void> result = inserter.insert(response, this.context);
|
||||
TestSubscriber.subscribe(result)
|
||||
.assertComplete();
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void ofServerSentEventClass() throws Exception {
|
||||
Flux<String> body = Flux.just("foo");
|
||||
BodyInserter<Flux<String>, ServerHttpResponse> inserter =
|
||||
BodyInserters.fromServerSentEvents(body, String.class);
|
||||
|
||||
assertEquals(body, inserter.t());
|
||||
|
||||
MockServerHttpResponse response = new MockServerHttpResponse();
|
||||
Mono<Void> result = inserter.insert(response, this.context);
|
||||
TestSubscriber.subscribe(result)
|
||||
.assertComplete();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user