Spark를 사용하는 RDD : Apache Spark의 빌딩 블록



Spark를 사용하는 RDD에 대한이 블로그는 Spark의 기본 단위 인 RDD에 대한 상세하고 포괄적 인 지식을 제공합니다.

, 단어 자체만으로도 모든 Hadoop 엔지니어의 마음에 불꽃을 일으키기에 충분합니다. n 메모리 내 가공 도구 이는 클러스터 컴퓨팅에서 번개처럼 빠릅니다. MapReduce와 비교하여 인 메모리 데이터 공유로 인해 RDD가 10 ~ 100 배 더 빨리 RDD (Resilient Distributed Data set) 덕분에이 모든 것이 가능합니다. Spark 기사를 사용하는이 RDD에서 오늘 초점을 맞춘 핵심 사항은 다음과 같습니다.

RDD가 필요하십니까?

RDD가 필요한 이유-Spark를 사용하는 RDD

세상은 데이터 과학 발전으로 인해 . 알고리즘 기반 회귀 , , 과 실행되는 분산 반복 계산 에이션 여러 컴퓨팅 장치 간의 데이터 재사용 및 공유를 포함하는 패션.

전통 기술은 다음과 같은 안정적인 중간 및 분산 스토리지가 필요했습니다. HDFS 데이터 복제 및 데이터 직렬화를 통한 반복적 인 계산으로 구성되어 프로세스가 훨씬 느려졌습니다. 해결책을 찾는 것은 결코 쉬운 일이 아닙니다.

여기가 RDD (탄력있는 분산 데이터 세트)가 큰 그림으로 나옵니다.

RDD 는 데이터 소스에서 데이터를 가져와 RDD로 드롭하므로 사용하기 쉽고 생성하기도 쉽습니다. 또한 작업이 적용되어 처리됩니다. 그들은 분산 메모리 수집 다음과 같은 권한 읽기 전용 그리고 가장 중요한 것은 내결함성 .

만약에 어떠한 데이터 파티션 RDD는 잃어버린 , 동일 적용하여 재생성 가능 변환 손실 된 파티션에 대한 작업 혈통 , 모든 데이터를 처음부터 처리하지 않습니다. 실시간 시나리오에서 이러한 종류의 접근 방식은 데이터 손실 상황이나 시스템 다운시 기적을 일으킬 수 있습니다.

RDD 란 무엇입니까?

RDD 또는 ( 탄력적 인 분산 데이터 세트 )는 기본 데이터 구조 Spark에서. 용어 탄력성 데이터 또는 데이터를 자동으로 생성하는 기능을 정의합니다. 롤백 ~로 원래 상태 데이터 손실 가능성과 함께 예기치 않은 재난이 발생했을 때.

RDD에 기록 된 데이터는 다음과 같습니다. 분할 에 저장 여러 실행 가능 노드 . 실행중인 노드 실패 런타임에 즉시 백업을 가져옵니다. 다음 실행 가능한 노드 . 이것이 RDD가 다른 기존 데이터 구조와 비교할 때 고급 유형의 데이터 구조로 간주되는 이유입니다. RDD는 정형, 비정형 및 반 정형 데이터를 저장할 수 있습니다.

Spark 블로그를 사용하여 RDD를 진행하고 다른 유형의 데이터 구조보다 우위를 차지하는 RDD의 고유 한 기능에 대해 알아 보겠습니다.

RDD의 특징

  • 메모리 내 (램) 계산 : In-Memory 계산의 개념은 데이터 처리를 더 빠르고 효율적인 단계로 가져갑니다. 공연 시스템의 업그레이드되었습니다.
  • 그의 평가 : Lazy 평가라는 용어는 변형 RDD의 데이터에 적용되지만 출력이 생성되지 않습니다. 대신 적용된 변환은 다음과 같습니다. 기록되었습니다.
  • 고집 : 결과 RDD는 항상 재사용 가능.
  • 대략적인 작업 : 사용자는 다음을 통해 데이터 세트의 모든 요소에 변환을 적용 할 수 있습니다. 지도, 필터 또는 그룹화 작업.
  • 내결함성 : 데이터 손실이있는 경우 시스템은 롤백 그것으로 원래 상태 로그를 사용하여 변형 .
  • 불변성 : 정의, 검색 또는 생성 된 데이터는 변경 시스템에 로그인되면. 기존 RDD에 액세스하여 수정해야하는 경우에는 일련의 RDD를 적용하여 새 RDD를 생성해야합니다. 변환 현재 또는 이전 RDD에 대한 기능.
  • 파티셔닝 : 그것은 중요한 단위 Spark의 병렬 처리 RDD. 기본적으로 생성되는 파티션 수는 데이터 원본을 기반으로합니다. 만들려는 파티션의 수를 결정할 수도 있습니다. 맞춤 파티션 기능.

Spark를 사용하여 RDD 생성

RDD는 다음에서 만들 수 있습니다. 세 가지 방법 :

  1. 데이터 읽기 병렬화 된 컬렉션
