Structured Concurrency

async, launch 와 같은 코루틴 빌더 함수를 사용할 때 각각의 비동기 식의 순서를 조율해야 할 경우가 있고 동기적으로 수행해야 할 경우가 있습니다. 이 경우 가장 원초적인 방법으로는 중첩된 스코프에서 식을 실행하는 방법이 있습니다. 하지만, 중첩된 스코프가 복잡해지면, 고치기도 어렵고 읽기도 어려워지며 유지보수가 어려워지게 됩니다.

이런 경우 조금 더 깔끔한 방법으로 동기연산으로 정의되어 있는 coroutineContext() 빌더 함수, withContext() 빌더 함수를 사용해서 해결합니다.

  • coroutineContext() 코루틴 스코프 빌더 함수
  • withContext() 코루틴 스코프 빌더 함수

이번 문서에서는 대부분의 예제를 coroutineContext() 코루틴 스코프 빌더 함수를 사용하는 방식을 설명했는데, 가장 마지막 챕터에 withContext() 를 사용하는 경우 역시 예제로 정리해두었습니다.


Structured Concurrency 란?

lanch{...}, async{...} 등과 같은 코루틴 빌더를 사용할 때 이 것을 중첩되어서 사용하는 경우가 있을 수 있습니다. 이렇게 중첩해서 lanch{...}, async{...} 을 사용하는 것을 보통 Structured Concurrency 라고 합니다.

예를 들면 아래와 같은 코드를 Structured Concurrency 라고 이야기합니다.

package io.chagchagchag.demo.kotlin_coroutine.structured_concurrency
 
import io.chagchagchag.demo.kotlin_coroutine.helper.logger
import kotlinx.coroutines.*
import kotlin.coroutines.EmptyCoroutineContext
 
fun main(){
  val log = logger()
 
  runBlocking {
    val job = CoroutineScope(EmptyCoroutineContext).launch {
      // ----- (1)
      val applePrice = async {
        delay(1000)
        5000
      }
      val bananaPrice = async {
        delay(1000)
        7000
      }
      val carrotPrice = async {
        delay(1000)
        3000
      }
 
      log.info("(1)")
      val sum = applePrice.await() + bananaPrice.await() + carrotPrice.await()
      // ----- (1)
      log.info("sum = $sum")
      log.info("(2)")
      assert(sum == 5000 + 7000 + 3000)
    }
 
    job.join() // join() 하지 않으면 main 문은 자기 마음대로 끝냅니다.
  }
}

그런데 이렇게 중첩된 CoroutineScope 들을 특정 순서에 맞춰서 호출되게끔 해야 하는 경우가 있습니다. 일반적으로 간단한 코드의 경우 launch{} 를 사용할 경우에는 join() 함수를 사용해서 동기코드처럼 작성하고 async{} 를 사용할 경우에는 await() 함수를 사용해서 동기코드 처럼 사용할 수 있습니다.

그런데 join(), await() 같은 함수를 직접 작성해서 수행하다보면 가독성이 떨어지기도 하고 코드가 길어지면 실수가 생기기도 하고, for 문 내에서 실행할 경우 동기적으로 실행이 되지 않을 수 있습니다.

이 경우 일반적으로 아래와 같은 방식으로 해결할 수 있습니다.

  • 부모역할의 비동기 스코프 내에 job1, job2, job3, ... 의 작업을 수행하게끔하고 부모 비동기 스코프를 밖에서 join()
  • coroutineScope{...} 또는 withContext(context){...} 내에서 job1, job2, job3, ... 의 작업을 수행하도록 코드를 작성

e.g. async

복잡한 async 코드

아래와 같이 사용하는 async 코드가 있다고 하겠습니다.

package io.chagchagchag.demo.kotlin_coroutine.structured_concurrency
 
import io.chagchagchag.demo.kotlin_coroutine.helper.logger
import kotlinx.coroutines.*
import kotlin.coroutines.EmptyCoroutineContext
 
