New K8S AT for jdbc-log

This commit is contained in:
Soby Chacko
2021-04-14 16:16:54 -04:00
parent d0ab4358aa
commit 3fc1b9fd26
7 changed files with 202 additions and 14 deletions

View File

@@ -0,0 +1,19 @@
apiVersion: v1
kind: Service
metadata:
name: jdbc
labels:
app: jdbc
component: jdbc
type: stream-ats
spring-deployment-id: jdbc
spec:
type: LoadBalancer
ports:
- port: 80
name: jdbc-port
targetPort: 8080
protocol: TCP
selector:
app: jdbc
component: jdbc

View File

@@ -0,0 +1,57 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: jdbc
labels:
app: jdbc
type: stream-ats
component: jdbc
spec:
replicas: 1
selector:
matchLabels:
app: jdbc
template:
metadata:
labels:
app: jdbc
component: jdbc
spec:
containers:
- name: jdbc
image: springcloudstream/jdbc-source-kafka:latest
ports:
- containerPort: 8080
env:
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
value: kafka:9092
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES
value: kafka-zk:2181
- name: SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION
value: fromjdbcsource
- name: JDBC_SUPPLIER_QUERY
value: "select id, name, tag from test order by id"
- name: SPRING_DATASOURCE_INITIALIZATION-MODE
value: ALWAYS
- name: SPRING_DATASOURCE_SCHEMA
value: https://raw.githubusercontent.com/spring-cloud/spring-cloud-stream-acceptance-tests/master/db-scripts/sample-schema.sql
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_REPLICATIONFACTOR
value: "1"
- name: LOGGING_FILE_NAME
value: jdbc-source.log
- name: MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE
value: logfile,health,info
- name: SPRING_CLOUD_STREAMAPP_SECURITY_ENABLED
value: "false"
livenessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 10
periodSeconds: 60
readinessProbe:
httpGet:
path: /actuator/info
port: 8080
initialDelaySeconds: 10
periodSeconds: 10

View File

@@ -0,0 +1,19 @@
apiVersion: v1
kind: Service
metadata:
name: log
labels:
app: log
component: log
type: stream-ats
spring-deployment-id: log
spec:
type: LoadBalancer
ports:
- port: 80
name: log-port
targetPort: 8080
protocol: TCP
selector:
app: log
component: log

View File

@@ -0,0 +1,53 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: log
labels:
app: log
type: stream-ats
component: log
spec:
replicas: 1
selector:
matchLabels:
app: log
template:
metadata:
labels:
app: log
component: log
spec:
containers:
- name: log
image: springcloudstream/log-sink-kafka:latest
ports:
- containerPort: 8080
env:
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS
value: kafka:9092
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES
value: kafka-zk:2181
- name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION
value: fromjdbcsource
- name: SPRING_CLOUD_STREAM_KAFKA_BINDER_REPLICATIONFACTOR
value: "1"
- name: LOG_EXPRESSION
value: "'From JDBC Source: '.concat(payload)"
- name: LOGGING_FILE_NAME
value: log-sink.log
- name: MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE
value: logfile,health,info
- name: SPRING_CLOUD_STREAMAPP_SECURITY_ENABLED
value: "false"
livenessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 10
periodSeconds: 60
readinessProbe:
httpGet:
path: /actuator/info
port: 8080
initialDelaySeconds: 10
periodSeconds: 10

View File

