diff --git a/dataflow-website/recipes/polyglot/polyglot-python-processor b/dataflow-website/recipes/polyglot/polyglot-python-processor deleted file mode 160000 index f2434c3..0000000 --- a/dataflow-website/recipes/polyglot/polyglot-python-processor +++ /dev/null @@ -1 +0,0 @@ -Subproject commit f2434c39b7a77b76204d19d188c7b662ce322900 diff --git a/dataflow-website/recipes/polyglot/polyglot-python-processor2/Dockerfile b/dataflow-website/recipes/polyglot/polyglot-python-processor2/Dockerfile new file mode 100644 index 0000000..fe0c3d7 --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-processor2/Dockerfile @@ -0,0 +1,12 @@ +FROM springcloud/openjdk:latest + +RUN apt-get update && apt-get install --no-install-recommends -y \ + python-pip \ + && rm -rf /var/lib/apt/lists/* + +RUN pip install kafka-python + +COPY python_processor.py /processor/ +COPY util/*.py /processor/util/ + +ENTRYPOINT ["python", "/processor/python_processor.py", "$@", "--"] diff --git a/dataflow-website/recipes/polyglot/polyglot-python-processor2/README.md b/dataflow-website/recipes/polyglot/polyglot-python-processor2/README.md new file mode 100644 index 0000000..c94e881 --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-processor2/README.md @@ -0,0 +1,67 @@ +Build docker image: + +`$ docker build -t chrisjs/python-processor:latest .` + +Push to Docker Hub if desired: + +`$ docker push chrisjs/python-processor:latest` + +NOTE: replace `chrisjs` with your docker hub user/org + + +Register a stream using this processor in between `http` and `log`: + +``` +dataflow:>app import --uri http://bit.ly/Einstein-SR2-stream-applications-kafka-docker +dataflow:>app register --type processor --name python-processor --uri docker://chrisjs/python-processor:latest +``` + +NOTE: replace `chrisjs` with your docker hub user/org + +This example uses minikube as the target k8s env. + +Get IP: + +``` +$ minikube ip +192.168.99.104 +``` + +Watch logs: + +``` +$ kubectl logs -f +``` + +Post and log a string: + +``` +dataflow:>stream create --name test --definition "http --server.port=32123 | python-processor | log" +dataflow:>stream deploy test --properties "deployer.http.kubernetes.createNodePort=32123" +dataflow:>http post --target http://192.168.99.104:32123 --data "hello world" +> POST (text/plain) http://192.168.99.104:32123 hello world +> 202 ACCEPTED +``` + +Inspect logs for posted message: + +``` +INFO 1 --- [container-0-C-1] log-sink : hello world +``` + +Post and log a reversed string: + +``` +dataflow:>stream create --name test --definition "http --server.port=32123 | python-processor --reverestring=true | log" +dataflow:>stream deploy test --properties "deployer.http.kubernetes.createNodePort=32123" +dataflow:>http post --target http://192.168.99.104:32123 --data "hello world" +> POST (text/plain) http://192.168.99.104:32123 hello world +> 202 ACCEPTED +``` + +Inspect logs for posted message: + +``` +INFO 1 --- [container-0-C-1] log-sink : dlrow olleh +``` + diff --git a/dataflow-website/recipes/polyglot/polyglot-python-processor2/python_processor.py b/dataflow-website/recipes/polyglot/polyglot-python-processor2/python_processor.py new file mode 100755 index 0000000..3a86f10 --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-processor2/python_processor.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python + +import os +import sys + +from kafka import KafkaConsumer, KafkaProducer +from util.http_status_server import HttpHealthServer +from util.task_args import get_kafka_binder_brokers, get_input_channel, get_output_channel, get_reverse_string + +consumer = KafkaConsumer(get_input_channel(), bootstrap_servers=[get_kafka_binder_brokers()]) +producer = KafkaProducer(bootstrap_servers=[get_kafka_binder_brokers()]) + +HttpHealthServer.run_thread() + +while True: + for message in consumer: + output_message = message.value + reverse_string = get_reverse_string() + + if reverse_string is not None and reverse_string.lower() == "true": + output_message = "".join(reversed(message.value)) + + producer.send(get_output_channel(), output_message) + diff --git a/dataflow-website/recipes/polyglot/polyglot-python-processor2/util/__init__.py b/dataflow-website/recipes/polyglot/polyglot-python-processor2/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dataflow-website/recipes/polyglot/polyglot-python-processor2/util/http_status_server.py b/dataflow-website/recipes/polyglot/polyglot-python-processor2/util/http_status_server.py new file mode 100644 index 0000000..519e432 --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-processor2/util/http_status_server.py @@ -0,0 +1,25 @@ +import threading +from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler + +class HttpHealthServer(BaseHTTPRequestHandler): + def do_GET(self): + if (self.path == '/actuator/health') or (self.path == '/actuator/info'): + self.send_ok_response() + else: + self.send_missing_response() + + self.send_header('Content-type', 'text/html') + self.end_headers() + + def send_ok_response(self): + self.send_response(200) + + def send_missing_response(self): + self.send_response(404) + + @staticmethod + def run_thread(port=8080): + http_server = HTTPServer(('', port), HttpHealthServer) + thread = threading.Thread(name='httpd_server', target=http_server.serve_forever) + thread.setDaemon(True) + thread.start() diff --git a/dataflow-website/recipes/polyglot/polyglot-python-processor2/util/task_args.py b/dataflow-website/recipes/polyglot/polyglot-python-processor2/util/task_args.py new file mode 100644 index 0000000..d7acc20 --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-processor2/util/task_args.py @@ -0,0 +1,33 @@ +import os +import sys +from collections import defaultdict + +def get_cmd_arg(name): + d = defaultdict(list) + for cmd_args in sys.argv[1:]: + cmd_arg = cmd_args.split('=') + if len(cmd_arg) == 2: + d[cmd_arg[0].lstrip('-')].append(cmd_arg[1]) + + if name in d: + return d[name][0] + else: + print('Unknown command line arg requested: {}'.format(name)) + +def get_env_var(name): + if name in os.environ: + return os.environ[name] + else: + print('Unknown environment variable requested: {}'.format(name)) + +def get_kafka_binder_brokers(): + return get_env_var('SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS') + +def get_input_channel(): + return get_cmd_arg("spring.cloud.stream.bindings.input.destination") + +def get_output_channel(): + return get_cmd_arg("spring.cloud.stream.bindings.output.destination") + +def get_reverse_string(): + return get_cmd_arg("reverestring")