fun main(){
  val log = logger()
 
  runBlocking {
    val job = CoroutineScope(EmptyCoroutineContext).launch {
      // ----- (1)
      val applePrice = async {
        delay(1000)
        5000
      }
      val bananaPrice = async {
        delay(1000)
        7000
      }
      val carrotPrice = async {
        delay(1000)
        3000
      }
 
      log.info("(1)")
      val sum = applePrice.await() + bananaPrice.await() + carrotPrice.await()
      // ----- (1)
      log.info("sum = $sum")
      log.info("(2)")
      assert(sum == 5000 + 7000 + 3000)
    }
 
    job.join() // join() 하지 않으면 main 문은 자기 마음대로 끝냅니다.
  }
}

장바구니에 사과, 바나나, 당근이 담겨있고 이것의 가격을 하나씩 모두 호출해 온 후 합산을 해서 현재 장바구니의 주문가격을 계산하는 코드입니다.

위의 코드에서 apple.await(), bananaPrice.await(), carrotPrice.await() 을 이용해서 값을 가져와서 합산을 했고, 출력결과를 보면 1초만에 수행이 완료되었습니다. 만약 블로킹 방식의 동기연산으로 수행했으면 3초 걸렸을 작업인데, 각각의 작업을 각각의 코루틴에서 수행되게끔 async{...} 에서 수행했기 때문에 1초만에 수행이 되었습니다.


출력결과

11:19:07.202 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (1)
11:19:08.208 [DefaultDispatcher-worker-3] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- sum = 15000
11:19:08.209 [DefaultDispatcher-worker-3] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (2)

Process finished with exit code 0

이 방식의 문제점은 코드가 점점 복잡해진다는 점입니다.

// ----- (1) 안에 위치한 코드의 이전이나 이후에 코드를 수행해야 할 경우에는 // ----- (1) 의 위치의 처음과 끝이 어디인지 알아야하고 // ----- (1) 내부의 코드를 다른 함수 등으로 분리해내기 쉽지 않기에 유지보수가 어려워지고, 확장성이 떨어진다는 단점이 있습니다.

원하는 특정 async 코드들의 연산은 coroutineScope() 로 감싸기

이번에는 위의 코드들을 아래와 같이 작성해봅니다.

package io.chagchagchag.demo.kotlin_coroutine.structured_concurrency
 
import io.chagchagchag.demo.kotlin_coroutine.helper.logger
import kotlinx.coroutines.*
import kotlin.coroutines.EmptyCoroutineContext
 
fun main(){
  val log = logger()
 
  // ----- (2)
  runBlocking {
    val job = CoroutineScope(EmptyCoroutineContext).launch {
      val cartSum = cartSum1()
      log.info("(1)")
      log.info("cartSum = $cartSum")
      log.info("(2)")
      assert(cartSum == 5000 + 7000 + 3000)
    }
 
    job.join() // join() 하지 않으면 main 문은 자기 마음대로 끝냅니다.
  }
}
 
// ----- (1)
suspend fun cartSum1(): Int{
  return coroutineScope {
    val applePrice = async {
      delay(1000)
      5000
    }
    val bananaPrice = async {
      delay(1000)
      7000
    }
    val carrotPrice = async {
      delay(1000)
      3000
    }
 
    applePrice.await() + bananaPrice.await() + carrotPrice.await()
  }
}

// ----- (1)

  • 사과, 바나나, 당근 가격을 구해오는 async{...} 구문들을 coroutineScope{...} 빌더함수가 생성하는 코루틴 스코프에서 실행하게끔 했습니다. 그리고 반환 값은 applePrice.await() + bananaPrice.await() + carrotPrice.await() 으로 지정해줬습니다.

// ----- (2)

  • 처음 async 코드에 비해 단순해졌습니다. 별도의 함수인 cartSum1()에서 합계를 구하도록 했기 때문입니다.
  • async 에 대해 일일이 await() 을 하는 시점을 알고 있을 필요도 없습니다.

출력결과 역시 원하는 값인 15000이 나왔습니다. 그리고 원하는 대로 1초 내에 연산이 이뤄졌습니다. 만약 모든 작업을 블로킹 기반으로 수행했다면 3초가 걸렸을 것입니다.

이렇게 하는 것이 가능한 이유는 coroutineScope{...} 빌더는 동기식 연산을 지원하기 때문입니다.