@@ -146,6 +146,20 @@ function prepare_tcp_log_with_kafka_binder() {
$(wait_for_200 ${LOG_SINK_SERVER_URI}/actuator/logfile)
}
function prepare_jdbc_log_with_kafka_binder() {
kubectl create -f k8s-templates/jdbc-log/jdbc.yaml
kubectl create -f k8s-templates/jdbc-log/jdbc-svc-lb.yaml
kubectl create -f k8s-templates/jdbc-log/log.yaml
kubectl create -f k8s-templates/jdbc-log/log-svc-lb.yaml
JDBC_SOURCE_SERVER_URI=http://$(kubectl get service jdbc | awk '{print $4}' | grep -v EXTERNAL-IP)
LOG_SINK_SERVER_URI=http://$(kubectl get service log | awk '{print $4}' | grep -v EXTERNAL-IP)
$(wait_for_200 ${JDBC_SOURCE_SERVER_URI}/actuator/logfile)
$(wait_for_200 ${LOG_SINK_SERVER_URI}/actuator/logfile)
}
function delete_acceptance_test_components() {
kubectl delete pod,deployment,rc,service -l type="stream-ats"
@@ -302,6 +316,27 @@ then
exit $BUILD_RETURN_VALUE
fi
prepare_jdbc_log_with_kafka_binder
pushd ../spring-cloud-stream-acceptance-tests
../mvnw clean package -Dtest=JdbcLogAcceptanceTests -Dmaven.test.skip=false -Djdbc.source.route=$JDBC_SOURCE_SERVER_URI -Dlog.sink.route=$LOG_SINK_SERVER_URI
BUILD_RETURN_VALUE=$?
popd
delete_acceptance_test_components
if [ "$BUILD_RETURN_VALUE" != 0 ]
then
echo "Early exit due to test failure in tcp-log tests"
duration=$SECONDS
echo "Total time: Build took $(($duration / 60)) minutes and $(($duration % 60)) seconds to complete."
delete_acceptance_test_infra
exit $BUILD_RETURN_VALUE
fi
delete_acceptance_test_infra
duration=$SECONDS

View File

@@ -33,14 +33,18 @@ abstract class AbstractAcceptanceTests {
protected static final Logger logger = LoggerFactory.getLogger(AbstractAcceptanceTests.class);
boolean waitForLogEntry(boolean noBoot2, String app, String route, String... entries) {
return waitForLogEntryInResource(noBoot2, app, route, entries);
return waitForLogEntryInResource(noBoot2, app, route, false, entries);
}
boolean waitForLogEntry(String app, String route, String... entries) {
return waitForLogEntryInResource(false, app, route, entries);
return waitForLogEntryInResource(false, app, route, false, entries);
}
private String getLog(String url) {
boolean waitForLogEntry(String app, String route, boolean toLower, String... entries) {
return waitForLogEntryInResource(false, app, route, toLower, entries);
}
private String getLog(String url, boolean toLower) {
RestTemplate restTemplate = new RestTemplate();
String logFileUrl = String.format("%s/logfile", url);
String log = null;
@@ -56,10 +60,10 @@ abstract class AbstractAcceptanceTests {
} catch (Exception e) {
logger.warn("Error while trying to access logfile from '" + logFileUrl + "' due to : " + e);
}
return log;
return toLower ? log.toLowerCase() : log;
}
private boolean waitForLogEntryInResource(boolean noBoot2, String app, String route, String... entries) {
private boolean waitForLogEntryInResource(boolean noBoot2, String app, String route, boolean toLower, String... entries) {
logger.info("Looking for '" + StringUtils.arrayToCommaDelimitedString(entries) + "' in logfile for " + app + " - " + route);
long timeout = System.currentTimeMillis() + (60 * 1000);
boolean exists = false;
@@ -72,9 +76,10 @@ abstract class AbstractAcceptanceTests {
}
logger.info("Polling to get log file. Remaining poll time = "
+ (timeout - System.currentTimeMillis() + " ms."));
String log = noBoot2 ? getLog(route) : getLog(route + "/actuator");
String log = noBoot2 ? getLog(route, toLower) : getLog(route + "/actuator", toLower);
if (log != null) {
if (Stream.of(entries).allMatch(s -> log.contains(s))) {
if (Stream.of(entries).allMatch(log::contains)) {
exists = true;
}
}

View File

@@ -31,24 +31,24 @@ public class JdbcLogAcceptanceTests extends AbstractAcceptanceTests {
String jdbcSourceUrl = System.getProperty("jdbc.source.route");
String logSinkUrl = System.getProperty("log.sink.route");
boolean foundLogs = waitForLogEntry("JDBC Source", jdbcSourceUrl, "Started JdbcSource");
if(!foundLogs) {
if (!foundLogs) {
fail("Did not find the jdbc source started logging message.");
}
foundLogs = waitForLogEntry("Log Sink", logSinkUrl, "Started LogSink");
if(!foundLogs) {
if (!foundLogs) {
fail("Did not find the log sink started logging message.");
}
// Convert the expected output text to all lowercase due to various DB platforms may write it out in various cases.
verifyLogs(logSinkUrl, "{\"id\":1,\"name\":\"bob\",\"tag\":null}");
verifyLogs(logSinkUrl, "{\"id\":2,\"name\":\"jane\",\"tag\":null}");
verifyLogs(logSinkUrl, "{\"id\":3,\"name\":\"john\",\"tag\":null}");
verifyLogs(logSinkUrl, "{\"id\":1,\"name\":\"Bob\",\"tag\":null}");
verifyLogs(logSinkUrl, "{\"id\":2,\"name\":\"Jane\",\"tag\":null}");
verifyLogs(logSinkUrl, "{\"id\":3,\"name\":\"John\",\"tag\":null}");
}
void verifyLogs(String appUrl, String textToLookfor) {
boolean foundMessage = waitForLogEntry("Log Sink", appUrl, textToLookfor);
boolean foundMessage = waitForLogEntry("Log Sink", appUrl, true, textToLookfor);
if (!foundMessage) {
fail("Did not find the message - " + textToLookfor + " - in the logs");
}