Heart's Develop Inside (WP)

Heart's Develop Blog in WordPress.com

Storm wiki : Guaranteeing message processing 정리

leave a comment »

원문

Guaranteeing message processing

주제

  • Storm 은 어떻게 spout 을 통해 나간 메시지들이 처리가 완료되는 것을 보장하는가?
  • Storm 의 신뢰성 기능을 취하기 위해 사용자는 무엇을 해야 하는가?

tuple 이 완전히 처리된다(fully processed)는 것이 무엇을 의미하는가?

  • spout 을 통해 나간 tuple 은 tuple 들을 추가로 생성할 수 있다
  • tuple tree 예시 : spout(sentence) -> bolt(split) -> bolt(word count)
  • tuple 이 완전히 처리된다는 것은 tree 내의 모든 tuple 들이 처리되었다는 것을 의미
  • tuple tree 내 tuple 들이 지정된 타임아웃 내에 모두 처리되지 않으면 tuple 처리가 실패한 것으로 간주
  • 타임아웃 설정
    • topology 단위
    • Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
    • 기본값 30초

tuple 이 완전히 처리되거나 처리에 실패할 경우 어떤 일이 발생하는가?

  • Spout 으로부터 나간 tuple 의 lifecycle 을 살펴보자
  • Spout 인터페이스
    • public interface ISpout extends Serializable {
          void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
          void close();
          void nextTuple();
          void ack(Object msgId);
          void fail(Object msgId);
      }
  • Lifecycle
    • Storm 은 초기에 Spout 역할에 필요한 객체들을 전달하기 위해 Spout.open() 을 호출한다
      • Spout 은 전달인자로 받은 SpoutOutputCollector 를 보관한다
    • Storm 은 Spout 으로부터 tuple 을 얻기 위해 Spout.nextTuple() 을 호출한다
    • Spout 은 SpoutOutputCollector 을 이용하여 tuple 을 output stream 중 하나로 방출한다(emit)
    • tuple 이 방출될 때, Spout 은 tuple 에 식별 용도의 message id 를 발급한다
      • _collector.emit(new Values(“field1”, “field2”, 3), msgId);
    • tuple 은 처리되기 위해 적절한 bolt 들로 보내지고 경우에 따라 새로운 tuple 들이 추가된다(anchoring)
      • Storm 은 tuple tree 를 추적한다
    • Storm 이 tuple tree 의 전체 tuple 이 처리 완료되었다는 것을 감지하면, Storm 은 tuple 을 제공한 Spout task 로 message id 와 함께 ack() 를 호출한다
    • Storm 이 tuple tree 내 tuple 들이 지정된 타임아웃 내에 처리되지 않은 것을 감지하면, Storm 은 tuple 을 제공한 Spout task 로 fail() 을 호출한다
    • (중요) tuple 은 tuple 이 제공된 Spout task 로 ack() 나 fail() 을 호출한다

message processing 을 보장받기 위해 Spout 이 해야 하는 일은 무엇인가?

  • 예시 : KestrelSpout
  • Kestrel queue 에서 message 를 가져올 때, 메시지를 “open” 한다
    • queue 에서 바로 제거되지 않고, “PENDING” 상태로 남겨두는 것
      • 다른 consumer 에게 전달되지 않는다
      • ack 되면 queue 에서 제거된다
    • 클라이언트가 연결을 끊으면 해당 클라이언트가 PENDING 상태로 만든 모든 메시지는 queue 로 되돌아간다
  • 메시지가 “open” 될 때, Kestrel 은 메시지와 함께 고유 id (unique id) 를 제공한다
  • tuple 을 SpoutOutputCollector 로 방출할 때, Kestrel 에서 제공하는 unique id 를 message id 로 사용한다
  • Storm 에 의해 ack() 이나 fail() 이 호출되면, 같이 제공되는 message id 를 통해 Kestrel queue 에 ack 처리하거나 queue 로 되돌린다

Storm 의 신뢰성(reliability) API 란?

  • 신뢰성을 얻기 위해 해야 하는 작업
    • tuple tree 에 node 가 추가되는 경우 Storm 에 알려야 함
      • OutputCollector.emit()
      • Anchored
        • _collector.emit(tuple, new Values(word));
        • output tuple(word) 는 input tuple 에 anchor 된다
        • output tuple 이 실패하면 spout tuple 이 재처리된다
        • output tuple 을 여러 input tuple 로 anchor 를 걸 수 있다
          • tuple-dag
          • output tuple 이 실패하면 input tuple 들의 root (spout) tuple 들이 모두 재처리된다
          • List<Tuple> anchors = newArrayList<Tuple>();
            anchors.add(tuple1);
            anchors.add(tuple2);
            _connector.emit(anchors, new Values(1,23));
      • Unanchored
        • _collector.emit(new Values(word));
        • output tuple 은 tuple tree 에 추가되지 않는다 (input tuple 에 anchor 되지 않는다)
          • tuple tree 의 성공/실패 여부에 영향을 끼치지 않는다
    • 개별 tuple 이 처리 완료 되었을 때 Storm 에 알려야 함
      • Ack
        • OutputCollector.ack()
          • _collector.ack(tuple);
      • Fail
        • OutputCollector.fail()
          • _collector.fail(tuple);
        • tuple tree 의 처리 실패를 명시적으로 알린다
          • timeout 까지 기다리지 않고 spout tuple 을 바로 재처리할 수 있다
      • 처리된 모든 tuple 은 ack 나 fail 처리해야 한다
        • Storm 은 tuple 을 추적하는 데 메모리를 소비
          • 메모리를 전부 소진할 수 있다
    • 일반적인 bolt 패턴
      • input tuple 을 읽고, 가공된 tuple 들을 방출하며, input tuple 을 ack 처리
      • IBasicBolt 는 이런 패턴을 캡슐화
        • public void execute(Tuple tuple, BasicOutputCollector collector)
        • BasicOutputCollector.emit() 은 output tuple 을 자동으로 input tuple 과 anchor 시켜 준다
        • execute() 메소드가 정상적으로 종료되면 자동으로 input tuple 을 ack 처리한다
        • stream 들을 aggregation 하거나 join 하는 경우 IBasicBolt 의 제공 패턴을 벗어남

