AirScheduleProducer
Kinesis 스트림 프로듀서. 항공편 스케줄 이벤트를 GZIP 압축하여 전송.
클래스 정의
@Service
@Profile("!qa")
class AirScheduleProducer(
private val streamBridge: StreamBridge,
private val objectMapper: ObjectMapper,
)메서드
produce()
fun produce(event: AirScheduleEvent)AirScheduleEvent를 Kinesis 스트림으로 전송.
| Caller | 설명 |
|---|---|
| KinesisEventListener | 이벤트 수신 후 호출 |
동작:
- SearchInfoEvent → SearchInfoPayload 변환
- FlightItemEvent 목록 → FlightItemPayload 목록 변환
- GZIP 압축
- AirSchedulePayload 생성
- StreamBridge로 전송
- 실패 시 에러 로깅
compressPayload()
private fun compressPayload(body: List<FlightItemPayload>): ByteArrayFlightItemPayload 목록을 GZIP 압축.
동작:
- ObjectMapper로 JSON 직렬화
- GZIPOutputStream으로 압축
- ByteArray 반환
스트림 바인딩
| 속성 | 값 |
|---|---|
| 출력 바인딩 | airSchedule-out-0 |
| 프로파일 | !qa (QA 제외) |
메시지 구조
{
"request": {
"tripType": "ROUND_TRIP",
"origin": "ICN",
"destination": "NRT",
...
},
"body": "<GZIP compressed ByteArray>",
"isBodyCompressed": true
}처리 플로우
AirScheduleEvent
│
├──► SearchInfoPayload.of(event.searchInfo)
│
└──► event.flightItems
│
└──► FlightItemPayload.of() (각각)
│
└──► JSON 직렬화
│
└──► GZIP 압축
│
└──► AirSchedulePayload
│
└──► StreamBridge.send()
│
└──► Kinesis (airSchedule-out-0)
에러 처리
.also { result ->
if (!result) logger.error("실패: Search Schedules Count: ${event.flightItems.count()}, Channel: ${event.flightItems.first().channel}")
}전송 실패 시 에러 로그 출력 (항공편 수, 채널 정보).
의존성
| 의존성 | 용도 |
|---|---|
| StreamBridge | Spring Cloud Stream 전송 |
| ObjectMapper | JSON 직렬화 |
특징
- Spring Cloud Stream: StreamBridge 사용
- GZIP 압축: body 데이터 압축으로 전송 효율화
- QA 환경 제외: @Profile(“!qa”)로 QA에서 비활성화
- 실패 로깅: 전송 실패 시 에러 로그
- MessageBuilder: Spring Messaging 메시지 빌더 사용