diff --git a/dataflow-website/recipes/polyglot/polyglot-python-app.zip b/dataflow-website/recipes/polyglot/polyglot-python-app.zip index 3254658..962060a 100644 Binary files a/dataflow-website/recipes/polyglot/polyglot-python-app.zip and b/dataflow-website/recipes/polyglot/polyglot-python-app.zip differ diff --git a/dataflow-website/recipes/polyglot/polyglot-python-app/Dockerfile b/dataflow-website/recipes/polyglot/polyglot-python-app/Dockerfile index 128c72a..0a75e52 100644 --- a/dataflow-website/recipes/polyglot/polyglot-python-app/Dockerfile +++ b/dataflow-website/recipes/polyglot/polyglot-python-app/Dockerfile @@ -4,6 +4,6 @@ RUN pip install kafka-python RUN pip install flask ADD /util/* /util/ -ADD barista_app.py / -ENTRYPOINT ["python","/barista_app.py"] -CMD [] \ No newline at end of file +ADD python_router_app.py / +ENTRYPOINT ["python","/python_router_app.py"] +CMD [] diff --git a/dataflow-website/recipes/polyglot/polyglot-python-app/README.md b/dataflow-website/recipes/polyglot/polyglot-python-app/README.md index 9c49584..7d26de9 100644 --- a/dataflow-website/recipes/polyglot/polyglot-python-app/README.md +++ b/dataflow-website/recipes/polyglot/polyglot-python-app/README.md @@ -8,18 +8,7 @@ NOTE: replace `tzolov` with your docker hub prefix. Register the docker image as SCDF `app` application: ```bash -app register --type app --name barista-app --uri docker://tzolov/scdf_python_app:0.1 +app register --type app --name python-router --uri docker://tzolov/scdf_python_app:0.1 ``` -Build the Bar pipelines: -```bash -stream create --name orders --definition "customer: time > :orders" --deploy -stream create --name cold-drink-line --definition ":coldDrinks > cold-drinks: log" --deploy -stream create --name hot-drink-line --definition ":hotDrinks > hot-drinks: log" --deploy -stream create --name bar --definition "barista-app" -stream deploy --name bar --properties app.barista-app.spring.cloud.stream.bindings.orders.destination=orders,app.barista-app.spring.cloud.stream.bindings.hot.drink.destination=hotDrinks,app.barista-app.spring.cloud.stream.bindings.cold.drink.destination=coldDrinks -``` - -![alt text](./scdf-barista-python-polyglot.png "Logo Title Text 1") - diff --git a/dataflow-website/recipes/polyglot/polyglot-python-app/polyglot-python-app-deployment.properties b/dataflow-website/recipes/polyglot/polyglot-python-app/polyglot-python-app-deployment.properties new file mode 100644 index 0000000..5da03bd --- /dev/null +++ b/dataflow-website/recipes/polyglot/polyglot-python-app/polyglot-python-app-deployment.properties @@ -0,0 +1,8 @@ +app.time.spring.cloud.stream.bindings.output.destination=timeDest + +app.python-router.spring.cloud.stream.bindings.input.destination=timeDest +app.python-router.spring.cloud.stream.bindings.even.destination=evenDest +app.python-router.spring.cloud.stream.bindings.odd.destination=oddDest + +app.evenLogger.spring.cloud.stream.bindings.input.destination=evenDest +app.oddLogger.spring.cloud.stream.bindings.input.destination=oddDest \ No newline at end of file diff --git a/dataflow-website/recipes/polyglot/polyglot-python-app/barista_app.py b/dataflow-website/recipes/polyglot/polyglot-python-app/python_router_app.py similarity index 52% rename from dataflow-website/recipes/polyglot/polyglot-python-app/barista_app.py rename to dataflow-website/recipes/polyglot/polyglot-python-app/python_router_app.py index 5f96c2d..b0ff8ce 100644 --- a/dataflow-website/recipes/polyglot/polyglot-python-app/barista_app.py +++ b/dataflow-website/recipes/polyglot/polyglot-python-app/python_router_app.py @@ -6,33 +6,34 @@ from util.actuator import Actuator from util.arguments import get_kafka_brokers, get_env_info, get_channel_topic -class Barista: - """Barista continuously process input drink orders and servers hot drinks for the even order numbers and cold drink - for the ood orders. - The orders come through the `orders` input channel. The hot and cold drinks are served through either the - `hot.drink` or the `cold.drink` output channels. +class Router: + """Router continuously process input stream of timestamps. The even timestamps are re-routed to the `evenDest` + output channel, while the odd timestamps are routed to the `oddDest` output channel. + + The timestamps come through the `input` channel. The even and odd timestamps are resend through either the + `even` or the `odd` output channels. :param info: Information about the app configuration. :param kafka_brokers: Kafka brokers connection uri. - :param orders: Orders topic name. - :param hot_drinks: Hot drinks topic name. - :param cold_drinks: Cold drinks topic name. + :param input_topic: Input timestamps topic name. + :param even_topic: Even timestamps output topic name. + :param odd_topic: Odd timestamps output topic name. """ - def __init__(self, info, kafka_brokers, orders, hot_drinks, cold_drinks): + def __init__(self, info, kafka_brokers, input_topic, even_topic, odd_topic): self.kafka_brokers = kafka_brokers - self.orders_topic = orders - self.hot_drink_topic = hot_drinks - self.cold_drink_topic = cold_drinks + self.input_topic = input_topic + self.even_topic = even_topic + self.odd_topic = odd_topic # Serve the liveliness and readiness probes via http server in a separate thread. Actuator.start(port=8080, info=info) # Ensure the output topics exist. - self.__create_topics_if_missing([self.orders_topic, self.hot_drink_topic, self.cold_drink_topic]) + self.__create_topics_if_missing([self.input_topic, self.even_topic, self.odd_topic]) - self.consumer = KafkaConsumer(self.orders_topic, bootstrap_servers=self.kafka_brokers) + self.consumer = KafkaConsumer(self.input_topic, bootstrap_servers=self.kafka_brokers) self.producer = KafkaProducer(bootstrap_servers=self.kafka_brokers) def __create_topics_if_missing(self, topic_names): @@ -46,26 +47,25 @@ class Barista: def process_orders(self): """ - Continuously consumes orders form the input channel and send hot or cold drinks to the output channels. - The even order number stands for a hot drink request while an odd number stands for a cold drink request. + Continuously consumes timestamps form the input channel and send even or odd timestamps to the output channels. """ while True: for message in self.consumer: if message.value is not None: if self.is_even_order(message.value): - self.producer.send(self.hot_drink_topic, b'Serve Hot drink for order:' + message.value) + self.producer.send(self.even_topic, b'Even timestamp: ' + message.value) else: - self.producer.send(self.cold_drink_topic, b'Serve Cold drink for order:' + message.value) + self.producer.send(self.odd_topic, b'Odd timestamp:' + message.value) @staticmethod def is_even_order(value): return int(value[-1:]) % 2 == 0 -Barista( +Router( get_env_info(), get_kafka_brokers(), - get_channel_topic('orders'), - get_channel_topic('hot.drink'), - get_channel_topic('cold.drink') + get_channel_topic('input'), + get_channel_topic('even'), + get_channel_topic('odd') ).process_orders() diff --git a/dataflow-website/recipes/polyglot/polyglot-python-app/scdf-barista-python-polyglot.png b/dataflow-website/recipes/polyglot/polyglot-python-app/scdf-barista-python-polyglot.png deleted file mode 100644 index 0d4e056..0000000 Binary files a/dataflow-website/recipes/polyglot/polyglot-python-app/scdf-barista-python-polyglot.png and /dev/null differ diff --git a/dataflow-website/recipes/polyglot/polyglot-python-app/util/arguments.py b/dataflow-website/recipes/polyglot/polyglot-python-app/util/arguments.py index 43496e0..8d7735a 100644 --- a/dataflow-website/recipes/polyglot/polyglot-python-app/util/arguments.py +++ b/dataflow-website/recipes/polyglot/polyglot-python-app/util/arguments.py @@ -67,8 +67,8 @@ def get_env_info(): props = ' stream-name={}\n app-name={}\n app-guid={}\n app-group={}\n kafka-brokers={}\n ' \ 'kafka-zk={}\n'.format(get_stream_name(), get_stream_app_label(), get_application_guid(), get_application_group(), get_kafka_brokers(), get_kafka_zk_nodes()) - channels = ' Inputs:\n orders={}\n Outputs: \n hot.drink={}\n cold.drink={}\n'.format( - get_channel_topic('orders'), get_channel_topic('hot.drink'), get_channel_topic('cold.drink')) + channels = ' Inputs:\n input={}\n Outputs: \n even={}\n odd={}\n'.format( + get_channel_topic('input'), get_channel_topic('even'), get_channel_topic('odd')) args = '\n '.join(sys.argv) envs = '' # envs = '\n '.join(list(map(lambda k: '{}={}'.format(k, os.environ[k]), os.environ)))