[[sub-elements]] = Sub-elements When this `Gateway` is receiving messages from a `PollableChannel`, you must either provide a global default `Poller` or provide a `Poller` sub-element to the `Job Launching Gateway`. [tabs] ==== Java:: + The following example shows how to provide a poller in Java: + .Java Configuration [source, java] ---- @Bean @ServiceActivator(inputChannel = "queueChannel", poller = @Poller(fixedRate="1000")) public JobLaunchingGateway sampleJobLaunchingGateway() { JobLaunchingGateway jobLaunchingGateway = new JobLaunchingGateway(jobLauncher()); jobLaunchingGateway.setOutputChannel(replyChannel()); return jobLaunchingGateway; } ---- XML:: + The following example shows how to provide a poller in XML: + .XML Configuration [source, xml] ---- ---- ==== [[providing-feedback-with-informational-messages]] == Providing Feedback with Informational Messages As Spring Batch jobs can run for long times, providing progress information is often critical. For example, stakeholders may want to be notified if some or all parts of a batch job have failed. Spring Batch provides support for this information being gathered through: * Active polling * Event-driven listeners When starting a Spring Batch job asynchronously (for example, by using the Job Launching Gateway), a `JobExecution` instance is returned. Thus, you can use `JobExecution.getJobId()` to continuously poll for status updates by retrieving updated instances of the `JobExecution` from the `JobRepository` by using the `JobExplorer`. However, this is considered sub-optimal, and an event-driven approach is preferred. Therefore, Spring Batch provides listeners, including the three most commonly used listeners: * `StepListener` * `ChunkListener` * `JobExecutionListener` In the example shown in the following image, a Spring Batch job has been configured with a `StepExecutionListener`. Thus, Spring Integration receives and processes any step before or after events. For example, you can inspect the received `StepExecution` by using a `Router`. Based on the results of that inspection, various things can occur (such as routing a message to a mail outbound channel adapter), so that an email notification can be sent out based on some condition. .Handling Informational Messages image::handling-informational-messages.png[Handling Informational Messages, scaledwidth="60%"] The following two-part example shows how a listener is configured to send a message to a `Gateway` for a `StepExecution` events and log its output to a `logging-channel-adapter`. First, create the notification integration beans. [tabs] ==== Java:: + The following example shows the how to create the notification integration beans in Java: + .Java Configuration [source, java] ---- @Bean @ServiceActivator(inputChannel = "stepExecutionsChannel") public LoggingHandler loggingHandler() { LoggingHandler adapter = new LoggingHandler(LoggingHandler.Level.WARN); adapter.setLoggerName("TEST_LOGGER"); adapter.setLogExpressionString("headers.id + ': ' + payload"); return adapter; } @MessagingGateway(name = "notificationExecutionsListener", defaultRequestChannel = "stepExecutionsChannel") public interface NotificationExecutionListener extends StepExecutionListener {} ---- + NOTE: You need to add the `@IntegrationComponentScan` annotation to your configuration. XML:: + The following example shows the how to create the notification integration beans in XML: + .XML Configuration [source, xml] ---- ---- ==== [[message-gateway-entry-list]] Second, modify your job to add a step-level listener. [tabs] ==== Java:: + The following example shows the how to add a step-level listener in Java: + .Java Configuration [source, java] ---- public Job importPaymentsJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new JobBuilder("importPayments", jobRepository) .start(new StepBuilder("step1", jobRepository) .chunk(200, transactionManager) .listener(notificationExecutionsListener()) // ... .build(); ) .build(); } ---- XML:: + The following example shows the how to add a step-level listener in XML: + .XML Configuration [source, xml] ---- ... ---- ==== [[asynchronous-processors]] == Asynchronous Processors Asynchronous Processors help you scale the processing of items. In the asynchronous processor use case, an `AsyncItemProcessor` serves as a dispatcher, executing the logic of the `ItemProcessor` for an item on a new thread. Once the item completes, the `Future` is passed to the `AsyncItemWriter` to be written. Therefore, you can increase performance by using asynchronous item processing, basically letting you implement fork-join scenarios. The `AsyncItemWriter` gathers the results and writes back the chunk as soon as all the results become available. [tabs] ==== Java:: + The following example shows how to configuration the `AsyncItemProcessor` in Java: + .Java Configuration [source, java] ---- @Bean public AsyncItemProcessor processor(ItemProcessor itemProcessor, TaskExecutor taskExecutor) { AsyncItemProcessor asyncItemProcessor = new AsyncItemProcessor(); asyncItemProcessor.setTaskExecutor(taskExecutor); asyncItemProcessor.setDelegate(itemProcessor); return asyncItemProcessor; } ---- XML:: + The following example shows how to configuration the `AsyncItemProcessor` in XML: + .XML Configuration [source, xml] ---- ---- ==== The `delegate` property refers to your `ItemProcessor` bean, and the `taskExecutor` property refers to the `TaskExecutor` of your choice. [tabs] ==== Java:: + The following example shows how to configure the `AsyncItemWriter` in Java: + .Java Configuration [source, java] ---- @Bean public AsyncItemWriter writer(ItemWriter itemWriter) { AsyncItemWriter asyncItemWriter = new AsyncItemWriter(); asyncItemWriter.setDelegate(itemWriter); return asyncItemWriter; } ---- XML:: + The following example shows how to configure the `AsyncItemWriter` in XML: + .XML Configuration [source, xml] ---- ---- ==== Again, the `delegate` property is actually a reference to your `ItemWriter` bean. [[externalizing-batch-process-execution]] == Externalizing Batch Process Execution The integration approaches discussed so far suggest use cases where Spring Integration wraps Spring Batch like an outer shell. However, Spring Batch can also use Spring Integration internally. By using this approach, Spring Batch users can delegate the processing of items or even chunks to outside processes. This lets you offload complex processing. Spring Batch Integration provides dedicated support for: * Remote Chunking * Remote Partitioning [[remote-chunking]] === Remote Chunking The following image shows one way that remote chunking works when you use Spring Batch together with Spring Integration: .Remote Chunking image::remote-chunking-sbi.png[Remote Chunking, scaledwidth="60%"] Taking things one step further, you can also externalize the chunk processing by using the `ChunkMessageChannelItemWriter` (provided by Spring Batch Integration), which sends items out and collects the result. Once sent, Spring Batch continues the process of reading and grouping items, without waiting for the results. Rather, it is the responsibility of the `ChunkMessageChannelItemWriter` to gather the results and integrate them back into the Spring Batch process. With Spring Integration, you have full control over the concurrency of your processes (for instance, by using a `QueueChannel` instead of a `DirectChannel`). Furthermore, by relying on Spring Integration's rich collection of channel adapters (such as JMS and AMQP), you can distribute chunks of a batch job to external systems for processing. [tabs] ==== Java:: + A job with a step to be remotely chunked might have a configuration similar to the following in Java: + .Java Configuration [source, java] ---- public Job chunkJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) { return new JobBuilder("personJob", jobRepository) .start(new StepBuilder("step1", jobRepository) .chunk(200, transactionManager) .reader(itemReader()) .writer(itemWriter()) .build()) .build(); } ---- XML:: + A job with a step to be remotely chunked might have a configuration similar to the following in XML: + .XML Configuration [source, xml] ---- ... ---- ==== The `ItemReader` reference points to the bean you want to use for reading data on the manager. The `ItemWriter` reference points to a special `ItemWriter` (called `ChunkMessageChannelItemWriter`), as described earlier. The processor (if any) is left off the manager configuration, as it is configured on the worker. You should check any additional component properties, such as throttle limits and so on, when implementing your use case. [tabs] ==== Java:: + The following Java configuration provides a basic manager setup: + .Java Configuration [source, java] ---- @Bean public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); factory.setBrokerURL("tcp://localhost:61616"); return factory; } /* * Configure outbound flow (requests going to workers) */ @Bean public DirectChannel requests() { return new DirectChannel(); } @Bean public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlow .from(requests()) .handle(Jms.outboundAdapter(connectionFactory).destination("requests")) .get(); } /* * Configure inbound flow (replies coming from workers) */ @Bean public QueueChannel replies() { return new QueueChannel(); } @Bean public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlow .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies")) .channel(replies()) .get(); } /* * Configure the ChunkMessageChannelItemWriter */ @Bean public ItemWriter itemWriter() { MessagingTemplate messagingTemplate = new MessagingTemplate(); messagingTemplate.setDefaultChannel(requests()); messagingTemplate.setReceiveTimeout(2000); ChunkMessageChannelItemWriter chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>(); chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate); chunkMessageChannelItemWriter.setReplyChannel(replies()); return chunkMessageChannelItemWriter; } ---- XML:: + The following XML configuration provides a basic manager setup: + .XML Configuration [source, xml] ---- ---- ==== The preceding configuration provides us with a number of beans. We configure our messaging middleware by using ActiveMQ and the inbound and outbound JMS adapters provided by Spring Integration. As shown, our `itemWriter` bean, which is referenced by our job step, uses the `ChunkMessageChannelItemWriter` to write chunks over the configured middleware. Now we can move on to the worker configuration, as the following example shows: [tabs] ==== Java:: + The following example shows the worker configuration in Java: + .Java Configuration [source, java] ---- @Bean public org.apache.activemq.ActiveMQConnectionFactory connectionFactory() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); factory.setBrokerURL("tcp://localhost:61616"); return factory; } /* * Configure inbound flow (requests coming from the manager) */ @Bean public DirectChannel requests() { return new DirectChannel(); } @Bean public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlow .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("requests")) .channel(requests()) .get(); } /* * Configure outbound flow (replies going to the manager) */ @Bean public DirectChannel replies() { return new DirectChannel(); } @Bean public IntegrationFlow outboundFlow(ActiveMQConnectionFactory connectionFactory) { return IntegrationFlow .from(replies()) .handle(Jms.outboundAdapter(connectionFactory).destination("replies")) .get(); } /* * Configure the ChunkProcessorChunkHandler */ @Bean @ServiceActivator(inputChannel = "requests", outputChannel = "replies") public ChunkProcessorChunkHandler chunkProcessorChunkHandler() { ChunkProcessor chunkProcessor = new SimpleChunkProcessor<>(itemProcessor(), itemWriter()); ChunkProcessorChunkHandler chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>(); chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor); return chunkProcessorChunkHandler; } ---- XML:: + The following example shows the worker configuration in XML: + .XML Configuration [source, xml] ---- ---- ==== Most of these configuration items should look familiar from the manager configuration. Workers do not need access to the Spring Batch `JobRepository` nor to the actual job configuration file. The main bean of interest is the `chunkProcessorChunkHandler`. The `chunkProcessor` property of `ChunkProcessorChunkHandler` takes a configured `SimpleChunkProcessor`, which is where you would provide a reference to your `ItemWriter` (and, optionally, your `ItemProcessor`) that will run on the worker when it receives chunks from the manager. For more information, see the section of the "`Scalability`" chapter on link:$$https://docs.spring.io/spring-batch/docs/current/reference/html/scalability.html#remoteChunking$$[Remote Chunking]. Starting from version 4.1, Spring Batch Integration introduces the `@EnableBatchIntegration` annotation that can be used to simplify a remote chunking setup. This annotation provides two beans that you can autowire in your application context: * `RemoteChunkingManagerStepBuilderFactory`: Configures the manager step * `RemoteChunkingWorkerBuilder`: Configures the remote worker integration flow These APIs take care of configuring a number of components, as the following diagram shows: .Remote Chunking Configuration image::remote-chunking-config.png[Remote Chunking Configuration, scaledwidth="80%"] On the manager side, the `RemoteChunkingManagerStepBuilderFactory` lets you configure a manager step by declaring: * The item reader to read items and send them to workers * The output channel ("Outgoing requests") to send requests to workers * The input channel ("Incoming replies") to receive replies from workers You need not explicitly configure `ChunkMessageChannelItemWriter` and the `MessagingTemplate`. (You can still explicitly configure them if find a reason to do so). On the worker side, the `RemoteChunkingWorkerBuilder` lets you configure a worker to: * Listen to requests sent by the manager on the input channel ("`Incoming requests`") * Call the `handleChunk` method of `ChunkProcessorChunkHandler` for each request with the configured `ItemProcessor` and `ItemWriter` * Send replies on the output channel ("`Outgoing replies`") to the manager You need not explicitly configure the `SimpleChunkProcessor` and the `ChunkProcessorChunkHandler`. (You can still explicitly configure them if you find a reason to do so). The following example shows how to use these APIs: [source, java] ---- @EnableBatchIntegration @EnableBatchProcessing public class RemoteChunkingJobConfiguration { @Configuration public static class ManagerConfiguration { @Autowired private RemoteChunkingManagerStepBuilderFactory managerStepBuilderFactory; @Bean public TaskletStep managerStep() { return this.managerStepBuilderFactory.get("managerStep") .chunk(100) .reader(itemReader()) .outputChannel(requests()) // requests sent to workers .inputChannel(replies()) // replies received from workers .build(); } // Middleware beans setup omitted } @Configuration public static class WorkerConfiguration { @Autowired private RemoteChunkingWorkerBuilder workerBuilder; @Bean public IntegrationFlow workerFlow() { return this.workerBuilder .itemProcessor(itemProcessor()) .itemWriter(itemWriter()) .inputChannel(requests()) // requests received from the manager .outputChannel(replies()) // replies sent to the manager .build(); } // Middleware beans setup omitted } } ---- You can find a complete example of a remote chunking job link:$$https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#remote-chunking-sample$$[here]. [[remote-partitioning]] === Remote Partitioning The following image shows a typical remote partitioning situation: .Remote Partitioning image::remote-partitioning.png[Remote Partitioning, scaledwidth="60%"] Remote Partitioning, on the other hand, is useful when it is not the processing of items but rather the associated I/O that causes the bottleneck. With remote partitioning, you can send work to workers that execute complete Spring Batch steps. Thus, each worker has its own `ItemReader`, `ItemProcessor`, and `ItemWriter`. For this purpose, Spring Batch Integration provides the `MessageChannelPartitionHandler`. This implementation of the `PartitionHandler` interface uses `MessageChannel` instances to send instructions to remote workers and receive their responses. This provides a nice abstraction from the transports (such as JMS and AMQP) being used to communicate with the remote workers. The section of the "`Scalability`" chapter that addresses xref:scalability.adoc#partitioning[remote partitioning] provides an overview of the concepts and components needed to configure remote partitioning and shows an example of using the default `TaskExecutorPartitionHandler` to partition in separate local threads of execution. For remote partitioning to multiple JVMs, two additional components are required: * A remoting fabric or grid environment * A `PartitionHandler` implementation that supports the desired remoting fabric or grid environment Similar to remote chunking, you can use JMS as the "`remoting fabric`". In that case, use a `MessageChannelPartitionHandler` instance as the `PartitionHandler` implementation, as described earlier. [tabs] ==== Java:: + The following example assumes an existing partitioned job and focuses on the `MessageChannelPartitionHandler` and JMS configuration in Java: + .Java Configuration [source, java] ---- /* * Configuration of the manager side */ @Bean public PartitionHandler partitionHandler() { MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler(); partitionHandler.setStepName("step1"); partitionHandler.setGridSize(3); partitionHandler.setReplyChannel(outboundReplies()); MessagingTemplate template = new MessagingTemplate(); template.setDefaultChannel(outboundRequests()); template.setReceiveTimeout(100000); partitionHandler.setMessagingOperations(template); return partitionHandler; } @Bean public QueueChannel outboundReplies() { return new QueueChannel(); } @Bean public DirectChannel outboundRequests() { return new DirectChannel(); } @Bean public IntegrationFlow outboundJmsRequests() { return IntegrationFlow.from("outboundRequests") .handle(Jms.outboundGateway(connectionFactory()) .requestDestination("requestsQueue")) .get(); } @Bean @ServiceActivator(inputChannel = "inboundStaging") public AggregatorFactoryBean partitioningMessageHandler() throws Exception { AggregatorFactoryBean aggregatorFactoryBean = new AggregatorFactoryBean(); aggregatorFactoryBean.setProcessorBean(partitionHandler()); aggregatorFactoryBean.setOutputChannel(outboundReplies()); // configure other propeties of the aggregatorFactoryBean return aggregatorFactoryBean; } @Bean public DirectChannel inboundStaging() { return new DirectChannel(); } @Bean public IntegrationFlow inboundJmsStaging() { return IntegrationFlow .from(Jms.messageDrivenChannelAdapter(connectionFactory()) .configureListenerContainer(c -> c.subscriptionDurable(false)) .destination("stagingQueue")) .channel(inboundStaging()) .get(); } /* * Configuration of the worker side */ @Bean public StepExecutionRequestHandler stepExecutionRequestHandler() { StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler(); stepExecutionRequestHandler.setJobExplorer(jobExplorer); stepExecutionRequestHandler.setStepLocator(stepLocator()); return stepExecutionRequestHandler; } @Bean @ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging") public StepExecutionRequestHandler serviceActivator() throws Exception { return stepExecutionRequestHandler(); } @Bean public DirectChannel inboundRequests() { return new DirectChannel(); } public IntegrationFlow inboundJmsRequests() { return IntegrationFlow .from(Jms.messageDrivenChannelAdapter(connectionFactory()) .configureListenerContainer(c -> c.subscriptionDurable(false)) .destination("requestsQueue")) .channel(inboundRequests()) .get(); } @Bean public DirectChannel outboundStaging() { return new DirectChannel(); } @Bean public IntegrationFlow outboundJmsStaging() { return IntegrationFlow.from("outboundStaging") .handle(Jms.outboundGateway(connectionFactory()) .requestDestination("stagingQueue")) .get(); } ---- XML:: + The following example assumes an existing partitioned job and focuses on the `MessageChannelPartitionHandler` and JMS configuration in XML: + .XML Configuration [source, xml] ---- ---- ==== You must also ensure that the partition `handler` attribute maps to the `partitionHandler` bean. [tabs] ==== Java:: + The following example maps the partition `handler` attribute to the `partitionHandler` in Java: + .Java Configuration [source, java] ---- public Job personJob(JobRepository jobRepository) { return new JobBuilder("personJob", jobRepository) .start(new StepBuilder("step1.manager", jobRepository) .partitioner("step1.worker", partitioner()) .partitionHandler(partitionHandler()) .build()) .build(); } ---- XML:: + The following example maps the partition `handler` attribute to the `partitionHandler` in XML: + .XML Configuration [source, xml] ---- ... ---- ==== You can find a complete example of a remote partitioning job link:$$https://github.com/spring-projects/spring-batch/tree/main/spring-batch-samples#remote-partitioning-sample$$[here]. You can use the `@EnableBatchIntegration` annotation to simplify a remote partitioning setup. This annotation provides two beans that are useful for remote partitioning: * `RemotePartitioningManagerStepBuilderFactory`: Configures the manager step * `RemotePartitioningWorkerStepBuilderFactory`: Configures the worker step These APIs take care of configuring a number of components, as the following diagrams show: .Remote Partitioning Configuration (with job repository polling) image::remote-partitioning-polling-config.png[Remote Partitioning Configuration (with job repository polling), scaledwidth="80%"] .Remote Partitioning Configuration (with replies aggregation) image::remote-partitioning-aggregation-config.png[Remote Partitioning Configuration (with replies aggregation), scaledwidth="80%"] On the manager side, the `RemotePartitioningManagerStepBuilderFactory` lets you configure a manager step by declaring: * The `Partitioner` used to partition data * The output channel ("`Outgoing requests`") on which to send requests to workers * The input channel ("`Incoming replies`") on which to receive replies from workers (when configuring replies aggregation) * The poll interval and timeout parameters (when configuring job repository polling) You need not explicitly configure The `MessageChannelPartitionHandler` and the `MessagingTemplate`. (You can still explicitly configured them if you find a reason to do so). On the worker side, the `RemotePartitioningWorkerStepBuilderFactory` lets you configure a worker to: * Listen to requests sent by the manager on the input channel ("`Incoming requests`") * Call the `handle` method of `StepExecutionRequestHandler` for each request * Send replies on the output channel ("`Outgoing replies`") to the manager You need not explicitly configure the `StepExecutionRequestHandler`. (You can explicitly configure it if you find a reason to do so). The following example shows how to use these APIs: [source, java] ---- @Configuration @EnableBatchProcessing @EnableBatchIntegration public class RemotePartitioningJobConfiguration { @Configuration public static class ManagerConfiguration { @Autowired private RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory; @Bean public Step managerStep() { return this.managerStepBuilderFactory .get("managerStep") .partitioner("workerStep", partitioner()) .gridSize(10) .outputChannel(outgoingRequestsToWorkers()) .inputChannel(incomingRepliesFromWorkers()) .build(); } // Middleware beans setup omitted } @Configuration public static class WorkerConfiguration { @Autowired private RemotePartitioningWorkerStepBuilderFactory workerStepBuilderFactory; @Bean public Step workerStep() { return this.workerStepBuilderFactory .get("workerStep") .inputChannel(incomingRequestsFromManager()) .outputChannel(outgoingRepliesToManager()) .chunk(100) .reader(itemReader()) .processor(itemProcessor()) .writer(itemWriter()) .build(); } // Middleware beans setup omitted } } ----