diff --git a/custom-stream-apps/partitioning-consumer-sample-kafka/src/main/resources/application.yml b/custom-stream-apps/partitioning-consumer-sample-kafka/src/main/resources/application.yml index 4811154..70a8320 100644 --- a/custom-stream-apps/partitioning-consumer-sample-kafka/src/main/resources/application.yml +++ b/custom-stream-apps/partitioning-consumer-sample-kafka/src/main/resources/application.yml @@ -4,7 +4,7 @@ spring: kafka: binder: autoAddPartitions: true - minPartitionCount: 4 + minPartitionCount: 3 bindings: input: destination: partitioned.destination diff --git a/custom-stream-apps/partitioning-producer-sample-kafka/src/main/java/demo/producer/Producer.java b/custom-stream-apps/partitioning-producer-sample-kafka/src/main/java/demo/producer/Producer.java index a2b7afb..39d9d45 100644 --- a/custom-stream-apps/partitioning-producer-sample-kafka/src/main/java/demo/producer/Producer.java +++ b/custom-stream-apps/partitioning-producer-sample-kafka/src/main/java/demo/producer/Producer.java @@ -29,7 +29,7 @@ public class Producer { "f", "g", "h", //making them go to partition-0 by making a single char string "fo", "go", "ho", "foo", "goo", "hoo", - "fooz", "gooz", "hooz" + //"fooz", "gooz", "hooz" }; @InboundChannelAdapter(channel = Source.OUTPUT, poller = @Poller(fixedRate = "1000")) diff --git a/custom-stream-apps/partitioning-producer-sample-kafka/src/main/resources/application.yml b/custom-stream-apps/partitioning-producer-sample-kafka/src/main/resources/application.yml index 0284449..70939b4 100644 --- a/custom-stream-apps/partitioning-producer-sample-kafka/src/main/resources/application.yml +++ b/custom-stream-apps/partitioning-producer-sample-kafka/src/main/resources/application.yml @@ -7,5 +7,5 @@ spring: producer: #payload string length - 1 is the partition where it will get stored partition-key-expression: headers['partitionKey'] - 1 - partition-count: 4 + partition-count: 3 required-groups: myGroup #only applicable for rabbit diff --git a/k8s-templates/log-svc-lb.yaml b/k8s-templates/log-svc-lb.yaml index 04be8ec..25191e0 100644 --- a/k8s-templates/log-svc-lb.yaml +++ b/k8s-templates/log-svc-lb.yaml @@ -5,6 +5,7 @@ metadata: labels: app: log component: log + type: acceptance-tests spec: type: LoadBalancer ports: diff --git a/k8s-templates/time-svc-lb.yaml b/k8s-templates/time-svc-lb.yaml index 2e208e4..a995127 100644 --- a/k8s-templates/time-svc-lb.yaml +++ b/k8s-templates/time-svc-lb.yaml @@ -5,6 +5,7 @@ metadata: labels: app: time component: time + type: acceptance-tests spec: type: LoadBalancer ports: diff --git a/k8s-templates/uppercase-transformer-kafka-svc-lb.yaml b/k8s-templates/uppercase-transformer-kafka-svc-lb.yaml new file mode 100644 index 0000000..5b22f0c --- /dev/null +++ b/k8s-templates/uppercase-transformer-kafka-svc-lb.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: Service +metadata: + name: uppercase-transformer-kafka + labels: + app: uppercase-transformer-kafka + type: acceptance-tests + component: uppercase-transformer-kafka +spec: + type: LoadBalancer + ports: + - port: 80 + name: uppercase-transformer-kafka-port + targetPort: 8080 + protocol: TCP + selector: + app: uppercase-transformer-kafka + type: acceptance-tests + component: uppercase-transformer-kafka diff --git a/k8s-templates/uppercase-transformer-kafka.yaml b/k8s-templates/uppercase-transformer-kafka.yaml new file mode 100644 index 0000000..5307e7c --- /dev/null +++ b/k8s-templates/uppercase-transformer-kafka.yaml @@ -0,0 +1,43 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: uppercase-transformer-kafka + labels: + app: uppercase-transformer-kafka + type: acceptance-tests + component: uppercase-transformer-kafka +spec: + replicas: 1 + template: + metadata: + labels: + app: uppercase-transformer-kafka + type: acceptance-tests + component: uppercase-transformer-kafka + spec: + containers: + - name: uppercase-transformer + image: springcloudstream/uppercase-transformer-kafka:latest + ports: + - containerPort: 8080 + env: + - name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS + value: kafka:9092 + - name: SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES + value: kafka-zk:2181 + - name: LOGGING_FILE + value: uppercase-transformer-kafka.log + - name: MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE + value: logfile,health,info + livenessProbe: + httpGet: + path: /actuator/health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 60 + readinessProbe: + httpGet: + path: /actuator/info + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 10 diff --git a/runCFAcceptanceTests.sh b/runCFAcceptanceTests.sh index ad3dc53..0c292b7 100755 --- a/runCFAcceptanceTests.sh +++ b/runCFAcceptanceTests.sh @@ -135,7 +135,7 @@ echo "Prepare artifacts for ticktock testing" prepare_ticktock_13_with_rabbit_binder $1 $2 $3 $4 $5 $6 -./mvnw -P acceptance-tests clean package -Dtest=TickTockAcceptanceTests -Dmaven.test.skip=false -Dtime.source.route=$FULL_TICKTOCK_TIME_SOURCE_ROUTE -Dlog.sink.route=$FULL_TICKTOCK_LOG_SINK_ROUTE +./mvnw -P acceptance-tests clean package -Dtest=TickTock13AcceptanceTests -Dmaven.test.skip=false -Dtime.source.route=$FULL_TICKTOCK_TIME_SOURCE_ROUTE -Dlog.sink.route=$FULL_TICKTOCK_LOG_SINK_ROUTE BUILD_RETURN_VALUE=$? cf stop ticktock-time-source diff --git a/runK8SAcceptanceTests.sh b/runK8SAcceptanceTests.sh index 54bb7e8..2b8c41e 100755 --- a/runK8SAcceptanceTests.sh +++ b/runK8SAcceptanceTests.sh @@ -69,7 +69,26 @@ function prepare_ticktock_latest_with_kafka_binder() { FULL_TICKTOCK_LOG_SINK_ROUTE=http://$LOG_SINK_SERVER_URI } -function delete_ticktock_components() { +function prepare_uppercase_transformer_with_kafka_binder() { + + kubectl create -f k8s-templates/uppercase-transformer-kafka.yaml + kubectl create -f k8s-templates/uppercase-transformer-kafka-svc-lb.yaml + + READY_FOR_TESTS=1 + for i in $( seq 1 "${RETRIES}" ); do + UPPERCASE_TRANSFORMER_KAFKA_SERVER_URI=$(kubectl get service uppercase-transformer-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) + [ '' != $UPPERCASE_TRANSFORMER_KAFKA_SERVER_URI ] && READY_FOR_TESTS=0 && break + echo "Waiting for server external ip for time source and log sinks. Attempt #$i/${RETRIES}... will try again in [${WAIT_TIME}] seconds" >&2 + sleep "${WAIT_TIME}" + done + UPPERCASE_TRANSFORMER_KAFKA_SERVER_URI=$(kubectl get service uppercase-transformer-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) + + $(netcat_port ${UPPERCASE_TRANSFORMER_KAFKA_SERVER_URI} 80) + + FULL_UPPERCASE_ROUTE=http://$UPPERCASE_TRANSFORMER_KAFKA_SERVER_URI +} + +function delete_acceptance_test_components() { kubectl delete pod,deployment,rc,service -l type="acceptance-tests" } @@ -101,14 +120,10 @@ CLUSTER_VERSION=$4 prepare_ticktock_latest_with_kafka_binder ${PROJECT_NAME} ${CLUSTER_NAME} ${GKE_ZONE} ${CLUSTER_VERSION} -./mvnw -P acceptance-tests clean package -Dtest=TickTockAcceptanceTests -Dmaven.test.skip=false -Dtime.source.route=$FULL_TICKTOCK_TIME_SOURCE_ROUTE -Dlog.sink.route=$FULL_TICKTOCK_LOG_SINK_ROUTE +./mvnw -P acceptance-tests clean package -Dtest=TickTockLatestAcceptanceTests -Dmaven.test.skip=false -Dtime.source.route=$FULL_TICKTOCK_TIME_SOURCE_ROUTE -Dlog.sink.route=$FULL_TICKTOCK_LOG_SINK_ROUTE BUILD_RETURN_VALUE=$? -delete_ticktock_components - -delete_kafka_components - -delete_test_cluster ${CLUSTER_NAME} ${GKE_ZONE} ${PROJECT_NAME} +delete_acceptance_test_components if [ "$BUILD_RETURN_VALUE" != 0 ] then @@ -117,9 +132,40 @@ then echo "Total time: Build took $(($duration / 60)) minutes and $(($duration % 60)) seconds to complete." + delete_kafka_components + + delete_test_cluster ${CLUSTER_NAME} ${GKE_ZONE} ${PROJECT_NAME} + exit $BUILD_RETURN_VALUE fi + +prepare_uppercase_transformer_with_kafka_binder + +./mvnw -P acceptance-tests clean package -Dtest=UppercaseTransformerAcceptanceTests -Dmaven.test.skip=false -Duppercase.processor.route=$FULL_UPPERCASE_ROUTE + +BUILD_RETURN_VALUE=$? + +delete_acceptance_test_components + +if [ "$BUILD_RETURN_VALUE" != 0 ] +then + echo "Early exit due to test failure in ticktock tests" + duration=$SECONDS + + echo "Total time: Build took $(($duration / 60)) minutes and $(($duration % 60)) seconds to complete." + + delete_kafka_components + + delete_test_cluster ${CLUSTER_NAME} ${GKE_ZONE} ${PROJECT_NAME} + + exit $BUILD_RETURN_VALUE +fi + +delete_kafka_components + +delete_test_cluster ${CLUSTER_NAME} ${GKE_ZONE} ${PROJECT_NAME} + duration=$SECONDS echo "Cumulative Build Time Across All Tests: Build took $(($duration / 60)) minutes and $(($duration % 60)) seconds to complete." diff --git a/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/TickTockAcceptanceTests.java b/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/TickTock13AcceptanceTests.java similarity index 95% rename from spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/TickTockAcceptanceTests.java rename to spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/TickTock13AcceptanceTests.java index 05f733d..0031809 100644 --- a/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/TickTockAcceptanceTests.java +++ b/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/TickTock13AcceptanceTests.java @@ -23,7 +23,7 @@ import static org.junit.Assert.fail; /** * @author Soby Chacko */ -public class TickTockAcceptanceTests extends AbstractAcceptanceTests { +public class TickTock13AcceptanceTests extends AbstractAcceptanceTests { @Test public void testTickTock13Rabbit() { diff --git a/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/TickTockLatestAcceptanceTests.java b/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/TickTockLatestAcceptanceTests.java new file mode 100644 index 0000000..2e62e12 --- /dev/null +++ b/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/TickTockLatestAcceptanceTests.java @@ -0,0 +1,46 @@ +/* + * Copyright 2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sample.acceptance.tests; + +import org.junit.Test; + +import static org.junit.Assert.fail; + +public class TickTockLatestAcceptanceTests extends AbstractAcceptanceTests { + + @Test + public void testTickTockLatestRabbit() { + + String timeSourceUrl = System.getProperty("time.source.route"); + String logSinkUrl = System.getProperty("log.sink.route"); + + boolean foundLogs = waitForLogEntry("Time Source", timeSourceUrl, "Started TimeSource"); + if(!foundLogs) { + fail("Did not find the time source started logging message."); + } + + foundLogs = waitForLogEntry("Log Sink", logSinkUrl, "Started LogSink"); + if(!foundLogs) { + fail("Did not find the log sink started logging message."); + } + + foundLogs = waitForLogEntry("Log Sink", logSinkUrl, "TICKTOCK - TIMESTAMP:"); + if(!foundLogs) { + fail("Did not find the ticktock messages in log sink"); + } + } +}