tuple 이 재처리되는 것을 가정하면 어떻게 어플리케이션이 정확하게 동작하게 만들수 있는가?

  • 정확히 데이터가 한 번만 처리되게 하는 기능 필요
  • 0.7.0 은 Transactional topologies 가 추가됨
  • 0.8 에서는 deprecated 됨
    • Trident API 사용

Storm 은 어떻게 신뢰성을 효율적으로 구현했는가?

  • Acker task
    • 모든 spout tuple 에 대해 tuple tree 를 추적
    • tree 가 종료된 것으로 보이면, spout tuple 을 만든 spout task 에 메시지를 ack
    • Config.TOPOLOGY_ACKERS 설정으로 수를 조정 가능
      • 기본값 1
      • 많은 수의 tuple 을 처리하는 경우 늘린다
  • tuple 과 tuple tree 의 lifecycle
    • tuple 생성 시 임의의 64bit id 를 할당받는다
      • acker 의 추적 용으로 사용
    • 모든 tuple 은 자신이 속한 tuple tree 들의 spout tuple id 를 알고 있다
    • tuple 이 anchor 되면, source tuple 들의 spout tuple id 들을 target tuple 에 복사한다
    • tuple 이 ack 되면 tuple tree 가 어떻게 변경되었는지 정보를 적절한 acker task 에게 보낸다
      • 이 tuple 은 처리가 완료되었다
      • 이 tuple 처리로 다른 tuple 들이 anchor 되었다
    • ack_tree
      C 가 처리되면서 D, E가 C 에 anchor 된 경우
  • spout tuple 을 처리할 acker task 선정방법
    • mod hashing
    • spout tuple 이 emit 될 때 acker task 에 관리 위임
    • acker task 가 tuple tree 가 완료되었다고 판단하면 받았던 spout 의 task id 로 ack 전달
  • acker task 는 추적을 위해 모든 tuple tree 를 보관하진 않는다
    • spout tree 별로 20 바이트 가량의 메모리만 사용
    • (spout tuple id, [spout task id, ack val])
      • ack val
        • 생성되거나 ack 된 tuple id 들을 xor 한 값
        • ack val 이 0이 되면 tuple tree 처리가 종료된 것
        • 충돌로 인해 ack val 이 0이 될 확률은 거의 없음
  • failure case 에 대한 처리
    • task 가 비정상 종료되어 tuple 이 ack 되지 않음
      • tuple tree 처리가 timeout 되어 spout tuple 이 재처리됨
    • acker task 가 비정상 종료
      • acker 가 관리하던 모든 tuple tree 들이 timeout 되고 spout tuple 들이 재처리됨
    • spout task 가 비정상 종료
      • 데이터를 공급하는 외부에서 재처리를 보증해야 함
      • 예 : Kestrel 이나 RabbitMQ 는 pending message 가 있는 client 가 disconnect 되면 message 를 queue 로 복원한다

신뢰성 튜닝

  • acker task 는 가볍다
    • topology 내에 많은 수가 필요하지 않다
    • Storm UI 의 __acker 를 통해 처리량을 보고 acker task 를 늘릴 수 있다
  • 신뢰성이 중요하지 않다면, tuple tree 를 추적하지 않을 수도 있다
    • Config.TOPOLOGY_ACKERS 를 0으로 설정
      • spout tuple 이 emit 되자마자 ack 가 호출됨
      • tuple tree 는 추적되지 않음
    • SpoutOutputCollector.emit() 호출 시 message id 를 부여하지 않음
    • unanchor 로 output tuple 을 생성하여 특정 sub-tree 만 추적하지 않게 할 수 있음
Advertisements

Written by kabhwan

2013년 8월 18일 , 시간: 10:37 오후

Storm에 게시됨

Tagged with

답글 남기기

아래 항목을 채우거나 오른쪽 아이콘 중 하나를 클릭하여 로그 인 하세요:

WordPress.com 로고

WordPress.com의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Twitter 사진

Twitter의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Facebook 사진

Facebook의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

Google+ photo

Google+의 계정을 사용하여 댓글을 남깁니다. 로그아웃 / 변경 )

%s에 연결하는 중

%d 블로거가 이것을 좋아합니다: