diff --git a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/connection/jedis/JedisConnection.java b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/connection/jedis/JedisConnection.java index 6e3433c17..e8846edee 100644 --- a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/connection/jedis/JedisConnection.java +++ b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/connection/jedis/JedisConnection.java @@ -1585,6 +1585,7 @@ public class JedisConnection implements RedisConnection { throw new UnsupportedOperationException(); } + // FIXME: DATAKV-24 once Jedis adds support for binary messages String msg = new String(message); String chn = new String(channel); diff --git a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/core/RedisTemplate.java b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/core/RedisTemplate.java index 7fecbf3f1..d58ab847d 100644 --- a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/core/RedisTemplate.java +++ b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/core/RedisTemplate.java @@ -182,6 +182,15 @@ public class RedisTemplate extends RedisAccessor implements RedisOperation this.keySerializer = serializer; } + /** + * Returns the key serializer used by this template. + * + * @return + */ + public RedisSerializer getKeySerializer() { + return keySerializer; + } + /** * Sets the value serializer to be used by this template. Defaults to {@link JdkSerializationRedisSerializer}. * @@ -191,6 +200,15 @@ public class RedisTemplate extends RedisAccessor implements RedisOperation this.valueSerializer = serializer; } + /** + * Returns the value serializer used by this template. + * + * @return + */ + public RedisSerializer getValueSerializer() { + return valueSerializer; + } + /** * Sets the hash key (or field) serializer to be used by this template. Defaults to {@link JdkSerializationRedisSerializer}. * diff --git a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisListenerContainer.java b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisListenerContainer.java index 7c3071370..522e3073d 100644 --- a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisListenerContainer.java +++ b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/RedisListenerContainer.java @@ -98,7 +98,7 @@ public class RedisListenerContainer implements InitializingBean, DisposableBean, private final SubscriptionTask subscriptionTask = new SubscriptionTask(); private final MessageListener multiplexer = new DispatchMessageListener(); - private RedisSerializer serializer = new StringRedisSerializer(); + private volatile RedisSerializer serializer = new StringRedisSerializer(); @Override diff --git a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/adapter/MessageListenerAdapter.java b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/adapter/MessageListenerAdapter.java index 9f6161fb6..366961451 100644 --- a/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/adapter/MessageListenerAdapter.java +++ b/spring-data-redis/src/main/java/org/springframework/data/keyvalue/redis/listener/adapter/MessageListenerAdapter.java @@ -206,19 +206,15 @@ public class MessageListenerAdapter implements MessageListener { /** * Handle the given exception that arose during listener execution. * The default implementation logs the exception at error level. - *

This method only applies when used as standard JMS {@link MessageListener}. - * In case of the Spring {@link SessionAwareMessageListener} mechanism, - * exceptions get handled by the caller instead. * @param ex the exception to handle - * @see #onMessage(javax.jms.Message) */ protected void handleListenerException(Throwable ex) { logger.error("Listener execution failed", ex); } /** - * Extract the message body from the given JMS message. - * @param message the JMS Message + * Extract the message body from the given Redis message. + * @param message the Redis Message * @return the content of the message, to be passed into the * listener method as argument */ diff --git a/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTestParams.java b/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTestParams.java index 750487509..fd98a0993 100644 --- a/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTestParams.java +++ b/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTestParams.java @@ -45,8 +45,12 @@ public class PubSubTestParams { jedisConnFactory.afterPropertiesSet(); RedisTemplate stringTemplate = new StringRedisTemplate(jedisConnFactory); - RedisTemplate personTemplate = new RedisTemplate(jedisConnFactory); + //RedisTemplate personTemplate = new RedisTemplate(jedisConnFactory); - return Arrays.asList(new Object[][] { { stringFactory, stringTemplate }, { personFactory, personTemplate } }); + + // FIXME: DATAKV-24 + return Arrays.asList(new Object[][] { { stringFactory, stringTemplate } + //, { personFactory, personTemplate } + }); } } diff --git a/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTests.java b/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTests.java index 0f359d326..5b2768a5e 100644 --- a/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTests.java +++ b/spring-data-redis/src/test/java/org/springframework/data/keyvalue/redis/listener/PubSubTests.java @@ -15,12 +15,14 @@ */ package org.springframework.data.keyvalue.redis.listener; +import static org.junit.Assert.*; + import java.util.Arrays; import java.util.Collection; import java.util.LinkedHashSet; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.BlockingDeque; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import org.junit.After; @@ -31,10 +33,9 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import org.springframework.beans.factory.DisposableBean; -import org.springframework.data.keyvalue.redis.connection.Message; -import org.springframework.data.keyvalue.redis.connection.MessageListener; import org.springframework.data.keyvalue.redis.connection.RedisConnectionFactory; import org.springframework.data.keyvalue.redis.core.RedisTemplate; +import org.springframework.data.keyvalue.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.keyvalue.redis.support.collections.ObjectFactory; /** @@ -52,14 +53,26 @@ public class PubSubTests { protected RedisTemplate template; private static Set connFactories = new LinkedHashSet(); - private MessageListener testListener; + private final BlockingDeque bag = new LinkedBlockingDeque(4); + + private final Object handler = new Object() { + void handleMessage(String message) { + System.out.println("Received message " + message); + bag.add(message); + } + }; + + private final MessageListenerAdapter adapter = new MessageListenerAdapter(handler); @Before public void setUp() throws Exception { + adapter.setSerializer(template.getValueSerializer()); + container = new RedisListenerContainer(); container.setConnectionFactory(template.getConnectionFactory()); container.setBeanName("container"); container.afterPropertiesSet(); + } @After @@ -102,21 +115,18 @@ public class PubSubTests { @Test public void testContainerSubscribe() throws Exception { - final BlockingQueue bag = new ArrayBlockingQueue(4); - container.addMessageListener(new MessageListener() { + container.addMessageListener(adapter, Arrays.asList(new ChannelTopic(CHANNEL))); - @Override - public void onMessage(Message message, byte[] pattern) { - System.out.println("Received message " + message + " and pattern=" + pattern); - bag.add(message); - } - }, Arrays.asList(new ChannelTopic(CHANNEL))); + // wait for the container to start the registration Thread.sleep(500); - template.convertAndSend(CHANNEL, "bar"); - template.convertAndSend(CHANNEL, "bar1"); - System.out.println("Found in bag " + bag.poll(1, TimeUnit.SECONDS)); - System.out.println("Found in bag " + bag.poll(1, TimeUnit.SECONDS)); + String payload1 = "do"; + String payload2 = "re mi"; + template.convertAndSend(CHANNEL, payload1); + template.convertAndSend(CHANNEL, payload2); + + assertEquals(payload1, bag.pollFirst(1, TimeUnit.SECONDS)); + assertEquals(payload2, bag.pollFirst(1, TimeUnit.SECONDS)); } } \ No newline at end of file