Files
2019-05-17 20:28:20 +02:00

72 lines
2.8 KiB
Python

from kafka import KafkaConsumer, KafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.errors import TopicAlreadyExistsError
from util.actuator import Actuator
from util.arguments import get_kafka_brokers, get_env_info, get_channel_topic
class Router:
"""Router continuously process input stream of timestamps. The even timestamps are re-routed to the `evenDest`
output channel, while the odd timestamps are routed to the `oddDest` output channel.
The timestamps come through the `input` channel. The even and odd timestamps are resend through either the
`even` or the `odd` output channels.
:param info: Information about the app configuration.
:param kafka_brokers: Kafka brokers connection uri.
:param input_topic: Input timestamps topic name.
:param even_topic: Even timestamps output topic name.
:param odd_topic: Odd timestamps output topic name.
"""
def __init__(self, info, kafka_brokers, input_topic, even_topic, odd_topic):
self.kafka_brokers = kafka_brokers
self.input_topic = input_topic
self.even_topic = even_topic
self.odd_topic = odd_topic
# Serve the liveliness and readiness probes via http server in a separate thread.
Actuator.start(port=8080, info=info)
# Ensure the output topics exist.
self.__create_topics_if_missing([self.input_topic, self.even_topic, self.odd_topic])
self.consumer = KafkaConsumer(self.input_topic, bootstrap_servers=self.kafka_brokers)
self.producer = KafkaProducer(bootstrap_servers=self.kafka_brokers)
def __create_topics_if_missing(self, topic_names):
admin_client = KafkaAdminClient(bootstrap_servers=self.kafka_brokers, client_id='test')
for topic in topic_names:
try:
new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)
except TopicAlreadyExistsError:
print ('Topic: {} already exists!')
def process_timestamps(self):
"""
Continuously consumes timestamps form the input channel and send even or odd timestamps to the output channels.
"""
while True:
for message in self.consumer:
if message.value is not None:
if self.is_even_timestamp(message.value):
self.producer.send(self.even_topic, b'Even timestamp: ' + message.value)
else:
self.producer.send(self.odd_topic, b'Odd timestamp:' + message.value)
@staticmethod
def is_even_timestamp(value):
return int(value[-1:]) % 2 == 0
Router(
get_env_info(),
get_kafka_brokers(),
get_channel_topic('input'),
get_channel_topic('even'),
get_channel_topic('odd')
).process_timestamps()