AirScheduleProducer

Kinesis 스트림 프로듀서. 항공편 스케줄 이벤트를 GZIP 압축하여 전송.

클래스 정의

@Service
@Profile("!qa")
class AirScheduleProducer(
    private val streamBridge: StreamBridge,
    private val objectMapper: ObjectMapper,
)

메서드

produce()

fun produce(event: AirScheduleEvent)

AirScheduleEvent를 Kinesis 스트림으로 전송.

Caller설명
KinesisEventListener이벤트 수신 후 호출

동작:

  1. SearchInfoEvent → SearchInfoPayload 변환
  2. FlightItemEvent 목록 → FlightItemPayload 목록 변환
  3. GZIP 압축
  4. AirSchedulePayload 생성
  5. StreamBridge로 전송
  6. 실패 시 에러 로깅

compressPayload()

private fun compressPayload(body: List<FlightItemPayload>): ByteArray

FlightItemPayload 목록을 GZIP 압축.

동작:

  1. ObjectMapper로 JSON 직렬화
  2. GZIPOutputStream으로 압축
  3. ByteArray 반환

스트림 바인딩

속성
출력 바인딩airSchedule-out-0
프로파일!qa (QA 제외)

메시지 구조

{
  "request": {
    "tripType": "ROUND_TRIP",
    "origin": "ICN",
    "destination": "NRT",
    ...
  },
  "body": "<GZIP compressed ByteArray>",
  "isBodyCompressed": true
}

처리 플로우

AirScheduleEvent
    │
    ├──► SearchInfoPayload.of(event.searchInfo)
    │
    └──► event.flightItems
             │
             └──► FlightItemPayload.of() (각각)
                      │
                      └──► JSON 직렬화
                               │
                               └──► GZIP 압축
                                        │
                                        └──► AirSchedulePayload
                                                 │
                                                 └──► StreamBridge.send()
                                                          │
                                                          └──► Kinesis (airSchedule-out-0)

에러 처리

.also { result ->
    if (!result) logger.error("실패: Search Schedules Count: ${event.flightItems.count()}, Channel: ${event.flightItems.first().channel}")
}

전송 실패 시 에러 로그 출력 (항공편 수, 채널 정보).

의존성

의존성용도
StreamBridgeSpring Cloud Stream 전송
ObjectMapperJSON 직렬화

특징

  • Spring Cloud Stream: StreamBridge 사용
  • GZIP 압축: body 데이터 압축으로 전송 효율화
  • QA 환경 제외: @Profile(“!qa”)로 QA에서 비활성화
  • 실패 로깅: 전송 실패 시 에러 로그
  • MessageBuilder: Spring Messaging 메시지 빌더 사용