coroutineScope{...} 코드를 보면 블로킹 연산을 수행하는 것이라고 착각할 수 있겠지만, 블로킹연산이 아닌 논블로킹 기반 동기연산을 지원합니다. 코틀린 엔진이coroutineScope{...}의 수행이 끝날 때 까지 뒷단에서는 다른 코루틴을 수행하게 됩니다.

출력결과

11:32:50.036 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (1)
11:32:50.044 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- cartSum = 15000
11:32:50.044 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (2)

Process finished with exit code 0

e.g. launch

launch 는 async 와는 다르게 값을 리턴하지 않습니다.

복잡한 launch 코드

아래 코드는 원하는 순서대로 수행됨을 보장하지 않는 launch{...} 를 사용한 코드입니다.

package io.chagchagchag.demo.kotlin_coroutine.structured_concurrency
 
import io.chagchagchag.demo.kotlin_coroutine.helper.logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.EmptyCoroutineContext
 
fun main(){
  val log = logger()
 
  runBlocking {
    val job = CoroutineScope(EmptyCoroutineContext).launch {
      val order = launch {
        delay(1000)
        log.info("주문 완료")
      }
 
      val pay = launch {
        delay(1000)
        log.info("결제 완료")
      }
 
      val delivery_reservation = launch {
        delay(1000)
        log.info("배송 예약 완료")
      }
 
      log.info("(1)")
      order.join()
      pay.join()
      delivery_reservation.join()
      log.info("(2)")
    }
 
    job.join()
  }
}

위 코드의 결과는 아래와 같습니다.

원했던 결과는 주문 완료결제 완료배송 예약 완료 인데, 출력결과는 주문 완료배송 예약 완료결제 완료 가 되었습니다.

출력결과

12:01:33.193 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (1)
12:01:34.187 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- 주문 완료
12:01:34.198 [DefaultDispatcher-worker-3] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- 배송 예약 완료
12:01:34.198 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- 결제 완료
12:01:34.199 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (2)

Process finished with exit code 0

이번에는 아래와 같은 코드를 작성해봅니다.

package io.chagchagchag.demo.kotlin_coroutine.structured_concurrency
 
import io.chagchagchag.demo.kotlin_coroutine.helper.logger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlin.coroutines.EmptyCoroutineContext
 
fun main(){
  val log = logger()
 
  runBlocking {
    val job = CoroutineScope(EmptyCoroutineContext).launch {
      val order = launch {
        delay(1000)
        log.info("주문 완료")
 
        val pay = launch {
          delay(1000)
          log.info("결제 완료")
 
          val delivery_reservation = launch {
            delay(1000)
            log.info("배송 예약 완료")
          }
          delivery_reservation.join()
        }
        pay.join()
      }
 
      log.info("(1)")
      order.join()
      log.info("(2)")
    }
 
    job.join()
  }
}

위 코드의 결과는 아래와 같습니다. 원했던 결과인 주문 완료결제 완료배송 예약 완료 를 정상적으로 잘 만들어 냈음을 알 수 있습니다.

12:08:16.478 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (1)
12:08:17.491 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- 주문 완료
12:08:18.496 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- 결제 완료
12:08:19.511 [DefaultDispatcher-worker-2] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- 배송 예약 완료
12:08:19.513 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (2)

Process finished with exit code 0

이런 코드를 어떻게 단순화할 수 있을 까요?

원하는 특정 launch 코드 들의 연산을 coroutineScope 로 감싸기

첫번째 방법은 순서대로 수행되어야 하는 각각의 launch{} 코드 들을 coroutineScope{...} 빌더 함수 내에서 실행하는 방식입니다.

package io.chagchagchag.demo.kotlin_coroutine.structured_concurrency
 
import io.chagchagchag.demo.kotlin_coroutine.helper.logger
import kotlinx.coroutines.*
import kotlin.coroutines.EmptyCoroutineContext
 
fun main(){
  val log = logger()
 
  runBlocking {
    val job = CoroutineScope(EmptyCoroutineContext).launch {
      log.info("(1)")
      coroutineScope {
        launch {
          delay(1000)
          log.info("주문 완료")
        }
 
        launch {
          delay(1000)
          log.info("결제 완료")
        }
 
        launch {
          delay(1000)
          log.info("배송 예약 완료")
        }
      }
      log.info("(2)")
    }
    job.join()
  }
}

