본문 바로가기

작업일지/안동버스 API 연동

#4) kafka로 데이터 발행(Publish)과 구독(Subscribe) 동작하기

카프카는 대용량의 실시간 로그처리에 특화되어 있는 솔루션이며, 데이터 유실없이 안정적으로 메세지를 전달할 수 있다. 분산환경에 특화되어 설계되어 있다는 특징이 있으며 다른 메세지 큐(ex.RabbitMQ)보다 성능적으로 뛰어나다고 한다.

 

Publisher-Subscriber 모델

발행과 구독 모델을 사용한다. 발행과 구독이란 메세지를 특정 수신자에게 다이렉트로 전달하는 시스템이 아니다.

Publisher는 메세지를 topic을 통해서 카테고리화하고 Subscriber는 그 topic을 구독함으로써 메세지를 읽어올 수 있다.

Publisher는 topic에 대한 정보만 알고있고, Subscriber도 topic만 바라보기 때문에 발행자와 구독자는 서로 모르는 상태다. 

 

 

출처 : Apache Kafka 0.8.1 Documentation

 

 

topic과 partition

메세지는 topic으로 분류되며, topic은 여러개의 파티션으로 나눠질 수 있다. 파티션의 자리를 차지하는 한칸은 로그라고 불리고 메세지는 로그에 순차적으로 들어가게 된다. 메세지의 상대적인 위치를 나타내는 것을 offset이라고 한다.

 

하나의 토픽에 여러개의 파티션을 나눠서 메세지를 쓰는 이유는 대량의 메세지가 같은 토픽으로 쓰여질 때, 파티션이 하나면 순차적으로 append되므로 처리하는데 버거울 수 있다. 하지만 여러 개의 파티션을 사용 할 경우 메세지나 분산되어 쓰여지기 때문에 처리하는데 있어서 훨씬 빠를 수 있다. 다만 파티션을 늘릴 경우 메세지가 쓰여지는게 순차적이지 않기 때문에 순서가 중요한 모델에선 적합하지 않을 수 있다.

 

consumer

컨슈머는 영어단어 뜻처럼 소비자의 기능을 한다. 즉 메세지를 소비한다. producer의 존재를 모르지만 특정 토픽을 구독함으로써 스스로 상황에 맞게 메세지를 소비한다. 소비를 한 표시는 offset의 위치를 통해 기억하고 consumer 서버의 메모리가 부족하다거나 서버가 죽는 등의 일이 발생해도 이전에 소비했던 offset의 위치를 통해 다시 읽어드릴 수 있다. 그렇기 때문에 매우 안정적이다.

 


kafka와 Springboot 연동

 

1. pom.xml에 dependency 추가

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

 

2. application.yml에 셋팅값 입력

spring:
  profiles:
    active:
    - dblocal
    - redislocal
    - tomcatlocal
    - elasticlocal
    - kafkaLocal
    - mysqllocal
    .
    .
    .
    .
    .
spring:
  profiles: kafkaLocal
  kafka:
    bootstrap-servers:
      - 127.0.0.1:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

    consumer:
      group-id: judy-consumer
      auto-offset-reset: latest # latest - 토픽의 가장 마지막 부터 메시지를 가져옴, earliest - 처음부터 메시지를 가져옴
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      client-id: study-client-judy

 

3. kafkaProducerService 인터페이스 파일을 생성하고 kafkaProducerServiceImpl 클래스를 만들어서 다음과 같이 작성

나는 간단하게 사용할거라 두번째 메소드만 작성했다.....ㅎㅎ

public interface KafkaProducerService {

    /**
     * 카프카 토픽에 데이터를 전송
     *
     * @param topic : kafka topic
     * @param data  : 전송할 데이터
     */
    void sendTopic(String topic, byte[] data);

    void sendTopic(String topic, Object data);

    /**
     * 다중 토픽에 동일한 데이터를 전송
     *
     * @param topicArr : kafka topic list (구분자 : | 파이프라인) (ex. abc|def|hig)
     * @param data     : 전송할 데이터
     */
    void sendMultiTopic(String topicArr, Object data);

}


----
@Slf4j
@Service
public class KafkaProducerServiceImpl implements KafkaProducerService {

    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @Override
    public void sendTopic(String topic, byte[] data) {
        kafkaTemplate.send(topic, data);
    }

    @Override
    public void sendTopic(String topic, Object data) {
        kafkaTemplate.send(topic, data);
    }

    @Override
    public void sendMultiTopic(String topicArr, Object data) {
    }


}

4. 다음처럼 사용하면 judy라는 토픽에 arrivalinfo(특정 정류소의 버스 도착정보가 담긴 객체)라는 메세지를 담아 발행한다.

//kafka에 message 적재
kafkaProducerService.sendTopic("judy", arrivalinfo);

 

