Resolve a gihub python-processor issue

This commit is contained in:
Christian Tzolov
2019-05-16 12:36:57 +02:00
parent 5669221fb0
commit 3b6cdc1417
7 changed files with 161 additions and 1 deletions

Submodule dataflow-website/recipes/polyglot/polyglot-python-processor deleted from f2434c39b7

View File

@@ -0,0 +1,12 @@
FROM springcloud/openjdk:latest
RUN apt-get update && apt-get install --no-install-recommends -y \
python-pip \
&& rm -rf /var/lib/apt/lists/*
RUN pip install kafka-python
COPY python_processor.py /processor/
COPY util/*.py /processor/util/
ENTRYPOINT ["python", "/processor/python_processor.py", "$@", "--"]

View File

@@ -0,0 +1,67 @@
Build docker image:
`$ docker build -t chrisjs/python-processor:latest .`
Push to Docker Hub if desired:
`$ docker push chrisjs/python-processor:latest`
NOTE: replace `chrisjs` with your docker hub user/org
Register a stream using this processor in between `http` and `log`:
```
dataflow:>app import --uri http://bit.ly/Einstein-SR2-stream-applications-kafka-docker
dataflow:>app register --type processor --name python-processor --uri docker://chrisjs/python-processor:latest
```
NOTE: replace `chrisjs` with your docker hub user/org
This example uses minikube as the target k8s env.
Get IP:
```
$ minikube ip
192.168.99.104
```
Watch logs:
```
$ kubectl logs -f <log pod name>
```
Post and log a string:
```
dataflow:>stream create --name test --definition "http --server.port=32123 | python-processor | log"
dataflow:>stream deploy test --properties "deployer.http.kubernetes.createNodePort=32123"
dataflow:>http post --target http://192.168.99.104:32123 --data "hello world"
> POST (text/plain) http://192.168.99.104:32123 hello world
> 202 ACCEPTED
```
Inspect logs for posted message:
```
INFO 1 --- [container-0-C-1] log-sink : hello world
```
Post and log a reversed string:
```
dataflow:>stream create --name test --definition "http --server.port=32123 | python-processor --reverestring=true | log"
dataflow:>stream deploy test --properties "deployer.http.kubernetes.createNodePort=32123"
dataflow:>http post --target http://192.168.99.104:32123 --data "hello world"
> POST (text/plain) http://192.168.99.104:32123 hello world
> 202 ACCEPTED
```
Inspect logs for posted message:
```
INFO 1 --- [container-0-C-1] log-sink : dlrow olleh
```

View File

@@ -0,0 +1,24 @@
#!/usr/bin/env python
import os
import sys
from kafka import KafkaConsumer, KafkaProducer
from util.http_status_server import HttpHealthServer
from util.task_args import get_kafka_binder_brokers, get_input_channel, get_output_channel, get_reverse_string
consumer = KafkaConsumer(get_input_channel(), bootstrap_servers=[get_kafka_binder_brokers()])
producer = KafkaProducer(bootstrap_servers=[get_kafka_binder_brokers()])
HttpHealthServer.run_thread()
while True:
for message in consumer:
output_message = message.value
reverse_string = get_reverse_string()
if reverse_string is not None and reverse_string.lower() == "true":
output_message = "".join(reversed(message.value))
producer.send(get_output_channel(), output_message)

View File

@@ -0,0 +1,25 @@
import threading
from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
class HttpHealthServer(BaseHTTPRequestHandler):
def do_GET(self):
if (self.path == '/actuator/health') or (self.path == '/actuator/info'):
self.send_ok_response()
else:
self.send_missing_response()
self.send_header('Content-type', 'text/html')
self.end_headers()
def send_ok_response(self):
self.send_response(200)
def send_missing_response(self):
self.send_response(404)
@staticmethod
def run_thread(port=8080):
http_server = HTTPServer(('', port), HttpHealthServer)
thread = threading.Thread(name='httpd_server', target=http_server.serve_forever)
thread.setDaemon(True)
thread.start()

View File

@@ -0,0 +1,33 @@
import os
import sys
from collections import defaultdict
def get_cmd_arg(name):
d = defaultdict(list)
for cmd_args in sys.argv[1:]:
cmd_arg = cmd_args.split('=')
if len(cmd_arg) == 2:
d[cmd_arg[0].lstrip('-')].append(cmd_arg[1])
if name in d:
return d[name][0]
else:
print('Unknown command line arg requested: {}'.format(name))
def get_env_var(name):
if name in os.environ:
return os.environ[name]
else:
print('Unknown environment variable requested: {}'.format(name))
def get_kafka_binder_brokers():
return get_env_var('SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS')
def get_input_channel():
return get_cmd_arg("spring.cloud.stream.bindings.input.destination")
def get_output_channel():
return get_cmd_arg("spring.cloud.stream.bindings.output.destination")
def get_reverse_string():
return get_cmd_arg("reverestring")