diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/AbstractTransactionalResourceItemWriter.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/AbstractTransactionalResourceItemWriter.java new file mode 100644 index 000000000..f806f8158 --- /dev/null +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/AbstractTransactionalResourceItemWriter.java @@ -0,0 +1,167 @@ +/* + * Copyright 2006-2007 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.batch.item.database; + +import java.util.HashSet; +import java.util.Set; + +import org.springframework.batch.item.ClearFailedException; +import org.springframework.batch.item.FlushFailedException; +import org.springframework.batch.item.ItemWriter; +import org.springframework.batch.repeat.RepeatContext; +import org.springframework.batch.repeat.support.RepeatSynchronizationManager; +import org.springframework.transaction.support.TransactionSynchronizationManager; +import org.springframework.util.Assert; + +/** + * Stores items in transactional resource and flushes aggressively in case of + * failure. This is useful for batch update writers which need to identify the + * failed item after failed flush. + * + * @see BatchSqlUpdateItemWriter + * @see HibernateAwareItemWriter + * + * @author Dave Syer + * @author Robert Kasanicky + */ +public abstract class AbstractTransactionalResourceItemWriter implements ItemWriter { + + private Set failed = new HashSet(); + + /** + * Flushing delegated to subclass surrounded by binding and unbinding of + * transactional resources. + * + * @see org.springframework.batch.item.ItemWriter#flush() + */ + public void flush() throws FlushFailedException { + bindTransactionResources(); + try { + doFlush(); + } + catch (RuntimeException e) { + synchronized (failed) { + failed.addAll(getProcessed()); + } + // This used to contain a call to onError, however, I think this + // should be handled within the step. + throw e; + } + finally { + unbindTransactionResources(); + } + } + + /** + * Delegate to subclass to actually do the writing, but flushes aggressively + * if the item was previously part of a failed chunk. + * + * @throws Exception + * + * @see org.springframework.batch.io.OutputSource#write(java.lang.Object) + */ + public void write(Object output) throws Exception { + bindTransactionResources(); + getProcessed().add(output); + doWrite(output); + flushIfNecessary(output); + } + + private void flushIfNecessary(Object output) { + boolean flush; + synchronized (failed) { + flush = failed.contains(output); + } + if (flush) { + // Force early completion to commit aggressively if we encounter a + // failed item (from a failed chunk but we don't know which one was + // the problem). + RepeatSynchronizationManager.setCompleteOnly(); + // Flush now, so that if there is a failure this record can be + // skipped. + flush(); + } + + } + + /** + * Delegate to subclass and unbind transactional resources, effectively + * clearing the item buffer. + */ + public void clear() throws ClearFailedException { + try { + doClear(); + } + finally { + unbindTransactionResources(); + } + } + + /** + * Callback method of {@link #flush()}. + */ + protected abstract void doFlush() throws FlushFailedException; + + /** + * Callback method of {@link #clear()}. + */ + protected abstract void doClear() throws ClearFailedException; + + /** + * Callback method of {@link #write(Object)}. + */ + protected abstract void doWrite(Object output) throws Exception; + + /** + * @return Key for items processed in the current transaction + * {@link RepeatContext}. + */ + protected abstract String getResourceKey(); + + /** + * Set up the {@link RepeatContext} as a transaction resource. + * + * @param context the context to set + */ + private void bindTransactionResources() { + if (TransactionSynchronizationManager.hasResource(getResourceKey())) { + return; + } + TransactionSynchronizationManager.bindResource(getResourceKey(), new HashSet()); + } + + /** + * Remove the transaction resource associated with this context. + */ + private void unbindTransactionResources() { + if (!TransactionSynchronizationManager.hasResource(getResourceKey())) { + return; + } + TransactionSynchronizationManager.unbindResource(getResourceKey()); + } + + /** + * Accessor for the list of processed items in this transaction. + * + * @return the processed + */ + protected Set getProcessed() { + Assert.state(TransactionSynchronizationManager.hasResource(getResourceKey()), + "Processed items not bound to transaction."); + Set processed = (Set) TransactionSynchronizationManager.getResource(getResourceKey()); + return processed; + } +} diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/BatchSqlUpdateItemWriter.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/BatchSqlUpdateItemWriter.java index 8a1a759ef..8fc7179e3 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/BatchSqlUpdateItemWriter.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/BatchSqlUpdateItemWriter.java @@ -17,21 +17,16 @@ package org.springframework.batch.item.database; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.Set; import org.springframework.batch.item.ClearFailedException; -import org.springframework.batch.item.FlushFailedException; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatContext; -import org.springframework.batch.repeat.support.RepeatSynchronizationManager; import org.springframework.beans.factory.InitializingBean; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.JdbcOperations; import org.springframework.jdbc.core.PreparedStatementCallback; -import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; /** @@ -59,14 +54,12 @@ import org.springframework.util.Assert; * @author Dave Syer * */ -public class BatchSqlUpdateItemWriter implements ItemWriter, InitializingBean { +public class BatchSqlUpdateItemWriter extends AbstractTransactionalResourceItemWriter implements InitializingBean { /** * Key for items processed in the current transaction {@link RepeatContext}. */ - protected static final String ITEMS_PROCESSED = BatchSqlUpdateItemWriter.class.getName() + ".ITEMS_PROCESSED"; - - private Set failed = new HashSet(); + private static final String ITEMS_PROCESSED = BatchSqlUpdateItemWriter.class.getName() + ".ITEMS_PROCESSED"; private JdbcOperations jdbcTemplate; @@ -112,129 +105,38 @@ public class BatchSqlUpdateItemWriter implements ItemWriter, InitializingBean { } /** - * Buffer the item in a transaction resource, but flush aggressively if the - * item was previously part of a failed chunk. - * - * @throws Exception - * - * @see org.springframework.batch.io.OutputSource#write(java.lang.Object) + * Create and execute batch prepared statement. */ - public void write(Object output) throws Exception { - bindTransactionResources(); - getProcessed().add(output); - flushIfNecessary(output); - } - - /** - * Accessor for the list of processed items in this transaction. - * - * @return the processed - */ - private Set getProcessed() { - Set processed = (Set) TransactionSynchronizationManager.getResource(ITEMS_PROCESSED); - if (processed == null) { - processed = Collections.EMPTY_SET; - } - return processed; - } - - /** - * Set up the {@link RepeatContext} as a transaction resource. - * - * @param context the context to set - */ - private void bindTransactionResources() { - if (TransactionSynchronizationManager.hasResource(ITEMS_PROCESSED)) { - return; - } - TransactionSynchronizationManager.bindResource(ITEMS_PROCESSED, new HashSet()); - } - - /** - * Remove the transaction resource associated with this context. - */ - private void unbindTransactionResources() { - if (!TransactionSynchronizationManager.hasResource(ITEMS_PROCESSED)) { - return; - } - TransactionSynchronizationManager.unbindResource(ITEMS_PROCESSED); - } - - /** - * Accessor for the context property. - * - * @param output - * - * @return the context - */ - private void flushIfNecessary(Object output) throws Exception { - boolean flush; - synchronized (failed) { - flush = failed.contains(output); - } - if (flush) { - RepeatContext context = RepeatSynchronizationManager.getContext(); - // Force early completion to commit aggressively if we encounter a - // failed item (from a failed chunk but we don't know which one was - // the problem). - context.setCompleteOnly(); - // Flush now, so that if there is a failure this record can be - // skipped. - doFlush(); - } - } - - /** - * Flush the hibernate session from within a repeat context. - */ - private void doFlush() { + protected void doFlush() { final Set processed = getProcessed(); - try { - if (!processed.isEmpty()) { - jdbcTemplate.execute(sql, new PreparedStatementCallback() { - public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException { - for (Iterator iterator = processed.iterator(); iterator.hasNext();) { - Object item = (Object) iterator.next(); - preparedStatementSetter.setValues(item, ps); - ps.addBatch(); - } - return ps.executeBatch(); + if (!processed.isEmpty()) { + jdbcTemplate.execute(sql, new PreparedStatementCallback() { + public Object doInPreparedStatement(PreparedStatement ps) throws SQLException, DataAccessException { + for (Iterator iterator = processed.iterator(); iterator.hasNext();) { + Object item = (Object) iterator.next(); + preparedStatementSetter.setValues(item, ps); + ps.addBatch(); } - }); - } - } - catch (RuntimeException e) { - synchronized (failed) { - failed.addAll(processed); - } - throw e; - } - finally { - getProcessed().clear(); + return ps.executeBatch(); + } + }); } } - /** - * Unbind transaction resources, effectively clearing the item buffer. - * - * @see org.springframework.batch.item.ItemWriter#clear() - */ - public void clear() throws ClearFailedException { - unbindTransactionResources(); + protected String getResourceKey() { + return ITEMS_PROCESSED; } /** - * Flush the internal item buffer and record failures if there are any. - * - * @see org.springframework.batch.item.ItemWriter#flush() + * No-op. */ - public void flush() throws FlushFailedException { - try { - doFlush(); - } - finally { - unbindTransactionResources(); - } + protected void doWrite(Object output) { + } + + /** + * No-op. + */ + protected void doClear() throws ClearFailedException { } } diff --git a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/HibernateAwareItemWriter.java b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/HibernateAwareItemWriter.java index 0f89a74c2..f3e1a8ffc 100644 --- a/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/HibernateAwareItemWriter.java +++ b/spring-batch-infrastructure/src/main/java/org/springframework/batch/item/database/HibernateAwareItemWriter.java @@ -15,19 +15,13 @@ */ package org.springframework.batch.item.database; -import java.util.HashSet; -import java.util.Set; - import org.hibernate.SessionFactory; import org.springframework.batch.item.ClearFailedException; -import org.springframework.batch.item.FlushFailedException; import org.springframework.batch.item.ItemWriter; import org.springframework.batch.repeat.RepeatContext; -import org.springframework.batch.repeat.support.RepeatSynchronizationManager; import org.springframework.beans.factory.InitializingBean; import org.springframework.orm.hibernate3.HibernateOperations; import org.springframework.orm.hibernate3.HibernateTemplate; -import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; /** @@ -50,15 +44,13 @@ import org.springframework.util.Assert; * @author Dave Syer * */ -public class HibernateAwareItemWriter implements ItemWriter, InitializingBean { +public class HibernateAwareItemWriter extends AbstractTransactionalResourceItemWriter implements InitializingBean { /** * Key for items processed in the current transaction {@link RepeatContext}. */ private static final String ITEMS_PROCESSED = HibernateAwareItemWriter.class.getName() + ".ITEMS_PROCESSED"; - private Set failed = new HashSet(); - private ItemWriter delegate; private HibernateOperations hibernateTemplate; @@ -92,7 +84,7 @@ public class HibernateAwareItemWriter implements ItemWriter, InitializingBean { } /** - * Check mandatory properties - there must be a delegate. + * Check mandatory properties - there must be a delegate and hibernateTemplate. * * @see org.springframework.dao.support.DaoSupport#initDao() */ @@ -102,119 +94,30 @@ public class HibernateAwareItemWriter implements ItemWriter, InitializingBean { } /** - * Use the delegate to actually do the writing, but flush aggressively if - * the item was previously part of a failed chunk. - * - * @throws Exception - * - * @see org.springframework.batch.io.OutputSource#write(java.lang.Object) + * Delegate to subclass and flush the hibernate session. */ - public void write(Object output) throws Exception { - bindTransactionResources(); - getProcessed().add(output); - delegate.write(output); - flushIfNecessary(output); - } - - /** - * Accessor for the list of processed items in this transaction. - * - * @return the processed - */ - private Set getProcessed() { - Assert.state(TransactionSynchronizationManager.hasResource(ITEMS_PROCESSED), - "Processed items not bound to transaction."); - Set processed = (Set) TransactionSynchronizationManager.getResource(ITEMS_PROCESSED); - return processed; - } - - /** - * Set up the {@link RepeatContext} as a transaction resource. - * - * @param context the context to set - */ - private void bindTransactionResources() { - if (TransactionSynchronizationManager.hasResource(ITEMS_PROCESSED)) { - return; - } - TransactionSynchronizationManager.bindResource(ITEMS_PROCESSED, new HashSet()); - } - - /** - * Remove the transaction resource associated with this context. - */ - private void unbindTransactionResources() { - if (!TransactionSynchronizationManager.hasResource(ITEMS_PROCESSED)) { - return; - } - TransactionSynchronizationManager.unbindResource(ITEMS_PROCESSED); - } - - /** - * Accessor for the context property. - * - * @param output - * - * @return the context - */ - private void flushIfNecessary(Object output) throws Exception { - boolean flush; - synchronized (failed) { - flush = failed.contains(output); - } - if (flush) { - // Force early completion to commit aggressively if we encounter a - // failed item (from a failed chunk but we don't know which one was - // the problem). - RepeatSynchronizationManager.setCompleteOnly(); - // Flush now, so that if there is a failure this record can be - // skipped. - doHibernateFlush(); - } - } - - /** - * Flush the hibernate session from within a repeat context. - */ - private void doHibernateFlush() { - try { - hibernateTemplate.flush(); - // This should happen when the transaction commits anyway, but to be - // sure... - hibernateTemplate.clear(); - } - catch (RuntimeException e) { - synchronized (failed) { - failed.addAll(getProcessed()); - } - // This used to contain a call to onError, however, I think this - // should be handled within the step. - throw e; - } + protected void doFlush() { + delegate.flush(); + hibernateTemplate.flush(); + // This should happen when the transaction commits anyway, but to be + // sure... + hibernateTemplate.clear(); } /** * Call the delegate clear() method, and then clear the hibernate session. - * - * @see org.springframework.batch.item.ItemWriter#clear() */ - public void clear() throws ClearFailedException { - unbindTransactionResources(); - hibernateTemplate.clear(); + protected void doClear() throws ClearFailedException { delegate.clear(); + hibernateTemplate.clear(); } - /** - * Flush the Hibernate session and record failures if there are any. The - * delegate flush will also be called. - * - * @see org.springframework.batch.item.ItemWriter#flush() - */ - public void flush() throws FlushFailedException { - bindTransactionResources(); - doHibernateFlush(); - unbindTransactionResources(); - delegate.flush(); + protected String getResourceKey() { + return ITEMS_PROCESSED; + } + + protected void doWrite(Object output) throws Exception { + delegate.write(output); } } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/BatchSqlUpdateItemWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/BatchSqlUpdateItemWriterTests.java index 7e2943c75..c2d950fe9 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/BatchSqlUpdateItemWriterTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/BatchSqlUpdateItemWriterTests.java @@ -76,7 +76,7 @@ public class BatchSqlUpdateItemWriterTests extends TestCase { list.add(item); } }); - TransactionSynchronizationManager.bindResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED, new HashSet( + TransactionSynchronizationManager.bindResource(writer.getResourceKey(), new HashSet( Collections.singleton("spam"))); RepeatSynchronizationManager.register(context); } @@ -86,8 +86,8 @@ public class BatchSqlUpdateItemWriterTests extends TestCase { * @see junit.framework.TestCase#tearDown() */ protected void tearDown() throws Exception { - if (TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED)) { - TransactionSynchronizationManager.unbindResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED); + if (TransactionSynchronizationManager.hasResource(writer.getResourceKey())) { + TransactionSynchronizationManager.unbindResource(writer.getResourceKey()); } RepeatSynchronizationManager.clear(); } @@ -125,9 +125,9 @@ public class BatchSqlUpdateItemWriterTests extends TestCase { * {@link org.springframework.batch.item.database.BatchSqlUpdateItemWriter#clear()}. */ public void testClear() { - assertTrue(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED)); + assertTrue(TransactionSynchronizationManager.hasResource(writer.getResourceKey())); writer.clear(); - assertFalse(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED)); + assertFalse(TransactionSynchronizationManager.hasResource(writer.getResourceKey())); } /** @@ -135,9 +135,9 @@ public class BatchSqlUpdateItemWriterTests extends TestCase { * {@link org.springframework.batch.item.database.BatchSqlUpdateItemWriter#flush()}. */ public void testFlush() { - assertTrue(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED)); + assertTrue(TransactionSynchronizationManager.hasResource(writer.getResourceKey())); writer.flush(); - assertFalse(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED)); + assertFalse(TransactionSynchronizationManager.hasResource(writer.getResourceKey())); assertEquals(2, list.size()); assertTrue(list.contains("SQL")); } @@ -148,10 +148,10 @@ public class BatchSqlUpdateItemWriterTests extends TestCase { * @throws Exception */ public void testWriteAndFlush() throws Exception { - assertTrue(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED)); + assertTrue(TransactionSynchronizationManager.hasResource(writer.getResourceKey())); writer.write("bar"); writer.flush(); - assertFalse(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED)); + assertFalse(TransactionSynchronizationManager.hasResource(writer.getResourceKey())); assertEquals(3, list.size()); assertTrue(list.contains("SQL")); } @@ -176,7 +176,7 @@ public class BatchSqlUpdateItemWriterTests extends TestCase { catch (RuntimeException e) { assertEquals("bar", e.getMessage()); } - assertFalse(TransactionSynchronizationManager.hasResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED)); + assertFalse(TransactionSynchronizationManager.hasResource(writer.getResourceKey())); assertEquals(2, list.size()); writer.setItemPreparedStatementSetter(new ItemPreparedStatementSetter() { public void setValues(Object item, PreparedStatement ps) throws SQLException { @@ -197,7 +197,7 @@ public class BatchSqlUpdateItemWriterTests extends TestCase { */ public void testEmptyFlush() { // items are bound on write, so we unbind them first - TransactionSynchronizationManager.unbindResource(BatchSqlUpdateItemWriter.ITEMS_PROCESSED); + TransactionSynchronizationManager.unbindResource(writer.getResourceKey()); writer.flush(); } diff --git a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/HibernateAwareItemWriterTests.java b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/HibernateAwareItemWriterTests.java index eed2db17b..a012f1bdb 100644 --- a/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/HibernateAwareItemWriterTests.java +++ b/spring-batch-infrastructure/src/test/java/org/springframework/batch/item/database/HibernateAwareItemWriterTests.java @@ -81,7 +81,7 @@ public class HibernateAwareItemWriterTests extends TestCase { * @see junit.framework.TestCase#tearDown() */ protected void tearDown() throws Exception { - String key = HibernateAwareItemWriter.class.getName() + ".ITEMS_PROCESSED"; + String key = writer.getResourceKey(); if (TransactionSynchronizationManager.hasResource(key)) { TransactionSynchronizationManager.unbindResource(key); } @@ -165,16 +165,19 @@ public class HibernateAwareItemWriterTests extends TestCase { } catch (RuntimeException e) { assertEquals("bar", e.getMessage()); } - assertEquals(1, list.size()); + assertEquals(2, list.size()); + assertTrue(list.contains("foo")); + assertTrue(list.contains("delegateFlush")); writer.setHibernateTemplate(new HibernateTemplateWrapper() { public void flush() throws DataAccessException { list.add("flush"); } }); writer.write("foo"); - assertEquals(4, list.size()); + assertEquals(6, list.size()); assertTrue(list.contains("flush")); assertTrue(list.contains("clear")); + assertTrue(list.contains("delegateFlush")); assertTrue(context.isCompleteOnly()); }