val PCRDD = spark.sparkContext.parallelize (Array ( 'Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. 지원 변환 이전 RDD에서
val words = spark.sparkContext.parallelize (Seq ( 'Spark', 'is', 'a', 'very', 'powerful', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). foreach (println)
  1. 데이터 읽기 외부 저장 또는 같은 파일 경로 HDFS 또는 HBase
val Sparkfile = spark.read.textFile ( '/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

RDD에서 수행되는 작업 :

주로 RDD에서 수행되는 두 가지 유형의 작업이 있습니다.

  • 변형
  • 행위

변형 : 그만큼 작업 우리는 RDD에 적용합니다 필터, 액세스 수정 부모 RDD의 데이터를 생성하여 연속 RDD 불린다 변환 . 새 RDD는 이전 RDD에 대한 포인터를 반환하여 이들 간의 종속성을 보장합니다.

변환은 게으른 평가, 즉, 작업중인 RDD에 적용된 작업은 기록되지만 실행. 시스템은 트리거를 트리거 한 후 결과 또는 예외를 발생시킵니다. 동작 .

변환을 아래와 같이 두 가지 유형으로 나눌 수 있습니다.

  • 좁은 변환
  • 넓은 변화

좁은 변환 우리는 좁은 변형을 단일 파티션 RDD를 처리하는 데 필요한 데이터를 단일 파티션에서 사용할 수 있으므로 상위 RDD의 부모 ASD . 좁은 변환의 예는 다음과 같습니다.

  • 지도()
  • 필터()
  • flatMap ()
  • 분할()
  • mapPartitions ()

광범위한 변환 : 우리는 다중 파티션 새 RDD를 생성합니다. RDD를 처리하는 데 필요한 데이터는 컴퓨터의 여러 파티션에서 사용할 수 있습니다. 부모 ASD . 광범위한 변환의 예는 다음과 같습니다.

  • reduceBy ()
  • 노동 조합()

행위 : 작업이 Apache Spark를 적용하도록 지시합니다. 계산 결과 또는 예외를 드라이버 RDD로 다시 전달합니다. 다음과 같은 조치는 거의 없습니다.

  • 수집()
  • 카운트()
  • 취하다()
  • 먼저()

RDD에 대한 작업을 실제로 적용 해 보겠습니다.

해시 맵과 해시 테이블의 차이점

IPL (인도 프리미어 리그) 최고 수준의 힙한 크리켓 토너먼트입니다. 이제 오늘은 IPL 데이터 세트에 손을 대고 Spark를 사용하여 RDD를 실행 해 보겠습니다.

  • 첫째, IPL의 CSV 일치 데이터를 다운로드하겠습니다. 다운로드 후 행과 열이있는 EXCEL 파일로 보이기 시작합니다.

다음 단계에서는 스파크를 실행하고 해당 위치에서 matches.csv 파일을로드합니다.csv파일 위치는 “/user/edureka_566977/test/matches.csv”

이제 시작하겠습니다. 변환 첫 번째 부분 :

  • 지도():

우리는 사용 지도 변환 RDD의 모든 요소에 특정 변환 작업을 적용합니다. 여기에서 CKfile이라는 이름으로 RDD를 생성합니다.csv파일. States라는 또 다른 RDD를 만들어 도시 정보 저장 .

spark2-shell val CKfile = sc.textFile ( '/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val 상태 = CKfile.map (_. split ( ',') (2)) states.collect (). foreach (println)

  • 필터():

필터 변환, 이름 자체가 그 용도를 설명합니다. 이 변환 작업을 사용하여 주어진 데이터 모음에서 선택적 데이터를 필터링합니다. 우리는 적용합니다 필터 작동 올해의 IPL 경기 기록을 얻으려면 여기 2017 년 fil RDD에 저장합니다.

val fil = CKfile.filter (line => line.contains ( '2017')) fil.collect (). foreach (println)

  • flatMap () :

flatMap은 newRDD를 만들기 위해 RDD의 각 요소에 변환 작업을 적용합니다. 지도 변환과 유사합니다. 여기에서 우리는 적용합니다플랫 맵...에 하이데라바드시의 성냥을 뱉어 데이터를filRDDRDD.

val filRDD = fil.flatMap (line => line.split ( 'Hyderabad')). collect ()

  • 분할():

RDD에 쓰는 모든 데이터는 특정 개수의 파티션으로 분할됩니다. 이 변환을 사용하여 파티션 수 데이터는 실제로 분할됩니다.

val fil = CKfile.filter (line => line.contains ( '2017')) fil.partitions.size

  • mapPartitions () :

MapPatitions를 Map ()의 대안으로 간주하고각각() 함께. 여기서 mapPartitions를 사용하여 행 수 파일 RDD에 있습니다.

val fil = CKfile.filter (line => line.contains ( '2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • reduceBy () :

우리는 사용ReduceBy() 의 위에 키-값 쌍 . 우리는이 변환을csv파일을 사용하여 플레이어를 찾습니다. 가장 높은 남자 .

val ManOfTheMatch = CKfile.map (_. split ( ',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • 노동 조합():

이름은 모든 것을 설명합니다. 우리는 결합 변환을 사용하여 두 개의 RDD를 함께 클럽 . 여기에서는 fil과 fil2라는 두 개의 RDD를 생성합니다. fil RDD에는 2017 년 IPL 일치 레코드가 포함되고 fil2 RDD에는 2016 IPL 일치 레코드가 포함됩니다.

val fil = CKfile.filter (line => line.contains ( '2017')) val fil2 = CKfile.filter (line => line.contains ( '2016')) val uninRDD = fil.union (fil2)

우리는 동작 실제 출력을 보여주는 부분 :

  • 수집():

수집은 우리가 사용하는 행동입니다 내용을 표시 RDD에서.

val CKfile = sc.textFile ( '/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • 카운트():

카운트계산하는 데 사용하는 작업입니다. 레코드 수 RDD에 존재.여기이 작업을 사용하여 matches.csv 파일의 총 레코드 수를 계산합니다.

val CKfile = sc.textFile ( '/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • 취하다():

Take는 collect와 유사한 Action 작업이지만 유일한 차이점은 선택적 행 수 사용자 요청에 따라. 여기에 다음 코드를 적용하여 상위 10 개 주요 보고서.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. take (10) .foreach (println)

  • 먼저():

First ()는 collect () 및 take ()와 유사한 작업 작업입니다.그것최상위 보고서를 인쇄하는 데 사용됩니다. 출력 여기에서는 first () 연산을 사용하여 특정 도시에서 플레이되는 최대 경기 수 출력으로 뭄바이를 얻습니다.

val CKfile = sc.textFile ( '/ user / edureka_566977 / test / matches.csv') val 상태 = CKfile.map (_. split ( ',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Spark를 사용하여 RDD를 배우는 과정을 더욱 흥미롭게 만들기 위해 흥미로운 사용 사례를 생각해 냈습니다.

Spark를 사용하는 RDD : Pokemon 사용 사례

  • 첫째, Pokemon.csv 파일을 다운로드하고 Matches.csv 파일에서했던 것처럼 spark-shell에로드하겠습니다.
val PokemonDataRDD1 = sc.textFile ( '/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

실제로 포켓몬은 다양한 종류로 제공됩니다. 몇 가지 종류를 찾아 보겠습니다.

  • Pokemon.csv 파일에서 스키마 제거

우리는 개요 Pokemon.csv 파일의. 따라서 제거합니다.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • 수 찾기 파티션 pokemon.csv가 배포됩니다.
println ( 'No.ofpartitions ='+ NoHeader.partitions.size)

  • 물 포켓몬

찾기 물 포켓몬의 수

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ( 'Water')) WaterRDD.collect (). foreach (println)

  • 파이어 포켓몬

찾기 파이어 포켓몬 수

val FireRDD = PokemonDataRDD1.filter (line => line.contains ( 'Fire')) FireRDD.collect (). foreach (println)

  • 우리는 또한 인구 카운트 기능을 사용하는 다른 유형의 포켓몬
WaterRDD.count () FireRDD.count ()

  • 나는 게임을 좋아하기 때문에 방어 전략 포켓몬을 찾아 보자 최대 방어.
val defenceList = NoHeader.map {x => x.split ( ',')}. map {x => (x (6) .toDouble)} println ( 'Highest_Defence :'+ defenceList.max ())

  • 우리는 최대를 압니다 방어력 값 하지만 우리는 어떤 포켓몬인지 모릅니다. 그래서 우리는 포켓몬.
val defWithPokemonName = NoHeader.map {x => x.split ( ',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (주문 [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • 이제 포켓몬을 최소 방어
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • 이제 포켓몬을 덜 방어적인 전략.
val PokemonDataRDD2 = sc.textFile ( '/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (Head)) val defWithPokemonName2 = NoHeader2 .map {x => x.split ( ',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Ordering [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

그래서 이것으로 Spark 기사를 사용하여이 RDD를 끝냅니다. RDD에 대한 귀하의 지식, 기능 및 RDD에서 수행 할 수있는 다양한 유형의 작업에 대해 약간의 빛을 냈기를 바랍니다.

추가 기능은 무엇입니까?

이 기사는 Cloudera Hadoop 및 Spark 개발자 인증 시험 (CCA175)을 준비하도록 설계되었습니다. Spark RDD, Spark SQL, Spark MLlib 및 Spark Streaming을 포함하는 Apache Spark 및 Spark 에코 시스템에 대한 심층적 인 지식을 얻을 수 있습니다. Scala 프로그래밍 언어, HDFS, Sqoop, Flume, Spark GraphX ​​및 Kafka와 같은 메시징 시스템에 대한 포괄적 인 지식을 얻을 수 있습니다.