Problem
I’m learning how to use Kafka, I’ve never used something similar in the past. In my job I was required to create a POC using it and integrate it to Spring Boot and save information on MongoDB (because we will need to retrieve information on-demand and I thought that it would be the best approach).
At the consumer I created an app, but I’m not sure if I should have a @RestController
class or this is something that should go into Kafka and how?
Is it ok to have the MyRestController
class? If not, how do I implement this using Kafka?
Right now, the code is working but I would like to improve it especially the Controller part and any extra comments that you could make to improve this.
This is the structure of my project:
KafkaConfigurator
package com.example.demo.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import com.example.demo.model.User;
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<String, User> userConsumerFactory() {
Map<String, Object> config = new HashMap();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "group_id");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), new JsonDeserializer<>(User.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, User> userKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, User> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(userConsumerFactory());
return factory;
}
}
MyRestController
package com.example.demo.controller;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.example.demo.listener.KafkaConsumer;
import com.example.demo.model.User;
@RestController
@RequestMapping(value = "users")
public class MyRestController {
@Autowired
private KafkaConsumer consumer;
@GetMapping(path = "/findUserByOffset/{offset}")
public User getUserByOffset(@PathVariable("offset") Long offset) {
return consumer.getUserByOffset(offset);
}
@GetMapping(path = "/findUsersInRange/{lowerOffset}/{upperOffset}")
public List<User> getUsersByOffsetRange(@PathVariable("lowerOffset") Long lowerOffset, @PathVariable("upperOffset") Long upperOffset) {
return consumer.getUsersByOffsetRange(lowerOffset, upperOffset);
}
}
UserRepository
package com.example.demo.factory;
import java.util.List;
import org.springframework.data.mongodb.repository.MongoRepository;
import org.springframework.data.mongodb.repository.Query;
import com.example.demo.model.User;
public interface UserRepository extends MongoRepository<User, Long>{
User findById(String id);
User findByOffset(Long offset);
@Query("{'offset' : { $gte: ?0, $lte: ?1 }}")
List<User> findInOffsetRange(Long lowerOffset, Long upperOffset);
}
KafkaConsumer
package com.example.demo.listener;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import com.example.demo.factory.UserRepository;
import com.example.demo.model.User;
@Service
public class KafkaConsumer {
@Autowired
UserRepository userRepository;
@KafkaListener(topics = "Kafka_Example", groupId = "group_id")
public void consume(String message) {
System.out.println("Consumed message: " + message);
}
@KafkaListener(topics = "Kafka_Example_json", groupId = "group_json", containerFactory = "userKafkaListenerFactory")
public void consumeJson(User user) {
System.out.println("Consumed JSON message: " + user);
userRepository.save(user);
}
public User getUserByOffset(Long offset) {
return userRepository.findByOffset(offset);
}
public List<User> getUsersByOffsetRange(Long lowerOffset, Long upperOffset) {
return userRepository.findInOffsetRange(lowerOffset, upperOffset);
}
}
User
package com.example.demo.model;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;
@Document
public class User {
@Id
private String id;
private String name;
private String dept;
private Long salary;
private Long offset;
public User(String id, String name, String dept, Long salary, Long offset) {
super();
this.id = id;
this.name = name;
this.dept = dept;
this.salary = salary;
this.offset = offset;
}
public User() {
super();
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDept() {
return dept;
}
public void setDept(String dept) {
this.dept = dept;
}
public Long getSalary() {
return salary;
}
public void setSalary(Long salary) {
this.salary = salary;
}
public Long getOffset() {
return offset;
}
public void setOffset(Long offset) {
this.offset = offset;
}
@Override
public String toString() {
return "User [name=" + name + ", dept=" + dept + ", salary=" + salary + "]";
}
}
KafkaConsumerDemoApplication
package com.example.demo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaConsumerDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaConsumerDemoApplication.class, args);
}
}
application.properties
#server
server.port=8081
#mongodb
spring.data.mongodb.host=localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=app1
#logging
logging.level.org.springframework.data=debug
logging.level.=error
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns_xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi_schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.5.RELEASE</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>kafka-consumer-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafka-consumer-demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.data/spring-data-mongodb -->
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb</artifactId>
<version>2.2.6.RELEASE</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
Solution
From your question and code, I understand that you are trying to
- consume events from Kafka topic, persist in MongoDB.
- Also, exposing api’s to retrieve data from Mongo DB and send in response.
If this is the case, solution above looks Good, as you are considering MongoDB as a source of truth/primary DB.
you can also persist data in Kafka as a cache and use configure API’s to get data from Kafka. (Refer KTable, GlobalKTable).