Add scdf-python-app. fix polyglot-python-task issue

This commit is contained in:
Christian Tzolov
2019-05-13 23:41:17 +02:00
parent 0d84299e61
commit 3e1628bd54
10 changed files with 255 additions and 2 deletions

View File

@@ -16,8 +16,10 @@ def get_cmd_arg(name):
for k, v in ((k.lstrip('-'), v) for k, v in (a.split('=') for a in sys.argv[1:])):
d[k].append(v)
return d[name][0]
if bool(d[name]):
return d[name][0]
else:
return None
def get_db_url():
"""Computes sqlalchemy connection URL

View File

@@ -0,0 +1,9 @@
FROM python:3.7.3-slim
RUN pip install kafka-python
RUN pip install flask
ADD /util/* /util/
ADD barista_app.py /
ENTRYPOINT ["python","/barista_app.py"]
CMD []

View File

@@ -0,0 +1,25 @@
Build docker image and push to Docker Hub.
```bash
docker build -t tzolov/scdf_python_app:0.1 .
docker push tzolov/scdf_python_app:0.1
```
NOTE: replace `tzolov` with your docker hub prefix.
Register the docker image as SCDF `app` application:
```bash
app register --type app --name barista-app --uri docker://tzolov/scdf_python_app:0.1
```
Build the Bar pipelines:
```bash
stream create --name orders --definition "customer: time > :orders" --deploy
stream create --name cold-drink-line --definition ":coldDrinks > cold-drinks: log" --deploy
stream create --name hot-drink-line --definition ":hotDrinks > hot-drinks: log" --deploy
stream create --name bar --definition "barista-app"
stream deploy --name bar --properties app.barista-app.spring.cloud.stream.bindings.orders.destination=orders,app.barista-app.spring.cloud.stream.bindings.hot.drink.destination=hotDrinks,app.barista-app.spring.cloud.stream.bindings.cold.drink.destination=coldDrinks
```
![alt text](./scdf-barista-python-polyglot.png "Logo Title Text 1")

View File

@@ -0,0 +1,71 @@
from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
from util.actuator import Actuator
from util.arguments import get_kafka_brokers, get_env_info, get_channel_topic
class Barista:
"""Barista continuously process input drink orders and servers hot drinks for the even order numbers and cold drink
for the ood orders.
The orders come through the `orders` input channel. The hot and cold drinks are served through either the
`hot.drink` or the `cold.drink` output channels.
:param info: Information about the app configuration.
:param kafka_brokers: Kafka brokers connection uri.
:param orders: Orders topic name.
:param hot_drinks: Hot drinks topic name.
:param cold_drinks: Cold drinks topic name.
"""
def __init__(self, info, kafka_brokers, orders, hot_drinks, cold_drinks):
self.kafka_brokers = kafka_brokers
self.orders_topic = orders
self.hot_drink_topic = hot_drinks
self.cold_drink_topic = cold_drinks
# Serve the liveliness and readiness probes via http server in a separate thread.
Actuator.start(port=8080, info=info)
# Ensure the output topics exist.
self.__create_topics_if_missing([self.orders_topic, self.hot_drink_topic, self.cold_drink_topic])
self.consumer = KafkaConsumer(self.orders_topic, bootstrap_servers=self.kafka_brokers)
self.producer = KafkaProducer(bootstrap_servers=self.kafka_brokers)
def __create_topics_if_missing(self, topic_names):
admin_client = KafkaAdminClient(bootstrap_servers=self.kafka_brokers, client_id='test')
for topic in topic_names:
try:
new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)
except TopicAlreadyExistsError:
print ('Topic: {} already exists!')
def process_orders(self):
"""
Continuously consumes orders form the input channel and send hot or cold drinks to the output channels.
The even order number stands for a hot drink request while an odd number stands for a cold drink request.
"""
while True:
for message in self.consumer:
if message.value is not None:
if self.is_even_order(message.value):
self.producer.send(self.hot_drink_topic, b'Serve Hot drink for order:' + message.value)
else:
self.producer.send(self.cold_drink_topic, b'Serve Cold drink for order:' + message.value)
@staticmethod
def is_even_order(value):
return int(value[-1:]) % 2 == 0
Barista(
get_env_info(),
get_kafka_brokers(),
get_channel_topic('orders'),
get_channel_topic('hot.drink'),
get_channel_topic('cold.drink')
).process_orders()

View File

@@ -0,0 +1,3 @@
kafka
kafka-python
flask

Binary file not shown.

After

Width:  |  Height:  |  Size: 73 KiB

View File

@@ -0,0 +1,11 @@
#
import time
from util.actuator import Actuator
from util.arguments import get_env_info
Actuator.start(9191, get_env_info())
#
print('OK')
time.sleep(10000)
print('STOPPED')

View File

@@ -0,0 +1,56 @@
import sys
import threading
from flask import Flask, Response
class Actuator:
"""Actuator is used to expose operational information about the running application, such as `health/liveliness`,
`info`, `env`, etc. It uses HTTP endpoints to enable us to interact with it.
The `/actuator/health` and `/actuator/info` handles the Kubernetes liveness and readiness probes requests.
Kubernetes expects HTTP 200 status code to consider the application live and ready.
:param port: the HTTP port used by the Actuator.
:param info_content: The text response to be returned by the /actuator/info endpoint.
"""
def __init__(self, port=8080, info_content='Info'):
self.http_app = Actuator.__create_http_app(info_content)
self.port = port
print(info_content)
sys.stdout.flush()
def __run(self):
self.http_app.run(port=self.port, host='0.0.0.0')
@staticmethod
def __create_http_app(info_description):
app = Flask(__name__)
app.debug = False
app.use_reloader = False
@app.route('/actuator/health')
def health():
return Response('Alive', status=200)
@app.route('/actuator/info')
def info():
return Response(info_description, status=200, content_type='text/plain')
return app
@staticmethod
def start(port=8080, info='Info'):
"""Starts the `Actuator` in a separate thread.
:param port: the HTTP port used by the Actuator. Defaults to 8080.
:param info: The text response to be returned by the /actuator/info endpoint.
"""
try:
thread = threading.Thread(target=Actuator(port, info).__run)
thread.setDaemon(True)
thread.start()
print('Actuator started!')
except KeyboardInterrupt:
sys.exit(0)

View File

@@ -0,0 +1,76 @@
import os
import sys
from collections import defaultdict
def get_cmd_arg(name):
"""Extracts argument value by name. (@author: Chris Schaefer)
Assumes the exec (default) spring-cloud-deployer-k8s argument passing mode.
Args:
name: argument name.
Returns:
value of the requested argument.
"""
d = defaultdict(list)
for k, v in ((k.lstrip('-'), v) for k, v in (a.split('=') for a in sys.argv[1:])):
d[k].append(v)
if bool(d[name]):
return d[name][0]
else:
return ''
def get_stream_app_label():
return get_cmd_arg('spring.cloud.dataflow.stream.app.label')
def get_stream_name():
return get_cmd_arg('spring.cloud.dataflow.stream.name')
def get_channel_topic(channel_name):
"""
For given channel name returns the message broker destinations (e.g. Kafka topics or RabbitMQ exchanges).
We adopt the Spring Cloud Stream using the following format:
spring.cloud.stream.bindings.<channelName>.destination=<value>.
The <channelName> represents the name of the channel being configured (for example, input or output).
:param channel_name: logical channel name as defined in the application.
:return: The target destination of a channel on the bound middleware (for example, the RabbitMQ exchange or Kafka
topic). If the channel is bound as a consumer, it could be bound to multiple destinations, and the
destination names can be specified as comma-separated String values.
"""
return get_cmd_arg('spring.cloud.stream.bindings.{}.destination'.format(channel_name))
def get_kafka_brokers():
return os.getenv('SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS', '')
def get_kafka_zk_nodes():
return os.getenv('SPRING_CLOUD_STREAM_KAFKA_BINDER_ZK_NODES', '')
def get_application_guid():
return os.getenv('SPRING_CLOUD_APPLICATION_GUID', '')
def get_application_group():
return os.getenv('SPRING_CLOUD_APPLICATION_GROUP', '')
def get_env_info():
props = ' stream-name={}\n app-name={}\n app-guid={}\n app-group={}\n kafka-brokers={}\n ' \
'kafka-zk={}\n'.format(get_stream_name(), get_stream_app_label(), get_application_guid(),
get_application_group(), get_kafka_brokers(), get_kafka_zk_nodes())
channels = ' Inputs:\n orders={}\n Outputs: \n hot.drink={}\n cold.drink={}\n'.format(
get_channel_topic('orders'), get_channel_topic('hot.drink'), get_channel_topic('cold.drink'))
args = '\n '.join(sys.argv)
envs = ''
# envs = '\n '.join(list(map(lambda k: '{}={}'.format(k, os.environ[k]), os.environ)))
return 'Properties\n{0}\nChannels\n{1}\nArguments\n {2}\n\nEnvironment\n {3}'.format(
props, channels, args, envs)