본문 바로가기
Dev/고민과 삽질의 기록들🤔

[Combine] Publisher와 Subscriber간 연결 과정 딥다이브 (ft. OpenCombine)

by Mintta 2023. 11. 3.

현재 MVC로 되어있는 프로젝트를 MVVM으로 리팩토링하기에 앞서 Combine을 공부하고 있습니다.

Combine은 인터페이스 쪽만 모두 public이고 내부 구현은 알 수가 없어서 공부에 좀 어려움이 있었는데, 팀원 덕분에 이 어려움을 해소시킬 수 있는 오픈소스를 찾게 되었습니다. 

 

GitHub - OpenCombine/OpenCombine: Open source implementation of Apple's Combine framework for processing values over time.

 

GitHub - OpenCombine/OpenCombine: Open source implementation of Apple's Combine framework for processing values over time.

Open source implementation of Apple's Combine framework for processing values over time. - GitHub - OpenCombine/OpenCombine: Open source implementation of Apple's Combine framework for proc...

github.com

 

OpenCombine이라는 오픈소스 라이브러리로, Kotlin뿐만 아니라 LLVM, swift 등에서 작업하신 Compiler Developer분이 만드셨고, 스타 수도 2.6K로 어느 정도 신뢰가 가더라고요.

 

Combine을 공부하면서 정말 많은 도움을 받고 있습니다. 진짜 이걸 어떻게 만들었는지 신기할 따름이네요..

 

오늘은 제일 기본이 되는 Publisher Subscriber가 연결되고 데이터가 전달되는 흐름을 OpenCombine과 함께 살펴보겠습니다.

 


예제 코드

Publisher는 PassthroughSubject를 기준으로, Subscriber는 Sink로 잡았습니다.

 

위 코드를 기준으로 실제로 어떤 일이 일어나는지 한번 딥다이브 해봅시다 !

 

0. sink를 했을 뿐인데 구독이 된다 ??? 🤔

보통 우리가 stream을 만들 때, Publisher와 Subscriber가 필요하기 마련입니다. 구독을 하고자 하는 대상과 구독자가 있어야하는 것이죠.

그런데 Publisher는 subject이기 때문에 구독을 하고자 하는 대상은 존재하는건 쉽게 알 수 있습니다. 하지만 구독자가 안보이는데 어떻게 이 코드는 잘 작동할까요 ?? 🤔

 

이는 sink메서드를 보면 알 수 있습니다.

sink()

Publisher의 extension에 존재하는 sink메서드를 보면 메서드 내부에서 Sink라는 Subscriber를 만들어서 Publisher(PassthroughSubject)에 subscribe의 인자로 Sink를 넘기고 있습니다.

 

Publisher의 extension메서드인 sink()를 쓰게 되면 우리가 직접 커스텀 subscriber를 만들 필요 없이, 내부에서 Sink Subscriber를 만들어서 subscribe까지 호출해주기 때문에 위의 코드에서 Subscriber의 모습은 온데간데 없이 안보였던 것이죠 !

1-1. Subscriber가 Publisher에게 구독권을 요청한다.

 

Subscriber가 Publisher에게 구독 요청을 보내는 것으로 시작됩니다.

 

subscribe(subscription:) 메서드는 Publisherextension안에 존재합니다.

extension Publisher - subscribe

extension Publisher

protocol Publisher

protocol Publisher

 

순서가 우리나라 말로는 “Subscriber가 Publisher를 구독한다”의 순서가 자연스럽지만, 여기서는 “Publisher에게 구독받기를 요청한다. Subscriber가”의 순서가 됩니다.

graduationPublisher.subscribe(gradeSubscriber)

 

SubscriberPublisher에게 구독권을 요청한다 !” 라고 이해하고 넘어가면 될 것 같습니다.

코드로 조금 더 살펴보겠습니다.

 

Publisher에게 요청할 때 Publisher에서 보내주는 값의 타입과 값을 받는 Subscription의 타입이 일치해야하기 때문에 P.Output == S.Input 과 일치해야함을 알 수 있습니다.

 

정기구독권같은 구독권을 예를 들어서 구독권을 생성해서 보내줬는데(Publisher) 받고자 하는 쪽(Subscriber)에서 구독권이 아닌 엉뚱한 Int타입으로 받기를 기대하고 있으면 이상하겠죠?

 

