from kafka import KafkaConsumer, KafkaProducer from kafka.admin import KafkaAdminClient, NewTopic from kafka.errors import TopicAlreadyExistsError from util.actuator import Actuator from util.arguments import get_kafka_brokers, get_env_info, get_channel_topic class Router: """Router continuously process input stream of timestamps. The even timestamps are re-routed to the `evenDest` output channel, while the odd timestamps are routed to the `oddDest` output channel. The timestamps come through the `input` channel. The even and odd timestamps are resend through either the `even` or the `odd` output channels. :param info: Information about the app configuration. :param kafka_brokers: Kafka brokers connection uri. :param input_topic: Input timestamps topic name. :param even_topic: Even timestamps output topic name. :param odd_topic: Odd timestamps output topic name. """ def __init__(self, info, kafka_brokers, input_topic, even_topic, odd_topic): self.kafka_brokers = kafka_brokers self.input_topic = input_topic self.even_topic = even_topic self.odd_topic = odd_topic # Serve the liveliness and readiness probes via http server in a separate thread. Actuator.start(port=8080, info=info) # Ensure the output topics exist. self.__create_topics_if_missing([self.input_topic, self.even_topic, self.odd_topic]) self.consumer = KafkaConsumer(self.input_topic, bootstrap_servers=self.kafka_brokers) self.producer = KafkaProducer(bootstrap_servers=self.kafka_brokers) def __create_topics_if_missing(self, topic_names): admin_client = KafkaAdminClient(bootstrap_servers=self.kafka_brokers, client_id='test') for topic in topic_names: try: new_topic = NewTopic(name=topic, num_partitions=1, replication_factor=1) admin_client.create_topics(new_topics=[new_topic], validate_only=False) except TopicAlreadyExistsError: print ('Topic: {} already exists!') def process_timestamps(self): """ Continuously consumes timestamps form the input channel and send even or odd timestamps to the output channels. """ while True: for message in self.consumer: if message.value is not None: if self.is_even_timestamp(message.value): self.producer.send(self.even_topic, b'Even timestamp: ' + message.value) else: self.producer.send(self.odd_topic, b'Odd timestamp:' + message.value) @staticmethod def is_even_timestamp(value): return int(value[-1:]) % 2 == 0 Router( get_env_info(), get_kafka_brokers(), get_channel_topic('input'), get_channel_topic('even'), get_channel_topic('odd') ).process_timestamps()