java - Running kafka consumer(new Consumer API) forever -
i have built queueing system on apache kafka
. application produce messages particular kafka topic
, @ consumer end have consume records produced topic.
wrote consumer using new java consumer api. code looks
properties props = new properties(); props.put("bootstrap.servers", kafkabrokerip+":9092"); props.put("group.id",groupid); props.put("enable.auto.commit", "true"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer","org.apache.kafka.common.serialization.stringdeserializer"); props.put("value.deserializer","org.apache.kafka.common.serialization.stringdeserializer"); kafkaconsumer<string, string> consumer = new kafkaconsumer(props); consumer.subscribe(arrays.aslist("consumertest")); while (true) { consumerrecords<string, string> records = consumer.poll(100); (consumerrecord<string, string> record : records){ system.out.println("data recieved : "+record.value()); } }
here need run consumer forever record pushed kafka topic producer should instantly consumed , processed.
confusion is, right way use infinite while loop (like in sample code) consume data ?
it works me, might want put inner loop in try/catch block in case throw exceptions. consider periodic reconnect task if disconnect.
Comments
Post a Comment