Apache Spark Streaming의 누적 상태 저장 변환



이 블로그 게시물에서는 Spark Streaming의 상태 저장 변환에 대해 설명합니다. Hadoop Spark 경력을위한 누적 추적 및 기술 향상에 대해 모두 알아보십시오.

Prithviraj Bose 제공

이전 블로그에서 Apache Spark Streaming의 창 개념을 사용하는 상태 저장 변환에 대해 설명했습니다. 읽을 수 있습니다 여기 .





이 게시물에서는 Apache Spark Streaming의 누적 상태 저장 작업에 대해 설명합니다. Spark Streaming을 처음 사용하는 경우 창 작동 방식을 이해하기 위해 이전 블로그를 읽는 것이 좋습니다.

스파크 스트리밍의 상태 저장 변환 유형 (계속…)

> 누적 추적

우리는 reduceByKeyAndWindow (…) 키의 상태를 추적하는 API, 그러나 윈도우 화는 특정 사용 사례에 제한을 둡니다. 시간 창으로 제한하지 않고 전체적으로 키의 상태를 누적하려면 어떻게해야합니까? 이 경우 우리는 updateStateByKey (…) 불.



이 API는 Spark 1.3.0에서 도입되었으며 매우 인기가 있습니다. 그러나이 API에는 약간의 성능 오버 헤드가 있으며 시간이 지남에 따라 상태 크기가 증가함에 따라 성능이 저하됩니다. 이 API의 사용법을 보여주는 샘플을 작성했습니다. 코드를 찾을 수 있습니다. 여기 .

Spark 1.6.0은 새로운 API를 도입했습니다. mapWithState (…) 이로 인해 발생하는 성능 오버 헤드를 해결합니다. updateStateByKey (…) . 이 블로그에서는 내가 작성한 샘플 프로그램을 사용하여이 특정 API에 대해 논의 할 것입니다. 코드를 찾을 수 있습니다. 여기 .

코드 연습을 시작하기 전에 체크 포인트에 대해 몇 마디 만하겠습니다. 모든 상태 저장 변환의 경우 체크 포인트가 필수입니다. 검사 점은 드라이버 프로그램이 실패 할 경우 키 상태를 복원하는 메커니즘입니다. 드라이버가 다시 시작되면 체크 포인트 파일에서 키 상태가 복원됩니다. 체크 포인트 위치는 일반적으로 HDFS, Amazon S3 또는 신뢰할 수있는 스토리지입니다. 코드를 테스트하는 동안 로컬 파일 시스템에 저장할 수도 있습니다.



샘플 프로그램에서 우리는 host = localhost 및 port = 9999에서 소켓 텍스트 스트림을 수신합니다. 들어오는 스트림을 (단어, 발생 횟수)로 토큰 화하고 1.6.0 API를 사용하여 단어 수를 추적합니다. mapWithState (…) . 또한 업데이트가없는 키는 StateSpec.timeout API. HDFS에서 검사 점을 지정하고 있으며 검사 점 빈도는 20 초마다입니다.

자바에서 객체 배열 선언

먼저 Spark Streaming 세션을 생성하겠습니다.

Spark-streaming-session

우리는 checkpointDir HDFS에서 다음 개체 메서드를 호출하십시오. getOrCreate (…) . 그만큼 getOrCreate API는 checkpointDir 복원 할 이전 상태가 있는지 확인하기 위해 (존재하는 경우) Spark Streaming 세션을 다시 생성하고 새 데이터로 이동하기 전에 파일에 저장된 데이터에서 키 상태를 업데이트합니다. 그렇지 않으면 새로운 Spark Streaming 세션이 생성됩니다.

그만큼 getOrCreate 체크 포인트 디렉토리 이름과 함수 (우리가 createFunc ) 누구의 서명이어야 하는가 () => StreamingContext .

내부 코드를 살펴 보겠습니다. createFunc .

2 행 : 작업 이름이 'TestMapWithStateJob'이고 배치 간격 = 5 초인 스트리밍 컨텍스트를 만듭니다.

5 행 : 체크 포인트 디렉토리를 설정합니다.

8 행 : 클래스를 사용하여 상태 사양 설정 org.apache.streaming.StateSpec 목적. 먼저 상태를 추적 할 함수를 설정 한 다음 후속 변환 중에 생성 될 결과 DStream의 파티션 수를 설정합니다. 마지막으로 키에 대한 업데이트가 30 초 내에 수신되지 않으면 키 상태가 제거되는 시간 제한 (30 초)을 설정합니다.

라인 12 # : 소켓 스트림 설정, 수신 배치 데이터 플랫 화, 키-값 쌍 생성, 호출 mapWithState , 체크 포인트 간격을 20 초로 설정하고 마지막으로 결과를 인쇄합니다.

Spark 프레임 워크는 th를 호출합니다. e createFunc 이전 값과 현재 상태를 가진 모든 키에 대해. 합계를 계산하고 누적 합계로 상태를 업데이트하고 마지막으로 키에 대한 합계를 반환합니다.

클래스 싱글 톤을 만드는 방법

Github 소스-> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

질문이 있으십니까? 의견란에 언급 해 주시면 연락 드리겠습니다.

관련 게시물:

Apache Spark 및 Scala 시작하기

Spark Streaming에서 창을 사용한 상태 저장 변환