Revert "file-split-ftp Move errorChannel to Poller"

This reverts commit 7b84810388.
This commit is contained in:
Gary Russell
2016-10-13 17:21:49 -04:00
parent 7b84810388
commit b3bab0dea2
8 changed files with 13 additions and 35 deletions

View File

@@ -28,7 +28,6 @@ import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.mail.MailProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.core.Pollers;
@@ -42,7 +41,6 @@ import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.ftp.session.DefaultFtpSessionFactory;
import org.springframework.integration.http.config.EnableIntegrationGraphController;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
@@ -71,7 +69,8 @@ public class Application {
public IntegrationFlow fromFile() {
return IntegrationFlows.from(Files.inboundAdapter(new File("/tmp/in"))
.preventDuplicates(false)
.patternFilter("*.txt"), e -> e.poller(Pollers.fixedDelay(5000).errorChannel(tfrErrorChannel())))
.patternFilter("*.txt"), e -> e.poller(Pollers.fixedDelay(5000)))
.enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, "tfrErrors.input"))
.handle(Files.splitter(true, true))
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(FileSplitter.FileMarker.class, "markers.input")
@@ -164,19 +163,13 @@ public class Application {
return ftp;
}
@Bean
public MessageChannel tfrErrorChannel() {
return new DirectChannel();
}
/**
* Error flow - email failure
* @return the flow.
*/
@Bean
public IntegrationFlow tfrErrors() {
return IntegrationFlows.from(tfrErrorChannel())
.enrichHeaders(Mail.headers()
return f -> f.enrichHeaders(Mail.headers()
.subject("File split and transfer failed")
.from("foo@bar")
.toFunction(m -> new String[] {"bar@baz"}))
@@ -185,8 +178,7 @@ public class Application {
+ FileHeaders.ORIGINAL_FILE + "']"))
.<MessagingException, String>transform(p ->
p.getFailedMessage().getPayload().toString() + "\n" + getStackTraceAsString(p))
.channel("toMail.input")
.get();
.channel("toMail.input");
}
@Bean

View File

@@ -25,7 +25,7 @@
<!-- From RabbitMQ To STDOUT -->
<int-amqp:inbound-channel-adapter channel="fromRabbit" consumers-per-queue="1"
<int-amqp:inbound-channel-adapter channel="fromRabbit"
queue-names="si.test.queue" connection-factory="connectionFactory" />
<int:channel id="fromRabbit">

View File

@@ -11,10 +11,6 @@
</appender>
<!-- Loggers -->
<logger name="org.springframework.amqp">
<level value="debug" />
</logger>
<logger name="org.springframework.integration">
<level value="warn" />
</logger>
@@ -33,4 +29,4 @@
<appender-ref ref="console" />
</root>
</log4j:configuration>
</log4j:configuration>

View File

@@ -23,14 +23,12 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.ImportResource;
import org.springframework.integration.http.config.EnableIntegrationGraphController;
/**
* @author Gary Russell
* @since 4.2
*/
@SpringBootApplication
@EnableIntegrationGraphController
@ImportResource("/META-INF/spring/integration/server-context.xml")
public class Application {
@@ -53,9 +51,9 @@ public class Application {
System.out.println("\n\n++++++++++++ Sending: " + request + " ++++++++++++\n");
String reply = requestGateway.echo(request);
System.out.println("\n\n++++++++++++ Replied with: " + reply + " ++++++++++++\n");
// client.close();
// server.close();
// System.exit(0); // AMQP-519
client.close();
server.close();
System.exit(0); // AMQP-519
}

View File

@@ -0,0 +1 @@
spring.output.ansi.enabled=always

View File

@@ -12,16 +12,11 @@
<int-http:inbound-gateway request-channel="receiveChannel"
path="/receiveGateway"
mapped-request-headers="Content-Type"
supported-methods="POST"/>
<int:channel id="receiveChannel"/>
<int:chain input-channel="receiveChannel">
<int:header-enricher>
<int:header name="contentType" expression="headers['content-type']" />
<int:header name="conent-type" expression="null" overwrite="true" />
</int:header-enricher>
<int:header-filter header-names="content-type" />
<int:service-activator expression="payload + ' from the other side'"/>
</int:chain>

View File

@@ -46,7 +46,6 @@ import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;
import org.springframework.kafka.support.KafkaNull;
import org.springframework.kafka.support.TopicPartitionInitialOffset;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
@@ -141,9 +140,7 @@ public class Application {
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
final DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(props);
cf.setValueDeserializer(new JsonDeserializer<>(String.class));
return cf;
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean

View File

@@ -174,7 +174,7 @@ subprojects { subproject ->
groovyVersion = '2.3.0'
hsqldbVersion = '2.3.2'
h2Version = '1.3.175'
jacksonVersion = '2.6.7'
jacksonVersion = '2.3.2'
jasyptVersion = '1.7'
javaxInjectVersion = '1'
javaxMailVersion = '1.5.5'
@@ -201,7 +201,7 @@ subprojects { subproject ->
springIntegrationKafkaVersion = '2.1.0.RELEASE'
springIntegrationSplunkVersion = '1.1.0.RELEASE'
springKafkaVersion = '1.1.0.RELEASE'
springVersion = '4.3.3.RELEASE'
springVersion = '4.3.2.RELEASE'
springSecurityVersion = '4.0.2.RELEASE'
springWebFlowVersion = '2.3.3.RELEASE'
tilesJspVersion = '2.2.1'
@@ -612,7 +612,6 @@ project('kafka') {
compile ("org.springframework.kafka:spring-kafka-test:$springKafkaVersion") {
exclude group: 'org.slf4j'
}
compile "com.fasterxml.jackson.core:jackson-databind:$jacksonVersion"
compile "log4j:log4j:$log4jVersion"