여기까지가 발행이다. 구독하는 클래스는 다음과 같다.

(사실 프로젝트를 하나 더 파서하면 좀더 명확하게 카프카의 역할을 볼 수 있으나 하나의 프로젝트안에서 프로듀서와 컨슈머를 구현했다)

 

@KafkaListener(topic = {'judy'}) 어노테이션을 보면 컨슈머는 judy라는 토픽을 구독하고 있으며 토픽에 메세지가 쓰여지면 자동으로 judyTopic 메소드가 실행된다. 필자는 넘어온 메세지를 파싱한 후 MYSQL에 JPA를 활용해서 데이터를 적재했다.

@Slf4j
@Service
public class KafkaConsumer {

    @Autowired
    BusStationInfoRepository busStationInfoRepository;

    @Autowired
    BusArrivalInfoRepository busArrivalInfoRepository;

    @KafkaListener(topics = {"judy"})
    public void judyTopic(ConsumerRecord<String, String> consumerRecord) throws ParseException {
        log.info("kafka client(judy) start [{}]", consumerRecord);
        int partition = consumerRecord.partition();
        long topicOffset = consumerRecord.offset();
        String topicName = consumerRecord.topic();
        String topicMessage = consumerRecord.value();

        //String to Json
        JSONParser parser = new JSONParser();
        Object obj = parser.parse(topicMessage);
        JSONObject jsonObj = (JSONObject) obj;

        //정류소 정보 MYSQL에 저장
        SimpleDateFormat formatter = new SimpleDateFormat ("yyyyMMddHHmmssSSS", java.util.Locale.KOREA);
        String regiStationKey = formatter.format(new Date()) + "STATION" + ((JSONObject) jsonObj.get("parameter")).get("stationId").toString();
        String stationId = ((JSONObject) jsonObj.get("parameter")).get("stationId").toString();
        BusStationInfo busStationInfo = new BusStationInfo();
        busStationInfo.setREGISTRATION_ID(regiStationKey);
        busStationInfo.setSTATION_ID(stationId);
        busStationInfo.setCOUNT(String.valueOf(jsonObj.get("count")));

        BusStationInfo busStationInfo1 = busStationInfoRepository.save(busStationInfo);

        //도착정보 MYSQL에 저장
        //String to Json
        JSONParser parserResult = new JSONParser();
        Object resultobj = parserResult.parse(jsonObj.get("results").toString());
        JSONArray resultJsonArray = (JSONArray) resultobj;

        for(int i=0; i<resultJsonArray.size(); i++){
            JSONObject resultJsonObj = (JSONObject) resultJsonArray.get(i);
            String regiArrivalKey = formatter.format(new Date()) + "ARRIVAL" + String.valueOf(resultJsonObj.get("routeId"));

            BusArrivalInfo busArrivalInfo = new BusArrivalInfo();
            busArrivalInfo.setREGISTRATION_ID(regiArrivalKey);
            busArrivalInfo.setPARENT_REGISTRATION_ID(regiStationKey);
            busArrivalInfo.setSTATION_ID(stationId);
            busArrivalInfo.setROUTEID(String.valueOf(resultJsonObj.get("routeId")));
            busArrivalInfo.setROUTENUM(String.valueOf(resultJsonObj.get("routeNum")));
            busArrivalInfo.setROUTENM(String.valueOf(resultJsonObj.get("routeNm")));
            busArrivalInfo.setVIA(String.valueOf(resultJsonObj.get("via")));
            busArrivalInfo.setSTATIONORD(String.valueOf(resultJsonObj.get("stationOrd")));
            busArrivalInfo.setARRVEHLD(String.valueOf(resultJsonObj.get("arrvehld")));
            busArrivalInfo.setPLATENO(String.valueOf(resultJsonObj.get("plateNo")));
            busArrivalInfo.setPOSTPLATENO(String.valueOf(resultJsonObj.get("postPlateNo")));
            busArrivalInfo.setPREDICTTM(String.valueOf(resultJsonObj.get("predictTm")));
            busArrivalInfo.setREMAINSTATION(String.valueOf(resultJsonObj.get("remainStation")));
            busArrivalInfo.setGOVCD(String.valueOf(resultJsonObj.get("govCd")));
            busArrivalInfo.setGOVCDNM(String.valueOf(resultJsonObj.get("govCdNm")));

            BusArrivalInfo busArrivalInfo1 = busArrivalInfoRepository.save(busArrivalInfo);
        }

        log.info("kafka client(judyTopic) partition:{}, topicOffset:{}, topicName:{}, topicMessage{}", partition, topicOffset, topicName, topicMessage);
    }

}

 

kafka를 통해 넘어온 메세지를 잘 적재했다~

반응형