출력결과는 원했던 결과인 주문 완료결제 완료배송 예약 완료 를 정상적으로 잘 만들어 냈음을 알 수 있습니다.

coroutineScope 내에서 실행되는 자식 스코프 들은 coroutineScope 빌더 함수 내의 부모스코프가 각 자식 스코프들의 라이프사이클을 관리하고 확인하면서 각각의 자식 coroutine 들이 동기적으로 동작되게끔 관리합니다. 즉, coroutineScope 빌더 함수 내에서는 여러개의 코루틴 스코프가 있더라도 각각의 코루틴 스코프는 동기연산을 하듯 순서대로 동작합니다.

12:15:59.359 [DefaultDispatcher-worker-2] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (1)
12:16:00.381 [DefaultDispatcher-worker-2] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- 주문 완료
12:16:00.381 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- 결제 완료
12:16:00.395 [DefaultDispatcher-worker-2] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- 배송 예약 완료
12:16:00.395 [DefaultDispatcher-worker-2] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (2)

Process finished with exit code 0

e.g. withContext

e.g 1. context 공유 여부 확인

package io.chagchagchag.demo.kotlin_coroutine.structured_concurrency
 
import io.chagchagchag.demo.kotlin_coroutine.helper.logger
import kotlinx.coroutines.*
 
fun main(){
  val log = logger()
 
  runBlocking {
    val job = CoroutineScope(Dispatchers.Default).launch {
      log.info("(before) 부모 코루틴 컨텍스트 : ${this.coroutineContext}")
 
      withContext(Dispatchers.IO){
        log.info("withContext 컨텍스트 : ${this.coroutineContext}")
      }
 
      log.info("(after) 부모 코루틴 컨텍스트 : ${this.coroutineContext}")
    }
 
    job.join()
  }
}

출력결과를 살펴보면, withContext() 가 사용하는 컨텍스트는 부모 코루틴 컨텍스트는 전혀 다르다는 것을 알 수 있습니다.

출력결과

13:49:45.660 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (before) 부모 코루틴 컨텍스트 : [StandaloneCoroutine{Active}@3b805dfa, Dispatchers.Default]
13:49:45.666 [DefaultDispatcher-worker-1] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- withContext 컨텍스트 : [DispatchedCoroutine{Active}@58c189ea, Dispatchers.IO]
13:49:45.667 [DefaultDispatcher-worker-3] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (after) 부모 코루틴 컨텍스트 : [StandaloneCoroutine{Active}@3b805dfa, Dispatchers.Default]

Process finished with exit code 0

e.g 2. 비동기 코드 동기연산 예제

package io.chagchagchag.demo.kotlin_coroutine.structured_concurrency
 
import io.chagchagchag.demo.kotlin_coroutine.helper.logger
import kotlinx.coroutines.*
import kotlin.coroutines.EmptyCoroutineContext
 
fun main(){
  val log = logger()
 
  runBlocking {
    val job = CoroutineScope(EmptyCoroutineContext).launch {
      val sum = sumCart2()
      log.info("(1)")
      log.info("cartSum = $sum")
      log.info("(2)")
      assert(sum == 5000 + 7000 + 3000)
    }
 
    job.join()
  }
}
 
suspend fun sumCart2() : Int{
  return withContext(Dispatchers.IO){
    val applePrice = async {
      delay(1000)
      5000
    }
    val bananaPrice = async {
      delay(1000)
      7000
    }
    val carrotPrice = async {
      delay(1000)
      3000
    }
    applePrice.await() + bananaPrice.await() + carrotPrice.await()
  }
}

확인해보면, 각각의 async 를 수행한 후에 합산이 정상적으로 잘 이루어짐을 확인 가능합니다.

출력결과

14:00:51.869 [DefaultDispatcher-worker-2] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (1)
14:00:51.874 [DefaultDispatcher-worker-2] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- cartSum = 15000
14:00:51.875 [DefaultDispatcher-worker-2] INFO io.chagchagchag.demo.kotlin_coroutine.helper.LoggingObject -- (2)

Process finished with exit code 0