diff --git a/kubernetes/k8s-templates/http-transfomer-log/http-svc.yaml b/kubernetes/k8s-templates/http-transfomer-log/http-svc.yaml new file mode 100644 index 0000000..9577d2b --- /dev/null +++ b/kubernetes/k8s-templates/http-transfomer-log/http-svc.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: Service +metadata: + name: http-source + labels: + app: http-source + component: http-source + type: stream-ats + spring-deployment-id: http-source +spec: + type: NodePort + ports: + - port: 80 + name: http-source-port + targetPort: 8080 + protocol: TCP + selector: + app: http-source + component: http-source diff --git a/kubernetes/k8s-templates/http-transfomer-log/http.yaml b/kubernetes/k8s-templates/http-transfomer-log/http.yaml new file mode 100644 index 0000000..94d6225 --- /dev/null +++ b/kubernetes/k8s-templates/http-transfomer-log/http.yaml @@ -0,0 +1,46 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: http-source + labels: + app: http-source + type: stream-ats + component: http-source +spec: + replicas: 1 + template: + metadata: + labels: + app: http-source + component: http-source + spec: + containers: + - name: http-source + image: springcloudstream/http-source-kafka:latest + ports: + - containerPort: 8080 + env: + - name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS + value: kafka:9092 + - name: SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES + value: kafka-zk:2181 + - name: SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION + value: http-out + - name: LOGGING_FILE + value: http-source.log + - name: MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE + value: logfile,health,info + - name: SPRING_CLOUD_STREAMAPP_SECURITY_ENABLED + value: "false" + livenessProbe: + httpGet: + path: /actuator/health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 60 + readinessProbe: + httpGet: + path: /actuator/info + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 10 diff --git a/kubernetes/k8s-templates/http-transfomer-log/log-svc-lb.yaml b/kubernetes/k8s-templates/http-transfomer-log/log-svc-lb.yaml new file mode 100644 index 0000000..e6541a5 --- /dev/null +++ b/kubernetes/k8s-templates/http-transfomer-log/log-svc-lb.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: Service +metadata: + name: log + labels: + app: log + component: log + type: stream-ats + spring-deployment-id: log +spec: + type: NodePort + ports: + - port: 80 + name: log-port + targetPort: 8080 + protocol: TCP + selector: + app: log + component: log diff --git a/kubernetes/k8s-templates/uppercase-transformer-kafka.yaml b/kubernetes/k8s-templates/http-transfomer-log/log.yaml similarity index 69% rename from kubernetes/k8s-templates/uppercase-transformer-kafka.yaml rename to kubernetes/k8s-templates/http-transfomer-log/log.yaml index 828ccbb..acc3d48 100644 --- a/kubernetes/k8s-templates/uppercase-transformer-kafka.yaml +++ b/kubernetes/k8s-templates/http-transfomer-log/log.yaml @@ -1,23 +1,22 @@ apiVersion: extensions/v1beta1 kind: Deployment metadata: - name: uppercase-transformer-kafka + name: log labels: - app: uppercase-transformer-kafka - type: acceptance-tests - component: uppercase-transformer-kafka + app: log + type: stream-ats + component: log spec: replicas: 1 template: metadata: labels: - app: uppercase-transformer-kafka - type: acceptance-tests - component: uppercase-transformer-kafka + app: log + component: log spec: containers: - - name: uppercase-transformer - image: springcloudstream/uppercase-transformer-kafka:latest + - name: log + image: springcloudstream/log-sink-kafka:latest ports: - containerPort: 8080 env: @@ -25,8 +24,12 @@ spec: value: kafka:9092 - name: SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES value: kafka-zk:2181 + - name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION + value: transformer-out + - name: LOG_EXPRESSION + value: "'From Transformer: '.concat(payload)" - name: LOGGING_FILE - value: uppercase-transformer-kafka.log + value: log-sink.log - name: MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE value: logfile,health,info - name: SPRING_CLOUD_STREAMAPP_SECURITY_ENABLED diff --git a/kubernetes/k8s-templates/http-transfomer-log/transform-processor-kafka-svc-lb.yaml b/kubernetes/k8s-templates/http-transfomer-log/transform-processor-kafka-svc-lb.yaml new file mode 100644 index 0000000..1e56a21 --- /dev/null +++ b/kubernetes/k8s-templates/http-transfomer-log/transform-processor-kafka-svc-lb.yaml @@ -0,0 +1,20 @@ +apiVersion: v1 +kind: Service +metadata: + name: transform-processor-kafka + labels: + app: transform-processor-kafka + type: stream-ats + component: transform-processor-kafka + spring-deployment-id: transform-processor +spec: + type: NodePort + ports: + - port: 80 + name: transform-processor-kafka-port + targetPort: 8080 + protocol: TCP + selector: + app: transform-processor-kafka + type: acceptance-tests + component: transform-processor-kafka diff --git a/kubernetes/k8s-templates/http-transfomer-log/transform-processor-kafka.yaml b/kubernetes/k8s-templates/http-transfomer-log/transform-processor-kafka.yaml new file mode 100644 index 0000000..9f7be2a --- /dev/null +++ b/kubernetes/k8s-templates/http-transfomer-log/transform-processor-kafka.yaml @@ -0,0 +1,51 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: transform-processor-kafka + labels: + app: transform-processor-kafka + type: stream-ats + component: transform-processor-kafka +spec: + replicas: 1 + template: + metadata: + labels: + app: transform-processor-kafka + type: acceptance-tests + component: transform-processor-kafka + spec: + containers: + - name: transform-processor + image: springcloudstream/transform-processor-kafka:latest + ports: + - containerPort: 8080 + env: + - name: SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS + value: kafka:9092 + - name: SPRING_CLOUD_STREAM_KAFKA_BINDER_ZKNODES + value: kafka-zk:2181 + - name: SPRING_CLOUD_STREAM_BINDINGS_INPUT_DESTINATION + value: http-out + - name: SPRING_CLOUD_STREAM_BINDINGS_OUTPUT_DESTINATION + value: transformer-out + - name: TRANSFORMER_EXPRESSION + value: payload.toUpperCase() + - name: LOGGING_FILE + value: transform-processor-kafka.log + - name: MANAGEMENT_ENDPOINTS_WEB_EXPOSURE_INCLUDE + value: logfile,health,info + - name: SPRING_CLOUD_STREAMAPP_SECURITY_ENABLED + value: "false" + livenessProbe: + httpGet: + path: /actuator/health + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 60 + readinessProbe: + httpGet: + path: /actuator/info + port: 8080 + initialDelaySeconds: 10 + periodSeconds: 10 diff --git a/kubernetes/k8s-templates/kafka-deployment.yaml b/kubernetes/k8s-templates/kafka-deployment.yaml index 7e17b3d..aa550cd 100644 --- a/kubernetes/k8s-templates/kafka-deployment.yaml +++ b/kubernetes/k8s-templates/kafka-deployment.yaml @@ -4,7 +4,7 @@ metadata: name: kafka-broker labels: app: kafka - type: acceptance-tests-binder + type: stream-ats-kafka component: kafka-broker spec: replicas: 1 diff --git a/kubernetes/k8s-templates/kafka-svc.yaml b/kubernetes/k8s-templates/kafka-svc.yaml index 8126480..c8303d4 100644 --- a/kubernetes/k8s-templates/kafka-svc.yaml +++ b/kubernetes/k8s-templates/kafka-svc.yaml @@ -5,6 +5,7 @@ metadata: labels: app: kafka component: kafka-broker + type: stream-ats-kafka spec: ports: - port: 9092 diff --git a/kubernetes/k8s-templates/kafka-zk-deployment.yaml b/kubernetes/k8s-templates/kafka-zk-deployment.yaml index 812dfd4..d0355a5 100644 --- a/kubernetes/k8s-templates/kafka-zk-deployment.yaml +++ b/kubernetes/k8s-templates/kafka-zk-deployment.yaml @@ -4,7 +4,7 @@ metadata: name: kafka-zk labels: app: kafka - type: acceptance-tests-binder + type: stream-ats-kafka component: kafka-zk spec: replicas: 1 diff --git a/kubernetes/k8s-templates/kafka-zk-svc.yaml b/kubernetes/k8s-templates/kafka-zk-svc.yaml index 1215ab6..7d2b15c 100644 --- a/kubernetes/k8s-templates/kafka-zk-svc.yaml +++ b/kubernetes/k8s-templates/kafka-zk-svc.yaml @@ -5,6 +5,7 @@ metadata: labels: app: kafka component: kafka-zk + type: stream-ats-kafka spec: ports: - name: client diff --git a/kubernetes/k8s-templates/log-svc-lb.yaml b/kubernetes/k8s-templates/log-svc-lb.yaml index 25191e0..e6541a5 100644 --- a/kubernetes/k8s-templates/log-svc-lb.yaml +++ b/kubernetes/k8s-templates/log-svc-lb.yaml @@ -5,9 +5,10 @@ metadata: labels: app: log component: log - type: acceptance-tests + type: stream-ats + spring-deployment-id: log spec: - type: LoadBalancer + type: NodePort ports: - port: 80 name: log-port diff --git a/kubernetes/k8s-templates/log.yaml b/kubernetes/k8s-templates/log.yaml index f1beeb1..3b5c096 100644 --- a/kubernetes/k8s-templates/log.yaml +++ b/kubernetes/k8s-templates/log.yaml @@ -4,7 +4,7 @@ metadata: name: log labels: app: log - type: acceptance-tests + type: stream-ats component: log spec: replicas: 1 diff --git a/kubernetes/k8s-templates/time-svc-lb.yaml b/kubernetes/k8s-templates/time-svc-lb.yaml index a995127..ea407ff 100644 --- a/kubernetes/k8s-templates/time-svc-lb.yaml +++ b/kubernetes/k8s-templates/time-svc-lb.yaml @@ -5,9 +5,10 @@ metadata: labels: app: time component: time - type: acceptance-tests + type: stream-ats + spring-deployment-id: time spec: - type: LoadBalancer + type: NodePort ports: - port: 80 name: time-port diff --git a/kubernetes/k8s-templates/time.yaml b/kubernetes/k8s-templates/time.yaml index 0e6a63a..ce67a12 100644 --- a/kubernetes/k8s-templates/time.yaml +++ b/kubernetes/k8s-templates/time.yaml @@ -4,7 +4,7 @@ metadata: name: time labels: app: time - type: acceptance-tests + type: stream-ats component: time spec: replicas: 1 diff --git a/kubernetes/k8s-templates/uppercase-transformer-kafka-svc-lb.yaml b/kubernetes/k8s-templates/uppercase-transformer-kafka-svc-lb.yaml deleted file mode 100644 index 5b22f0c..0000000 --- a/kubernetes/k8s-templates/uppercase-transformer-kafka-svc-lb.yaml +++ /dev/null @@ -1,19 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: uppercase-transformer-kafka - labels: - app: uppercase-transformer-kafka - type: acceptance-tests - component: uppercase-transformer-kafka -spec: - type: LoadBalancer - ports: - - port: 80 - name: uppercase-transformer-kafka-port - targetPort: 8080 - protocol: TCP - selector: - app: uppercase-transformer-kafka - type: acceptance-tests - component: uppercase-transformer-kafka diff --git a/kubernetes/runK8SAcceptanceTests.sh b/kubernetes/runK8SAcceptanceTests.sh index 0865128..be7128e 100755 --- a/kubernetes/runK8SAcceptanceTests.sh +++ b/kubernetes/runK8SAcceptanceTests.sh @@ -4,7 +4,8 @@ # First argument is GKE Project # Second argument is GKE Cluster # Third argument is GKE Zone -# Fourth argument is GKE Cluster version +# Fourth arg namespace +# Fifth arg is base domain pushd () { command pushd "$@" > /dev/null @@ -14,137 +15,101 @@ popd () { command popd "$@" > /dev/null } -function netcat_port() { - local READY_FOR_TESTS=1 - for i in $( seq 1 "${RETRIES}" ); do - nc -w1 ${1} $2 &2 - sleep "${WAIT_TIME}" - done - return ${READY_FOR_TESTS} +function wait_for_200 { + local READY_FOR_TESTS=1 + for i in $( seq 1 "${RETRIES}" ); do + STATUS=$(curl -s -o /dev/null -w '%{http_code}' ${1}) + if [ $STATUS -eq 200 ]; then + READY_FOR_TESTS=0 + break + else + echo "Failed to connect to ${1} with status code: $STATUS. Attempt #$i/${RETRIES}... will try again in [${WAIT_TIME}] seconds" >&2 + sleep "${WAIT_TIME}" + fi + done + return ${READY_FOR_TESTS} } function prepare_ticktock_latest_with_kafka_binder() { - - grep $CLUSTER_NAME ~/.kube/config - grep_result=$? - if [ $grep_result = 0 ]; then - kubectl config delete-context $CLUSTER_NAME - fi - - gcloud container --project ${PROJECT_NAME} clusters create ${CLUSTER_NAME} --zone ${GKE_ZONE} --machine-type "custom-2-4096" \ - --cluster-version ${CLUSTER_VERSION} --num-nodes "2" --image-type "COS" --disk-size "25" --network "default" \ - --enable-cloud-logging --enable-cloud-monitoring \ - --scopes "https://www.googleapis.com/auth/compute","https://www.googleapis.com/auth/devstorage.read_only","https://www.googleapis.com/auth/logging.write","https://www.googleapis.com/auth/monitoring","https://www.googleapis.com/auth/servicecontrol","https://www.googleapis.com/auth/service.management.readonly","https://www.googleapis.com/auth/trace.append" - - gcloud container clusters get-credentials ${CLUSTER_NAME} --zone ${GKE_ZONE} --project ${PROJECT_NAME} - - kubectl create -f k8s-templates/kafka-zk-deployment.yaml - kubectl create -f k8s-templates/kafka-zk-svc.yaml - - kubectl create -f k8s-templates/kafka-deployment.yaml - kubectl create -f k8s-templates/kafka-svc.yaml - kubectl create -f k8s-templates/time.yaml kubectl create -f k8s-templates/time-svc-lb.yaml kubectl create -f k8s-templates/log.yaml kubectl create -f k8s-templates/log-svc-lb.yaml - READY_FOR_TESTS=1 - for i in $( seq 1 "${RETRIES}" ); do - TIME_SOURCE_SERVER_URI=$(kubectl get service time | awk '{print $4}' | grep -v EXTERNAL-IP) - LOG_SINK_SERVER_URI=$(kubectl get service log | awk '{print $4}' | grep -v EXTERNAL-IP) - [ '' != $TIME_SOURCE_SERVER_URI ] && [ '' != $LOG_SINK_SERVER_URI ] && READY_FOR_TESTS=0 && break - echo "Waiting for server external ip for time source and log sinks. Attempt #$i/${RETRIES}... will try again in [${WAIT_TIME}] seconds" >&2 - sleep "${WAIT_TIME}" - done - TIME_SOURCE_SERVER_URI=$(kubectl get service time | awk '{print $4}' | grep -v EXTERNAL-IP) - LOG_SINK_SERVER_URI=$(kubectl get service log | awk '{print $4}' | grep -v EXTERNAL-IP) + TIME_SOURCE_SERVER_URI=https://time.${CLUSTER_NAME}.${BASE_DOMAIN} + LOG_SINK_SERVER_URI=https://log.${CLUSTER_NAME}.${BASE_DOMAIN} - $(netcat_port ${TIME_SOURCE_SERVER_URI} 80) - $(netcat_port ${LOG_SINK_SERVER_URI} 80) - - FULL_TICKTOCK_TIME_SOURCE_ROUTE=http://$TIME_SOURCE_SERVER_URI - FULL_TICKTOCK_LOG_SINK_ROUTE=http://$LOG_SINK_SERVER_URI + $(wait_for_200 ${TIME_SOURCE_SERVER_URI}/actuator/logfile) + $(wait_for_200 ${LOG_SINK_SERVER_URI}/actuator/logfile) } -function prepare_uppercase_transformer_with_kafka_binder() { +function prepare_http_transform_log_with_kafka_binder() { - kubectl create -f k8s-templates/uppercase-transformer-kafka.yaml - kubectl create -f k8s-templates/uppercase-transformer-kafka-svc-lb.yaml + kubectl create -f k8s-templates/http-transfomer-log/http.yaml + kubectl create -f k8s-templates/http-transfomer-log/http-svc.yaml + kubectl create -f k8s-templates/http-transfomer-log/transform-processor-kafka.yaml + kubectl create -f k8s-templates/http-transfomer-log/transform-processor-kafka-svc-lb.yaml + kubectl create -f k8s-templates/http-transfomer-log/log.yaml + kubectl create -f k8s-templates/http-transfomer-log/log-svc-lb.yaml - READY_FOR_TESTS=1 - for i in $( seq 1 "${RETRIES}" ); do - UPPERCASE_TRANSFORMER_KAFKA_SERVER_URI=$(kubectl get service uppercase-transformer-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) - [ '' != $UPPERCASE_TRANSFORMER_KAFKA_SERVER_URI ] && READY_FOR_TESTS=0 && break - echo "Waiting for server external ip for time source and log sinks. Attempt #$i/${RETRIES}... will try again in [${WAIT_TIME}] seconds" >&2 - sleep "${WAIT_TIME}" - done - UPPERCASE_TRANSFORMER_KAFKA_SERVER_URI=$(kubectl get service uppercase-transformer-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) + HTTP_SOURCE_SERVER_URI=https://http-source.${CLUSTER_NAME}.${BASE_DOMAIN} + TRANSFORMER_PROCESSOR_SERVER_URI=https://transform-processor-kafka.${CLUSTER_NAME}.${BASE_DOMAIN} + LOG_SINK_SERVER_URI=https://log.${CLUSTER_NAME}.${BASE_DOMAIN} - $(netcat_port ${UPPERCASE_TRANSFORMER_KAFKA_SERVER_URI} 80) + $(wait_for_200 ${HTTP_SOURCE_SERVER_URI}/actuator/logfile) + $(wait_for_200 ${TRANSFORMER_PROCESSOR_SERVER_URI}/actuator/logfile) + $(wait_for_200 ${LOG_SINK_SERVER_URI}/actuator/logfile) - FULL_UPPERCASE_ROUTE=http://$UPPERCASE_TRANSFORMER_KAFKA_SERVER_URI -} - -function prepare_partitioning_test_with_kafka_binder() { - - kubectl create -f k8s-templates/partitioning-consumer1-sample-kafka.yaml - kubectl create -f k8s-templates/partitioning-consumer1-sample-kafka-svc-lb.yaml - - kubectl create -f k8s-templates/partitioning-consumer2-sample-kafka.yaml - kubectl create -f k8s-templates/partitioning-consumer2-sample-kafka-svc-lb.yaml - - kubectl create -f k8s-templates/partitioning-consumer3-sample-kafka.yaml - kubectl create -f k8s-templates/partitioning-consumer3-sample-kafka-svc-lb.yaml - - kubectl create -f k8s-templates/partitioning-producer-sample-kafka.yaml - kubectl create -f k8s-templates/partitioning-producer-sample-kafka-svc-lb.yaml - - - READY_FOR_TESTS=1 - for i in $( seq 1 "${RETRIES}" ); do - PARTITIONING_PRODUCER_SERVER_URI=$(kubectl get service partitioning-producer-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) - PARTITIONING_CONSUMER1_SERVER_URI=$(kubectl get service partitioning-consumer1-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) - PARTITIONING_CONSUMER2_SERVER_URI=$(kubectl get service partitioning-consumer2-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) - PARTITIONING_CONSUMER3_SERVER_URI=$(kubectl get service partitioning-consumer3-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) - [ '' != $PARTITIONING_PRODUCER_SERVER_URI ] && [ '' != $PARTITIONING_CONSUMER1_SERVER_URI ] && [ '' != $PARTITIONING_CONSUMER2_SERVER_URI ] && [ '' != $PARTITIONING_CONSUMER3_SERVER_URI ] && READY_FOR_TESTS=0 && break - echo "Waiting for server external ip for partitioning producer/consumer apps. Attempt #$i/${RETRIES}... will try again in [${WAIT_TIME}] seconds" >&2 - sleep "${WAIT_TIME}" - done - - PARTITIONING_PRODUCER_SERVER_URI=$(kubectl get service partitioning-producer-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) - PARTITIONING_CONSUMER1_SERVER_URI=$(kubectl get service partitioning-consumer1-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) - PARTITIONING_CONSUMER2_SERVER_URI=$(kubectl get service partitioning-consumer2-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) - PARTITIONING_CONSUMER3_SERVER_URI=$(kubectl get service partitioning-consumer3-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) - - $(netcat_port ${PARTITIONING_PRODUCER_SERVER_URI} 80) - $(netcat_port ${PARTITIONING_CONSUMER1_SERVER_URI} 80) - $(netcat_port ${PARTITIONING_CONSUMER2_SERVER_URI} 80) - $(netcat_port ${PARTITIONING_CONSUMER3_SERVER_URI} 80) - - FULL_PARTITIONING_PRODUCER_ROUTE=http://$PARTITIONING_PRODUCER_SERVER_URI - FULL_PARTITIONING_CONSUMER1_ROUTE=http://$PARTITIONING_CONSUMER1_SERVER_URI - FULL_PARTITIONING_CONSUMER2_ROUTE=http://$PARTITIONING_CONSUMER2_SERVER_URI - FULL_PARTITIONING_CONSUMER3_ROUTE=http://$PARTITIONING_CONSUMER3_SERVER_URI + curl -X POST -H "Content-Type: text/plain" --data "foobar" $HTTP_SOURCE_SERVER_URI } +# +#function prepare_partitioning_test_with_kafka_binder() { +# +# kubectl create -f k8s-templates/partitioning-consumer1-sample-kafka.yaml +# kubectl create -f k8s-templates/partitioning-consumer1-sample-kafka-svc-lb.yaml +# +# kubectl create -f k8s-templates/partitioning-consumer2-sample-kafka.yaml +# kubectl create -f k8s-templates/partitioning-consumer2-sample-kafka-svc-lb.yaml +# +# kubectl create -f k8s-templates/partitioning-consumer3-sample-kafka.yaml +# kubectl create -f k8s-templates/partitioning-consumer3-sample-kafka-svc-lb.yaml +# +# kubectl create -f k8s-templates/partitioning-producer-sample-kafka.yaml +# kubectl create -f k8s-templates/partitioning-producer-sample-kafka-svc-lb.yaml +# +# +# READY_FOR_TESTS=1 +# for i in $( seq 1 "${RETRIES}" ); do +# PARTITIONING_PRODUCER_SERVER_URI=$(kubectl get service partitioning-producer-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) +# PARTITIONING_CONSUMER1_SERVER_URI=$(kubectl get service partitioning-consumer1-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) +# PARTITIONING_CONSUMER2_SERVER_URI=$(kubectl get service partitioning-consumer2-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) +# PARTITIONING_CONSUMER3_SERVER_URI=$(kubectl get service partitioning-consumer3-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) +# [ '' != $PARTITIONING_PRODUCER_SERVER_URI ] && [ '' != $PARTITIONING_CONSUMER1_SERVER_URI ] && [ '' != $PARTITIONING_CONSUMER2_SERVER_URI ] && [ '' != $PARTITIONING_CONSUMER3_SERVER_URI ] && READY_FOR_TESTS=0 && break +# echo "Waiting for server external ip for partitioning producer/consumer apps. Attempt #$i/${RETRIES}... will try again in [${WAIT_TIME}] seconds" >&2 +# sleep "${WAIT_TIME}" +# done +# +# PARTITIONING_PRODUCER_SERVER_URI=$(kubectl get service partitioning-producer-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) +# PARTITIONING_CONSUMER1_SERVER_URI=$(kubectl get service partitioning-consumer1-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) +# PARTITIONING_CONSUMER2_SERVER_URI=$(kubectl get service partitioning-consumer2-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) +# PARTITIONING_CONSUMER3_SERVER_URI=$(kubectl get service partitioning-consumer3-sample-kafka | awk '{print $4}' | grep -v EXTERNAL-IP) +# +# $(netcat_port ${PARTITIONING_PRODUCER_SERVER_URI} 80) +# $(netcat_port ${PARTITIONING_CONSUMER1_SERVER_URI} 80) +# $(netcat_port ${PARTITIONING_CONSUMER2_SERVER_URI} 80) +# $(netcat_port ${PARTITIONING_CONSUMER3_SERVER_URI} 80) +# +# FULL_PARTITIONING_PRODUCER_ROUTE=http://$PARTITIONING_PRODUCER_SERVER_URI +# FULL_PARTITIONING_CONSUMER1_ROUTE=http://$PARTITIONING_CONSUMER1_SERVER_URI +# FULL_PARTITIONING_CONSUMER2_ROUTE=http://$PARTITIONING_CONSUMER2_SERVER_URI +# FULL_PARTITIONING_CONSUMER3_ROUTE=http://$PARTITIONING_CONSUMER3_SERVER_URI +#} function delete_acceptance_test_components() { - kubectl delete pod,deployment,rc,service -l type="acceptance-tests" -} - -function delete_kafka_components() { - - kubectl delete pod,deployment,rc,service -l app="kafka" -} - -function delete_test_cluster() { - - gcloud container clusters delete ${CLUSTER_NAME} --zone ${GKE_ZONE} --project ${PROJECT_NAME} --quiet - + kubectl delete pod,deployment,rc,service -l type="stream-ats" } #Main script starting @@ -157,13 +122,26 @@ RETRIES="${RETRIES:-60}" PROJECT_NAME=$1 CLUSTER_NAME=$2 GKE_ZONE=$3 -CLUSTER_VERSION=$4 +NAMESPACE=$4 +BASE_DOMAIN=$5 -prepare_ticktock_latest_with_kafka_binder ${PROJECT_NAME} ${CLUSTER_NAME} ${GKE_ZONE} ${CLUSTER_VERSION} +gcloud container clusters get-credentials ${CLUSTER_NAME} --zone ${GKE_ZONE} --project ${PROJECT_NAME} + +C_TMP=$(kubectl config get-contexts | grep ${CLUSTER_NAME} | awk '{print $2}') +kubectl config use-context $C_TMP +kubectl config set-context $C_TMP --namespace ${NAMESPACE} + +kubectl create -f k8s-templates/kafka-zk-deployment.yaml +kubectl create -f k8s-templates/kafka-zk-svc.yaml + +kubectl create -f k8s-templates/kafka-deployment.yaml +kubectl create -f k8s-templates/kafka-svc.yaml + +prepare_ticktock_latest_with_kafka_binder pushd ../spring-cloud-stream-acceptance-tests -../mvnw clean package -Dtest=TickTockLatestAcceptanceTests -Dmaven.test.skip=false -Dtime.source.route=$FULL_TICKTOCK_TIME_SOURCE_ROUTE -Dlog.sink.route=$FULL_TICKTOCK_LOG_SINK_ROUTE +../mvnw clean package -Dtest=TickTockLatestAcceptanceTests -Dmaven.test.skip=false -Dtime.source.route=$TIME_SOURCE_SERVER_URI -Dlog.sink.route=$LOG_SINK_SERVER_URI BUILD_RETURN_VALUE=$? popd @@ -177,18 +155,14 @@ then echo "Total time: Build took $(($duration / 60)) minutes and $(($duration % 60)) seconds to complete." - delete_kafka_components - sleep 60 - delete_test_cluster ${CLUSTER_NAME} ${GKE_ZONE} ${PROJECT_NAME} - exit $BUILD_RETURN_VALUE fi -prepare_uppercase_transformer_with_kafka_binder +prepare_http_transform_log_with_kafka_binder pushd ../spring-cloud-stream-acceptance-tests -../mvnw clean package -Dtest=UppercaseTransformerAcceptanceTests -Dmaven.test.skip=false -Duppercase.processor.route=$FULL_UPPERCASE_ROUTE +../mvnw clean package -Dtest=HttpTransformerLogAcceptanceTests -Dmaven.test.skip=false -Dhttp.source.route=$HTTP_SOURCE_SERVER_URI -Dtransformer.processor.route=$TRANSFORMER_PROCESSOR_SERVER_URI -Dlog.sink.route=$LOG_SINK_SERVER_URI BUILD_RETURN_VALUE=$? popd @@ -234,9 +208,10 @@ fi # exit $BUILD_RETURN_VALUE #fi -delete_kafka_components -sleep 60 -delete_test_cluster ${CLUSTER_NAME} ${GKE_ZONE} ${PROJECT_NAME} +#delete_kafka_components +#delete_test_cluster ${CLUSTER_NAME} ${GKE_ZONE} ${PROJECT_NAME} + +kubectl delete pod,deployment,rc,service -l type=stream-ats-kafka duration=$SECONDS diff --git a/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/HttpTransformerLogAcceptanceTests.java b/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/HttpTransformerLogAcceptanceTests.java new file mode 100644 index 0000000..2db2904 --- /dev/null +++ b/spring-cloud-stream-acceptance-tests/src/test/java/sample/acceptance/tests/HttpTransformerLogAcceptanceTests.java @@ -0,0 +1,37 @@ +package sample.acceptance.tests; + +import org.junit.Test; + +import static org.junit.Assert.fail; + +public class HttpTransformerLogAcceptanceTests extends AbstractAcceptanceTests { + + @Test + public void testUppercaseTransformerRabbit() { + + String httpSourceUrl = System.getProperty("http.source.route"); + String transformerProcessorUrl = System.getProperty("transformer.processor.route"); + String logSinkUrl = System.getProperty("log.sink.route"); + + boolean foundLogs = waitForLogEntry("HTTP Source", httpSourceUrl, "Started HttpSourceKafkaApplication"); + if(!foundLogs) { + fail("Did not find the HttpSourceKafkaApplication started logging messages."); + } + + foundLogs = waitForLogEntry("Transform Processor", transformerProcessorUrl, "Started TransformProcessorKafkaApplication"); + if(!foundLogs) { + fail("Did not find the TransformProcessorKafkaApplication started logging messages."); + } + + foundLogs = waitForLogEntry("Log Sink", logSinkUrl, "Started LogSinkKafkaApplication"); + if(!foundLogs) { + fail("Did not find the log sink started logging message."); + } + + foundLogs = waitForLogEntry("Uppercase Transformer", logSinkUrl, "From Transformer: FOOBAR"); + if(!foundLogs) { + fail("Did not find the logging messages."); + } + + } +}