Publishersubscribe메서드 내부에서 반드시 실행되는 함수를 보면 receive(subscriber: Subscriber) 메서드가 있습니다. 이는 Publisher protocol에 정의되어있는 메서드입니다 !

 

Generic을 통해서 Subscriber 프로토콜을 채택한 객체만이 들어올 수 있게끔 제약을 걸어두었습니다.

 

receive(subscriber) 메서드는 이제 Publisher가 구독요청을 보낸 Subscriber를 받아들이는(receive)하는 과정의 시작이라고 보시면 됩니다 !

 

1-2. Publisher가 Subscriber를 Accept하는 과정 시작

receive(subscribe)메서드의 구현부를 살펴보면

Conduit이라는 객체를 생성한 후에 conduit객체를 subscriber에게 보내고 있습니다.

 

Conduit 객체가 뭘까요 ?? 🤔

 

Conduit은 뭘까 ?

우선 Conduit의 사전적 정의부터 알아보겠습니다.

 

도관, 전선관, Publisher와 Subscriber의 연결을 나타내는 subscription과 매우 유사한 뜻인 걸 알 수 있겠네요 !

 

Conduit의 내부도 살펴보겠습니다.

 

Conduit이라는 객체는 ConduitBase라는 객체를 상속받고 있는데, ConduitBase가 Subscription 프로토콜을 채택하고 있는 걸 봐서 이를 상속받은 Conduit이라는 객체는 Subscription의 역할을 수행한다는 것을 알 수 있습니다.

 

이제 conduit이 가지고 있는 프로퍼티들을 살펴보겠습니다.

  • parent: PassthroughSubject (누가, 값을 보내는 주체)
  • downstream: Subscriber (누구에게, 값을 받는 구독자들)
  • demand: Subscriber.Demand (얼마나, 얼만큼의 양을 보낼지)

프로퍼티를 보니 Conduit, 즉 subscription은 누가, 누구에게, 얼만큼의 값을 보내줘야할지에 대한 모든 정보를 가지고 있습니다.

 

"subscription은 그럼 data stream을 관리하는 관리자, 혹은 data stream 그 자체구나 !"


이제 다시 코드로 돌아와서 이렇게 생성한 conduit을  subscriber에게 넘기고 있었습니다.

여기서 subscriber는 Sink subscriber이고, subscriber에게 subscriber 프로토콜에 정의된 receive(subscription) 메서드를 통해 생성한 구독권을 Publisher(PassthroughSubject)가 전달하고 있는 것 입니다.

 

그렇다면 이제 Sink에 있는 receive(subscription)을 볼 차례입니다 !

 

Sink().receive(subscription)

status를 case subscribed(구독된!)으로 바꾸면서 그 연관값으로 subscription을 저장하고 있는 것을 볼 수 있습니다.

그리고 이 후 이제 얼만큼 값을 받을지에 대한 정보를 주는 subscription.request(.unlimited) 메서드를 호출하는 것을 볼 수 있습니다.

 

여기서 subscription은 우리가 생성해서 넣어준 Conduit객체입니다 !

그럼 이제 살펴볼 메서드는 Conduit객체에 있는 request(Demad) 메서드이겠죠 ?

 

request를 통해 받게 된 .unlimited는 사실 UInt.max라는 것을 알 수 있고, demand를 그 만큼 증가시키고 있습니다.

Unsigned Int의 max값이면 사실상 무제한이 맞겠네요..!

(Integar가 4byte = 32bit = 2^32 = 4,294,967,296 (약 42억))

 

여기까지 됐으면 이제 PassthroughSubject와 Sink Subscriber간 data stream 연결 작업은 끝이 납니다 !! 🥳

여기서 바로 끝내기에는 아쉬위니, 값을 보내는 것까지 한번 살펴봅시다 !!

 

Subject(Publisher) -> Subject 값 전달하는 과정 !

protocol Subject
protocol Subscriber

 

값을 보낸다고 했을 때 우리가 사용하게 될 메서드들은 모두 Subject protocol, 그리고 Subscriber protocol에 존재합니다 !

 

이번에 아래 코드를 기준으로 살펴봅시다 !

subject.send(10)

 

