1. 글을 작성하게 된 계기
2. 적용한 기술
조회한 데이터의 내용이 일정 시간 동안 변하지 않기 때문에, 캐시를 얼마나 효율적으로 사용하는지가 관건이라 판단했습니다. 따라서 캐시를 사용했는데요, 여기서 구현/적용한 내용은 다음과 같습니다.
- 비동기 데이터 조회
- Fallback 패턴
- 캐시 갱신
2-1. 비동기 데이터 조회
외부 API를 호출해 데이터를 조회한 후, 이를 저장/갱신하는 상황을 가정해 보겠습니다. 이를 순차적으로 처리하면, 응답을 받을 동안 기다려야 하기 때문에 호출 횟수만큼 전체 처리 시간이 증가하게 됩니다. 예를 들어, API를 호출하고 한 번 응답을 받는 데 1초가 걸린다면, 전체 처리 시간은 10초인 것입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class DataProvider(
private val restTemplate: RestTemplate
) {
fun fetchData() {
for (pageIdx in 0 until limit) {
// 데이터 조회
val data = restTemplate.getForObject(${URL}?pageNo=${pageIdx}&pageSize = 200, Response::class.java)
// 후처리 및 저장. 로직 반복.
saveAll(data)
......
}
}
}
이는 비동기를 활용해 데이터를 조회/저장하는 방식으로 개선할 수 있습니다. 이에 대한 상세한 원리와 설명은 해당 포스팅을 참조해 주세요.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class DataProvider(
private val restTemplate: RestTemplate
) {
fun fetchData() {
val futures = mutableSetOf<CompletableFuture<Void>>()
val data = ConcurrentHashMap.newKeySet<IssuePrice>()
for (pageIdx in 0 until limit) {
// 비동기 데이터 조회
val future = CompletableFuture.supplyAsync {
restTemplate.getForObject(${URL}?pageNo = ${pageIdx}&pageSize = 200, Response::class.java)
}.thenApplyAsync { response ->
data.addAll(response.data)
}.thenApply { _ -> null }
futures.add(future)
}
}
}
이때, 각 작업이 비동기로 진행되다 보니, 트랜잭션 관리, 데이터 유실에 대해 신경 써야 합니다. 따라서 데이터를 받아오는 구간과 이를 후처리한 후 저장하는 구간 각각에서 데이터가 유실되지 않도록 관리해야 합니다. 또한 비동기를 사용할 경우, 일반적인 예외 처리 방법과 조금 다르기 때문에 이 부분 또한 고려해야 합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
@Service
class DirectionalApiService(
......
) {
@Async
fun fetchData(pageIdx: Int): CompletableFuture<List<IssuePrice>> {
val url = properties.dataFetchingUrl + "?page=$pageIdx&size=200"
return CompletableFuture.supplyAsync {
restTemplate.getForObject(url, PriceResponse::class.java)
}.thenApply { response ->
createIssues(response)
}.handle { result, ex -> // 예외 처리
handle(ex, pageIdx, result)
}
}
......
// 이벤트 유실 대비
private fun handle(
throwable: Throwable?,
pageIdx: Int,
result: List<IssuePrice>?,
): List<IssuePrice> {
if (throwable != null) {
deadLetters.recordDataFetchingEvent(pageIdx)
return emptyList()
}
if (result == null) {
deadLetters.recordDataFetchingEvent(pageIdx)
return emptyList()
}
return result
}
}
과제전형이기 때문에 이 부분은 유실된 작업만 저장하고, 별도로 처리하진 않았습니다.
2-2. Fallback 패턴
글로벌 캐시를 사용할 경우, 캐시 서버가 다운 되면 순간적으로 관계형 데이터베이스로 부하가 뻗칠 수 있습니다. 이를 방지하기 위해 fallback 패턴을 적용했습니다. 1차 캐시 서버가 다운됐을 경우, 에러 응답을 내려주는 것이 아닌, 2차 저장소를 조회 해 데이터를 내려주는 것입니다.
이는 썬더링 허드 문제(Thundering herd problem)라고도 부릅니다.
만약 과제가 아니라, 실제 상황에서 트래픽이 많다면 레디스를 클러스터 모드로 구성 후, 1차, 2차 캐시에 배치할 것 같습니다. 3차 저장소로 MongoDB를 사용한 이유는 관계형 데이터베이스보다는 조회 속도가 빠르고, 어느 정도의 안정성이 필요 해서 입니다.
이에 대한 아이디어는 배달의민족 최전방 시스템! ‘가게노출 시스템’을 소개합니다.를 참조해주세요.
레디스는 빠른 응답 속도를 가지고 있지만, 데이터가 휘발될 수 있어 불안정합니다. 물론 AOF 모드를 사용할 수 있지만, 매번 I/O 작업이 발생하기 때문에 이를 사용하기 조금 꺼려졌습니다.
또한 MongoDB는 RDB 보다는 읽기에서 좋은 성능을 보입니다.
또한 읽기 작업에서 어느 정도 객체를 중심으로 데이터를 처리할 수 있습니다. MongoDB는 문서로 매핑해, 객체 형태로 데이터를 다루기 때문입니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Document(collection = "indices")
class IndexDocument(
@Field(name = "index_id")
val indexId: Long,
@Field(name = "index_code")
val indexCode: String,
@Field(name = "name")
val name: String,
@Field(name = "issues")
private var _issue: List<IssueDocument>? = emptyList(),
@Field(name = "average_change_rate")
private var _averageChangeRate: BigDecimal? = BigDecimal.ZERO,
) {
@Id
@Field(name = "_id")
private var _id: String? = UUID.randomUUID().toString()
......
그렇다고 MongoDB가 완전히 객체지향적 데이터베이스는 아닙니다. 이는 해당 포스팅을 참조해주세요.
DTO는 데이터를 너무 직접적으로 알기 때문에, 변경 사항에 유연하게 대응할 수 없지만, 문서는 스키마 제약사항이 없으며, 데이터의 추가/삭제가 유연합니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 필드를 직접적으로 아는 DTO는 변경 사항에 유연하게 대처하기 힘듭니다.
internal data class PriceResponse(
val content: List<PriceContent> = emptyList(),
val pageable: Pageable,
val totalElements: Int,
......
val empty: Boolean,
)
internal data class PriceContent(
val issueCode: String,
val close: BigDecimal,
val change: BigDecimal,
val changeRate: BigDecimal,
val open: BigDecimal,
val high: BigDecimal,
val low: BigDecimal,
val tradingVolume: BigDecimal,
val tradingValue: BigDecimal,
val listedShares: BigDecimal,
val previousClose: BigDecimal,
)
2-3. 캐시 갱신
캐시를 공급하는 쪽과 사용하는 쪽을 분리해 주는 것이 좋다는 생각이 들었습니다. 예를 들어, 아래와 같은 코드는 캐시를 조회하는 관심사와 캐시를 저장하는 관심사가 합쳐져 있습니다. 만약 캐시가 반드시 일정 주기로 갱신 돼야 한다면, 이 메서드로 인해 어느 타이밍에 캐시가 갱신될지 알 수 없게 됩니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
class CacheService(
private val cacheRepository:CacheRepository,
private val repository:EntityRepository
) {
fun findById(id:Long):Cache {
val findCache = cacheRepository.findById(id)
if(findCache != null){
return findCache
}
val findEntity = repository.findById(id)
val newCache = Cache(findEntity)
// 일정 주기로 캐시를 갱신해야 하는데, 캐시 저장소에 데이터가 없는 경우 갑자기 캐시가 저장된다.
cacheRepository.save(newCache)
return newCache
}
}
쉽게 말해, 30분 마다 캐시가 갱신 돼야 하는데, 위 메서드로 인해 캐시가 없는 경우 곧바로 캐시를 갱신합니다. 30분 마다 갱신된 새로운 데이터를 공급해야 하는데, 올바르지 않은 데이터가 캐시에 등록될 수도 있는 것입니다.
이렇게 되면, 캐시 신선도, 데이터 정합성과 같은 이슈가 발생할 수 있습니다. 물론 이는 이벤트/메시지 큐를 활용해 문제를 해결할 수 있지만, 이에 대해서는 별도의 포스팅을 하도록 하겠습니다.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Service
class CacheService(
private val cacheRepository: CacheRepository,
private val repository: EntityRepository,
private val eventPublisher: MessageQueue
) {
fun findById(id: Long): Cache {
val findCache = cacheRepository.findById(id)
if (findCache != null) {
return findCache
}
val findEntity = repository.findById(id)
val event = CacheSaveEvent(findEntity.id)
// 이벤트 발행
eventPublisher.publishEvent(event)
return newCache
}
}
이벤트에 대한 글은 해당 포스팅을 참조해주세요.
3. 정리
평소 학습했던 내용들을 종합해 적용한 과제였습니다. 이런저런 구현 사항들이 많아 생각했던 것보다 시간이 조금 걸렸는데요, 그래도 오랜만에 구현 연습을 하며 재미나게 진행했습니다. 이 외에도 직렬화, 캐시 관리 전략, 데드레터 처리 등 다양한 주제들이 있었는데, 이는 다음 기회에 포스팅 하도록 하겠습니다.