Attempt to close all delegate stream readers even when some fail

Signed-off-by: Elimelec Burghelea <elimelec1@protonmail.com>
This commit is contained in:
Elimelec Burghelea
2025-02-20 21:52:35 +02:00
committed by Mahmoud Ben Hassine
parent 1eac9e9fee
commit f40ab202a0
4 changed files with 69 additions and 15 deletions

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2008-2022 the original author or authors.
* Copyright 2008-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -63,6 +63,7 @@ import static org.springframework.batch.core.BatchStatus.UNKNOWN;
* @author David Turanski
* @author Mahmoud Ben Hassine
* @author Parikshit Dutta
* @author Elimelec Burghelea
*/
class TaskletStepExceptionTests {
@@ -212,8 +213,8 @@ class TaskletStepExceptionTests {
taskletStep.execute(stepExecution);
assertEquals(FAILED, stepExecution.getStatus());
assertTrue(stepExecution.getFailureExceptions().contains(taskletException));
assertTrue(stepExecution.getFailureExceptions().contains(exception));
assertEquals(stepExecution.getFailureExceptions().get(0), taskletException);
assertEquals(stepExecution.getFailureExceptions().get(1).getSuppressed()[0], exception);
assertEquals(2, jobRepository.getUpdateCount());
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2006-2023 the original author or authors.
* Copyright 2006-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -761,7 +761,7 @@ class TaskletStepTests {
Throwable ex = stepExecution.getFailureExceptions().get(0);
// The original rollback was caused by this one:
assertEquals("Bar", ex.getMessage());
assertEquals("Bar", ex.getSuppressed()[0].getMessage());
}
@Test
@@ -791,7 +791,7 @@ class TaskletStepTests {
assertEquals("", msg);
Throwable ex = stepExecution.getFailureExceptions().get(0);
// The original rollback was caused by this one:
assertEquals("Bar", ex.getMessage());
assertEquals("Bar", ex.getSuppressed()[0].getMessage());
}
/**

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2006-2022 the original author or authors.
* Copyright 2006-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -28,7 +28,7 @@ import org.springframework.batch.item.ItemStreamException;
*
* @author Dave Syer
* @author Mahmoud Ben Hassine
*
* @author Elimelec Burghelea
*/
public class CompositeItemStream implements ItemStream {
@@ -102,13 +102,26 @@ public class CompositeItemStream implements ItemStream {
/**
* Broadcast the call to close.
* @throws ItemStreamException thrown if one of the {@link ItemStream}s in the list
* fails to close. This is a sequential operation so all itemStreams in the list after
* the one that failed to close will remain open.
* fails to close.
*/
@Override
public void close() throws ItemStreamException {
List<Exception> exceptions = new ArrayList<>();
for (ItemStream itemStream : streams) {
itemStream.close();
try {
itemStream.close();
}
catch (Exception e) {
exceptions.add(e);
}
}
if (!exceptions.isEmpty()) {
String message = String.format("Failed to close %d delegate(s) due to exceptions", exceptions.size());
ItemStreamException holder = new ItemStreamException(message);
exceptions.forEach(holder::addSuppressed);
throw holder;
}
}

View File

@@ -1,5 +1,5 @@
/*
* Copyright 2006-2022 the original author or authors.
* Copyright 2006-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -15,19 +15,25 @@
*/
package org.springframework.batch.item.support;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ItemStreamSupport;
import static org.junit.jupiter.api.Assertions.assertEquals;
/**
* @author Dave Syer
*
* @author Elimelec Burghelea
*/
class CompositeItemStreamTests {
@@ -90,6 +96,40 @@ class CompositeItemStreamTests {
assertEquals(1, list.size());
}
@Test
void testClose2Delegates() {
ItemStream reader1 = Mockito.mock(ItemStream.class);
ItemStream reader2 = Mockito.mock(ItemStream.class);
manager.register(reader1);
manager.register(reader2);
manager.close();
verify(reader1, times(1)).close();
verify(reader2, times(1)).close();
}
@Test
void testClose2DelegatesThatThrowsException() {
ItemStream reader1 = Mockito.mock(ItemStream.class);
ItemStream reader2 = Mockito.mock(ItemStream.class);
manager.register(reader1);
manager.register(reader2);
doThrow(new ItemStreamException("A failure")).when(reader1).close();
try {
manager.close();
Assertions.fail("Expected an ItemStreamException");
}
catch (ItemStreamException ignored) {
}
verify(reader1, times(1)).close();
verify(reader2, times(1)).close();
}
@Test
void testCloseDoesNotUnregister() {
manager.setStreams(new ItemStream[] { new ItemStreamSupport() {