diff --git a/dataflow-website/recipes/polyglot/polyglot-python-task/util/task_args.py b/dataflow-website/recipes/polyglot/polyglot-python-task/util/task_args.py index 2b4fd73..81bf50b 100644 --- a/dataflow-website/recipes/polyglot/polyglot-python-task/util/task_args.py +++ b/dataflow-website/recipes/polyglot/polyglot-python-task/util/task_args.py @@ -16,8 +16,10 @@ def get_cmd_arg(name): for k, v in ((k.lstrip('-'), v) for k, v in (a.split('=') for a in sys.argv[1:])): d[k].append(v) - return d[name][0] - + if bool(d[name]): + return d[name][0] + else: + return None def get_db_url(): """Computes sqlalchemy connection URL diff --git a/dataflow-website/recipes/polyglot/scdf-python-app/Dockerfile b/dataflow-website/recipes/polyglot/scdf-python-app/Dockerfile new file mode 100644 index 0000000..128c72a --- /dev/null +++ b/dataflow-website/recipes/polyglot/scdf-python-app/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.7.3-slim + +RUN pip install kafka-python +RUN pip install flask + +ADD /util/* /util/ +ADD barista_app.py / +ENTRYPOINT ["python","/barista_app.py"] +CMD [] \ No newline at end of file diff --git a/dataflow-website/recipes/polyglot/scdf-python-app/README.md b/dataflow-website/recipes/polyglot/scdf-python-app/README.md new file mode 100644 index 0000000..9c49584 --- /dev/null +++ b/dataflow-website/recipes/polyglot/scdf-python-app/README.md @@ -0,0 +1,25 @@ + +Build docker image and push to Docker Hub. +```bash +docker build -t tzolov/scdf_python_app:0.1 . +docker push tzolov/scdf_python_app:0.1 +``` +NOTE: replace `tzolov` with your docker hub prefix. + +Register the docker image as SCDF `app` application: +```bash +app register --type app --name barista-app --uri docker://tzolov/scdf_python_app:0.1 +``` + +Build the Bar pipelines: +```bash +stream create --name orders --definition "customer: time > :orders" --deploy +stream create --name cold-drink-line --definition ":coldDrinks > cold-drinks: log" --deploy +stream create --name hot-drink-line --definition ":hotDrinks > hot-drinks: log" --deploy +stream create --name bar --definition "barista-app" +stream deploy --name bar --properties app.barista-app.spring.cloud.stream.bindings.orders.destination=orders,app.barista-app.spring.cloud.stream.bindings.hot.drink.destination=hotDrinks,app.barista-app.spring.cloud.stream.bindings.cold.drink.destination=coldDrinks +``` + +![alt text](./scdf-barista-python-polyglot.png "Logo Title Text 1") + + diff --git a/dataflow-website/recipes/polyglot/scdf-python-app/barista_app.py b/dataflow-website/recipes/polyglot/scdf-python-app/barista_app.py new file mode 100644 index 0000000..5f96c2d --- /dev/null +++ b/dataflow-website/recipes/polyglot/scdf-python-app/barista_app.py @@ -0,0 +1,71 @@ +from kafka import KafkaConsumer, KafkaProducer +from kafka.admin import KafkaAdminClient, NewTopic +from kafka.errors import TopicAlreadyExistsError + +from util.actuator import Actuator +from util.arguments import get_kafka_brokers, get_env_info, get_channel_topic + + +class Barista: + """Barista continuously process input drink orders and servers hot drinks for the even order numbers and cold drink + for the ood orders. + The orders come through the `orders` input channel. The hot and cold drinks are served through either the + `hot.drink` or the `cold.drink` output channels. + + :param info: Information about the app configuration. + :param kafka_brokers: Kafka brokers connection uri. + :param orders: Orders topic name. + :param hot_drinks: Hot drinks topic name. + :param cold_drinks: Cold drinks topic name. + """ + + def __init__(self, info, kafka_brokers, orders, hot_drinks, cold_drinks): + + self.kafka_brokers = kafka_brokers + self.orders_topic = orders + self.hot_drink_topic = hot_drinks + self.cold_drink_topic = cold_drinks + + # Serve the liveliness and readiness probes via http server in a separate thread. + Actuator.start(port=8080, info=info) + + # Ensure the output topics exist. + self.__create_topics_if_missing([self.orders_topic, self.hot_drink_topic, self.cold_drink_topic]) + + self.consumer = KafkaConsumer(self.orders_topic, bootstrap_servers=self.kafka_brokers) + self.producer = KafkaProducer(bootstrap_servers=self.kafka_brokers) + + def __create_topics_if_missing(self, topic_names): + admin_client = KafkaAdminClient(bootstrap_servers=self.kafka_brokers, client_id='test') + for topic in topic_names: + try: + new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1) + admin_client.create_topics(new_topics=[new_topic], validate_only=False) + except TopicAlreadyExistsError: + print ('Topic: {} already exists!') + + def process_orders(self): + """ + Continuously consumes orders form the input channel and send hot or cold drinks to the output channels. + The even order number stands for a hot drink request while an odd number stands for a cold drink request. + """ + while True: + for message in self.consumer: + if message.value is not None: + if self.is_even_order(message.value): + self.producer.send(self.hot_drink_topic, b'Serve Hot drink for order:' + message.value) + else: + self.producer.send(self.cold_drink_topic, b'Serve Cold drink for order:' + message.value) + + @staticmethod + def is_even_order(value): + return int(value[-1:]) % 2 == 0 + + +Barista( + get_env_info(), + get_kafka_brokers(), + get_channel_topic('orders'), + get_channel_topic('hot.drink'), + get_channel_topic('cold.drink') +).process_orders() diff --git a/dataflow-website/recipes/polyglot/scdf-python-app/requirements.txt b/dataflow-website/recipes/polyglot/scdf-python-app/requirements.txt new file mode 100644 index 0000000..5ec7000 --- /dev/null +++ b/dataflow-website/recipes/polyglot/scdf-python-app/requirements.txt @@ -0,0 +1,3 @@ +kafka +kafka-python +flask diff --git a/dataflow-website/recipes/polyglot/scdf-python-app/scdf-barista-python-polyglot.png b/dataflow-website/recipes/polyglot/scdf-python-app/scdf-barista-python-polyglot.png new file mode 100644 index 0000000..0d4e056 Binary files /dev/null and b/dataflow-website/recipes/polyglot/scdf-python-app/scdf-barista-python-polyglot.png differ diff --git a/dataflow-website/recipes/polyglot/scdf-python-app/test.py b/dataflow-website/recipes/polyglot/scdf-python-app/test.py new file mode 100644 index 0000000..e897b83 --- /dev/null +++ b/dataflow-website/recipes/polyglot/scdf-python-app/test.py @@ -0,0 +1,11 @@ +# +import time + +from util.actuator import Actuator +from util.arguments import get_env_info + +Actuator.start(9191, get_env_info()) +# +print('OK') +time.sleep(10000) +print('STOPPED') \ No newline at end of file diff --git a/dataflow-website/recipes/polyglot/scdf-python-app/util/__init__.py b/dataflow-website/recipes/polyglot/scdf-python-app/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dataflow-website/recipes/polyglot/scdf-python-app/util/actuator.py b/dataflow-website/recipes/polyglot/scdf-python-app/util/actuator.py new file mode 100644 index 0000000..2809dc5 --- /dev/null +++ b/dataflow-website/recipes/polyglot/scdf-python-app/util/actuator.py @@ -0,0 +1,56 @@ +import sys +import threading + +from flask import Flask, Response + + +class Actuator: + """Actuator is used to expose operational information about the running application, such as `health/liveliness`, + `info`, `env`, etc. It uses HTTP endpoints to enable us to interact with it. + + The `/actuator/health` and `/actuator/info` handles the Kubernetes liveness and readiness probes requests. + Kubernetes expects HTTP 200 status code to consider the application live and ready. + + :param port: the HTTP port used by the Actuator. + :param info_content: The text response to be returned by the /actuator/info endpoint. + """ + + def __init__(self, port=8080, info_content='Info'): + self.http_app = Actuator.__create_http_app(info_content) + self.port = port + print(info_content) + sys.stdout.flush() + + def __run(self): + self.http_app.run(port=self.port, host='0.0.0.0') + + @staticmethod + def __create_http_app(info_description): + app = Flask(__name__) + app.debug = False + app.use_reloader = False + + @app.route('/actuator/health') + def health(): + return Response('Alive', status=200) + + @app.route('/actuator/info') + def info(): + return Response(info_description, status=200, content_type='text/plain') + + return app + + @staticmethod + def start(port=8080, info='Info'): + """Starts the `Actuator` in a separate thread. + + :param port: the HTTP port used by the Actuator. Defaults to 8080. + :param info: The text response to be returned by the /actuator/info endpoint. + """ + try: + thread = threading.Thread(target=Actuator(port, info).__run) + thread.setDaemon(True) + thread.start() + print('Actuator started!') + except KeyboardInterrupt: + sys.exit(0) diff --git a/dataflow-website/recipes/polyglot/scdf-python-app/util/arguments.py b/dataflow-website/recipes/polyglot/scdf-python-app/util/arguments.py new file mode 100644 index 0000000..43496e0 --- /dev/null +++ b/dataflow-website/recipes/polyglot/scdf-python-app/util/arguments.py @@ -0,0 +1,76 @@ +import os +import sys +from collections import defaultdict + + +def get_cmd_arg(name): + """Extracts argument value by name. (@author: Chris Schaefer) + + Assumes the exec (default) spring-cloud-deployer-k8s argument passing mode. + + Args: + name: argument name. + Returns: + value of the requested argument. + """ + d = defaultdict(list) + for k, v in ((k.lstrip('-'), v) for k, v in (a.split('=') for a in sys.argv[1:])): + d[k].append(v) + + if bool(d[name]): + return d[name][0] + else: + return '' + + +def get_stream_app_label(): + return get_cmd_arg('spring.cloud.dataflow.stream.app.label') + + +def get_stream_name(): + return get_cmd_arg('spring.cloud.dataflow.stream.name') + + +def get_channel_topic(channel_name): + """ + For given channel name returns the message broker destinations (e.g. Kafka topics or RabbitMQ exchanges). + + We adopt the Spring Cloud Stream using the following format: + spring.cloud.stream.bindings..destination=. + The represents the name of the channel being configured (for example, input or output). + + :param channel_name: logical channel name as defined in the application. + :return: The target destination of a channel on the bound middleware (for example, the RabbitMQ exchange or Kafka + topic). If the channel is bound as a consumer, it could be bound to multiple destinations, and the + destination names can be specified as comma-separated String values. + """ + return get_cmd_arg('spring.cloud.stream.bindings.{}.destination'.format(channel_name)) + + +def get_kafka_brokers(): + return os.getenv('SPRING_CLOUD_STREAM_KAFKA_BINDER_BROKERS', '') + + +def get_kafka_zk_nodes(): + return os.getenv('SPRING_CLOUD_STREAM_KAFKA_BINDER_ZK_NODES', '') + + +def get_application_guid(): + return os.getenv('SPRING_CLOUD_APPLICATION_GUID', '') + + +def get_application_group(): + return os.getenv('SPRING_CLOUD_APPLICATION_GROUP', '') + + +def get_env_info(): + props = ' stream-name={}\n app-name={}\n app-guid={}\n app-group={}\n kafka-brokers={}\n ' \ + 'kafka-zk={}\n'.format(get_stream_name(), get_stream_app_label(), get_application_guid(), + get_application_group(), get_kafka_brokers(), get_kafka_zk_nodes()) + channels = ' Inputs:\n orders={}\n Outputs: \n hot.drink={}\n cold.drink={}\n'.format( + get_channel_topic('orders'), get_channel_topic('hot.drink'), get_channel_topic('cold.drink')) + args = '\n '.join(sys.argv) + envs = '' + # envs = '\n '.join(list(map(lambda k: '{}={}'.format(k, os.environ[k]), os.environ))) + return 'Properties\n{0}\nChannels\n{1}\nArguments\n {2}\n\nEnvironment\n {3}'.format( + props, channels, args, envs)