72 lines
2.8 KiB
Python
72 lines
2.8 KiB
Python
from kafka import KafkaConsumer, KafkaProducer
|
|
from kafka.admin import KafkaAdminClient, NewTopic
|
|
from kafka.errors import TopicAlreadyExistsError
|
|
|
|
from util.actuator import Actuator
|
|
from util.arguments import get_kafka_brokers, get_env_info, get_channel_topic
|
|
|
|
|
|
class Router:
|
|
"""Router continuously process input stream of timestamps. The even timestamps are re-routed to the `evenDest`
|
|
output channel, while the odd timestamps are routed to the `oddDest` output channel.
|
|
|
|
The timestamps come through the `input` channel. The even and odd timestamps are resend through either the
|
|
`even` or the `odd` output channels.
|
|
|
|
:param info: Information about the app configuration.
|
|
:param kafka_brokers: Kafka brokers connection uri.
|
|
:param input_topic: Input timestamps topic name.
|
|
:param even_topic: Even timestamps output topic name.
|
|
:param odd_topic: Odd timestamps output topic name.
|
|
"""
|
|
|
|
def __init__(self, info, kafka_brokers, input_topic, even_topic, odd_topic):
|
|
|
|
self.kafka_brokers = kafka_brokers
|
|
self.input_topic = input_topic
|
|
self.even_topic = even_topic
|
|
self.odd_topic = odd_topic
|
|
|
|
# Serve the liveliness and readiness probes via http server in a separate thread.
|
|
Actuator.start(port=8080, info=info)
|
|
|
|
# Ensure the output topics exist.
|
|
self.__create_topics_if_missing([self.input_topic, self.even_topic, self.odd_topic])
|
|
|
|
self.consumer = KafkaConsumer(self.input_topic, bootstrap_servers=self.kafka_brokers)
|
|
self.producer = KafkaProducer(bootstrap_servers=self.kafka_brokers)
|
|
|
|
def __create_topics_if_missing(self, topic_names):
|
|
admin_client = KafkaAdminClient(bootstrap_servers=self.kafka_brokers, client_id='test')
|
|
for topic in topic_names:
|
|
try:
|
|
new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1)
|
|
admin_client.create_topics(new_topics=[new_topic], validate_only=False)
|
|
except TopicAlreadyExistsError:
|
|
print ('Topic: {} already exists!')
|
|
|
|
def process_timestamps(self):
|
|
"""
|
|
Continuously consumes timestamps form the input channel and send even or odd timestamps to the output channels.
|
|
"""
|
|
while True:
|
|
for message in self.consumer:
|
|
if message.value is not None:
|
|
if self.is_even_timestamp(message.value):
|
|
self.producer.send(self.even_topic, b'Even timestamp: ' + message.value)
|
|
else:
|
|
self.producer.send(self.odd_topic, b'Odd timestamp:' + message.value)
|
|
|
|
@staticmethod
|
|
def is_even_timestamp(value):
|
|
return int(value[-1:]) % 2 == 0
|
|
|
|
|
|
Router(
|
|
get_env_info(),
|
|
get_kafka_brokers(),
|
|
get_channel_topic('input'),
|
|
get_channel_topic('even'),
|
|
get_channel_topic('odd')
|
|
).process_timestamps()
|