parametrize kafka pass config

This commit is contained in:
Christian Tzolov
2022-10-21 15:41:20 +02:00
parent 5cbe143ca3
commit 45890427e2
4 changed files with 9 additions and 7 deletions

View File

@@ -38,10 +38,6 @@ public class KafkaTriggerDemoApplication {
public Function<Message<String>, String> uppercase(JsonMapper mapper) {
return message -> {
// // (Optionally) access and use the Azure function context.
ExecutionContext context = (ExecutionContext) message.getHeaders().get("executionContext");
context.getLogger().info("Kafka triggered with data: " + message.getPayload());
// Convert the message payload into Azure's KafkaEntity format.
KafkaEntity kafkaEntity = mapper.fromJson(message.getPayload(), KafkaEntity.class);

View File

@@ -39,7 +39,7 @@ public class UppercaseHandler extends FunctionInvoker<Message<String>, String> {
brokerList = "%BrokerList%",
consumerGroup = "$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
password = "%ConfluentCloudPassword%",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.PLAINTEXT,
// protocol = BrokerProtocol.SASLSSL,
@@ -50,11 +50,11 @@ public class UppercaseHandler extends FunctionInvoker<Message<String>, String> {
topic = "output",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
password = "%ConfluentCloudPassword%",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.PLAINTEXT
//protocol = BrokerProtocol.SASLSSL
// protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {