Add azure kafka function sample

- Add sample project with Azure KafkaTrigger and Output Kakfa Bindings
 - Add README docs

 Resolves #836
This commit is contained in:
Christian Tzolov
2022-10-17 19:17:16 +02:00
parent d627742772
commit 5aecb218e7
18 changed files with 1227 additions and 0 deletions

View File

@@ -0,0 +1,16 @@
{
"functionTimeout": "00:05:00",
"version": "2.0",
"extensions": {
"kafka": {
"maxBatchSize": 64,
"SubscriberIntervalInSeconds": 1,
"ExecutorChannelCapacity": 1,
"ChannelFullRetryIntervalInMs": 50
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[3.3.0, 4.0.0)"
}
}

View File

@@ -0,0 +1,12 @@
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"AzureWebJobsDashboard": "",
"FUNCTIONS_WORKER_RUNTIME": "java",
"BrokerList": "localhost:9092",
"ConfluentCloudUsername": "test",
"TriggerKafkaTopic": "trigger"
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 93 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 144 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 214 KiB

View File

@@ -0,0 +1,61 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example;
import java.util.Map;
import java.util.function.Function;
import com.microsoft.azure.functions.ExecutionContext;
import example.entity.KafkaEntity;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.function.json.JsonMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.Message;
@SpringBootApplication
public class KafkaTriggerDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaTriggerDemoApplication.class, args);
}
@Bean
public Function<Message<String>, String> uppercase(JsonMapper mapper) {
return message -> {
// // (Optionally) access and use the Azure function context.
ExecutionContext context = (ExecutionContext) message.getHeaders().get("executionContext");
context.getLogger().info("Kafka triggered with data: " + message.getPayload());
// Convert the message payload into Azure's KafkaEntity format.
KafkaEntity kafkaEntity = mapper.fromJson(message.getPayload(), KafkaEntity.class);
// Business logic: convert the JSON string values into uppercase.
if (kafkaEntity.getValue() != null) {
Map<String, Object> valueMap = mapper.fromJson(kafkaEntity.getValue(), Map.class);
if (valueMap != null) {
valueMap.forEach((k, v) -> valueMap.put(k,
v != null && v instanceof String ? ((String) v).toUpperCase() : null));
return mapper.toString(valueMap);
}
}
return mapper.toString(null);
};
}
}

View File

@@ -0,0 +1,69 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example;
import com.microsoft.azure.functions.BrokerAuthenticationMode;
import com.microsoft.azure.functions.BrokerProtocol;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.annotation.KafkaOutput;
import com.microsoft.azure.functions.annotation.KafkaTrigger;
import org.springframework.cloud.function.adapter.azure.FunctionInvoker;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
public class UppercaseHandler extends FunctionInvoker<Message<String>, String> {
@FunctionName("KafkaTrigger")
public void execute(
@KafkaTrigger(
name = "KafkaTrigger",
topic = "%TriggerKafkaTopic%",
brokerList = "%BrokerList%",
consumerGroup = "$Default",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
protocol = BrokerProtocol.PLAINTEXT,
// protocol = BrokerProtocol.SASLSSL,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
dataType = "string") String kafkaEventData,
@KafkaOutput(
name = "kafkaOutput",
topic = "output",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.PLAINTEXT
//protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info(kafkaEventData);
Message<String> message = MessageBuilder.withPayload(kafkaEventData).build();
String response = handleRequest(message, context);
output.setValue(response);
}
}

View File

@@ -0,0 +1,93 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.entity;
import com.fasterxml.jackson.annotation.JsonProperty;
public class KafkaEntity {
@JsonProperty("Offset")
private int offset;
@JsonProperty("Partition")
private int partition;
@JsonProperty("Timestamp")
private String timestamp;
@JsonProperty("Topic")
private String topic;
@JsonProperty("Key")
private String key;
@JsonProperty("Value")
private String value;
@JsonProperty("Headers")
private KafkaHeaders[] headers;
public int getOffset() {
return offset;
}
public void setOffset(int offset) {
this.offset = offset;
}
public int getPartition() {
return partition;
}
public void setPartition(int partition) {
this.partition = partition;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(String timestamp) {
this.timestamp = timestamp;
}
public String getTopic() {
return topic;
}
public void setTopic(String topic) {
this.topic = topic;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
public KafkaHeaders[] getHeaders() {
return headers;
}
public void setHeaders(KafkaHeaders[] headers) {
this.headers = headers;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
}

View File

@@ -0,0 +1,42 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package example.entity;
import com.fasterxml.jackson.annotation.JsonProperty;
public class KafkaHeaders {
@JsonProperty("Key")
private String key;
@JsonProperty("Value")
private String value;
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}
public String getValue() {
return value;
}
public void setValue(String value) {
this.value = value;
}
}

View File

@@ -0,0 +1,63 @@
version: '2.1'
services:
zookeeper:
image: zookeeper:3.4.9
hostname: zookeeper
ports:
- "2181:2181"
environment:
ZOO_MY_ID: 1
ZOO_PORT: 2181
ZOO_SERVERS: server.1=zookeeper:2888:3888
# volumes:
# - ./zk-single-kafka-single/zookeeper/data:/data
# - ./zk-single-kafka-single/zookeeper/datalog:/datalog
kafka1:
image: confluentinc/cp-kafka:7.2.2
hostname: kafka1
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_LISTENERS: LISTENER_DOCKER_INTERNAL://kafka1:19092,LISTENER_DOCKER_EXTERNAL://${DOCKER_HOST_IP:-127.0.0.1}:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: LISTENER_DOCKER_INTERNAL:PLAINTEXT,LISTENER_DOCKER_EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: LISTENER_DOCKER_INTERNAL
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_BROKER_ID: 1
KAFKA_LOG4J_LOGGERS: "kafka.controller=INFO,kafka.producer.async.DefaultEventHandler=INFO,state.change.logger=INFO"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
# volumes:
# - ./zk-single-kafka-single/kafka1/data:/var/lib/kafka/data
depends_on:
- zookeeper
init-kafka:
image: confluentinc/cp-kafka:7.2.2
depends_on:
- kafka1
entrypoint: [ '/bin/sh', '-c' ]
command: |
"
# blocks until kafka is reachable
kafka-topics --bootstrap-server kafka1:19092 --list
echo -e 'Creating kafka topics'
kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic trigger --replication-factor 1 --partitions 1
kafka-topics --bootstrap-server kafka1:19092 --create --if-not-exists --topic output --replication-factor 1 --partitions 1
echo -e 'Successfully created the following topics:'
kafka-topics --bootstrap-server kafka1:19092 --list
"
kconsole:
image: docker.redpanda.com/vectorized/console:latest
restart: on-failure
hostname: kconsole
ports:
- "8080:8080"
environment:
KAFKA_BROKERS: "kafka1:19092"
depends_on:
- kafka1