Adding new K8S AT for tcp-source | log

This commit is contained in:
Soby Chacko
2021-04-14 12:30:24 -04:00
parent e74db5c272
commit d0ab4358aa
7 changed files with 243 additions and 1 deletions

View File

@@ -0,0 +1,19 @@
apiVersion: v1
kind: Service
metadata:
name: log
labels:
app: log
component: log
type: stream-ats
spring-deployment-id: log
spec:
type: LoadBalancer
ports:
- port: 80
name: log-port
targetPort: 8080
protocol: TCP
selector:
app: log
component: log

View File

@@ -0,0 +1,53 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: log
labels:
app: log
type: stream-ats
component: log
spec:
replicas: 1
selector:
matchLabels:
app: log
template:
metadata:
labels:
app: log
component: log
spec:
containers:
- name: log
image: springcloudstream/log-sink-kafka:latest
ports:
- containerPort: 8080
env:
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
value: kafka:9092
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES
value: kafka-zk:2181
- name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION
value: tcp.out
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_REPLICATIONFACTOR
value: "1"
- name: LOG_EXPRESSION
value: "'From TCP Source: '.concat(payload)"
- name: LOGGING_FILE_NAME
value: log-sink.log
- name: MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE
value: logfile,health,info
- name: SPRING_CLOUD_STREAMAPP_SECURITY_ENABLED
value: "false"
livenessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 10
periodSeconds: 60
readinessProbe:
httpGet:
path: /actuator/info
port: 8080
initialDelaySeconds: 10
periodSeconds: 10

View File

@@ -0,0 +1,23 @@
apiVersion: v1
kind: Service
metadata:
name: tcp
labels:
app: tcp
component: tcp
type: stream-ats
spring-deployment-id: tcp
spec:
type: LoadBalancer
ports:
- port: 80
name: tcp-port-1
targetPort: 8080
protocol: TCP
- port: 1234
name: tcp-port-2
targetPort: 1234
protocol: TCP
selector:
app: tcp
component: tcp

View File

@@ -0,0 +1,53 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: tcp
labels:
app: tcp
type: stream-ats
component: tcp
spec:
replicas: 1
selector:
matchLabels:
app: tcp
template:
metadata:
labels:
app: tcp
component: tcp
spec:
containers:
- name: tcp
image: springcloudstream/tcp-source-kafka:latest
ports:
- containerPort: 8080
env:
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
value: kafka:9092
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES
value: kafka-zk:2181
- name: SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION
value: tcp.out
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_REPLICATIONFACTOR
value: "1"
- name: TCP_PORT
value: "1234"
- name: LOGGING_FILE_NAME
value: tcp-source.log
- name: MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE
value: logfile,health,info
- name: SPRING_CLOUD_STREAMAPP_SECURITY_ENABLED
value: "false"
livenessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 10
periodSeconds: 60
readinessProbe:
httpGet:
path: /actuator/info
port: 8080
initialDelaySeconds: 10
periodSeconds: 10

View File

@@ -130,6 +130,22 @@ function prepare_http_router_log_with_kafka_binder() {
curl -X POST -H "Content-Type: text/plain" --data "ijklmnop" $HTTP_SOURCE_SERVER_URI
}
function prepare_tcp_log_with_kafka_binder() {
kubectl create -f k8s-templates/tcp-log/tcp.yaml
kubectl create -f k8s-templates/tcp-log/tcp-svc-lb.yaml
kubectl create -f k8s-templates/tcp-log/log.yaml
kubectl create -f k8s-templates/tcp-log/log-svc-lb.yaml
TCP_SOURCE_SERVER_URI=http://$(kubectl get service tcp | awk '{print $4}' | grep -v EXTERNAL-IP)
LOG_SINK_SERVER_URI=http://$(kubectl get service log | awk '{print $4}' | grep -v EXTERNAL-IP)
TCP_SOURCE_SERVER_IP=$(kubectl get service tcp | awk '{print $4}' | grep -v EXTERNAL-IP)
$(wait_for_200 ${TCP_SOURCE_SERVER_URI}/actuator/logfile)
$(wait_for_200 ${LOG_SINK_SERVER_URI}/actuator/logfile)
}
function delete_acceptance_test_components() {
kubectl delete pod,deployment,rc,service -l type="stream-ats"
@@ -265,6 +281,27 @@ then
exit $BUILD_RETURN_VALUE
fi
prepare_tcp_log_with_kafka_binder
pushd ../spring-cloud-stream-acceptance-tests
../mvnw clean package -Dtest=TcpLogAcceptanceTests -Dmaven.test.skip=false -Dtcp.source.route=$TCP_SOURCE_SERVER_URI -Dlog.sink.route=$LOG_SINK_SERVER_URI -Dtcp.source.ip=$TCP_SOURCE_SERVER_IP
BUILD_RETURN_VALUE=$?
popd
delete_acceptance_test_components
if [ "$BUILD_RETURN_VALUE" != 0 ]
then
echo "Early exit due to test failure in tcp-log tests"
duration=$SECONDS
echo "Total time: Build took $(($duration / 60)) minutes and $(($duration % 60)) seconds to complete."
delete_acceptance_test_infra
exit $BUILD_RETURN_VALUE
fi
delete_acceptance_test_infra
duration=$SECONDS

View File

@@ -30,7 +30,7 @@ import java.util.stream.Stream;
*/
abstract class AbstractAcceptanceTests {
private static final Logger logger = LoggerFactory.getLogger(AbstractAcceptanceTests.class);
protected static final Logger logger = LoggerFactory.getLogger(AbstractAcceptanceTests.class);
boolean waitForLogEntry(boolean noBoot2, String app, String route, String... entries) {
return waitForLogEntryInResource(noBoot2, app, route, entries);

View File

@@ -0,0 +1,57 @@
/*
* Copyright 2021-2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sample.acceptance.tests;
import java.io.IOException;
import java.net.Socket;
import javax.net.SocketFactory;
import org.junit.Test;
import static org.junit.Assert.fail;
public class TcpLogAcceptanceTests extends AbstractAcceptanceTests {
@Test
public void testTcpSourceToLogSink() throws IOException {
String tcpSourceUrl = System.getProperty("tcp.source.route");
String logSinkUrl = System.getProperty("log.sink.route");
String tcpSourceIp = System.getProperty("tcp.source.ip");
boolean foundLogs = waitForLogEntry("TCP Source", tcpSourceUrl, "Started TcpSource");
if (!foundLogs) {
fail("Did not find the time source started logging message.");
}
foundLogs = waitForLogEntry("Log Sink", logSinkUrl, "Started LogSink");
if (!foundLogs) {
fail("Did not find the log sink started logging message.");
}
logger.info("Trying to open socket connection to: [" + tcpSourceIp + ":1234]");
Socket socket = SocketFactory.getDefault().createSocket(tcpSourceIp, 1234);
socket.getOutputStream().write(("Hello World from TCP Source!!" + "\r\n").getBytes());
socket.close();
foundLogs = waitForLogEntry("Log Sink", logSinkUrl, "From TCP Source: Hello World from TCP Source!!");
if (!foundLogs) {
fail("Did not find the tcp source messages in log sink");
}
}
}