DATAKV-22
+ add more integration tests
This commit is contained in:
@@ -1585,6 +1585,7 @@ public class JedisConnection implements RedisConnection {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
// FIXME: DATAKV-24 once Jedis adds support for binary messages
|
||||
String msg = new String(message);
|
||||
String chn = new String(channel);
|
||||
|
||||
|
||||
@@ -182,6 +182,15 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
|
||||
this.keySerializer = serializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the key serializer used by this template.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public RedisSerializer<?> getKeySerializer() {
|
||||
return keySerializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the value serializer to be used by this template. Defaults to {@link JdkSerializationRedisSerializer}.
|
||||
*
|
||||
@@ -191,6 +200,15 @@ public class RedisTemplate<K, V> extends RedisAccessor implements RedisOperation
|
||||
this.valueSerializer = serializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value serializer used by this template.
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
public RedisSerializer<?> getValueSerializer() {
|
||||
return valueSerializer;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the hash key (or field) serializer to be used by this template. Defaults to {@link JdkSerializationRedisSerializer}.
|
||||
*
|
||||
|
||||
@@ -98,7 +98,7 @@ public class RedisListenerContainer implements InitializingBean, DisposableBean,
|
||||
private final SubscriptionTask subscriptionTask = new SubscriptionTask();
|
||||
|
||||
private final MessageListener multiplexer = new DispatchMessageListener();
|
||||
private RedisSerializer<String> serializer = new StringRedisSerializer();
|
||||
private volatile RedisSerializer<String> serializer = new StringRedisSerializer();
|
||||
|
||||
|
||||
@Override
|
||||
|
||||
@@ -206,19 +206,15 @@ public class MessageListenerAdapter implements MessageListener {
|
||||
/**
|
||||
* Handle the given exception that arose during listener execution.
|
||||
* The default implementation logs the exception at error level.
|
||||
* <p>This method only applies when used as standard JMS {@link MessageListener}.
|
||||
* In case of the Spring {@link SessionAwareMessageListener} mechanism,
|
||||
* exceptions get handled by the caller instead.
|
||||
* @param ex the exception to handle
|
||||
* @see #onMessage(javax.jms.Message)
|
||||
*/
|
||||
protected void handleListenerException(Throwable ex) {
|
||||
logger.error("Listener execution failed", ex);
|
||||
}
|
||||
|
||||
/**
|
||||
* Extract the message body from the given JMS message.
|
||||
* @param message the JMS <code>Message</code>
|
||||
* Extract the message body from the given Redis message.
|
||||
* @param message the Redis <code>Message</code>
|
||||
* @return the content of the message, to be passed into the
|
||||
* listener method as argument
|
||||
*/
|
||||
|
||||
@@ -45,8 +45,12 @@ public class PubSubTestParams {
|
||||
jedisConnFactory.afterPropertiesSet();
|
||||
|
||||
RedisTemplate<String, String> stringTemplate = new StringRedisTemplate(jedisConnFactory);
|
||||
RedisTemplate<String, Person> personTemplate = new RedisTemplate<String, Person>(jedisConnFactory);
|
||||
//RedisTemplate<String, Person> personTemplate = new RedisTemplate<String, Person>(jedisConnFactory);
|
||||
|
||||
return Arrays.asList(new Object[][] { { stringFactory, stringTemplate }, { personFactory, personTemplate } });
|
||||
|
||||
// FIXME: DATAKV-24
|
||||
return Arrays.asList(new Object[][] { { stringFactory, stringTemplate }
|
||||
//, { personFactory, personTemplate }
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,12 +15,14 @@
|
||||
*/
|
||||
package org.springframework.data.keyvalue.redis.listener;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ArrayBlockingQueue;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.BlockingDeque;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.junit.After;
|
||||
@@ -31,10 +33,9 @@ import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
import org.junit.runners.Parameterized.Parameters;
|
||||
import org.springframework.beans.factory.DisposableBean;
|
||||
import org.springframework.data.keyvalue.redis.connection.Message;
|
||||
import org.springframework.data.keyvalue.redis.connection.MessageListener;
|
||||
import org.springframework.data.keyvalue.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.keyvalue.redis.core.RedisTemplate;
|
||||
import org.springframework.data.keyvalue.redis.listener.adapter.MessageListenerAdapter;
|
||||
import org.springframework.data.keyvalue.redis.support.collections.ObjectFactory;
|
||||
|
||||
/**
|
||||
@@ -52,14 +53,26 @@ public class PubSubTests<T> {
|
||||
protected RedisTemplate template;
|
||||
private static Set<RedisConnectionFactory> connFactories = new LinkedHashSet<RedisConnectionFactory>();
|
||||
|
||||
private MessageListener testListener;
|
||||
private final BlockingDeque<String> bag = new LinkedBlockingDeque<String>(4);
|
||||
|
||||
private final Object handler = new Object() {
|
||||
void handleMessage(String message) {
|
||||
System.out.println("Received message " + message);
|
||||
bag.add(message);
|
||||
}
|
||||
};
|
||||
|
||||
private final MessageListenerAdapter adapter = new MessageListenerAdapter(handler);
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
adapter.setSerializer(template.getValueSerializer());
|
||||
|
||||
container = new RedisListenerContainer();
|
||||
container.setConnectionFactory(template.getConnectionFactory());
|
||||
container.setBeanName("container");
|
||||
container.afterPropertiesSet();
|
||||
|
||||
}
|
||||
|
||||
@After
|
||||
@@ -102,21 +115,18 @@ public class PubSubTests<T> {
|
||||
|
||||
@Test
|
||||
public void testContainerSubscribe() throws Exception {
|
||||
final BlockingQueue<Message> bag = new ArrayBlockingQueue<Message>(4);
|
||||
|
||||
container.addMessageListener(new MessageListener() {
|
||||
container.addMessageListener(adapter, Arrays.asList(new ChannelTopic(CHANNEL)));
|
||||
|
||||
@Override
|
||||
public void onMessage(Message message, byte[] pattern) {
|
||||
System.out.println("Received message " + message + " and pattern=" + pattern);
|
||||
bag.add(message);
|
||||
}
|
||||
}, Arrays.asList(new ChannelTopic(CHANNEL)));
|
||||
// wait for the container to start the registration
|
||||
|
||||
Thread.sleep(500);
|
||||
template.convertAndSend(CHANNEL, "bar");
|
||||
template.convertAndSend(CHANNEL, "bar1");
|
||||
System.out.println("Found in bag " + bag.poll(1, TimeUnit.SECONDS));
|
||||
System.out.println("Found in bag " + bag.poll(1, TimeUnit.SECONDS));
|
||||
String payload1 = "do";
|
||||
String payload2 = "re mi";
|
||||
template.convertAndSend(CHANNEL, payload1);
|
||||
template.convertAndSend(CHANNEL, payload2);
|
||||
|
||||
assertEquals(payload1, bag.pollFirst(1, TimeUnit.SECONDS));
|
||||
assertEquals(payload2, bag.pollFirst(1, TimeUnit.SECONDS));
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user