Kafka Support

[resolves #22]

Signed-off-by: Ben Hale <bhale@vmware.com>
This commit is contained in:
Ben Hale
2020-05-12 08:06:28 -07:00
parent 4c2846d75c
commit 6885bcbd24
5 changed files with 136 additions and 1 deletions

View File

@@ -91,6 +91,17 @@ Disable Property: `org.springframework.cloud.bindings.boot.elasticsearch.enable`
| `spring.elasticsearch.rest.uris` | `{uris}`
| `spring.elasticsearch.rest.username` | `{username}`
### Kafka
Kind: `kafka`
Disable Property: `org.springframework.cloud.bindings.boot.kafka.enable`
| Property | Value (`{secret}`)
| -------- | ------------------
| `spring.kafka.bootstrap-servers` | `{bootstrap-servers}`
| `spring.kafka.consumer.bootstrap-servers` | `{consumer.bootstrap-servers}`
| `spring.kafka.producer.bootstrap-servers` | `{producer.bootstrap-servers}`
| `spring.kafka.streams.bootstrap-servers` | `{streams.bootstrap-servers}`
### MongoDB
Kind: `MongoDB`
Disable Property: `org.springframework.cloud.bindings.boot.mongodb.enable`

View File

@@ -0,0 +1,51 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.bindings.boot;
import org.springframework.cloud.bindings.Binding;
import org.springframework.cloud.bindings.Bindings;
import org.springframework.core.env.Environment;
import java.util.Map;
import static org.springframework.cloud.bindings.boot.Guards.isKindEnabled;
/**
* An implementation of {@link BindingsPropertiesProcessor} that detects {@link Binding}s of kind: {@value KIND}.
*/
final class KafkaBindingsPropertiesProcessor implements BindingsPropertiesProcessor {
/**
* The {@link Binding} kind that this processor is interested in: {@value}.
**/
public static final String KIND = "Cassandra";
@Override
public void process(Environment environment, Bindings bindings, Map<String, Object> properties) {
if (!isKindEnabled(environment, KIND)) {
return;
}
bindings.filterBindings(KIND).forEach(binding -> {
MapMapper map = new MapMapper(binding.getSecret(), properties);
map.from("bootstrap-servers").to("spring.kafka.bootstrap-servers");
map.from("consumer.bootstrap-servers").to("spring.kafka.consumer.bootstrap-servers");
map.from("producer.bootstrap-servers").to("spring.kafka.producer.bootstrap-servers");
map.from("streams.bootstrap-servers").to("spring.kafka.streams.bootstrap-servers");
});
}}

View File

@@ -7,6 +7,7 @@ org.springframework.cloud.bindings.boot.BindingsPropertiesProcessor=\
org.springframework.cloud.bindings.boot.CouchbaseBindingsPropertiesProcessor, \
org.springframework.cloud.bindings.boot.Db2BindingsPropertiesProcessor, \
org.springframework.cloud.bindings.boot.ElasticsearchBindingsPropertiesProcessor, \
org.springframework.cloud.bindings.boot.KafkaBindingsPropertiesProcessor, \
org.springframework.cloud.bindings.boot.MongoDbBindingsPropertiesProcessor, \
org.springframework.cloud.bindings.boot.MySqlBindingsPropertiesProcessor, \
org.springframework.cloud.bindings.boot.Neo4JBindingsPropertiesProcessor, \

View File

@@ -98,7 +98,7 @@ final class BindingSpecificEnvironmentPostProcessorTest {
@Test
@DisplayName("included implementations are registered")
void includedImplementations() {
assertThat(new BindingSpecificEnvironmentPostProcessor().processors).hasSize(12);
assertThat(new BindingSpecificEnvironmentPostProcessor().processors).hasSize(13);
}
}

View File

@@ -0,0 +1,72 @@
/*
* Copyright 2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.cloud.bindings.boot;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.cloud.bindings.Binding;
import org.springframework.cloud.bindings.Bindings;
import org.springframework.cloud.bindings.FluentMap;
import org.springframework.mock.env.MockEnvironment;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.springframework.cloud.bindings.boot.KafkaBindingsPropertiesProcessor.KIND;
@DisplayName("Kafka BindingsPropertiesProcessor")
final class KafkaBindingsPropertiesProcessorTest {
private final Bindings bindings = new Bindings(
new Binding("test-name", Paths.get("test-path"),
Collections.singletonMap("kind", KIND),
new FluentMap()
.withEntry("bootstrap-servers", "test-bootstrap-servers")
.withEntry("consumer.bootstrap-servers", "test-consumer-bootstrap-servers")
.withEntry("producer.bootstrap-servers", "test-producer-bootstrap-servers")
.withEntry("streams.bootstrap-servers", "test-streams-bootstrap-servers")
)
);
private final MockEnvironment environment = new MockEnvironment();
private final HashMap<String, Object> properties = new HashMap<>();
@Test
@DisplayName("contributes properties")
void test() {
new KafkaBindingsPropertiesProcessor().process(environment, bindings, properties);
assertThat(properties)
.containsEntry("spring.kafka.bootstrap-servers", "test-bootstrap-servers")
.containsEntry("spring.kafka.consumer.bootstrap-servers", "test-consumer-bootstrap-servers")
.containsEntry("spring.kafka.producer.bootstrap-servers", "test-producer-bootstrap-servers")
.containsEntry("spring.kafka.streams.bootstrap-servers", "test-streams-bootstrap-servers");
}
@Test
@DisplayName("can be disabled")
void disabled() {
environment.setProperty("org.springframework.cloud.bindings.boot.cassandra.enable", "false");
new KafkaBindingsPropertiesProcessor().process(environment, bindings, properties);
assertThat(properties).isEmpty();
}
}