Azure samples version and code adjustments

This commit is contained in:
Christian Tzolov
2023-07-20 10:52:16 +02:00
parent 3c080031ad
commit 8883e3b771
13 changed files with 75 additions and 58 deletions

View File

@@ -18,7 +18,6 @@ package example;
import java.util.Map;
import java.util.function.Function;
import com.microsoft.azure.functions.ExecutionContext;
import example.entity.KafkaEntity;
import org.springframework.boot.SpringApplication;

View File

@@ -16,6 +16,8 @@
package example;
import java.util.function.Function;
import com.microsoft.azure.functions.BrokerAuthenticationMode;
import com.microsoft.azure.functions.BrokerProtocol;
import com.microsoft.azure.functions.ExecutionContext;
@@ -24,35 +26,39 @@ import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.KafkaOutput;
import com.microsoft.azure.functions.annotation.KafkaTrigger;
import org.springframework.cloud.function.adapter.azure.FunctionInvoker;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
public class UppercaseHandler extends FunctionInvoker<Message<String>, String> {
@Component
public class UppercaseHandler {
@Autowired
private Function<Message<String>, String> uppercase;
@FunctionName("KafkaTrigger")
public void execute(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "%TriggerKafkaTopic%",
brokerList = "%BrokerList%",
consumerGroup = "$Default",
username = "%ConfluentCloudUsername%",
password = "%ConfluentCloudPassword%",
authenticationMode = BrokerAuthenticationMode.PLAIN,
name = "KafkaTrigger",
topic = "%TriggerKafkaTopic%",
brokerList = "%BrokerList%",
consumerGroup = "$Default",
username = "%ConfluentCloudUsername%",
password = "%ConfluentCloudPassword%",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.PLAINTEXT,
// protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string") String kafkaEventData,
@KafkaOutput(
name = "kafkaOutput",
topic = "output",
topic = "output",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
username = "%ConfluentCloudUsername%",
password = "%ConfluentCloudPassword%",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.PLAINTEXT
// protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
@@ -62,7 +68,7 @@ public class UppercaseHandler extends FunctionInvoker<Message<String>, String> {
Message<String> message = MessageBuilder.withPayload(kafkaEventData).build();
String response = handleRequest(message, context);
String response = uppercase.apply(message);
output.setValue(response);
}