RESOLVED - BATCH-522: abstract common concerns from BatchSqlUpdateItemWriter and HibernateAwareItemWriter

duplicate logic pulled up to AbstractTransactionalResourceItemWriter
This commit is contained in:
robokaso
2008-04-09 09:39:53 +00:00
parent 58448aa23b
commit 5ff2e320bc
5 changed files with 225 additions and 250 deletions

View File

@@ -0,0 +1,167 @@
/*
* Copyright 2006-2007 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.batch.item.database;
import java.util.HashSet;
import java.util.Set;
import org.springframework.batch.item.ClearFailedException;
import org.springframework.batch.item.FlushFailedException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.support.RepeatSynchronizationManager;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
/**
* Stores items in transactional resource and flushes aggressively in case of
* failure. This is useful for batch update writers which need to identify the
* failed item after failed flush.
*
* @see BatchSqlUpdateItemWriter
* @see HibernateAwareItemWriter
*
* @author Dave Syer
* @author Robert Kasanicky
*/
public abstract class AbstractTransactionalResourceItemWriter implements ItemWriter {
private Set failed = new HashSet();
/**
* Flushing delegated to subclass surrounded by binding and unbinding of
* transactional resources.
*
* @see org.springframework.batch.item.ItemWriter#flush()
*/
public void flush() throws FlushFailedException {
bindTransactionResources();
try {
doFlush();
}
catch (RuntimeException e) {
synchronized (failed) {
failed.addAll(getProcessed());
}
// This used to contain a call to onError, however, I think this
// should be handled within the step.
throw e;
}
finally {
unbindTransactionResources();
}
}
/**
* Delegate to subclass to actually do the writing, but flushes aggressively
* if the item was previously part of a failed chunk.
*
* @throws Exception
*
* @see org.springframework.batch.io.OutputSource#write(java.lang.Object)
*/
public void write(Object output) throws Exception {
bindTransactionResources();
getProcessed().add(output);
doWrite(output);
flushIfNecessary(output);
}
private void flushIfNecessary(Object output) {
boolean flush;
synchronized (failed) {
flush = failed.contains(output);
}
if (flush) {
// Force early completion to commit aggressively if we encounter a
// failed item (from a failed chunk but we don't know which one was
// the problem).
RepeatSynchronizationManager.setCompleteOnly();
// Flush now, so that if there is a failure this record can be
// skipped.
flush();
}
}
/**
* Delegate to subclass and unbind transactional resources, effectively
* clearing the item buffer.
*/
public void clear() throws ClearFailedException {
try {
doClear();
}
finally {
unbindTransactionResources();
}
}
/**
* Callback method of {@link #flush()}.
*/
protected abstract void doFlush() throws FlushFailedException;
/**
* Callback method of {@link #clear()}.
*/
protected abstract void doClear() throws ClearFailedException;
/**
* Callback method of {@link #write(Object)}.
*/
protected abstract void doWrite(Object output) throws Exception;
/**
* @return Key for items processed in the current transaction
* {@link RepeatContext}.
*/
protected abstract String getResourceKey();
/**
* Set up the {@link RepeatContext} as a transaction resource.
*
* @param context the context to set
*/
private void bindTransactionResources() {
if (TransactionSynchronizationManager.hasResource(getResourceKey())) {
return;
}
TransactionSynchronizationManager.bindResource(getResourceKey(), new HashSet());
}
/**
* Remove the transaction resource associated with this context.
*/
private void unbindTransactionResources() {
if (!TransactionSynchronizationManager.hasResource(getResourceKey())) {
return;
}
TransactionSynchronizationManager.unbindResource(getResourceKey());
}
/**
* Accessor for the list of processed items in this transaction.
*
* @return the processed
*/
protected Set getProcessed() {
Assert.state(TransactionSynchronizationManager.hasResource(getResourceKey()),
"Processed items not bound to transaction.");
Set processed = (Set) TransactionSynchronizationManager.getResource(getResourceKey());
return processed;
}
}

View File

@@ -17,21 +17,16 @@ package org.springframework.batch.item.database;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.springframework.batch.item.ClearFailedException;
import org.springframework.batch.item.FlushFailedException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.support.RepeatSynchronizationManager;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcOperations;
import org.springframework.jdbc.core.PreparedStatementCallback;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
/**
@@ -59,14 +54,12 @@ import org.springframework.util.Assert;
* @author Dave Syer
*
*/
public class BatchSqlUpdateItemWriter implements ItemWriter, InitializingBean {
public class BatchSqlUpdateItemWriter extends AbstractTransactionalResourceItemWriter implements InitializingBean {
/**
* Key for items processed in the current transaction {@link RepeatContext}.
*/
protected static final String ITEMS_PROCESSED = BatchSqlUpdateItemWriter.class.getName() + ".ITEMS_PROCESSED";
private Set failed = new HashSet();
private static final String ITEMS_PROCESSED = BatchSqlUpdateItemWriter.class.getName() + ".ITEMS_PROCESSED";
private JdbcOperations jdbcTemplate;
@@ -112,129 +105,38 @@ public class BatchSqlUpdateItemWriter implements ItemWriter, InitializingBean {
}
/**
* Buffer the item in a transaction resource, but flush aggressively if the
* item was previously part of a failed chunk.
*
* @throws Exception
*
* @see org.springframework.batch.io.OutputSource#write(java.lang.Object)
* Create and execute batch prepared statement.
*/
public void write(Object output) throws Exception {
bindTransactionResources();
getProcessed().add(output);
flushIfNecessary(output);
}
/**
* Accessor for the list of processed items in this transaction.
*
* @return the processed
*/
private Set getProcessed() {
Set processed = (Set) TransactionSynchronizationManager.getResource(ITEMS_PROCESSED);
if (processed == null) {
processed = Collections.EMPTY_SET;
}
return processed;
}
/**
* Set up the {@link RepeatContext} as a transaction resource.
*
* @param context the context to set
*/
private void bindTransactionResources() {
if (TransactionSynchronizationManager.hasResource(ITEMS_PROCESSED)) {
return;
}
TransactionSynchronizationManager.bindResource(ITEMS_PROCESSED, new HashSet());
}
/**
* Remove the transaction resource associated with this context.
*/
private void unbindTransactionResources() {
if (!TransactionSynchronizationManager.hasResource(ITEMS_PROCESSED)) {
return;
}
TransactionSynchronizationManager.unbindResource(ITEMS_PROCESSED);
}
/**
* Accessor for the context property.
*
* @param output
*
* @return the context
*/
private void flushIfNecessary(Object output) throws Exception {
boolean flush;
synchronized (failed) {
flush = failed.contains(output);
}
if (flush) {
RepeatContext context = RepeatSynchronizationManager.getContext();
// Force early completion to commit aggressively if we encounter a
// failed item (from a failed chunk but we don't know which one was
// the problem).
context.setCompleteOnly();
// Flush now, so that if there is a failure this record can be
// skipped.
doFlush();
}
}
/**
* Flush the hibernate session from within a repeat context.
*/
private void doFlush() {
protected void doFlush() {
final Set processed = getProcessed();
try {
if (!processed.isEmpty()) {
jdbcTemplate.execute(sql, new PreparedStatementCallback() {
public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
for (Iterator iterator = processed.iterator(); iterator.hasNext();) {
Object item = (Object) iterator.next();
preparedStatementSetter.setValues(item, ps);
ps.addBatch();
}
return ps.executeBatch();
if (!processed.isEmpty()) {
jdbcTemplate.execute(sql, new PreparedStatementCallback() {
public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException {
for (Iterator iterator = processed.iterator(); iterator.hasNext();) {
Object item = (Object) iterator.next();
preparedStatementSetter.setValues(item, ps);
ps.addBatch();
}
});
}
}
catch (RuntimeException e) {
synchronized (failed) {
failed.addAll(processed);
}
throw e;
}
finally {
getProcessed().clear();
return ps.executeBatch();
}
});
}
}
/**
* Unbind transaction resources, effectively clearing the item buffer.
*
* @see org.springframework.batch.item.ItemWriter#clear()
*/
public void clear() throws ClearFailedException {
unbindTransactionResources();
protected String getResourceKey() {
return ITEMS_PROCESSED;
}
/**
* Flush the internal item buffer and record failures if there are any.
*
* @see org.springframework.batch.item.ItemWriter#flush()
* No-op.
*/
public void flush() throws FlushFailedException {
try {
doFlush();
}
finally {
unbindTransactionResources();
}
protected void doWrite(Object output) {
}
/**
* No-op.
*/
protected void doClear() throws ClearFailedException {
}
}

View File

@@ -15,19 +15,13 @@
*/
package org.springframework.batch.item.database;
import java.util.HashSet;
import java.util.Set;
import org.hibernate.SessionFactory;
import org.springframework.batch.item.ClearFailedException;
import org.springframework.batch.item.FlushFailedException;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.RepeatContext;
import org.springframework.batch.repeat.support.RepeatSynchronizationManager;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.orm.hibernate3.HibernateOperations;
import org.springframework.orm.hibernate3.HibernateTemplate;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.Assert;
/**
@@ -50,15 +44,13 @@ import org.springframework.util.Assert;
* @author Dave Syer
*
*/
public class HibernateAwareItemWriter implements ItemWriter, InitializingBean {
public class HibernateAwareItemWriter extends AbstractTransactionalResourceItemWriter implements InitializingBean {
/**
* Key for items processed in the current transaction {@link RepeatContext}.
*/
private static final String ITEMS_PROCESSED = HibernateAwareItemWriter.class.getName() + ".ITEMS_PROCESSED";
private Set failed = new HashSet();
private ItemWriter delegate;
private HibernateOperations hibernateTemplate;
@@ -92,7 +84,7 @@ public class HibernateAwareItemWriter implements ItemWriter, InitializingBean {
}
/**
* Check mandatory properties - there must be a delegate.
* Check mandatory properties - there must be a delegate and hibernateTemplate.
*
* @see org.springframework.dao.support.DaoSupport#initDao()
*/
@@ -102,119 +94,30 @@ public class HibernateAwareItemWriter implements ItemWriter, InitializingBean {
}
/**
* Use the delegate to actually do the writing, but flush aggressively if
* the item was previously part of a failed chunk.
*
* @throws Exception
*
* @see org.springframework.batch.io.OutputSource#write(java.lang.Object)
* Delegate to subclass and flush the hibernate session.
*/
public void write(Object output) throws Exception {
bindTransactionResources();
getProcessed().add(output);
delegate.write(output);
flushIfNecessary(output);
}
/**
* Accessor for the list of processed items in this transaction.
*
* @return the processed
*/
private Set getProcessed() {
Assert.state(TransactionSynchronizationManager.hasResource(ITEMS_PROCESSED),
"Processed items not bound to transaction.");
Set processed = (Set) TransactionSynchronizationManager.getResource(ITEMS_PROCESSED);
return processed;
}
/**
* Set up the {@link RepeatContext} as a transaction resource.
*
* @param context the context to set
*/
private void bindTransactionResources() {
if (TransactionSynchronizationManager.hasResource(ITEMS_PROCESSED)) {
return;
}
TransactionSynchronizationManager.bindResource(ITEMS_PROCESSED, new HashSet());
}
/**
* Remove the transaction resource associated with this context.
*/
private void unbindTransactionResources() {
if (!TransactionSynchronizationManager.hasResource(ITEMS_PROCESSED)) {
return;
}
TransactionSynchronizationManager.unbindResource(ITEMS_PROCESSED);
}
/**
* Accessor for the context property.
*
* @param output
*
* @return the context
*/
private void flushIfNecessary(Object output) throws Exception {
boolean flush;
synchronized (failed) {
flush = failed.contains(output);
}
if (flush) {
// Force early completion to commit aggressively if we encounter a
// failed item (from a failed chunk but we don't know which one was
// the problem).
RepeatSynchronizationManager.setCompleteOnly();
// Flush now, so that if there is a failure this record can be
// skipped.
doHibernateFlush();
}
}
/**
* Flush the hibernate session from within a repeat context.
*/
private void doHibernateFlush() {
try {
hibernateTemplate.flush();
// This should happen when the transaction commits anyway, but to be
// sure...
hibernateTemplate.clear();
}
catch (RuntimeException e) {
synchronized (failed) {
failed.addAll(getProcessed());
}
// This used to contain a call to onError, however, I think this
// should be handled within the step.
throw e;
}
protected void doFlush() {
delegate.flush();
hibernateTemplate.flush();
// This should happen when the transaction commits anyway, but to be
// sure...
hibernateTemplate.clear();
}
/**
* Call the delegate clear() method, and then clear the hibernate session.
*
* @see org.springframework.batch.item.ItemWriter#clear()
*/
public void clear() throws ClearFailedException {
unbindTransactionResources();
hibernateTemplate.clear();
protected void doClear() throws ClearFailedException {
delegate.clear();
hibernateTemplate.clear();
}
/**
* Flush the Hibernate session and record failures if there are any. The
* delegate flush will also be called.
*
* @see org.springframework.batch.item.ItemWriter#flush()
*/
public void flush() throws FlushFailedException {
bindTransactionResources();
doHibernateFlush();
unbindTransactionResources();
delegate.flush();
protected String getResourceKey() {
return ITEMS_PROCESSED;
}
protected void doWrite(Object output) throws Exception {
delegate.write(output);
}
}

View File

@@ -76,7 +76,7 @@ public class BatchSqlUpdateItemWriterTests extends TestCase {
list.add(item);
}
});
TransactionSynchronizationManager.bindResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED, new HashSet(
TransactionSynchronizationManager.bindResource(writer.getResourceKey(), new HashSet(
Collections.singleton("spam")));
RepeatSynchronizationManager.register(context);
}
@@ -86,8 +86,8 @@ public class BatchSqlUpdateItemWriterTests extends TestCase {
* @see junit.framework.TestCase#tearDown()
*/
protected void tearDown() throws Exception {
if (TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED)) {
TransactionSynchronizationManager.unbindResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED);
if (TransactionSynchronizationManager.hasResource(writer.getResourceKey())) {
TransactionSynchronizationManager.unbindResource(writer.getResourceKey());
}
RepeatSynchronizationManager.clear();
}
@@ -125,9 +125,9 @@ public class BatchSqlUpdateItemWriterTests extends TestCase {
* {@link org.springframework.batch.item.database.BatchSqlUpdateItemWriter#clear()}.
*/
public void testClear() {
assertTrue(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED));
assertTrue(TransactionSynchronizationManager.hasResource(writer.getResourceKey()));
writer.clear();
assertFalse(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED));
assertFalse(TransactionSynchronizationManager.hasResource(writer.getResourceKey()));
}
/**
@@ -135,9 +135,9 @@ public class BatchSqlUpdateItemWriterTests extends TestCase {
* {@link org.springframework.batch.item.database.BatchSqlUpdateItemWriter#flush()}.
*/
public void testFlush() {
assertTrue(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED));
assertTrue(TransactionSynchronizationManager.hasResource(writer.getResourceKey()));
writer.flush();
assertFalse(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED));
assertFalse(TransactionSynchronizationManager.hasResource(writer.getResourceKey()));
assertEquals(2, list.size());
assertTrue(list.contains("SQL"));
}
@@ -148,10 +148,10 @@ public class BatchSqlUpdateItemWriterTests extends TestCase {
* @throws Exception
*/
public void testWriteAndFlush() throws Exception {
assertTrue(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED));
assertTrue(TransactionSynchronizationManager.hasResource(writer.getResourceKey()));
writer.write("bar");
writer.flush();
assertFalse(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED));
assertFalse(TransactionSynchronizationManager.hasResource(writer.getResourceKey()));
assertEquals(3, list.size());
assertTrue(list.contains("SQL"));
}
@@ -176,7 +176,7 @@ public class BatchSqlUpdateItemWriterTests extends TestCase {
catch (RuntimeException e) {
assertEquals("bar", e.getMessage());
}
assertFalse(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED));
assertFalse(TransactionSynchronizationManager.hasResource(writer.getResourceKey()));
assertEquals(2, list.size());
writer.setItemPreparedStatementSetter(new ItemPreparedStatementSetter() {
public void setValues(Object item, PreparedStatement ps) throws SQLException {
@@ -197,7 +197,7 @@ public class BatchSqlUpdateItemWriterTests extends TestCase {
*/
public void testEmptyFlush() {
// items are bound on write, so we unbind them first
TransactionSynchronizationManager.unbindResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED);
TransactionSynchronizationManager.unbindResource(writer.getResourceKey());
writer.flush();
}

View File

@@ -81,7 +81,7 @@ public class HibernateAwareItemWriterTests extends TestCase {
* @see junit.framework.TestCase#tearDown()
*/
protected void tearDown() throws Exception {
String key = HibernateAwareItemWriter.class.getName() + ".ITEMS_PROCESSED";
String key = writer.getResourceKey();
if (TransactionSynchronizationManager.hasResource(key)) {
TransactionSynchronizationManager.unbindResource(key);
}
@@ -165,16 +165,19 @@ public class HibernateAwareItemWriterTests extends TestCase {
} catch (RuntimeException e) {
assertEquals("bar", e.getMessage());
}
assertEquals(1, list.size());
assertEquals(2, list.size());
assertTrue(list.contains("foo"));
assertTrue(list.contains("delegateFlush"));
writer.setHibernateTemplate(new HibernateTemplateWrapper() {
public void flush() throws DataAccessException {
list.add("flush");
}
});
writer.write("foo");
assertEquals(4, list.size());
assertEquals(6, list.size());
assertTrue(list.contains("flush"));
assertTrue(list.contains("clear"));
assertTrue(list.contains("delegateFlush"));
assertTrue(context.isCompleteOnly());
}