1.首先在pom文件添加依赖
The managed version is 1.1.7.RELEASE The artifact is managed in org.springframework.boot:spring-boot-dependencies:1.5.9.RELEASE
org.springframework.kafka spring-kafka
2.yml文件内容如下:
spring: datasource: url: jdbc:postgresql://127.0.0.1:5432/dev username: dev password: dev dubbo: registry: address: zookeeper://127.0.0.1:2181 kafka: bootstrap-servers: 127.0.0.1:9092 consumer: group-id: grp1
3.Consumer java类
@Componentpublic class JobTrackingConsumer implements BaseConsumer{ private final static Logger logger = LoggerFactory.getLogger(JobTrackingConsumer.class); @Autowired private JobBasicCache cache; @KafkaListener(topics = {CommonConstant.FTP_CATEGPRY_ID, CommonConstant.HTTP_CATEGPRY_ID}) @Override public void consume(ConsumerRecord consumer) { OptionalkafkaMessage = (Optional ) Optional.ofNullable(consumer.value()); if (kafkaMessage.isPresent()) { logger.info("topic:{}, msg:{}", consumer.topic(), kafkaMessage.get()); cache.push(consumer.topic().toString(), kafkaMessage.get()); } }}