요즘 비동기 프로그래밍에 대한 논의는 앱이나 웹과 같은 프론트 사이드의 진영뿐 아니라 서버 진영에서도 심심치 않게 들리는 화두가 되었습니다. 일례로 Java 21 버전에서 소개된 Virtual Thread와 같이 최신 기술 트렌드에 비동기 프로그래밍은 항상 뜨거운 감자와도 같은 주제입니다. 이번 포스팅에서는 스프링 팀에서 비동기 프로그래밍 방법 중 신흥 강자로 소개되고 있는 Kotlin Corouine이 왜 경량 스레드라고 소개되고 있는지 어떠한 컨셉을 갖고 있는지에 대해 다뤄보고자 합니다.
Why Coroutine?
비동기 프로그래밍이란?
동기(Synchronous)
- 현재 작업의 응답이 끝남과 동시에 다음 작업이 요청된다.
- 함수를 호출하는 곳에서 호출되는 함수가 결과를 반환할 때까지 기다린다.
- 작업 완료 여부를 계속해서 확인한다.
비동기(Asynchronous)
- 현재 작업의 응답이 끝나지 않은 상태에서 다음 작업이 요청된다.
- 함수를 호출하는 곳에서 결과를 기다리지 않고, 다른 함수(callback)에서 결과를 기다린다.
- 작업 완료 여부를 확인하지 않는다.
비동기 프로그래밍 방식 소개
연속적인 두 개의 함수를 처리하는 코드를 각기 다른 비동기 프로그래밍 스타일로 작성해보자.
Callback
interface Callback {
fun onSuccess()
fun onFailure()
}
private val dispatcher: ExecutorService = Executors
.newCachedThreadPool { runnable ->
Thread(runnable).apply {
isDaemon = true
}
}
fun doHardWork(callback: Callback): Disposable {
// .. do Hard Work ..
return dispatcher.submit {
if(isSuccess) {
callback.onSuccess()
} else {
callback.onFailure()
}
}.asDisposable()
}
fun doNextWork(callback: Callback): Disposable {
return dispatcher.submit {
if(isSuccess) {
callback.onSuccess()
} else {
callback.onFailure()
}
}.asDisposable()
}
fun main() {
doHardWork( object: Callback {
override fun onSuccess() {
...
doNextWork(object: Callback {
override fun onSuccess() {
...
}
override fun onFailure() {
...
}
}
}
override fun onFailure() {
// print error log
}
}
)
}
Rx
fun doHardWork(): Single<Boolean> = Single.create { emitter ->
// .. do hard thing ..
emitter.onSuccess(isSuccess)
}.subscribeOn(Schedulers.IO)
fun doNextWork(): Single<Boolean> = Single.create { emitter ->
// ..do next work ..
emitter.onSuccess(isSuccess)
}.subscribeOn(Schedulers.IO)
fun main() {
doHardWork()
.flatMap { isSuccess ->
if(isSuccess) {
doNextWork()
} else {
Single.error("Failed to do hard wrok")
}
.doOnError { // Print error log }
.observeOn(Schedulers.MAIN)
.subscribe { isSuccess ->
println("is it success? $isSuccess")
}
}
Callback 방식과 Rx 방식 비교
- Callback 방식의 경우 연쇄적인 함수를 호출할 경우 Callback 지옥에 빠질 수 있다
- Rx 방식에서는 비동기 작업의 수행 스레드와 결과 처리 스레드 지정을 손쉽게 설정할 수 있다
- Rx 방식에서는 작업 간의 관계 및 스레드 지정이 Chaining 방식으로 작성 가능하여 가독성이 더 좋아진다. (다만, Rx가 제공하는 일련의 Operator를 능숙하게 사용하는 경우에만 가독성의 장점을 챙길 수 있다.)
- exampledata 제어 : map, filter, distinct ..
- event 처리 : doOnError, doOnSubscribe, doOnSuccess, doOnDispose ..
- stream 변경 : flatMap, concatMap, switchMap, compose, switchIfEmpty, …
- 또한 체인에 예외나 취소에 대한 핸들링이 용이하다
- But, Rx 체인의 디버깅은 아직 어렵다. doOn… 등의 operator를 사용해서 디버깅 하는 수준
Coroutine
suspend fun doHardWork(): Boolean {
// .. do hard work ..
return isSuccess
}
suspend fun doNextWork(): Boolean {
return isSuccess
}
fun main() = runBlocking {
launch(Dispatchers.IO) {
if(doHardWork()) {
doNextWork()
}
}
}
Callback 및 Rx 방식과 코루틴 방식 비교
- 코루틴 역시 Rx 처럼 작업 수행 스레드와 결과 처리 스레드를 지정하기 용이하다
- 코루틴에서 원하는 로직을 수행할 때 스레드 지정 및 로직을 처리하는 과정이 일반 동기 프로그래밍 방식과 흡사하다. (가독성이 높고, 학습 하기 쉽다)
- 코루틴 역시 작업의 취소 및 에러 핸들링에 용이하다 (취소 및 에러에 관한 부분이 조금 어려운편..)
- 디버깅에 용이하다
Coroutine 기본 이해
fun main(args: Array<String>) {
GlobalScope.launch {
delay(1000L)
println("World!")
}
println("Hello,")
Thread.sleep(2000L) // for blocking main thread
}
Hello,
World!
- delay() 함수는 코루틴 내에서 사용할 수 있는 중단 함수. 즉, GlobalScope.launch {} 는 코루틴 빌더이다.
- Thread.sleep(2000L) 를 통해 MainThread를 2초간 Block 하여, 메인함수의 종료를 지연시킨다. 좀 더 자연스러운 스타일로 바꿔보자
fun main(args: Array<String>) = runBlocking {
GlobalScope.launch {
delay(1000L)
println("World!")
}
println("Hello,")
delay(2000L)
}
runBlocking{} 블록은 주어진 블록이 완료될 때까지 현재 스레드를 멈추는 코루틴 빌더이다
💡 이렇게 임의의 시간 (2초) 를 가지고 메인 스레드를 Block하는 것은 좋은 로직이 아니다. 부모 코루틴(runBlocking block = main function)은 자식 코루틴 (GlobalScope.launch{}) 의 작업이 완료되고 종료되는데 까지 얼마나 걸리는지 예측할 수 없기 때문이다.
fun main(args: Array<String>) = runBlocking {
val job = GlobalScope.launch {
delay(1000L)
println("World!")
}
println("Hello,")
job.join()
}
💡이 또한, 모든 자식 코루틴이 실행될 때마다 Job 객체를 참조하여 join을 호출하는 방식은 매우 불편하다.
모든 코루틴들은 각자의 스코프를 갖는다. 부모 코루틴과 자식 코루틴이 같은 코루틴 스코프 내에 위치하면 부모 코루틴은 알아서 자식 코루틴이 종료되길 기다린다.
fun main(args: Array<String>) = runBlocking { // this -> runBlock's coroutineScope
launch {
delay(1000L)
println("World!")
}
println("Hello,")
}
launch 함수는 CoroutineScope.launch() 로 CoroutineScope의 확장 함수이다. runBlocking 블록에서 this 참조로 runBlocking의 coroutineScope에 접근이 가능하며, 위의 코드는 부모 코루틴과 자식 코루틴의 코루틴 스코프가 일치한 모습이다.
CoroutineScope
fun main(args: Array<String>) = runBlocking {
launch {
delay(200L)
println("1")
}
coroutineScope {
launch {
delay(500L)
println("2")
}
delay(100L)
println("3")
}
println("4")
}
- 실행결과runBlocking과 coroutineScope의 차이는 runBlocking은 자식 코루틴이 종료되길 기다리면서 현재 스레드를 블락하고 coroutineScope은 그렇지 않다는 차이가 있다.
3 1 2 4
중단 함수
fun main(args: Array<String>) = runBlocking {
val job = launch {
doWorld()
}
val deferred = async {
doWorld2()
}
deferred.await()
println("Hello,")
}
suspend fun doWorld() {
delay(1000L)
println("World!")
}
가벼움
fun main(args: Array<String>) = runBlocking {
repeat(100_000) {
launch {
delay(1000L)
print(".")
}
}
}
Global 코루틴은 deamon thread 처럼 동작한다
fun main(args: Array<String>) = runBlocking {
GlobalScope.launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L)
}
I’m sleeping 0 …
I’m sleeping 1 …
I’m sleeping 2 …
Coroutine Context란?
public interface CoroutineContext {
public operator fun <E : Element> get(key: Key<E>): E?
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
public operator fun plus(context: CoroutineContext): CoroutineContext = ...impl...
public fun minusKey(key: Key<*>): CoroutineContext
}
public interface Key<E : Element>
public interface Element : CoroutineContext {
public val key: Key<*>
...overrides...
}
- get() : 연산자 함수로써 주어진 key 에 해당하는 컨텍스트 요소를 반환한다
- fold() : 초기값을 시작으로 제공된 병합 함수를 이용하여 대상 컨텍스트 요소들을 병합한 후 결과를 반환한다
- plus() : 현재 컨텍스트와 파라미터로 주어진 다른 컨텍스트가 갖는 요소들을 모두 포함하는 컨텍스트를 반환한다. 중복을 포함하지 않는다
- minusKey() : 현재 컨텍스트에서 주어진 키를 갖는 요소들을 제외한 새로운 컨텍스트를 반환한다
Element 는 CoroutineContext 를 상속하며 key 를 멤버 속성으로 갖는다. CoroutineContext 를 구성하는 Element 들의 예를 들어보면 CoroutineId, CoroutineName, CoroutineDispatcher, ContinuationInterceptor, CoroutineExceptionHandler 등이 있다.
💡 즉, 코루틴 컨텍스트에는 코루틴 컨텍스트를 상속한 Element 들이 등록될 수 있고, 각 Element들이 등록 될때는 Element의 고유한 Key를 기반으로 등록된다는 것
launch() {} // Only CoroutineId
// CoroutineId + Continuation Interceptor
launch(Dispatchers.IO) {}
// CoroutineId + CoroutineName + Continuation Interceptor
launch(Dispatchers.IO + CoroutineName("SampleCoroutine")){}
// CoroutineId + CoroutineName + Coroutine ExcpetionHandler + Continuation Interceptor
launch(Dispatchers.IO + CoroutineName("SampleCoroutine") + CoroutineExceptionHandler()){}
‘+’ 연산자를 이용해 Element 들 (CoroutineContext) 를 병합해 CombinedContext를 만들어 낸다.
💡 즉, CoroutineContext는 코루틴의 실행환경에 대한 정보라고 생각하면 쉽다!
CoroutineScope이란?
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
CoroutineScope은 단순히 CoroutineContext를 갖고 있을 뿐이다.
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
우리가 위에서 살펴본 launch 와 같은 코루틴 빌더들은 CoroutineScope의 확장함수이며, launch 블록으로 생성된 코루틴들은 CoroutineScope의 CoroutineContext 환경을 그대로 상속한다.
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext.foldCopiesForChildCoroutine() + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
combined를 자세히 보면 현재 CoroutineScope의 coroutineContext와 새로 입력받은 context를 병합한 모습. default 로 EmptyCoroutineContext가 지정되어있는데 이건 뭘까?
public object EmptyCoroutineContext : CoroutineContext, Serializable {
private const val serialVersionUID: Long = 0
private fun readResolve(): Any = EmptyCoroutineContext
public override fun <E : Element> get(key: Key<E>): E? = null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
public override fun plus(context: CoroutineContext): CoroutineContext = context
public override fun minusKey(key: Key<*>): CoroutineContext = this
public override fun hashCode(): Int = 0
public override fun toString(): String = "EmptyCoroutineContext"
}
위의 코드를 보면 newCoroutineContext에서 ‘+’ 연산자로 coroutineContext.foldCopiesForChildCoroutine() + context
를 해봐도 EmptyCoroutineContext는 무시됨을 알 수 있다.
object GlobalScope : CoroutineScope {
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
우리가 사용한 GlobalScope은 Singleton object로 coroutineContext로 EmptyCoroutineContext를 가지고 있다.
왜 Coroutine을 경량화된 Thread라고 할까?
Coroutine은 Thread가 아니다
코루틴이 하나 생성된다고 해서 Thread가 하나 생성되는 것이 아니다. Coroutine은 스케쥴링 가능한 코드 블록 혹은 코드 블록의 집합이다.
launch { } 와 같이 코루틴 빌더를 실행했을 경우 넘긴 코드 블록은 Continuation이라는 단위로 만들어진다. 여기서 말하는 Continuation은 CPS(Continuation Passing Style)에서 이야기하는 Continuation 개념의 구현체이다.
어떤 일을 수행하기 위한 일련의 함수들의 연결을 각 함수의 반환값을 이용하지 않고 Continuation 이라는 추가 파라미터(Callback)를 두어 연결하는 방식으로 Continuation 단위로 dispatcher 를 변경한다거나 실행을 유예한다거나 하는 플로우 컨트롤이 용이해지는 이점이 있다.
Continuation 단위로 변경이 된 코드는 처음, suspended 상태로 대기하고 있다가 resume() 요청으로 resumed 상태로 전환되어 실행되고 추가적인 중단 함수를 만나면 suspended 상태로 전환되었다가 하는 등 suspended, resumed 상태를 번갈아가며 코드가 진행된다.
이 때 resume() 요청을 할 때마다 dispatcher에게 dispatch(스레드 전환)이 필요한지 묻는 isDispatchNeeded() 함수를 통해 확인 후 필요한 경우 적합한 스레드로 전달되어 실행된다.
즉, 스레드 전환이 필요하지 않으면 전환하지 않는다!!
fun main(args: Array<String>) = runBlocking {
repeat(100_000) {
launch(Dispatchers.IO) {
delay(1000L)
print(".")
}
}
}
위에서 본 이 코드에서 OOM이 발생하지 않는 이유이다! launch 빌더에서 Dispatcher를 재정의 하지 않았기 때문에 runBlocking의 dispatcher를 함께 사용한다. runBlocking 빌더는 내부적으로 GlobalScope을 사용하며 Dispatcher 는 BlockingEventLoop 을 사용해 이벤트 루프 기반으로 10만번의 이벤트를 발생하여 점을 출력하게 되며 스레드 부하는 없으므로 OOM을 피할 수 있게 된다.
'팀 이야기' 카테고리의 다른 글
[스프링팀] PK 숨기기 (8) | 2024.10.18 |
---|---|
[스프링] SpringBoot이 ObjectMapper를 구성하는 방법 (0) | 2024.10.16 |
[스프링팀] gRPC (1) | 2024.10.15 |
[스프링팀] Druid (0) | 2023.08.31 |
[스프링팀] 검색 기술의 원리 발표 세션 (1) | 2022.09.14 |