Refactored ByteStreamOutboundChannelAdapter to ByteStreamWritingMessageConsumer.

This commit is contained in:
Mark Fisher
2008-09-24 00:11:22 +00:00
parent 0ed3ba7657
commit ab7d7fa5f8
2 changed files with 25 additions and 26 deletions

View File

@@ -23,27 +23,27 @@ import java.io.OutputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.integration.endpoint.AbstractMessageConsumingEndpoint;
import org.springframework.integration.message.Message;
import org.springframework.integration.message.MessageConsumer;
import org.springframework.integration.message.MessagingException;
/**
* An outbound Channel Adapter that writes a byte array to an {@link OutputStream}.
* A {@link MessageConsumer} that writes a byte array to an {@link OutputStream}.
*
* @author Mark Fisher
*/
public class ByteStreamOutboundChannelAdapter extends AbstractMessageConsumingEndpoint {
public class ByteStreamWritingMessageConsumer implements MessageConsumer {
private final Log logger = LogFactory.getLog(this.getClass());
private final BufferedOutputStream stream;
public ByteStreamOutboundChannelAdapter(OutputStream stream) {
public ByteStreamWritingMessageConsumer(OutputStream stream) {
this(stream, -1);
}
public ByteStreamOutboundChannelAdapter(OutputStream stream, int bufferSize) {
public ByteStreamWritingMessageConsumer(OutputStream stream, int bufferSize) {
if (bufferSize > 0) {
this.stream = new BufferedOutputStream(stream, bufferSize);
}
@@ -53,8 +53,7 @@ public class ByteStreamOutboundChannelAdapter extends AbstractMessageConsumingEn
}
@Override
public void onMessageInternal(Message<?> message) {
public void onMessage(Message<?> message) {
Object payload = message.getPayload();
if (payload == null) {
if (logger.isWarnEnabled()) {

View File

@@ -33,7 +33,7 @@ import org.springframework.integration.scheduling.PollingSchedule;
/**
* @author Mark Fisher
*/
public class ByteStreamOutboundChannelAdapterTests {
public class ByteStreamWritingMessageConsumerTests {
private QueueChannel channel;
@@ -50,8 +50,8 @@ public class ByteStreamOutboundChannelAdapterTests {
@Test
public void testSingleByteArray() {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ByteStreamOutboundChannelAdapter adapter = new ByteStreamOutboundChannelAdapter(stream);
adapter.onMessage(new GenericMessage<byte[]>(new byte[] {1,2,3}));
ByteStreamWritingMessageConsumer consumer = new ByteStreamWritingMessageConsumer(stream);
consumer.onMessage(new GenericMessage<byte[]>(new byte[] {1,2,3}));
byte[] result = stream.toByteArray();
assertEquals(3, result.length);
assertEquals(1, result[0]);
@@ -62,8 +62,8 @@ public class ByteStreamOutboundChannelAdapterTests {
@Test
public void testSingleString() {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ByteStreamOutboundChannelAdapter target = new ByteStreamOutboundChannelAdapter(stream);
target.onMessage(new StringMessage("foo"));
ByteStreamWritingMessageConsumer consumer = new ByteStreamWritingMessageConsumer(stream);
consumer.onMessage(new StringMessage("foo"));
byte[] result = stream.toByteArray();
assertEquals(3, result.length);
assertEquals("foo", new String(result));
@@ -72,9 +72,9 @@ public class ByteStreamOutboundChannelAdapterTests {
@Test
public void testMaxMessagesPerTaskSameAsMessageCount() {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ByteStreamOutboundChannelAdapter target = new ByteStreamOutboundChannelAdapter(stream);
ByteStreamWritingMessageConsumer consumer = new ByteStreamWritingMessageConsumer(stream);
poller.setMaxMessagesPerPoll(3);
poller.subscribe(target);
poller.subscribe(consumer);
channel.send(new GenericMessage<byte[]>(new byte[] {1,2,3}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {4,5,6}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {7,8,9}), 0);
@@ -88,9 +88,9 @@ public class ByteStreamOutboundChannelAdapterTests {
@Test
public void testMaxMessagesPerTaskLessThanMessageCount() {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ByteStreamOutboundChannelAdapter target = new ByteStreamOutboundChannelAdapter(stream);
ByteStreamWritingMessageConsumer consumer = new ByteStreamWritingMessageConsumer(stream);
poller.setMaxMessagesPerPoll(2);
poller.subscribe(target);
poller.subscribe(consumer);
channel.send(new GenericMessage<byte[]>(new byte[] {1,2,3}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {4,5,6}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {7,8,9}), 0);
@@ -103,10 +103,10 @@ public class ByteStreamOutboundChannelAdapterTests {
@Test
public void testMaxMessagesPerTaskExceedsMessageCount() {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ByteStreamOutboundChannelAdapter target = new ByteStreamOutboundChannelAdapter(stream);
ByteStreamWritingMessageConsumer consumer = new ByteStreamWritingMessageConsumer(stream);
poller.setMaxMessagesPerPoll(5);
poller.setReceiveTimeout(0);
poller.subscribe(target);
poller.subscribe(consumer);
channel.send(new GenericMessage<byte[]>(new byte[] {1,2,3}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {4,5,6}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {7,8,9}), 0);
@@ -119,10 +119,10 @@ public class ByteStreamOutboundChannelAdapterTests {
@Test
public void testMaxMessagesLessThanMessageCountWithMultipleDispatches() {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ByteStreamOutboundChannelAdapter target = new ByteStreamOutboundChannelAdapter(stream);
ByteStreamWritingMessageConsumer consumer = new ByteStreamWritingMessageConsumer(stream);
poller.setMaxMessagesPerPoll(2);
poller.setReceiveTimeout(0);
poller.subscribe(target);
poller.subscribe(consumer);
channel.send(new GenericMessage<byte[]>(new byte[] {1,2,3}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {4,5,6}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {7,8,9}), 0);
@@ -140,10 +140,10 @@ public class ByteStreamOutboundChannelAdapterTests {
@Test
public void testMaxMessagesExceedsMessageCountWithMultipleDispatches() {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ByteStreamOutboundChannelAdapter target = new ByteStreamOutboundChannelAdapter(stream);
ByteStreamWritingMessageConsumer consumer = new ByteStreamWritingMessageConsumer(stream);
poller.setMaxMessagesPerPoll(5);
poller.setReceiveTimeout(0);
poller.subscribe(target);
poller.subscribe(consumer);
channel.send(new GenericMessage<byte[]>(new byte[] {1,2,3}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {4,5,6}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {7,8,9}), 0);
@@ -160,10 +160,10 @@ public class ByteStreamOutboundChannelAdapterTests {
@Test
public void testStreamResetBetweenDispatches() {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ByteStreamOutboundChannelAdapter target = new ByteStreamOutboundChannelAdapter(stream);
ByteStreamWritingMessageConsumer consumer = new ByteStreamWritingMessageConsumer(stream);
poller.setMaxMessagesPerPoll(2);
poller.setReceiveTimeout(0);
poller.subscribe(target);
poller.subscribe(consumer);
channel.send(new GenericMessage<byte[]>(new byte[] {1,2,3}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {4,5,6}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {7,8,9}), 0);
@@ -180,10 +180,10 @@ public class ByteStreamOutboundChannelAdapterTests {
@Test
public void testStreamWriteBetweenDispatches() throws IOException {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
ByteStreamOutboundChannelAdapter target = new ByteStreamOutboundChannelAdapter(stream);
ByteStreamWritingMessageConsumer consumer = new ByteStreamWritingMessageConsumer(stream);
poller.setMaxMessagesPerPoll(2);
poller.setReceiveTimeout(0);
poller.subscribe(target);
poller.subscribe(consumer);
channel.send(new GenericMessage<byte[]>(new byte[] {1,2,3}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {4,5,6}), 0);
channel.send(new GenericMessage<byte[]>(new byte[] {7,8,9}), 0);