378 lines
12 KiB
Plaintext
378 lines
12 KiB
Plaintext
:batch-asciidoc: ./
|
|
:toc: left
|
|
:toclevels: 4
|
|
|
|
[[itemProcessor]]
|
|
== Item processing
|
|
|
|
ifndef::onlyonetoggle[]
|
|
include::toggle.adoc[]
|
|
endif::onlyonetoggle[]
|
|
|
|
The <<readersAndWriters.adoc#readersAndWriters,ItemReader and ItemWriter interfaces>> are both very useful for their specific
|
|
tasks, but what if you want to insert business logic before writing? One option for both
|
|
reading and writing is to use the composite pattern: Create an `ItemWriter` that contains
|
|
another `ItemWriter` or an `ItemReader` that contains another `ItemReader`. The following
|
|
code shows an example:
|
|
|
|
[source, java]
|
|
----
|
|
public class CompositeItemWriter<T> implements ItemWriter<T> {
|
|
|
|
ItemWriter<T> itemWriter;
|
|
|
|
public CompositeItemWriter(ItemWriter<T> itemWriter) {
|
|
this.itemWriter = itemWriter;
|
|
}
|
|
|
|
public void write(List<? extends T> items) throws Exception {
|
|
//Add business logic here
|
|
itemWriter.write(items);
|
|
}
|
|
|
|
public void setDelegate(ItemWriter<T> itemWriter){
|
|
this.itemWriter = itemWriter;
|
|
}
|
|
}
|
|
----
|
|
|
|
The preceding class contains another `ItemWriter` to which it delegates after having
|
|
provided some business logic. This pattern could easily be used for an `ItemReader` as
|
|
well, perhaps to obtain more reference data based upon the input that was provided by the
|
|
main `ItemReader`. It is also useful if you need to control the call to `write` yourself.
|
|
However, if you only want to 'transform' the item passed in for writing before it is
|
|
actually written, you need not `write` yourself. You can just modify the item. For this
|
|
scenario, Spring Batch provides the `ItemProcessor` interface, as shown in the following
|
|
interface definition:
|
|
|
|
[source, java]
|
|
----
|
|
public interface ItemProcessor<I, O> {
|
|
|
|
O process(I item) throws Exception;
|
|
}
|
|
----
|
|
|
|
An `ItemProcessor` is simple. Given one object, transform it and return another. The
|
|
provided object may or may not be of the same type. The point is that business logic may
|
|
be applied within the process, and it is completely up to the developer to create that
|
|
logic. An `ItemProcessor` can be wired directly into a step. For example, assume an
|
|
`ItemReader` provides a class of type `Foo` and that it needs to be converted to type `Bar`
|
|
before being written out. The following example shows an `ItemProcessor` that performs
|
|
the conversion:
|
|
|
|
[source, java]
|
|
----
|
|
public class Foo {}
|
|
|
|
public class Bar {
|
|
public Bar(Foo foo) {}
|
|
}
|
|
|
|
public class FooProcessor implements ItemProcessor<Foo, Bar> {
|
|
public Bar process(Foo foo) throws Exception {
|
|
//Perform simple transformation, convert a Foo to a Bar
|
|
return new Bar(foo);
|
|
}
|
|
}
|
|
|
|
public class BarWriter implements ItemWriter<Bar> {
|
|
public void write(List<? extends Bar> bars) throws Exception {
|
|
//write bars
|
|
}
|
|
}
|
|
----
|
|
|
|
In the preceding example, there is a class `Foo`, a class `Bar`, and a class
|
|
`FooProcessor` that adheres to the `ItemProcessor` interface. The transformation is
|
|
simple, but any type of transformation could be done here. The `BarWriter` writes `Bar`
|
|
objects, throwing an exception if any other type is provided. Similarly, the
|
|
`FooProcessor` throws an exception if anything but a `Foo` is provided. The
|
|
`FooProcessor` can then be injected into a `Step`, as shown in the following example:
|
|
|
|
.XML Configuration
|
|
[source, xml, role="xmlContent"]
|
|
----
|
|
<job id="ioSampleJob">
|
|
<step name="step1">
|
|
<tasklet>
|
|
<chunk reader="fooReader" processor="fooProcessor" writer="barWriter"
|
|
commit-interval="2"/>
|
|
</tasklet>
|
|
</step>
|
|
</job>
|
|
----
|
|
|
|
.Java Configuration
|
|
[source, java, role="javaContent"]
|
|
----
|
|
@Bean
|
|
public Job ioSampleJob() {
|
|
return this.jobBuilderFactory.get("ioSampleJob")
|
|
.start(step1())
|
|
.build();
|
|
}
|
|
|
|
@Bean
|
|
public Step step1() {
|
|
return this.stepBuilderFactory.get("step1")
|
|
.<Foo, Bar>chunk(2)
|
|
.reader(fooReader())
|
|
.processor(fooProcessor())
|
|
.writer(barWriter())
|
|
.build();
|
|
}
|
|
----
|
|
|
|
A difference between `ItemProcessor` and `ItemReader` or `ItemWriter` is that an `ItemProcessor`
|
|
is optional for a `Step`.
|
|
|
|
[[chainingItemProcessors]]
|
|
=== Chaining ItemProcessors
|
|
|
|
Performing a single transformation is useful in many scenarios, but what if you want to
|
|
'chain' together multiple `ItemProcessor` implementations? This can be accomplished using
|
|
the composite pattern mentioned previously. To update the previous, single
|
|
transformation, example, `Foo` is transformed to `Bar`, which is transformed to `Foobar`
|
|
and written out, as shown in the following example:
|
|
|
|
[source, java]
|
|
----
|
|
public class Foo {}
|
|
|
|
public class Bar {
|
|
public Bar(Foo foo) {}
|
|
}
|
|
|
|
public class Foobar {
|
|
public Foobar(Bar bar) {}
|
|
}
|
|
|
|
public class FooProcessor implements ItemProcessor<Foo, Bar> {
|
|
public Bar process(Foo foo) throws Exception {
|
|
//Perform simple transformation, convert a Foo to a Bar
|
|
return new Bar(foo);
|
|
}
|
|
}
|
|
|
|
public class BarProcessor implements ItemProcessor<Bar, Foobar> {
|
|
public Foobar process(Bar bar) throws Exception {
|
|
return new Foobar(bar);
|
|
}
|
|
}
|
|
|
|
public class FoobarWriter implements ItemWriter<Foobar>{
|
|
public void write(List<? extends Foobar> items) throws Exception {
|
|
//write items
|
|
}
|
|
}
|
|
----
|
|
|
|
A `FooProcessor` and a `BarProcessor` can be 'chained' together to give the resultant
|
|
`Foobar`, as shown in the following example:
|
|
|
|
|
|
[source, java]
|
|
----
|
|
CompositeItemProcessor<Foo,Foobar> compositeProcessor =
|
|
new CompositeItemProcessor<Foo,Foobar>();
|
|
List itemProcessors = new ArrayList();
|
|
itemProcessors.add(new FooProcessor());
|
|
itemProcessors.add(new BarProcessor());
|
|
compositeProcessor.setDelegates(itemProcessors);
|
|
----
|
|
|
|
Just as with the previous example, the composite processor can be configured into the
|
|
`Step`:
|
|
|
|
.XML Configuration
|
|
[source, xml, role="xmlContent"]
|
|
----
|
|
<job id="ioSampleJob">
|
|
<step name="step1">
|
|
<tasklet>
|
|
<chunk reader="fooReader" processor="compositeItemProcessor" writer="foobarWriter"
|
|
commit-interval="2"/>
|
|
</tasklet>
|
|
</step>
|
|
</job>
|
|
|
|
<bean id="compositeItemProcessor"
|
|
class="org.springframework.batch.item.support.CompositeItemProcessor">
|
|
<property name="delegates">
|
|
<list>
|
|
<bean class="..FooProcessor" />
|
|
<bean class="..BarProcessor" />
|
|
</list>
|
|
</property>
|
|
</bean>
|
|
----
|
|
|
|
.Java Configuration
|
|
[source, java, role="javaContent"]
|
|
----
|
|
@Bean
|
|
public Job ioSampleJob() {
|
|
return this.jobBuilderFactory.get("ioSampleJob")
|
|
.start(step1())
|
|
.build();
|
|
}
|
|
|
|
@Bean
|
|
public Step step1() {
|
|
return this.stepBuilderFactory.get("step1")
|
|
.<Foo, Foobar>chunk(2)
|
|
.reader(fooReader())
|
|
.processor(compositeProcessor())
|
|
.writer(foobarWriter())
|
|
.build();
|
|
}
|
|
|
|
@Bean
|
|
public CompositeItemProcessor compositeProcessor() {
|
|
List<ItemProcessor> delegates = new ArrayList<>(2);
|
|
delegates.add(new FooProcessor());
|
|
delegates.add(new BarProcessor());
|
|
|
|
CompositeItemProcessor processor = new CompositeItemProcessor();
|
|
|
|
processor.setDelegates(delegates);
|
|
|
|
return processor;
|
|
}
|
|
----
|
|
|
|
[[filteringRecords]]
|
|
=== Filtering Records
|
|
|
|
One typical use for an item processor is to filter out records before they are passed to
|
|
the `ItemWriter`. Filtering is an action distinct from skipping. Skipping indicates that
|
|
a record is invalid, while filtering simply indicates that a record should not be
|
|
written.
|
|
|
|
For example, consider a batch job that reads a file containing three different types of
|
|
records: records to insert, records to update, and records to delete. If record deletion
|
|
is not supported by the system, then we would not want to send any "delete" records to
|
|
the `ItemWriter`. But, since these records are not actually bad records, we would want to
|
|
filter them out rather than skip them. As a result, the `ItemWriter` would receive only
|
|
"insert" and "update" records.
|
|
|
|
To filter a record, you can return `null` from the `ItemProcessor`. The framework detects
|
|
that the result is `null` and avoids adding that item to the list of records delivered to
|
|
the `ItemWriter`. As usual, an exception thrown from the `ItemProcessor` results in a
|
|
skip.
|
|
|
|
[[validatingInput]]
|
|
=== Validating Input
|
|
|
|
In the <<readersAndWriters.adoc#readersAndWriters,ItemReaders and ItemWriters>> chapter, multiple approaches to parsing input have been
|
|
discussed. Each major implementation throws an exception if it is not 'well-formed'. The
|
|
`FixedLengthTokenizer` throws an exception if a range of data is missing. Similarly,
|
|
attempting to access an index in a `RowMapper` or `FieldSetMapper` that does not exist or
|
|
is in a different format than the one expected causes an exception to be thrown. All of
|
|
these types of exceptions are thrown before `read` returns. However, they do not address
|
|
the issue of whether or not the returned item is valid. For example, if one of the fields
|
|
is an age, it obviously cannot be negative. It may parse correctly, because it exists and
|
|
is a number, but it does not cause an exception. Since there are already a plethora of
|
|
validation frameworks, Spring Batch does not attempt to provide yet another. Rather, it
|
|
provides a simple interface, called `Validator`, that can be implemented by any number of
|
|
frameworks, as shown in the following interface definition:
|
|
|
|
[source, java]
|
|
----
|
|
public interface Validator<T> {
|
|
|
|
void validate(T value) throws ValidationException;
|
|
|
|
}
|
|
----
|
|
|
|
The contract is that the `validate` method throws an exception if the object is invalid
|
|
and returns normally if it is valid. Spring Batch provides an out of the box
|
|
`ValidatingItemProcessor`, as shown in the following bean definition:
|
|
|
|
.XML Configuration
|
|
[source, xml, role="xmlContent"]
|
|
----
|
|
<bean class="org.springframework.batch.item.validator.ValidatingItemProcessor">
|
|
<property name="validator" ref="validator" />
|
|
</bean>
|
|
|
|
<bean id="validator" class="org.springframework.batch.item.validator.SpringValidator">
|
|
<property name="validator">
|
|
<bean class="org.springframework.batch.sample.domain.trade.internal.validator.TradeValidator"/>
|
|
</property>
|
|
</bean>
|
|
----
|
|
|
|
.Java Configuration
|
|
[source, java, role="javaContent"]
|
|
----
|
|
@Bean
|
|
public ValidatingItemProcessor itemProcessor() {
|
|
ValidatingItemProcessor processor = new ValidatingItemProcessor();
|
|
|
|
processor.setValidator(validator());
|
|
|
|
return processor;
|
|
}
|
|
|
|
@Bean
|
|
public SpringValidator validator() {
|
|
SpringValidator validator = new SpringValidator();
|
|
|
|
validator.setValidator(new TradeValidator());
|
|
|
|
return validator;
|
|
}
|
|
----
|
|
|
|
You can also use the `BeanValidatingItemProcessor` to validate items annotated with
|
|
the Bean Validation API (JSR-303) annotations. For example, given the following type `Person`:
|
|
|
|
[source, java]
|
|
----
|
|
class Person {
|
|
|
|
@NotEmpty
|
|
private String name;
|
|
|
|
public Person(String name) {
|
|
this.name = name;
|
|
}
|
|
|
|
public String getName() {
|
|
return name;
|
|
}
|
|
|
|
public void setName(String name) {
|
|
this.name = name;
|
|
}
|
|
|
|
}
|
|
----
|
|
|
|
you can validate items by declaring a `BeanValidatingItemProcessor` bean in your
|
|
application context and register it as a processor in your chunk-oriented step:
|
|
|
|
[source, java]
|
|
----
|
|
@Bean
|
|
public BeanValidatingItemProcessor<Person> beanValidatingItemProcessor() throws Exception {
|
|
BeanValidatingItemProcessor<Person> beanValidatingItemProcessor = new BeanValidatingItemProcessor<>();
|
|
beanValidatingItemProcessor.setFilter(true);
|
|
|
|
return beanValidatingItemProcessor;
|
|
}
|
|
----
|
|
|
|
[[faultTolerant]]
|
|
=== Fault Tolerance
|
|
|
|
When a chunk is rolled back, items that have been cached during reading may be
|
|
reprocessed. If a step is configured to be fault tolerant (typically by using skip or
|
|
retry processing), any `ItemProcessor` used should be implemented in a way that is
|
|
idempotent. Typically that would consist of performing no changes on the input item for
|
|
the `ItemProcessor` and only updating the
|
|
instance that is the result.
|