DSL: Mail Inbound Adapters

Add POP3 Server for Tests

Polishing; PR Comments; IMAP4 Server
This commit is contained in:
Gary Russell
2014-08-23 10:42:04 +03:00
committed by Artem Bilan
parent ba4f52d59c
commit 89cb4e137d
8 changed files with 537 additions and 60 deletions

View File

@@ -94,6 +94,8 @@ dependencies {
}
testRuntime "com.sun.mail:mailapi:$mailVersion"
testRuntime "com.sun.mail:smtp:$mailVersion"
testRuntime "com.sun.mail:pop3:$mailVersion"
testRuntime "com.sun.mail:imap:$mailVersion"
jacoco "org.jacoco:org.jacoco.agent:$jacocoVersion:runtime"
}

View File

@@ -0,0 +1,46 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl.mail;
import org.springframework.integration.mail.ImapMailReceiver;
import org.springframework.integration.mail.SearchTermStrategy;
/**
* @author Gary Russell
*
*/
public class ImapMailInboundChannelAdapterSpec
extends MailInboundChannelAdapterSpec<ImapMailInboundChannelAdapterSpec, ImapMailReceiver> {
ImapMailInboundChannelAdapterSpec() {
this.receiver = new ImapMailReceiver();
}
ImapMailInboundChannelAdapterSpec(String url) {
this.receiver = new ImapMailReceiver(url);
}
public ImapMailInboundChannelAdapterSpec searchTermStrategy(SearchTermStrategy searchTermStrategy) {
this.receiver.setSearchTermStrategy(searchTermStrategy);
return this;
}
public ImapMailInboundChannelAdapterSpec shouldMarkMessagesAsRead(boolean shouldMarkMessagesAsRead) {
this.receiver.setShouldMarkMessagesAsRead(shouldMarkMessagesAsRead);
return this;
}
}

View File

@@ -25,4 +25,25 @@ public class Mail {
return new MailSendingMessageHandlerSpec(host);
}
public static Pop3MailInboundChannelAdapterSpec pop3InboundAdapter() {
return new Pop3MailInboundChannelAdapterSpec();
}
public static Pop3MailInboundChannelAdapterSpec pop3InboundAdapter(String url) {
return new Pop3MailInboundChannelAdapterSpec(url);
}
public static Pop3MailInboundChannelAdapterSpec pop3InboundAdapter(String host, int port, String username,
String password) {
return new Pop3MailInboundChannelAdapterSpec(host, port, username, password);
}
public static ImapMailInboundChannelAdapterSpec imapInboundAdapter() {
return new ImapMailInboundChannelAdapterSpec();
}
public static ImapMailInboundChannelAdapterSpec imapInboundAdapter(String url) {
return new ImapMailInboundChannelAdapterSpec(url);
}
}

View File

@@ -0,0 +1,80 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl.mail;
import java.util.Properties;
import javax.mail.Authenticator;
import javax.mail.Session;
import org.springframework.integration.dsl.core.MessageSourceSpec;
import org.springframework.integration.dsl.support.PropertiesBuilder;
import org.springframework.integration.dsl.support.PropertiesConfigurer;
import org.springframework.integration.mail.AbstractMailReceiver;
import org.springframework.integration.mail.MailReceivingMessageSource;
/**
* @author Gary Russell
*
*/
public abstract class MailInboundChannelAdapterSpec<S extends MailInboundChannelAdapterSpec<S, R>,
R extends AbstractMailReceiver>
extends MessageSourceSpec<S, MailReceivingMessageSource> {
protected volatile R receiver;
public S selectorExpression(String selectorExpression) {
this.receiver.setSelectorExpression(PARSER.parseExpression(selectorExpression));
return _this();
}
public S session(Session session) {
this.receiver.setSession(session);
return _this();
}
public S javaMailProperties(Properties javaMailProperties) {
this.receiver.setJavaMailProperties(javaMailProperties);
return _this();
}
public S javaMailProperties(PropertiesConfigurer configurer) {
PropertiesBuilder properties = new PropertiesBuilder();
configurer.configure(properties);
return javaMailProperties(properties.get());
}
public S javaMailAuthenticator(Authenticator javaMailAuthenticator) {
this.receiver.setJavaMailAuthenticator(javaMailAuthenticator);
return _this();
}
public S maxFetchSize(int maxFetchSize) {
this.receiver.setMaxFetchSize(maxFetchSize);
return _this();
}
public S shouldDeleteMessages(boolean shouldDeleteMessages) {
this.receiver.setShouldDeleteMessages(shouldDeleteMessages);
return _this();
}
@Override
public MailReceivingMessageSource doGet() {
return new MailReceivingMessageSource(this.receiver);
}
}

View File