send메서드의 구현부를 살펴보면 downstreams라는 ConduitList를 forEach문으로 순회하며 conduit의 offer메서드를 호출하고 있습니다.

여기서 Conduit은 위에서 살펴봤듯이 Subscription을 채택하고 있기에 Subscription을 의미합니다.

Conduit안에 있는 offer구현부를 살펴보면 Subscriber타입의 downstream에 receive(_ output) 메서드를 통해서 Sink subscriber에게 값을 전달하고 있습니다.

 

결국 값을 전달하는 작업을 실제로 수행해주는 건 Conduit, subscription 인거죠.

  • 값의 전달 방향: Publisher → Subscription → Subscriber

 

추가얘기) Subscriber와 Subscription이 Referecen type인 이유

Subscriber가 Reference type인 이유

subscriber.receive(output)

 

conduit에서 실제로 값을 전달할 때 위 코드를 통해 전달합니다. 여기서 Subscriber는 conduit객체를 생성할 때 넘겨받았던 subscriber

입니다. 만약 Subscriber가 value type이였다면 어떻게 될까요 ?

Conduit객체 생성시에 Subscriber객체를 넘기는 순간 값 복사가 일어나게 될테고, 이 때 subscriber에게 값을 넘기는데, 이 subscriber가 우리가 처음 값을 받기를 원했던 원본 subscriber가 아닌 copied된 객체이고, 이것은 우리가 처음 값을 받기로 한 그 객체가 아닐 수 있기 때문에 side effect가 생길 수 있습니다.

Subscription이 Reference type인 이유

Subscription, Conduit은 구독자(Subscriber)에 따라 다른 Subscription을 가지고 있기 때문에 이를 구분하기 위해서 identity가 보장되어야합니다. 그렇기 때문에 identity를 보장할 수 있는 reference type을 사용해야만 합니다.


Wrap up

글로써 순서를 정리해보면 아래와 같습니다.

  1. Subscriber가 Publisher에게 subscribe 메서드를 통해 구독권을 요청합니다.
  2. receive(subscriber:) 메서드 내부에서 구독권(conduit, subscription)을 생성해서
  3. subscriber.receive(subscription: conduit) 을 통해 subscriber에게 구독권을 전달해줍니다.
  4. subscription.request(demand) 메서드를 통해 받을 값의 양에 대한 요청사항을 요청합니다.
  5. receive(_:) 메서드를 통해서 Publisher가 Subscriber에게 값을 전달

완료가 되었을 때는 completion을 receive하는 것도 확인할 수 있습니다.

 

WWDC 영상에서는 위의 과정 아래의 그림과 같은 보여주고 있습니다.

 

조금 더 자세히 안에서 실행되는 메서드도 포함해서 다시 정리해보겠습니다.

 

그림에서 아래는 공간상 적을 수 없겠지만,

 

모든 과정이 끝나면 Subscriber 프로토콜안에 정의된 receive(completion:)를 호출하고 끝이 나게 될 것 입니다.


마무리

Combine의 기능을 그대로 구현한 OpenCombine을 딥다이브해보면서 실제 apple document와 비교하면서 읽어봐도 정말 잘 구현되어있다고 느꼈고, 실제로도 이런식으로 로직이 돌아가겠구나하는 것을 느끼면서 이걸 만든 사람은 진짜 뭐지...싶네요

 

앞으로 쓸 다음글들의 주제는

  • Map Operator 실행 로직 딥다이브
  • Publisher중 Future만 왜 reference type인지, async/await로 완벽히 대체되는 것에 대한 생각
  • MVVM Input Output 리팩토링 과정에서 있었던 결과값을 -> stream으로 바꾸는 과정 (flatmap, deferred, future)
  • CancelBag을 쓰는 이유,
  • Cancel되는 로직

이외에도 Swift 기본에 대한 이야기도 하고싶고..

최근에는 들어오는 Input지식들이 너무 많고, 제가 느끼기에 간단하지 않은 것들이라 정리하고싶은 것들 뿐인데 아티클 하나 쓰는게 정말 시간을 많이 잡아먹어서 써야할 것들이 계속해서 쌓이고 늘어가네요..ㅎㅎ

 

그래도 이렇게 아티클로 한번 정리하고나면 정말 뿌듯하고 그러네요 나머지 글들도 화이팅해서 써보겠습니다 !

댓글