Fix Race Condition in Integration Tests Using Redis SessionEventRegistry
Closes gh-3398
This commit is contained in:
@@ -16,24 +16,29 @@
|
||||
|
||||
package org.springframework.session.data;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
|
||||
import org.springframework.context.ApplicationListener;
|
||||
import org.springframework.session.events.AbstractSessionEvent;
|
||||
|
||||
public class SessionEventRegistry implements ApplicationListener<AbstractSessionEvent> {
|
||||
|
||||
private Map<String, AbstractSessionEvent> events = new HashMap<>();
|
||||
private Map<String, List<AbstractSessionEvent>> events = new HashMap<>();
|
||||
|
||||
private ConcurrentMap<String, Object> locks = new ConcurrentHashMap<>();
|
||||
|
||||
@Override
|
||||
public void onApplicationEvent(AbstractSessionEvent event) {
|
||||
String sessionId = event.getSessionId();
|
||||
this.events.put(sessionId, event);
|
||||
this.events.computeIfAbsent(sessionId, (key) -> new ArrayList<>()).add(event);
|
||||
Object lock = getLock(sessionId);
|
||||
synchronized (lock) {
|
||||
lock.notifyAll();
|
||||
@@ -45,24 +50,41 @@ public class SessionEventRegistry implements ApplicationListener<AbstractSession
|
||||
this.locks.clear();
|
||||
}
|
||||
|
||||
public boolean receivedEvent(String sessionId) throws InterruptedException {
|
||||
return waitForEvent(sessionId) != null;
|
||||
public <E extends AbstractSessionEvent> boolean receivedEvent(String sessionId, Class<E> type)
|
||||
throws InterruptedException {
|
||||
return waitForEvent(sessionId, type) != null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public <E extends AbstractSessionEvent> E getEvent(String sessionId) throws InterruptedException {
|
||||
return (E) waitForEvent(sessionId);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private <E extends AbstractSessionEvent> E waitForEvent(String sessionId) throws InterruptedException {
|
||||
public <E extends AbstractSessionEvent> E waitForEvent(String sessionId, Class<E> type)
|
||||
throws InterruptedException {
|
||||
Object lock = getLock(sessionId);
|
||||
long waitInMs = TimeUnit.SECONDS.toMillis(10);
|
||||
long start = System.currentTimeMillis();
|
||||
boolean doneWaiting = false;
|
||||
synchronized (lock) {
|
||||
if (!this.events.containsKey(sessionId)) {
|
||||
lock.wait(10000);
|
||||
while (!doneWaiting) {
|
||||
E result = getEvent(sessionId, type);
|
||||
if (result == null) {
|
||||
// wait until timeout or notified
|
||||
// might need to continue trying if the notification
|
||||
// was for a different event
|
||||
lock.wait(waitInMs);
|
||||
}
|
||||
long now = System.currentTimeMillis();
|
||||
doneWaiting = (now - start) >= waitInMs;
|
||||
}
|
||||
return getEvent(sessionId, type);
|
||||
}
|
||||
return (E) this.events.get(sessionId);
|
||||
}
|
||||
|
||||
private <E extends AbstractSessionEvent> @Nullable E getEvent(String sessionId, Class<E> type) {
|
||||
List<AbstractSessionEvent> events = this.events.get(sessionId);
|
||||
E result = (events != null) ? (E) events.stream()
|
||||
.filter((event) -> type.isAssignableFrom(event.getClass()))
|
||||
.findFirst()
|
||||
.orElse(null) : null;
|
||||
return result;
|
||||
}
|
||||
|
||||
private Object getLock(String sessionId) {
|
||||
|
||||
@@ -101,7 +101,7 @@ class ReactiveRedisIndexedSessionRepositoryConfigurationITests {
|
||||
RedisSession session = this.repository.createSession().block();
|
||||
this.repository.save(session).block();
|
||||
SessionEventRegistry registry = this.context.getBean(SessionEventRegistry.class);
|
||||
SessionCreatedEvent event = registry.getEvent(session.getId());
|
||||
SessionCreatedEvent event = registry.waitForEvent(session.getId(), SessionCreatedEvent.class);
|
||||
Session eventSession = event.getSession();
|
||||
assertThat(eventSession).usingRecursiveComparison()
|
||||
.withComparatorForFields(new InstantComparator(), "cached.creationTime", "cached.lastAccessedTime")
|
||||
|
||||
@@ -124,7 +124,7 @@ class ReactiveRedisIndexedSessionRepositoryITests {
|
||||
|
||||
this.repository.save(session).block();
|
||||
|
||||
SessionCreatedEvent event = this.eventRegistry.getEvent(session.getId());
|
||||
SessionCreatedEvent event = this.eventRegistry.waitForEvent(session.getId(), SessionCreatedEvent.class);
|
||||
assertThat(event).isNotNull();
|
||||
RedisSession eventSession = event.getSession();
|
||||
compareSessions(session, eventSession);
|
||||
@@ -168,7 +168,7 @@ class ReactiveRedisIndexedSessionRepositoryITests {
|
||||
assertThat(this.redis.expire(key, Duration.ofSeconds(1)).block()).isTrue();
|
||||
|
||||
await().atMost(Duration.ofSeconds(3)).untilAsserted(() -> {
|
||||
SessionExpiredEvent event = this.eventRegistry.getEvent(toSave.getId());
|
||||
SessionExpiredEvent event = this.eventRegistry.waitForEvent(toSave.getId(), SessionExpiredEvent.class);
|
||||
RedisSession eventSession = event.getSession();
|
||||
Map<String, RedisSession> findByPrincipalName = this.repository
|
||||
.findByIndexNameAndIndexValue(INDEX_NAME, principalName)
|
||||
@@ -206,7 +206,7 @@ class ReactiveRedisIndexedSessionRepositoryITests {
|
||||
.block();
|
||||
assertThat(findByPrincipalName).hasSize(0);
|
||||
assertThat(findByPrincipalName.keySet()).doesNotContain(toSave.getId());
|
||||
SessionDeletedEvent event = this.eventRegistry.getEvent(toSave.getId());
|
||||
SessionDeletedEvent event = this.eventRegistry.waitForEvent(toSave.getId(), SessionDeletedEvent.class);
|
||||
assertThat(event).isNotNull();
|
||||
RedisSession eventSession = event.getSession();
|
||||
compareSessions(toSave, eventSession);
|
||||
|
||||
@@ -111,10 +111,10 @@ class RedisIndexedSessionRepositoryITests extends AbstractRedisITests {
|
||||
|
||||
this.repository.save(toSave);
|
||||
|
||||
assertThat(this.registry.receivedEvent(toSave.getId())).isTrue();
|
||||
assertThat(this.registry.receivedEvent(toSave.getId(), SessionCreatedEvent.class)).isTrue();
|
||||
assertThat(this.redis.boundSetOps(usernameSessionKey).members()).contains(toSave.getId());
|
||||
|
||||
SessionCreatedEvent createdEvent = this.registry.getEvent(toSave.getId());
|
||||
SessionCreatedEvent createdEvent = this.registry.waitForEvent(toSave.getId(), SessionCreatedEvent.class);
|
||||
Session session = createdEvent.getSession();
|
||||
|
||||
assertThat(session.getId()).isEqualTo(toSave.getId());
|
||||
@@ -127,11 +127,10 @@ class RedisIndexedSessionRepositoryITests extends AbstractRedisITests {
|
||||
this.repository.deleteById(toSave.getId());
|
||||
|
||||
assertThat(this.repository.findById(toSave.getId())).isNull();
|
||||
assertThat(this.registry.<SessionDestroyedEvent>getEvent(toSave.getId()))
|
||||
.isInstanceOf(SessionDestroyedEvent.class);
|
||||
assertThat(this.redis.boundSetOps(usernameSessionKey).members()).doesNotContain(toSave.getId());
|
||||
|
||||
assertThat(this.registry.getEvent(toSave.getId()).getSession().<String>getAttribute(expectedAttributeName))
|
||||
SessionDestroyedEvent destroyedEvent = this.registry.waitForEvent(toSave.getId(), SessionDestroyedEvent.class);
|
||||
assertThat(destroyedEvent.getSession().<String>getAttribute(expectedAttributeName))
|
||||
.isEqualTo(expectedAttributeValue);
|
||||
}
|
||||
|
||||
@@ -171,7 +170,8 @@ class RedisIndexedSessionRepositoryITests extends AbstractRedisITests {
|
||||
assertThat(findByPrincipalName.keySet()).containsOnly(toSave.getId());
|
||||
|
||||
this.repository.deleteById(toSave.getId());
|
||||
assertThat(this.registry.receivedEvent(toSave.getId())).isTrue();
|
||||
boolean sessionDestroyed = this.registry.receivedEvent(toSave.getId(), SessionDestroyedEvent.class);
|
||||
assertThat(sessionDestroyed).isTrue();
|
||||
|
||||
findByPrincipalName = this.repository.findByIndexNameAndIndexValue(INDEX_NAME, principalName);
|
||||
|
||||
@@ -334,7 +334,8 @@ class RedisIndexedSessionRepositoryITests extends AbstractRedisITests {
|
||||
assertThat(findByPrincipalName.keySet()).containsOnly(toSave.getId());
|
||||
|
||||
this.repository.deleteById(toSave.getId());
|
||||
assertThat(this.registry.receivedEvent(toSave.getId())).isTrue();
|
||||
boolean sessionDestroyed = this.registry.receivedEvent(toSave.getId(), SessionDestroyedEvent.class);
|
||||
assertThat(sessionDestroyed).isTrue();
|
||||
|
||||
findByPrincipalName = this.repository.findByIndexNameAndIndexValue(INDEX_NAME, getSecurityName());
|
||||
|
||||
|
||||
Reference in New Issue
Block a user