@@ -29,8 +29,8 @@ import org.springframework.mail.javamail.JavaMailSenderImpl;
* @author Gary Russell
* @author Artem Bilan
*/
public class MailSendingMessageHandlerSpec extends
MessageHandlerSpec<MailSendingMessageHandlerSpec, MailSendingMessageHandler> {
public class MailSendingMessageHandlerSpec
extends MessageHandlerSpec<MailSendingMessageHandlerSpec, MailSendingMessageHandler> {
private final JavaMailSenderImpl sender = new JavaMailSenderImpl();

View File

@@ -0,0 +1,43 @@
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.dsl.mail;
import org.springframework.integration.mail.Pop3MailReceiver;
/**
* @author Gary Russell
*
*/
public class Pop3MailInboundChannelAdapterSpec
extends MailInboundChannelAdapterSpec<Pop3MailInboundChannelAdapterSpec, Pop3MailReceiver> {
Pop3MailInboundChannelAdapterSpec() {
this.receiver = new Pop3MailReceiver();
}
Pop3MailInboundChannelAdapterSpec(String url) {
this.receiver = new Pop3MailReceiver(url);
}
Pop3MailInboundChannelAdapterSpec(String host, String username, String password) {
this.receiver = new Pop3MailReceiver(host, username, password);
}
Pop3MailInboundChannelAdapterSpec(String host, int port, String username, String password) {
this.receiver = new Pop3MailReceiver(host, port, username, password);
}
}

View File

@@ -18,11 +18,15 @@ package org.springframework.integration.dsl.test.mail;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.endsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.util.Properties;
import javax.mail.internet.MimeMessage;
import javax.mail.internet.MimeMessage.RecipientType;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -35,13 +39,19 @@ import org.springframework.context.annotation.Configuration;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.dsl.mail.Mail;
import org.springframework.integration.dsl.support.Pollers;
import org.springframework.integration.dsl.test.mail.PoorMansMailServer.ImapServer;
import org.springframework.integration.dsl.test.mail.PoorMansMailServer.Pop3Server;
import org.springframework.integration.dsl.test.mail.PoorMansMailServer.SmtpServer;
import org.springframework.integration.mail.MailHeaders;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
@@ -56,15 +66,23 @@ import org.springframework.util.SocketUtils;
@DirtiesContext
public class MailTests {
private final static int port = SocketUtils.findAvailableTcpPort();
private final static int smtpPort = SocketUtils.findAvailableTcpPort();
private static SmtpServer server = PoorMansMailServer.smtp(port);
private static SmtpServer smtpServer = PoorMansMailServer.smtp(smtpPort);
private final static int pop3Port = SocketUtils.findAvailableTcpPort(smtpPort + 1);
private static Pop3Server pop3Server = PoorMansMailServer.pop3(pop3Port);
private final static int imapPort = SocketUtils.findAvailableTcpPort(pop3Port + 1);
private static ImapServer imapServer = PoorMansMailServer.imap(imapPort);
@BeforeClass
public static void setup() throws InterruptedException {
int n = 0;
while (n++ < 100 && !server.isListening()) {
while (n++ < 100 && (!smtpServer.isListening() || !pop3Server.isListening() || !imapServer.isListening())) {
Thread.sleep(100);
}
assertTrue(n < 100);
@@ -72,17 +90,24 @@ public class MailTests {
@AfterClass
public static void tearDown() {
server.stop();
smtpServer.stop();
pop3Server.stop();
imapServer.stop();
}
@Autowired
@Qualifier("sendMailChannel")
private MessageChannel sendMailChannel;
@Autowired
@Qualifier("sendMailEndpoint.handler")
private MessageHandler sendMailHandler;
@Autowired
private PollableChannel pop3Channel;
@Autowired
private PollableChannel imapChannel;
@Test
public void testOutbound() throws Exception {
assertEquals("localhost", TestUtils.getPropertyValue(this.sendMailHandler, "mailSender.host"));
@@ -98,12 +123,12 @@ public class MailTests {
.build());
int n = 0;
while (n++ < 100 && server.getMessages().size() == 0) {
while (n++ < 100 && smtpServer.getMessages().size() == 0) {
Thread.sleep(100);
}
assertTrue(server.getMessages().size() > 0);
String message = server.getMessages().get(0);
assertTrue(smtpServer.getMessages().size() > 0);
String message = smtpServer.getMessages().get(0);
assertThat(message, endsWith("foo\n"));
assertThat(message, containsString("foo@bar"));
assertThat(message, containsString("bar@baz"));
@@ -112,6 +137,28 @@ public class MailTests {
}
@Test
public void testPop3() throws Exception {
Message<?> message = this.pop3Channel.receive(10000);
assertNotNull(message);
MimeMessage mm = (MimeMessage) message.getPayload();
assertEquals("foo@bar", mm.getRecipients(RecipientType.TO)[0].toString());
assertEquals("bar@baz", mm.getFrom()[0].toString());
assertEquals("Test Email", mm.getSubject());
assertEquals("foo\r\n", mm.getContent());
}
@Test
public void testImap() throws Exception {
Message<?> message = this.imapChannel.receive(10000);
assertNotNull(message);
MimeMessage mm = (MimeMessage) message.getPayload();
assertEquals("foo@bar", mm.getRecipients(RecipientType.TO)[0].toString());
assertEquals("bar@baz", mm.getFrom()[0].toString());
assertEquals("Test Email", mm.getSubject());
assertEquals("foo\r\n", mm.getContent());
}
@Configuration
@EnableIntegration
public static class ContextConfiguration {
@@ -120,7 +167,7 @@ public class MailTests {
public IntegrationFlow sendMailFlow() {
return IntegrationFlows.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(port)
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
@@ -128,6 +175,28 @@ public class MailTests {
.get();
}
@Bean
public IntegrationFlow pop3MailFlow() {
return IntegrationFlows
.from(Mail.pop3InboundAdapter("localhost", pop3Port, "user", "pw")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.autoStartup(true)
.poller(Pollers.fixedDelay(1000)))
.channel(MessageChannels.queue("pop3Channel"))
.get();
}
@Bean
public IntegrationFlow imapMailFlow() {
return IntegrationFlows
.from(Mail.imapInboundAdapter("imap://user:pw@localhost:" + imapPort + "/INBOX")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.autoStartup(true)
.poller(Pollers.fixedDelay(1000)))
.channel(MessageChannels.queue("imapChannel"))
.get();
}
}
}

View File

@@ -46,69 +46,44 @@ public class PoorMansMailServer {
}
}
public static class SmtpServer implements Runnable {
public static Pop3Server pop3(int port) {
try {
return new Pop3Server(port);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private final ServerSocket socket;
public static ImapServer imap(int port) {
try {
return new ImapServer(port);
}
catch (IOException e) {
throw new RuntimeException(e);
}
}
private final ExecutorService exec = Executors.newCachedThreadPool();
private final List<String> messages = new ArrayList<String>();
private volatile boolean listening;
public static class SmtpServer extends MailServer {
public SmtpServer(int port) throws IOException {
this.socket = ServerSocketFactory.getDefault().createServerSocket(port);
this.listening = true;
exec.execute(this);
}
public boolean isListening() {
return listening;
}
public List<String> getMessages() {
return messages;
super(port);
}
@Override
public void run() {
try {
while (!socket.isClosed()) {
Socket socket = this.socket.accept();
exec.execute(new SmtpHandler(socket));
}
}
catch (IOException e) {
this.listening = false;
}
protected MailHandler mailHandler(Socket socket) {
return new SmtpHandler(socket);
}
public void stop() {
try {
this.socket.close();
}
catch (IOException e) {
e.printStackTrace();
}
this.exec.shutdownNow();
}
public class SmtpHandler implements Runnable {
private final Socket socket;
private BufferedWriter writer;
public class SmtpHandler extends MailHandler {
public SmtpHandler(Socket socket) {
this.socket = socket;
super(socket);
}
@Override
public void run() {
void doRun() {
try {
StringBuilder sb = new StringBuilder();
BufferedReader reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
this.writer = new BufferedWriter(new OutputStreamWriter(this.socket.getOutputStream()));
write("220 foo SMTP");
while (!socket.isClosed()) {
String line = reader.readLine();
@@ -160,12 +135,253 @@ public class PoorMansMailServer {
}
}
private void write(String str) throws IOException {
}
}
public static class Pop3Server extends MailServer {
public Pop3Server(int port) throws IOException {
super(port);
}
@Override
protected MailHandler mailHandler(Socket socket) {
return new Pop3Handler(socket);
}
public class Pop3Handler extends MailHandler {
public Pop3Handler(Socket socket) {
super(socket);
}
@Override
void doRun() {
try {
write("+OK POP3");
while (!socket.isClosed()) {
String line = reader.readLine();
switch (line) {
case "CAPA":
write("+OK");
write("USER");
write(".");
break;
case "USER user":
write("+OK");
break;
case "PASS pw":
write("+OK");
break;
case "STAT":
write("+OK 1 3");
break;
case "NOOP":
write("+OK");
break;
case "RETR 1":
write("+OK");
write(MESSAGE);
write(".");
break;
case "QUIT":
write("+OK");
socket.close();
break;
}
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
}
public static class ImapServer extends MailServer {
public ImapServer(int port) throws IOException {
super(port);
}
@Override
protected MailHandler mailHandler(Socket socket) {
return new Pop3Handler(socket);
}
public class Pop3Handler extends MailHandler {
public Pop3Handler(Socket socket) {
super(socket);
}
@Override
void doRun() {
try {
write("* OK IMAP4rev1 Service Ready");
while (!socket.isClosed()) {
String line = reader.readLine();
if (line == null) {
break;
}
String tag = line.substring(0, line.indexOf(" ") + 1);
System.out.println(line);
if (line.endsWith("CAPABILITY")) {
write("* CAPABILITY IMAP4rev1");
write(tag + "OK CAPABILITY completed");
}
else if (line.endsWith("LOGIN user pw")) {
write(tag + "OK LOGIN completed");
}
else if (line.endsWith("LIST \"\" INBOX")) {
write("* LIST \"/\" \"\"");
write(tag + "OK LIST completed");
}
else if (line.endsWith("EXAMINE \"\"")) {
write("* 1 EXISTS");
write("* 1 RECENT");
write("* OK [UNSEEN 1]");
write(tag + "OK EXAMINE completed");
}
else if (line.endsWith("SEARCH NOT (FLAGGED) ALL")) {
write("* SEARCH 1 1");
write(tag + "OK SEARCH completed");
}
else if (line.contains("FETCH 1,1")) {
write("* 1 FETCH (RFC822.SIZE 6909 INTERNALDATE \"27-May-2013 09:45:41 +0000\" "
+ "FLAGS (\\Seen) "
+ "ENVELOPE (\"Mon, 27 May 2013 15:14:49 +0530\" "
+ "\"Test Email\" ((\"Foo\" NIL \"foo\" \"bar.tv\")) "
+ "((\"Foo\" NIL \"foo\" \"bar.tv\")) "
+ "((\"Foo\" NIL \"foo\" \"bar.tv\")) "
+ "((\"Bar\" NIL \"bar\" \"baz.net\")) NIL NIL "
+ "\"<4DA0A7E4.3010506@baz.net>\" "
+ "\"<CACVnpJkAUUfa3d_-4GNZW2WpxbB39tBCHC=T0gc7hty6dOEHcA@foo.bar.com>\") "
+ "BODYSTRUCTURE (\"TEXT\" \"PLAIN\" (\"CHARSET\" \"ISO-8859-1\") NIL NIL \"7BIT\" 1176 43)))");
write(tag + "OK FETCH completed");
}
else if (line.contains("STORE 1 +FLAGS (\\Flagged)")) {
write("* 1 FETCH (FLAGS (\\Flagged))");
write(tag + "OK STORE completed");
}
else if (line.contains("STORE 1 +FLAGS (\\Seen)")) {
write("* 1 FETCH (FLAGS (\\Flagged \\Seen))");
write(tag + "OK STORE completed");
}
else if (line.contains("FETCH 1 FLAGS")) {
write("* 1 FLAGS(\\Seen)");
write(tag + "OK FETCH completed");
}
else if (line.contains("FETCH 1 (BODY.PEEK")) {
write("* 1 FETCH (BODY[]<0> {" + (MESSAGE.length() + 2) + "}");
write(MESSAGE);
write(")");
write(tag + "OK FETCH completed");
}
else if (line.contains("CLOSE")) {
write(tag + "OK CLOSE completed");
}
else if (line.contains("NOOP")) {
write(tag + "OK NOOP completed");
}
}
}
catch (IOException e) {
e.printStackTrace();
}
}
}
}
public abstract static class MailServer implements Runnable {
private final ServerSocket socket;
private final ExecutorService exec = Executors.newCachedThreadPool();
protected final List<String> messages = new ArrayList<String>();
private volatile boolean listening;
public MailServer(int port) throws IOException {
this.socket = ServerSocketFactory.getDefault().createServerSocket(port);
this.listening = true;
exec.execute(this);
}
public boolean isListening() {
return listening;
}
public List<String> getMessages() {
return messages;
}
@Override
public void run() {
try {
while (!socket.isClosed()) {
Socket socket = this.socket.accept();
exec.execute(mailHandler(socket));
}
}
catch (IOException e) {
this.listening = false;
}
}
protected abstract MailHandler mailHandler(Socket socket);
public void stop() {
try {
this.socket.close();
}
catch (IOException e) {
e.printStackTrace();
}
this.exec.shutdownNow();
}
public abstract class MailHandler implements Runnable {
protected static final String MESSAGE = "To: foo@bar\r\nFrom: bar@baz\r\nSubject: Test Email\r\n\r\nfoo";
protected final Socket socket;
private BufferedWriter writer;
protected StringBuilder sb = new StringBuilder();
protected BufferedReader reader;
public MailHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
try {
this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream()));
this.writer = new BufferedWriter(new OutputStreamWriter(this.socket.getOutputStream()));
}
catch (IOException e) {
e.printStackTrace();
}
doRun();
}
protected void write(String str) throws IOException {
this.writer.write(str);
this.writer.write("\r\n");
this.writer.flush();
}
abstract void doRun();
}