Replace the Barista by Python-Router app use case
This commit is contained in:
Binary file not shown.
@@ -4,6 +4,6 @@ RUN pip install kafka-python
|
||||
RUN pip install flask
|
||||
|
||||
ADD /util/* /util/
|
||||
ADD barista_app.py /
|
||||
ENTRYPOINT ["python","/barista_app.py"]
|
||||
CMD []
|
||||
ADD python_router_app.py /
|
||||
ENTRYPOINT ["python","/python_router_app.py"]
|
||||
CMD []
|
||||
|
||||
@@ -8,18 +8,7 @@ NOTE: replace `tzolov` with your docker hub prefix.
|
||||
|
||||
Register the docker image as SCDF `app` application:
|
||||
```bash
|
||||
app register --type app --name barista-app --uri docker://tzolov/scdf_python_app:0.1
|
||||
app register --type app --name python-router --uri docker://tzolov/scdf_python_app:0.1
|
||||
```
|
||||
|
||||
Build the Bar pipelines:
|
||||
```bash
|
||||
stream create --name orders --definition "customer: time > :orders" --deploy
|
||||
stream create --name cold-drink-line --definition ":coldDrinks > cold-drinks: log" --deploy
|
||||
stream create --name hot-drink-line --definition ":hotDrinks > hot-drinks: log" --deploy
|
||||
stream create --name bar --definition "barista-app"
|
||||
stream deploy --name bar --properties app.barista-app.spring.cloud.stream.bindings.orders.destination=orders,app.barista-app.spring.cloud.stream.bindings.hot.drink.destination=hotDrinks,app.barista-app.spring.cloud.stream.bindings.cold.drink.destination=coldDrinks
|
||||
```
|
||||
|
||||

|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,8 @@
|
||||
app.time.spring.cloud.stream.bindings.output.destination=timeDest
|
||||
|
||||
app.python-router.spring.cloud.stream.bindings.input.destination=timeDest
|
||||
app.python-router.spring.cloud.stream.bindings.even.destination=evenDest
|
||||
app.python-router.spring.cloud.stream.bindings.odd.destination=oddDest
|
||||
|
||||
app.evenLogger.spring.cloud.stream.bindings.input.destination=evenDest
|
||||
app.oddLogger.spring.cloud.stream.bindings.input.destination=oddDest
|
||||
@@ -6,33 +6,34 @@ from util.actuator import Actuator
|
||||
from util.arguments import get_kafka_brokers, get_env_info, get_channel_topic
|
||||
|
||||
|
||||
class Barista:
|
||||
"""Barista continuously process input drink orders and servers hot drinks for the even order numbers and cold drink
|
||||
for the ood orders.
|
||||
The orders come through the `orders` input channel. The hot and cold drinks are served through either the
|
||||
`hot.drink` or the `cold.drink` output channels.
|
||||
class Router:
|
||||
"""Router continuously process input stream of timestamps. The even timestamps are re-routed to the `evenDest`
|
||||
output channel, while the odd timestamps are routed to the `oddDest` output channel.
|
||||
|
||||
The timestamps come through the `input` channel. The even and odd timestamps are resend through either the
|
||||
`even` or the `odd` output channels.
|
||||
|
||||
:param info: Information about the app configuration.
|
||||
:param kafka_brokers: Kafka brokers connection uri.
|
||||
:param orders: Orders topic name.
|
||||
:param hot_drinks: Hot drinks topic name.
|
||||
:param cold_drinks: Cold drinks topic name.
|
||||
:param input_topic: Input timestamps topic name.
|
||||
:param even_topic: Even timestamps output topic name.
|
||||
:param odd_topic: Odd timestamps output topic name.
|
||||
"""
|
||||
|
||||
def __init__(self, info, kafka_brokers, orders, hot_drinks, cold_drinks):
|
||||
def __init__(self, info, kafka_brokers, input_topic, even_topic, odd_topic):
|
||||
|
||||
self.kafka_brokers = kafka_brokers
|
||||
self.orders_topic = orders
|
||||
self.hot_drink_topic = hot_drinks
|
||||
self.cold_drink_topic = cold_drinks
|
||||
self.input_topic = input_topic
|
||||
self.even_topic = even_topic
|
||||
self.odd_topic = odd_topic
|
||||
|
||||
# Serve the liveliness and readiness probes via http server in a separate thread.
|
||||
Actuator.start(port=8080, info=info)
|
||||
|
||||
# Ensure the output topics exist.
|
||||
self.__create_topics_if_missing([self.orders_topic, self.hot_drink_topic, self.cold_drink_topic])
|
||||
self.__create_topics_if_missing([self.input_topic, self.even_topic, self.odd_topic])
|
||||
|
||||
self.consumer = KafkaConsumer(self.orders_topic, bootstrap_servers=self.kafka_brokers)
|
||||
self.consumer = KafkaConsumer(self.input_topic, bootstrap_servers=self.kafka_brokers)
|
||||
self.producer = KafkaProducer(bootstrap_servers=self.kafka_brokers)
|
||||
|
||||
def __create_topics_if_missing(self, topic_names):
|
||||
@@ -46,26 +47,25 @@ class Barista:
|
||||
|
||||
def process_orders(self):
|
||||
"""
|
||||
Continuously consumes orders form the input channel and send hot or cold drinks to the output channels.
|
||||
The even order number stands for a hot drink request while an odd number stands for a cold drink request.
|
||||
Continuously consumes timestamps form the input channel and send even or odd timestamps to the output channels.
|
||||
"""
|
||||
while True:
|
||||
for message in self.consumer:
|
||||
if message.value is not None:
|
||||
if self.is_even_order(message.value):
|
||||
self.producer.send(self.hot_drink_topic, b'Serve Hot drink for order:' + message.value)
|
||||
self.producer.send(self.even_topic, b'Even timestamp: ' + message.value)
|
||||
else:
|
||||
self.producer.send(self.cold_drink_topic, b'Serve Cold drink for order:' + message.value)
|
||||
self.producer.send(self.odd_topic, b'Odd timestamp:' + message.value)
|
||||
|
||||
@staticmethod
|
||||
def is_even_order(value):
|
||||
return int(value[-1:]) % 2 == 0
|
||||
|
||||
|
||||
Barista(
|
||||
Router(
|
||||
get_env_info(),
|
||||
get_kafka_brokers(),
|
||||
get_channel_topic('orders'),
|
||||
get_channel_topic('hot.drink'),
|
||||
get_channel_topic('cold.drink')
|
||||
get_channel_topic('input'),
|
||||
get_channel_topic('even'),
|
||||
get_channel_topic('odd')
|
||||
).process_orders()
|
||||
Binary file not shown.
|
Before Width: | Height: | Size: 73 KiB |
@@ -67,8 +67,8 @@ def get_env_info():
|
||||
props = ' stream-name={}\n app-name={}\n app-guid={}\n app-group={}\n kafka-brokers={}\n ' \
|
||||
'kafka-zk={}\n'.format(get_stream_name(), get_stream_app_label(), get_application_guid(),
|
||||
get_application_group(), get_kafka_brokers(), get_kafka_zk_nodes())
|
||||
channels = ' Inputs:\n orders={}\n Outputs: \n hot.drink={}\n cold.drink={}\n'.format(
|
||||
get_channel_topic('orders'), get_channel_topic('hot.drink'), get_channel_topic('cold.drink'))
|
||||
channels = ' Inputs:\n input={}\n Outputs: \n even={}\n odd={}\n'.format(
|
||||
get_channel_topic('input'), get_channel_topic('even'), get_channel_topic('odd'))
|
||||
args = '\n '.join(sys.argv)
|
||||
envs = ''
|
||||
# envs = '\n '.join(list(map(lambda k: '{}={}'.format(k, os.environ[k]), os.environ)))
|
||||
|
||||
Reference in New Issue
Block a user