From 66b582c3969d56fefb59d4ed176a098cb0bab042 Mon Sep 17 00:00:00 2001 From: Scott Frederick Date: Wed, 20 May 2015 14:52:30 -0500 Subject: [PATCH] Add support for multiple URIs in credentials for AMQP/RabbitMQ. --- .../cloudfoundry/AmqpServiceInfoCreator.java | 8 ++ .../CloudFoundryConnectorAmqpServiceTest.java | 43 +++++++++++ .../test-rabbit-info-multiple-uris.json | 12 +++ .../cloud/service/common/AmqpServiceInfo.java | 27 ++++++- .../RabbitConnectionFactoryCreator.java | 55 ++++++++++++-- .../RabbitConnectionFactoryCreatorTest.java | 73 +++++++++++++++---- 6 files changed, 195 insertions(+), 23 deletions(-) create mode 100644 spring-cloud-cloudfoundry-connector/src/test/resources/org/springframework/cloud/cloudfoundry/test-rabbit-info-multiple-uris.json diff --git a/spring-cloud-cloudfoundry-connector/src/main/java/org/springframework/cloud/cloudfoundry/AmqpServiceInfoCreator.java b/spring-cloud-cloudfoundry-connector/src/main/java/org/springframework/cloud/cloudfoundry/AmqpServiceInfoCreator.java index b1d36a5..11bd81a 100644 --- a/spring-cloud-cloudfoundry-connector/src/main/java/org/springframework/cloud/cloudfoundry/AmqpServiceInfoCreator.java +++ b/spring-cloud-cloudfoundry-connector/src/main/java/org/springframework/cloud/cloudfoundry/AmqpServiceInfoCreator.java @@ -1,5 +1,6 @@ package org.springframework.cloud.cloudfoundry; +import java.util.List; import java.util.Map; import org.springframework.cloud.service.common.AmqpServiceInfo; @@ -15,6 +16,7 @@ public class AmqpServiceInfoCreator extends CloudFoundryServiceInfoCreator serviceData) { Map credentials = getCredentials(serviceData); @@ -23,6 +25,12 @@ public class AmqpServiceInfoCreator extends CloudFoundryServiceInfoCreator uris = (List) credentials.get("uris"); + List managementUris = (List) credentials.get("http_api_uris"); + return new AmqpServiceInfo(id, uri, managementUri, uris, managementUris); + } + return new AmqpServiceInfo(id, uri, managementUri); } diff --git a/spring-cloud-cloudfoundry-connector/src/test/java/org/springframework/cloud/cloudfoundry/CloudFoundryConnectorAmqpServiceTest.java b/spring-cloud-cloudfoundry-connector/src/test/java/org/springframework/cloud/cloudfoundry/CloudFoundryConnectorAmqpServiceTest.java index c8cf6e1..5542810 100644 --- a/spring-cloud-cloudfoundry-connector/src/test/java/org/springframework/cloud/cloudfoundry/CloudFoundryConnectorAmqpServiceTest.java +++ b/spring-cloud-cloudfoundry-connector/src/test/java/org/springframework/cloud/cloudfoundry/CloudFoundryConnectorAmqpServiceTest.java @@ -1,6 +1,7 @@ package org.springframework.cloud.cloudfoundry; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.when; @@ -18,6 +19,8 @@ import org.springframework.cloud.service.common.AmqpServiceInfo; * */ public class CloudFoundryConnectorAmqpServiceTest extends AbstractCloudFoundryConnectorTest { + protected static final String hostname2 = "11.21.31.41"; + @Test public void rabbitServiceCreationWithTags() { when(mockEnvironment.getEnvValue("VCAP_SERVICES")) @@ -68,6 +71,34 @@ public class CloudFoundryConnectorAmqpServiceTest extends AbstractCloudFoundryCo assertServiceFoundOfType(serviceInfos, "rabbit-2", AmqpServiceInfo.class); } + @Test + public void rabbitServiceCreationMultipleUris() { + when(mockEnvironment.getEnvValue("VCAP_SERVICES")) + .thenReturn(getServicesPayload( + getRabbitServicePayloadMultipleUris("rabbit-1", hostname, hostname2, port, username, password, "q-1", "vhost1"), + getRabbitServicePayloadMultipleUris("rabbit-2", hostname, hostname2, port, username, password, "q-2", "vhost2"))); + + List serviceInfos = testCloudConnector.getServiceInfos(); + assertServiceFoundOfType(serviceInfos, "rabbit-1", AmqpServiceInfo.class); + assertServiceFoundOfType(serviceInfos, "rabbit-2", AmqpServiceInfo.class); + + AmqpServiceInfo amqpServiceInfo = (AmqpServiceInfo) serviceInfos.get(0); + assertNotNull(amqpServiceInfo.getUri()); + assertTrue(amqpServiceInfo.getUri().contains(hostname)); + assertNotNull(amqpServiceInfo.getManagementUri()); + assertTrue(amqpServiceInfo.getManagementUri().contains(hostname)); + + assertNotNull(amqpServiceInfo.getUris()); + assertEquals(2, amqpServiceInfo.getUris().size()); + assertTrue(amqpServiceInfo.getUris().get(0).contains(hostname)); + assertTrue(amqpServiceInfo.getUris().get(1).contains(hostname2)); + + assertNotNull(amqpServiceInfo.getManagementUris()); + assertEquals(2, amqpServiceInfo.getManagementUris().size()); + assertTrue(amqpServiceInfo.getManagementUris().get(0).contains(hostname)); + assertTrue(amqpServiceInfo.getManagementUris().get(1).contains(hostname2)); + } + @Test public void rabbitServiceCreationNoLabelNoTags() { when(mockEnvironment.getEnvValue("VCAP_SERVICES")) @@ -103,6 +134,7 @@ public class CloudFoundryConnectorAmqpServiceTest extends AbstractCloudFoundryCo assertServiceFoundOfType(serviceInfos, "qpid-1", AmqpServiceInfo.class); assertServiceFoundOfType(serviceInfos, "qpid-2", AmqpServiceInfo.class); AmqpServiceInfo serviceInfo = (AmqpServiceInfo) getServiceInfo(serviceInfos, "qpid-1"); + assertNotNull(serviceInfo); assertEquals(username, serviceInfo.getUserName()); assertEquals(password, serviceInfo.getPassword()); assertEquals("vhost1", serviceInfo.getVirtualHost()); @@ -142,6 +174,17 @@ public class CloudFoundryConnectorAmqpServiceTest extends AbstractCloudFoundryCo hostname, port, user, password, name, vHost); } + private String getRabbitServicePayloadMultipleUris(String serviceName, + String hostname, String hostname2, int port, + String user, String password, String name, + String vHost) { + String payload = getAmqpServicePayload("test-rabbit-info-multiple-uris.json", serviceName, + hostname, port, user, password, name, vHost); + payload = payload.replace("$host1", hostname); + payload = payload.replace("$host2", hostname2); + return payload; + } + private String getQpidServicePayloadNoLabelNoTags(String serviceName, String hostname, int port, String user, String password, String name, diff --git a/spring-cloud-cloudfoundry-connector/src/test/resources/org/springframework/cloud/cloudfoundry/test-rabbit-info-multiple-uris.json b/spring-cloud-cloudfoundry-connector/src/test/resources/org/springframework/cloud/cloudfoundry/test-rabbit-info-multiple-uris.json new file mode 100644 index 0000000..0daefc2 --- /dev/null +++ b/spring-cloud-cloudfoundry-connector/src/test/resources/org/springframework/cloud/cloudfoundry/test-rabbit-info-multiple-uris.json @@ -0,0 +1,12 @@ +{ + "name":"$serviceName", + "label":"rabbitmq", + "plan":"free", + "tags":["amqp","rabbitmq"], + "credentials":{ + "uri": "amqp://$username:$password@$hostname/$virtualHost", + "http_api_uri": "http://$user:$pass@$hostname/api", + "uris": [ "amqp://$username:$password@$host1/$virtualHost", "amqp://$username:$password@$host2/$virtualHost" ], + "http_api_uris": [ "http://$user:$pass@$host1/api", "http://$user:$pass@$host2/api" ] + } +} diff --git a/spring-cloud-core/src/main/java/org/springframework/cloud/service/common/AmqpServiceInfo.java b/spring-cloud-core/src/main/java/org/springframework/cloud/service/common/AmqpServiceInfo.java index a2895aa..c7f7ef3 100644 --- a/spring-cloud-core/src/main/java/org/springframework/cloud/service/common/AmqpServiceInfo.java +++ b/spring-cloud-core/src/main/java/org/springframework/cloud/service/common/AmqpServiceInfo.java @@ -5,8 +5,10 @@ import org.springframework.cloud.service.ServiceInfo.ServiceLabel; import org.springframework.cloud.service.UriBasedServiceInfo; import org.springframework.cloud.util.UriInfo; +import java.util.List; + /** - * Information to access RabbitMQ service. + * Information to access an AMQP service. * * @author Ramnivas Laddad * @author Scott Frederick @@ -20,6 +22,9 @@ public class AmqpServiceInfo extends UriBasedServiceInfo { private String managementUri; + private List uris; + private List managementUris; + public AmqpServiceInfo(String id, String host, int port, String username, String password, String virtualHost) { this(id, host, port, username, password, virtualHost, null); } @@ -29,6 +34,12 @@ public class AmqpServiceInfo extends UriBasedServiceInfo { this.managementUri = managementUri; } + public AmqpServiceInfo(String id, String uri, String managementUri, List uris, List managementUris) { + this(id, uri, managementUri); + this.uris = uris; + this.managementUris = managementUris; + } + public AmqpServiceInfo(String id, String uri) throws CloudException { this(id, uri, null); } @@ -44,7 +55,19 @@ public class AmqpServiceInfo extends UriBasedServiceInfo { } @ServiceProperty(category="connection") - public String getManagementUri() { return managementUri; } + public String getManagementUri() { + return managementUri; + } + + @ServiceProperty(category="connection") + public List getUris() { + return uris; + } + + @ServiceProperty(category="connection") + public List getManagementUris() { + return managementUris; + } @Override protected UriInfo validateAndCleanUriInfo(UriInfo uriInfo) { diff --git a/spring-cloud-spring-service-connector/src/main/java/org/springframework/cloud/service/messaging/RabbitConnectionFactoryCreator.java b/spring-cloud-spring-service-connector/src/main/java/org/springframework/cloud/service/messaging/RabbitConnectionFactoryCreator.java index 441f6c4..029f2f9 100644 --- a/spring-cloud-spring-service-connector/src/main/java/org/springframework/cloud/service/messaging/RabbitConnectionFactoryCreator.java +++ b/spring-cloud-spring-service-connector/src/main/java/org/springframework/cloud/service/messaging/RabbitConnectionFactoryCreator.java @@ -6,6 +6,9 @@ import org.springframework.cloud.service.AbstractServiceConnectorCreator; import org.springframework.cloud.service.ServiceConnectorConfig; import org.springframework.cloud.service.common.AmqpServiceInfo; +import java.net.URI; +import java.net.URISyntaxException; + /** * Simplified access to creating RabbitMQ service objects. * @@ -20,17 +23,40 @@ public class RabbitConnectionFactoryCreator extends AbstractServiceConnectorCrea @Override public ConnectionFactory create(AmqpServiceInfo serviceInfo, ServiceConnectorConfig serviceConnectorConfiguration) { - com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory(); - try { - connectionFactory.setUri(serviceInfo.getUri()); - } - catch (Exception e) { - throw new IllegalArgumentException("Failed to create ConnectionFactory", e); - } + com.rabbitmq.client.ConnectionFactory connectionFactory = createRabbitConnectionFactory(serviceInfo); + configurer.configure(connectionFactory, (RabbitConnectionFactoryConfig) serviceConnectorConfiguration); + return createSpringConnectionFactory(serviceInfo, serviceConnectorConfiguration, connectionFactory); + } + + private com.rabbitmq.client.ConnectionFactory createRabbitConnectionFactory(AmqpServiceInfo serviceInfo) { + com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory(); + if (serviceInfo.getUris() != null && serviceInfo.getUris().size() > 0) { + setConnectionFactoryUri(connectionFactory, serviceInfo.getUris().get(0)); + } else { + setConnectionFactoryUri(connectionFactory, serviceInfo.getUri()); + } + return connectionFactory; + } + + private void setConnectionFactoryUri(com.rabbitmq.client.ConnectionFactory connectionFactory, String uri) { + try { + connectionFactory.setUri(uri); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid AMQP URI", e); + } + } + + private CachingConnectionFactory createSpringConnectionFactory(AmqpServiceInfo serviceInfo, + ServiceConnectorConfig serviceConnectorConfiguration, + com.rabbitmq.client.ConnectionFactory connectionFactory) { CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory); + if (serviceInfo.getUris() != null) { + cachingConnectionFactory.setAddresses(getAddresses(serviceInfo)); + } + if (serviceConnectorConfiguration != null) { Integer channelCacheSize = ((RabbitConnectionFactoryConfig) serviceConnectorConfiguration).getChannelCacheSize(); if (channelCacheSize != null) { @@ -41,4 +67,19 @@ public class RabbitConnectionFactoryCreator extends AbstractServiceConnectorCrea return cachingConnectionFactory; } + private String getAddresses(AmqpServiceInfo serviceInfo) { + try { + StringBuilder addresses = new StringBuilder(); + for (String uriString : serviceInfo.getUris()) { + URI uri = new URI(uriString); + if (addresses.length() > 0) { + addresses.append(','); + } + addresses.append(uri.getHost()).append(':').append(uri.getPort()); + } + return addresses.toString(); + } catch (URISyntaxException e) { + throw new IllegalArgumentException("Invalid AMQP URI", e); + } + } } diff --git a/spring-cloud-spring-service-connector/src/test/java/org/springframework/cloud/service/rabbit/RabbitConnectionFactoryCreatorTest.java b/spring-cloud-spring-service-connector/src/test/java/org/springframework/cloud/service/rabbit/RabbitConnectionFactoryCreatorTest.java index e085c23..95a5f1b 100644 --- a/spring-cloud-spring-service-connector/src/test/java/org/springframework/cloud/service/rabbit/RabbitConnectionFactoryCreatorTest.java +++ b/spring-cloud-spring-service-connector/src/test/java/org/springframework/cloud/service/rabbit/RabbitConnectionFactoryCreatorTest.java @@ -2,50 +2,95 @@ package org.springframework.cloud.service.rabbit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import com.rabbitmq.client.Address; import org.junit.Test; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.cloud.service.common.AmqpServiceInfo; import org.springframework.cloud.service.messaging.RabbitConnectionFactoryCreator; import org.springframework.test.util.ReflectionTestUtils; +import java.net.URI; +import java.util.Arrays; +import java.util.List; + /** * * @author Ramnivas Laddad + * @author Scott Frederick * */ public class RabbitConnectionFactoryCreatorTest { private static final String TEST_HOST = "10.20.30.40"; + private static final String TEST_HOST2 = "11.21.31.41"; private static final int TEST_PORT = 1234; + private static final int TEST_PORT2 = 5678; private static final String TEST_USERNAME = "myuser"; private static final String TEST_PASSWORD = "mypass"; private static final String TEST_VH = "myVirtualHost"; - private RabbitConnectionFactoryCreator testCreator = new RabbitConnectionFactoryCreator(); @Test - public void cloudRabbitCreationNoConfig() throws Exception { - AmqpServiceInfo serviceInfo = createServiceInfo(); + public void cloudRabbitCreationNoUri() throws Exception { + AmqpServiceInfo serviceInfo = new AmqpServiceInfo("id", TEST_HOST, TEST_PORT, TEST_USERNAME, TEST_PASSWORD, TEST_VH); ConnectionFactory connector = testCreator.create(serviceInfo, null); - assertConnectorProperties(serviceInfo, connector); + assertConnectorPropertiesMatchUri(connector, serviceInfo.getUri()); } - public AmqpServiceInfo createServiceInfo() { - return new AmqpServiceInfo("id", TEST_HOST, TEST_PORT, TEST_USERNAME, TEST_PASSWORD, TEST_VH); + @Test + public void cloudRabbitCreationWithUri() throws Exception { + String userinfo = String.format("%s:%s", TEST_USERNAME, TEST_PASSWORD); + URI uri = new URI("amqp", userinfo, TEST_HOST, TEST_PORT, "/" + TEST_VH, null, null); + AmqpServiceInfo serviceInfo = new AmqpServiceInfo("id", uri.toString()); + + ConnectionFactory connector = testCreator.create(serviceInfo, null); + + assertConnectorPropertiesMatchUri(connector, serviceInfo.getUri()); } - private void assertConnectorProperties(AmqpServiceInfo serviceInfo, ConnectionFactory connector) { + @Test + public void cloudRabbitCreationWithUris() throws Exception { + String userinfo = String.format("%s:%s", TEST_USERNAME, TEST_PASSWORD); + URI uri = new URI("amqp", userinfo, "0.0.0.0", 0, "/" + TEST_VH, null, null); + URI uri1 = new URI("amqp", userinfo, TEST_HOST, TEST_PORT, "/" + TEST_VH, null, null); + URI uri2 = new URI("amqp", userinfo, TEST_HOST2, TEST_PORT2, "/" + TEST_VH, null, null); + List uris = Arrays.asList(uri1.toString(), uri2.toString()); + AmqpServiceInfo serviceInfo = new AmqpServiceInfo("id", uri.toString(), null, uris, null); + + ConnectionFactory connector = testCreator.create(serviceInfo, null); + + assertConnectorPropertiesMatchUri(connector, uri1.toString()); + assertConnectorPropertiesMatchHosts(connector, uris); + } + + private void assertConnectorPropertiesMatchUri(ConnectionFactory connector, String uriString) throws Exception { assertNotNull(connector); - - assertEquals(serviceInfo.getHost(), connector.getHost()); - assertEquals(serviceInfo.getPort(), connector.getPort()); - com.rabbitmq.client.ConnectionFactory underlying = (com.rabbitmq.client.ConnectionFactory) ReflectionTestUtils.getField(connector, "rabbitConnectionFactory"); - assertEquals(serviceInfo.getUserName(), ReflectionTestUtils.getField(underlying, "username")); - assertEquals(serviceInfo.getPassword(), ReflectionTestUtils.getField(underlying, "password")); - assertEquals(serviceInfo.getVirtualHost(), connector.getVirtualHost()); + URI uri = new URI(uriString); + assertEquals(uri.getHost(), connector.getHost()); + assertEquals(uri.getPort(), connector.getPort()); + com.rabbitmq.client.ConnectionFactory rabbitConnectionFactory = + (com.rabbitmq.client.ConnectionFactory) ReflectionTestUtils.getField(connector, "rabbitConnectionFactory"); + String[] userInfo = uri.getRawUserInfo().split(":"); + assertEquals(userInfo[0], ReflectionTestUtils.getField(rabbitConnectionFactory, "username")); + assertEquals(userInfo[1], ReflectionTestUtils.getField(rabbitConnectionFactory, "password")); + + assertTrue(uri.getPath().endsWith(connector.getVirtualHost())); + } + + private void assertConnectorPropertiesMatchHosts(ConnectionFactory connector, List uriStrings) throws Exception { + Address[] addresses = (Address[]) ReflectionTestUtils.getField(connector, "addresses"); + assertNotNull(addresses); + assertEquals(uriStrings.size(), addresses.length); + + for (int i = 0; i < uriStrings.size(); i++) { + URI uri = new URI(uriStrings.get(i)); + assertEquals(uri.getHost(), addresses[i].getHost()); + assertEquals(uri.getPort(), addresses[i].getPort()); + } } }