ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 재고시스템으로 알아보는 동시성이슈 해결방법
    📖 개발 공부 2023. 8. 2. 01:17

    인프런 재고시스템으로 알아보는 동시성이슈 해결방법 강의를 보면서 정리한 내용입니다.

     

    재고시스템으로 알아보는 동시성이슈 해결방법 - 인프런 | 강의

    동시성 이슈란 무엇인지 알아보고 처리하는 방법들을 학습합니다., 동시성 이슈 처리도 자신있게! 간단한 재고 시스템으로 차근차근 배워보세요. 백엔드 개발자라면 꼭 알아야 할 동시성 이슈

    www.inflearn.com

     

    실습한 레포지토리 👉 https://github.com/ayoung0073/stock

    • 코틀린으로 작성
    • mysql, redis, mongo 세 커밋으로 나누어서 코드 작성

    동시성 문제

    동시성 문제란 동일한 자원에 대해 여러 스레드가 동시에 접근 하면서 발생하는 문제다.

    재고시스템에서 재고 감소 로직에서 동시성 문제가 발생할 수 있다.

    Synchronized

    다음은 Synchronized 어노테이션을 활용하여 동시성 문제를 해결하는 방법이다.

    이는 한 개의 스레드만 접근하도록 할 수 있다.

    @Service
    class StockService(
        private val stockRepository: StockRepository
    ) {
        @Synchronized
        fun decrease(id: Long, quantity: Long) {
            val stock = stockRepository.findById(id).orElseThrow()
            stock.decrease(quantity)
            stockRepository.saveAndFlush(stock)
        }
    }
    

     

    위의 방법으로 다음 테스트 코드를 통과할 수 있을까?

    @Test
    fun `동시에_100명이_주문`() {
        val threadCount = 100
        val executorService = Executors.newFixedThreadPool(32)
        val latch = CountDownLatch(threadCount)
        for (i in 0 until threadCount) {
            executorService.submit {
                try {
                    stockService.decrease(1L, 1L)
                } finally {
                    latch.countDown()
                }
            }
        }
        latch.await()
        val stock = stockRepository.findById(1L).orElseThrow()
    
        // 100 - (100 * 1) = 0
        assertEquals(0, stock.quantity)
    }

    실패하였다.

    원인은 @Transactional 어노테이션에 있다.

    @Transactional이 있으면 우리가 만든 클래스를 래핑한 클래스를 새로 만들어서 실행한다.

    class TransactionStockService(
        private val stockService: StockService
    ) {
        fun decrease(id: Long, quantity: Long) {
            startTransaction()
            stockService.decrease(id, quantity)
            endTransaction()
        }
    
        fun startTransaction() {
        }
    
        fun endTransaction() {
        }
    }
    

    transaction 종료 시점에 데이터베이스에 업데이트 시키는데 여기서 문제가 발생한다.

    decrease 메서드가 완료되었고, 실제 DB에 업데이트하기 전에 다른 스레드가 decrease를 호출할 수 있다.

    그렇게 되면 다른 스레드는 갱신되기 전의 값을 가져와서 문제가 발생하는 것이다.

     

    해결방법: @Transactional 어노테이션 제거

    @Transactional을 제거하면 Proxy로 동작하지 않기 때문에 해결 할 수 있다.

     

    문제점

    @Synchronized는 하나의 프로세스 안에서만 보장이 된다.

    서버가 2대 이상인 경우엔 데이터베이스 접근을 여러 곳에서 할 수 있어서, 동시성을 보장할 수 없다.

     

    MySQL에서 지원하는 방법

    MySQL에서는 3가지 Lock 방법으로 동시성 문제를 해결할 수 있다.

    1. Pessimistic Lock: 실제로 데이터에 Lock 을 걸어서 정합성을 맞추는 방법
    2. Optimistic Lock: 실제로 Lock 을 이용하지 않고 버전을 이용함으로써 정합성을 맞추는 방법
    3. Named Lock: 이름을 가진 metadata locking

    하나하나씩 살펴보자

    1. Pessimistic Lock

    실제로 데이터에 Lock 을 걸어서 정합성을 맞추는 방법이다. exclusive lock 을 걸게 되며 다른 트랜잭션에서는 lock 이 해제되기 전에 데이터를 가져갈 수 없게 된다.

    데드락 이 걸릴 수 있기때문에 주의하여 사용하여야 한다.

    데이터에는 락을 가진 스레드만 접근 가능해서 문제를 해결할 수 있다.

    spring data jpa에서는 Lock이라는 어노테이션으로 Pessimistic Lock 를 쉽게 적용할 수 있다.

    @Service
    class PessimisticLockStockService(
        private val stockRepository: StockRepository
    ) {
    
        @Transactional
        fun decrease(id: Long, quantity: Long) {
            val stock = stockRepository.findByIdWithPessimisticLock(id) // Lock을 걸고 데이터를 가지고 온 후
            stock.decrease(quantity) // 재고 감소 시킨 후
    
            stockRepository.saveAndFlush(stock) // 재고 저장
        }
    }
    
    interface StockRepository : JpaRepository<Stock, Long> {
    
        @Lock(value = LockModeType.PESSIMISTIC_WRITE)
        @Query("select s from Stock s where s.id = :id")
        fun findByIdWithPessimisticLock(id: Long): Stock
    
    }
    

    충돌이 빈번하게 일어나는 경우 Optimistic Lock보다 성능이 좋을 수 있다.

    Lock을 통해 업데이트를 제어하기 때문에 데이터 정합성이 어느정도 보장된다.

    but, 별도의 Lock이 있기 때문에 성능 감소가 있을 수 있다.

     

    테스트 코드

    @Test
    fun `동시에_100명이_주문`() {
        val threadCount = 100
        val executorService = Executors.newFixedThreadPool(32)
        val latch = CountDownLatch(threadCount)
        for (i in 0 until threadCount) {
            executorService.submit {
                try {
                    stockService.decrease(1L, 1L)
                } finally {
                    latch.countDown()
                }
            }
        }
        latch.await()
        val stock = stockRepository.findById(1L).orElseThrow()
    
        // 100 - (100 * 1) = 0
        assertEquals(0, stock.quantity)
    }
    

    재고 100개

    재고 1,000개

    2. Optimistic Lock

    실제로 Lock 을 이용하지 않고 버전을 이용함으로써 정합성을 맞추는 방법이다. 먼저 데이터를 읽은 후에 update 를 수행할 때 현재 내가 읽은 버전이 맞는지 확인하며 업데이트한다. 내가 읽은 버전에서 수정사항이 생겼을 경우에는 application에서 다시 읽은 후에 작업을 수행해야한다.

    OptimisticLock은 실패했을 때 재시도를 해줘야하므로 Facade 라는 패키지를 만들어서 OptimisticLockFacade 클래스를 만들자.

    @Service
    class OptimisticLockStockFacade(
        private val optimisticLockStockService: OptimisticLockStockService,
    ) {
        fun decrease(id: Long, quantity: Long) {
            while (true) {
                try {
                    optimisticLockStockService.decrease(id, quantity)
                    break
                } catch (e: Exception) {
                    Thread.sleep(50)
                }
            }
        }
    }
    
    @Service
    class OptimisticLockStockService(
        private val stockRepository: StockRepository
    ) {
        @Transactional
        fun decrease(id: Long, quantity: Long) {
            val stock = stockRepository.findByIdWithOptimisticLock(id)
            stock.decrease(quantity)
    
            stockRepository.saveAndFlush(stock)
        }
    }
    
    • 장점: 별도의 lock을 잡지 않아서 Pessimistic Lock보다 성능상 이점이 있다.
    • 단점: update 실패했을 때 재시도 로직을 개발자가 직접 작성해야한다.

    충돌이 빈번하게 일어날 것이라 예상한다면 Pessimistic Lock를 이용하는게 성능상 좋을 수 있다

     

    테스트 코드

    @Test
    fun `동시에_100명이_주문`() {
        val threadCount = 100
        val executorService = Executors.newFixedThreadPool(32)
        val latch = CountDownLatch(threadCount)
        for (i in 0 until threadCount) {
            executorService.submit {
                try {
                    optimisticLockStockFacade.decrease(1L, 1L)
                } catch (e: RuntimeException) {
                    throw e
                } finally {
                    latch.countDown()
                }
            }
        }
        latch.await()
        val stock = stockRepository.findById(1L).orElseThrow()
    
        // 100 - (100 * 1) = 0
        assertEquals(0, stock.quantity)
    }
    

     

    재고 100개

     

    3. Named Lock

    이름을 가진 metadata locking 이다. 이름을 가진 lock 을 획득한 후 해제할 때까지 다른 세션은 이 lock 을 획득할 수 없도록 한다. 주의할 점으로는 transaction 이 종료될 때 lock 이 자동으로 해제되지 않기 때문에, 별도의 명령어로 해제를 수행해주거나 선점시간이 끝나야 해제된다.

    stock에 lock을 거는게 아니라 메타데이터에 락을 검

     

    (실제 적용할 때는 데이터 소스 분리해서 사용하는 걸 추천한다. 커넥션 풀이 부족해지는 현상이 발생할 수 있어 다른 서비스에 영향을 줄 수 있다)

    로직 수행 전후로 getlock과 releaseLock을 수행해야하므로 facade 클래스를 만들어 수행한다

    StockService는 부모의 transaction 별도로 수행되어야 해서 propagation을 변경해준다.

    @Service
    class NamedLockStockFacade(
        private val lockRepository: LockRepository,
        private val stockService: StockService,
    ) {
        fun decrease(id: Long, quantity: Long) {
            try {
                lockRepository.getLock(id.toString())
                stockService.decrease(id, quantity)
            } finally {
                lockRepository.releaseLock(id.toString())
            }
        }
    }
    
    @Service
    class StockService(
        private val stockRepository: StockRepository
    ) {
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        @Synchronized
        fun decrease(id: Long, quantity: Long) {
            val stock = stockRepository.findById(id).orElseThrow()
            stock.decrease(quantity)
            stockRepository.saveAndFlush(stock)
        }
    }
    

    nameLock은 주로 분산락을 구현할 때 사용한다. Pessimistic는 timeout을 구현하기 힘들지만 namedLock은 손쉽게 구현할 수 있다. 이외에 데이터 삽입시에 정합성을 맞춰야 하는 경우에도 사용할 수 있다

    하지만 이방법은 transaction 종료시 session과 lock 관리를 해줘야해서 주의해서 사용해야하고, 실제 사용할 때 구현방법이 복잡할 수 있다.

    테스트 코드

    @Test
    fun `동시에_100명이_주문`() {
        val threadCount = 100
        val executorService = Executors.newFixedThreadPool(32)
        val latch = CountDownLatch(threadCount)
        for (i in 0 until threadCount) {
            executorService.submit {
                try {
                    namedLockStockFacade.decrease(1L, 1L)
                } catch (e: RuntimeException) {
                    throw e
                } finally {
                    latch.countDown()
                }
            }
        }
        latch.await()
        val stock = stockRepository.findById(1L).orElseThrow()
    
        // 100 - (100 * 1) = 0
        assertEquals(0, stock.quantity)
    }
    

    재고 100개

    재고 1,000개

     

    Pessimistic과 유사하지만, Pessimistic는 row와 table 단위로 lock하지만, Named Lock은 Metadata에 lock 하는 방식이다.

     


    Redis 활용

    동시성 문제를 Redis를 활용해서 해결할 수 있다. Redis 활용엔 2가지 방법이 있다.

    Lettuce

    • setnx 명령어를 활용하여 분산락 구현이는 spin lock 방식이어서 retry 로직을 개발자가 직접 개발해야한다.
    • key와 value가 set 되었다면 1, 되지 않았다면 0을 반환하는 특성을 사용하여 atomic하게 Lock 획득 여부를 결정한다.
    • setnx는 set if not exists의 약어로, key/value를 set할 때 기존 key가 없을 때만 set하는 명령어이다.
    • spin lock 방식
    • lock을 획득하려는 스레드가 lock을 획득할 수 있는지 반복적으로 확인하면서 lock 획득을 시도하는 방식이다.

     

     

    잠시 setnx를 실습해보자.

    docker exec -it <도커컨테이너ID> redis-cli # redis-cli 실행
    
    127.0.0.1:6379> setnx 1 lock
    (integer) 1 # 성공
    127.0.0.1:6379> setnx 1 lock 
    (integer) 0 # 성공 X
    127.0.0.1:6379> del 1
    (integer) 1
    127.0.0.1:6379> setnx 1 lock
    (integer) 1
    

    Lettuce를 활용하는 방식은 MySQL의 NamedLock을 활용하는 방식과 유사하다.

    다른 점은 세션 관리에 신경쓰지 않아도 된다는 점이다.

    @Component
    class LettuceLockStockFacade(
        private val redisLockRepository: RedisLockRepository,
        private val stockService: StockService
    ) {
        fun decrease(key: Long, quantity: Long) {
            while (!redisLockRepository.lock(key.toString())) {
                Thread.sleep(150)
            }
            try {
                stockService.decrease(key, quantity)
            } finally {
                redisLockRepository.unlock(key.toString())
            }
        }
    }
    
    @Component
    class RedisLockRepository(
        private val redisTemplate: RedisTemplate<String, String>
    ) {
        fun lock(key: String): Boolean {
            return redisTemplate
                .opsForValue()
                .setIfAbsent(key, "lock", Duration.ofMillis(3_000))!! // setnx
        }
    
        fun unlock(key: String): Boolean {
            return redisTemplate.delete(key)
        }
    }
    

    이는 구현이 간단하지만, spin lock 방식이어서 redis에 부하를 줄 수 있음 (여기서는 Thread.sleep을 이용해 부하를 줄였다.)

     

     

    테스트 코드

    @Test
    fun `동시에_100명이_주문`() {
        val threadCount = 100
        val executorService = Executors.newFixedThreadPool(32)
        val latch = CountDownLatch(threadCount)
        for (i in 0 until threadCount) {
            executorService.submit {
                try {
                    lettuceLockStockFacade.decrease(1L, 1L)
                    println("decrease")
                } catch (e: Exception) {
                    e.printStackTrace()
                } finally {
                    latch.countDown()
                    println("countDown")
                }
            }
        }
        latch.await()
        val stock = stockRepository.findById(1L).orElseThrow()
    
        // 100 - (100 * 1) = 0
        assertEquals(0, stock.quantity)
    }
    

    재고 100개

    재고 1,000개

     

     

    Redisson

    pub/sub 기반으로 lock 구현을 제공한다.

    • pub/sub: 채널을 하나 만들고 lock을 점유중인 스레드가 lock을 획득하려는 스레드에게 해제를 알려주는 방식이다.
    • 이 방식은 별도의 retry 방식을 따로 작성하지 않아도 된다.

    @Component
    class RedissonLockStockFacade(
        private val redissonClient: RedissonClient,
        private val stockService: StockService
    ) {
        fun decrease(key: Long, quantity: Long) {
            val lock: RLock = redissonClient.getLock(key.toString())
            try {
                val available = lock.tryLock(10, 1, TimeUnit.SECONDS)
                println("available: $available")
                if (!available) {
                    println("lock 획득 실패")
                    return
                }
                stockService.decrease(key, quantity)
            } catch (e: InterruptedException) {
                throw RuntimeException(e)
            } finally {
                lock.unlock()
            }
        }
    }
    

     

    테스트 코드

    @Test
    fun `동시에_100명이_주문`() {
        val threadCount = 100
        val executorService = Executors.newFixedThreadPool(32)
        val latch = CountDownLatch(threadCount)
        for (i in 0 until threadCount) {
            executorService.submit {
                try {
                    redissonLockStockFacade.decrease(1L, 1L)
                } finally {
                    latch.countDown()
                }
            }
        }
        latch.await()
        val stock = stockRepository.findById(1L).orElseThrow()
    
        // 100 - (100 * 1) = 0
        assertEquals(0, stock.quantity)
    }
    

    재고 100개

    재고 1,000개

    • 장점
      • 락 획득 재시도를 기본으로 제공한다.
      • pub-sub 방식으로 구현이 되어있기 때문에 lettuce 와 비교했을 때 redis 에 부하가 덜 간다.
    • 단점
      • 별도의 라이브러리를 사용해야한다.
      • lock을 라이브러리 차원에서 제공해주기 때문에 사용법을 공부해야 한다.

    실무에서는

    • 재시도가 필요하지 않은 lock 은 lettuce 활용하고,
    • 재시도가 필요한 경우에는 redisson 를 활용한다.

     

    Mysql 과 Redis 비교하기

    Mysql

    • 이미 MySQL을 사용하고 있다면 별도의 비용없이 사용가능하다.
    • 어느정도의 트래픽까지는 문제없이 활용이 가능하다.
    • Redis 보다는 성능이 좋지 않다.

    Redis

    • 활용중인 Redis가 없다면 별도의 구축 비용과 인프라 관리 비용이 발생한다.
    • MySQL보다 성능이 좋다.

    [번외] Mongo의 findAndModify 이용하여 동시성 문제 해결해보기

    위 강의는 없지만 mongo에서도 재고 관리 로직을 간단히 작성할 수 있을 것 같아 추가해본다.

    db.collection.findAndModify()

    Mongo는 document 단위로 원자성을 보장하여서, 동시성 문제를 해결할 수 있다.

    findAndModify를 이용하여 재고관리 로직을 작성해보았다.

    @Repository
    class MongoStockRepository(
        private val mongoTemplate: MongoTemplate
    ) {
        /*
            몽고는 document에 수행하는 모든 명령어가 원자성을 보장한다.
         */
        fun incQuantity(id: Long, quantity: Long) {
            mongoTemplate.findAndModify(
                Query(
                    Criteria("_id").`is`(id).and("quantity").gt(0)
                ),
                Update().inc("quantity", quantity),
                StockDocument::class.java
            )
        }
    }
    
    @Service
    class MongoStockService(
        private val mongoStockRepository: MongoStockRepository
    ) {
        fun decrease(id: Long, quantity: Long) {
            mongoStockRepository.incQuantity(id, -quantity)
        }
    }
    

    테스트 코드

    @Test
    fun `동시에_100명이_주문`() {
        val threadCount = 100
        val executorService = Executors.newFixedThreadPool(32)
        val latch = CountDownLatch(threadCount)
        for (i in 0 until threadCount) {
            executorService.submit {
                try {
                    stockMongoService.decrease(1L, 1L)
                } finally {
                    latch.countDown()
                }
            }
        }
        latch.await()
        val stock = mongoStockRepository.findById(1L)
    
        // 100 - (100 * 1) = 0
        assertEquals(0, stock?.quantity)
    }
    

    재고 100개

    재고 1,000개

    반응형

    '📖 개발 공부' 카테고리의 다른 글

    [HTTP] HTTP 메서드  (0) 2023.08.09
    RPC, gRPC, stub  (0) 2023.08.03
    CQRS 패턴  (0) 2023.07.25
    [Kubernetes] sidecar pattern (사이드카 패턴)  (0) 2023.07.10
    비동기 asynchronous  (0) 2023.07.02

    댓글

Designed by Tistory.