SQL에서 NoSQL 데이터베이스로 데이터를 전송하는 DBInputFormat



이 블로그의 목적은 SQL 데이터베이스에서 HDFS로 데이터를 전송하는 방법, SQL 데이터베이스에서 NoSQL 데이터베이스로 데이터를 전송하는 방법을 배우는 것입니다.

이 블로그에서는 Hadoop 기술의 가장 중요한 구성 요소 중 하나 인 MapReduce의 기능과 가능성을 살펴 봅니다.

오늘날 기업들은 대용량 데이터를 효과적으로 처리 할 수있는 능력 때문에 데이터 스토리지를위한 첫 번째 선택으로 Hadoop 프레임 워크를 채택하고 있습니다. 그러나 우리는 또한 데이터가 다재다능하고 다양한 구조와 형식으로 존재한다는 것을 알고 있습니다. 매우 다양한 데이터와 다양한 형식을 제어하려면 모든 종류를 수용하면서도 효과적이고 일관된 결과를 생성하는 메커니즘이 있어야합니다.





Hadoop 프레임 워크에서 가장 강력한 구성 요소는 MapReduce로 데이터 및 구조에 대한 제어를 다른 구성 요소보다 더 잘 제공 할 수 있습니다. 학습 곡선의 오버 헤드와 프로그래밍 복잡성이 필요하지만 이러한 복잡성을 처리 할 수 ​​있다면 Hadoop으로 모든 종류의 데이터를 확실히 처리 할 수 ​​있습니다.

MapReduce 프레임 워크는 모든 처리 작업을 기본적으로 Map과 Reduce의 두 단계로 나눕니다.



이러한 단계를 위해 원시 데이터를 준비하려면 몇 가지 기본 클래스와 인터페이스를 이해해야합니다. 이러한 재 처리를위한 슈퍼 클래스는 InputFormat.

그만큼 InputFormat class는 Hadoop MapReduce API의 핵심 클래스 중 하나입니다. 이 클래스는 두 가지 주요 사항을 정의합니다.

  • 데이터 분할
  • 기록 독자

데이터 분할 개별 맵 작업의 크기와 잠재적 실행 서버를 모두 정의하는 Hadoop MapReduce 프레임 워크의 기본 개념입니다. 그만큼 레코드 리더 입력 파일에서 실제로 레코드를 읽고이를 매퍼에게 제출하는 역할을합니다.



매퍼 수는 분할 수에 따라 결정됩니다. 분할을 만드는 것은 InputFormat의 작업입니다. 대부분의 시간 분할 크기는 블록 크기와 동일하지만 항상 HDFS 블록 크기를 기반으로 분할이 생성되는 것은 아닙니다. InputFormat의 getSplits () 메서드가 어떻게 재정의되었는지에 따라 전적으로 달라집니다.

MR 분할과 HDFS 블록에는 근본적인 차이가 있습니다. 블록은 물리적 데이터 청크 인 반면 분할은 매퍼가 읽는 논리적 청크입니다. 분할은 입력 데이터를 포함하지 않으며 데이터의 참조 또는 주소 만 보유합니다. 분할에는 기본적으로 두 가지가 있습니다. 바이트 단위의 길이와 문자열 인 저장 위치 집합입니다.

이를 더 잘 이해하기 위해 MR을 사용하여 MySQL에 저장된 데이터를 처리하는 한 가지 예를 들어 보겠습니다. 이 경우 블록의 개념이 없기 때문에“분할은 항상 HDFS 블록을 기반으로 생성된다”라는 이론,실패. 한 가지 가능성은 MySQL 테이블의 행 범위를 기반으로 분할을 만드는 것입니다 (관계형 데이터베이스에서 데이터를 읽기위한 입력 형식 인 DBInputFormat이 수행하는 작업입니다). n 개의 행으로 구성된 k 개의 분할이있을 수 있습니다.

입력 파일의 총 크기 (바이트)를 기반으로 분할이 생성되는 것은 FileInputFormat (파일에 저장된 데이터를 처리하기위한 InputFormat)을 기반으로하는 InputFormats에만 해당됩니다. 그러나 입력 파일의 FileSystem 블록 크기는 입력 분할에 대한 상한으로 처리됩니다. HDFS 블록 크기보다 작은 파일이있는 경우 해당 파일에 대해 하나의 매퍼 만 제공됩니다. 다른 동작을 원하면 mapred.min.split.size를 사용할 수 있습니다. 그러나 다시 InputFormat의 getSplits ()에만 의존합니다.

org.apache.hadoop.mapreduce.lib.input 패키지에서 사용할 수있는 기존 입력 형식이 너무 많습니다.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

C ++ 범위 연산자

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

기본값은 TextInputFormat입니다.

마찬가지로 리듀서에서 데이터를 읽고 HDFS에 저장하는 출력 형식이 너무 많습니다.

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

기본값은 TextOutputFormat입니다.

이 블로그를 다 읽었을 때 다음을 배웠을 것입니다.

  • 지도 축소 프로그램 작성 방법
  • Mapreduce에서 사용할 수있는 다양한 유형의 InputFormat 정보
  • InputFormats의 필요성은 무엇입니까?
  • 사용자 지정 InputFormat을 작성하는 방법
  • SQL 데이터베이스에서 HDFS로 데이터를 전송하는 방법
  • SQL (여기서는 MySQL) 데이터베이스에서 NoSQL 데이터베이스 (여기서는 Hbase)로 데이터를 전송하는 방법
  • 한 SQL 데이터베이스에서 SQL 데이터베이스의 다른 테이블로 데이터를 전송하는 방법 (동일한 SQL 데이터베이스에서이 작업을 수행하는 경우 그다지 중요하지 않을 수 있습니다. 그러나 동일한 지식을 갖는 데 아무런 문제가 없습니다. 사용 방법)

전제 조건 :

  • Hadoop 사전 설치
  • SQL 사전 설치
  • Hbase 사전 설치
  • 자바 기본 이해
  • MapReduce 지식
  • Hadoop 프레임 워크 기본 지식

여기서 해결할 문제 설명을 이해하겠습니다.

관계형 데이터베이스 Edureka의 MySQL DB에 직원 테이블이 있습니다. 이제 비즈니스 요구 사항에 따라 관계형 DB에서 사용할 수있는 모든 데이터를 Hadoop 파일 시스템, 즉 Hbase로 알려진 NoSQL DB 인 HDFS로 이동해야합니다.

이 작업을 수행 할 수있는 많은 옵션이 있습니다.

  • Sqoop
  • 플룸
  • MapReduce

이제이 작업을 위해 다른 도구를 설치하고 구성하지 않으려 고합니다. Hadoop의 처리 프레임 워크 인 MapReduce는 하나의 옵션 만 남았습니다. MapReduce 프레임 워크는 전송하는 동안 데이터를 완전히 제어 할 수 있습니다. 열을 조작하고 두 대상 위치 중 하나에 직접 배치 할 수 있습니다.

노트 :

  • MySQL 테이블에서 테이블을 가져 오려면 MySQL 커넥터를 다운로드하여 Hadoop의 클래스 경로에 넣어야합니다. 이를 위해 com.mysql.jdbc_5.1.5.jar 커넥터를 다운로드하여 Hadoop_home / share / Hadoop / MaPreduce / lib 디렉토리에 보관하십시오.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • 또한 MR 프로그램이 Hbase에 액세스 할 수 있도록 모든 Hbase jar를 Hadoop 클래스 경로 아래에 두십시오. 이렇게하려면 다음 명령을 실행하십시오. :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

이 작업을 실행하는 데 사용한 소프트웨어 버전은 다음과 같습니다.

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • 이클립스 문

호환성 문제에서 프로그램을 피하기 위해 독자에게 유사한 환경에서 명령을 실행하도록 규정합니다.

사용자 지정 DBInputWritable :

package com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implements Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException // Resultset 객체는 SQL 문에서 반환 된 데이터를 나타냅니다. {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) throws IOException { } public void write (PreparedStatement ps) throws SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return name} public String getDept () {return dept}}

사용자 지정 DBOutputWritable :

package com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable 구현 Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException {} public void write (DataOutput out) throws IOException {} public void write (PreparedStatement ps) throws SQLException {ps.setString (1, name) ps.setInt (2, id) ps.setString (3, dept)}}

입력 테이블 :

데이터베이스 edureka 생성
테이블 생성 emp (empid int not null, name varchar (30), dept varchar (20), primary key (empid))
emp 값에 삽입 (1, 'abhay', 'developement'), (2, 'brundesh', 'test')
emp에서 * 선택

사례 1 : MySQL에서 HDFS로 전송

package com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // 드라이버 클래스'jdbc : mysql : // localhost : 3306 / edureka ', // db url'root ', // 사용자 이름'root ') // password Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.class) FileOutputFormat.setOutputPath (job, new Path (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] { 'empid', 'name', 'dept'} / / table columns) 경로 p = new Path (args [0]) FileSystem fs = FileSystem.get (new URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0 : 1)}}

이 코드 조각을 통해 소스 SQL DB에 액세스하기위한 입력 형식을 준비하거나 구성 할 수 있습니다. 매개 변수에는 드라이버 클래스, URL에는 SQL 데이터베이스의 주소, 사용자 이름 및 암호가 포함됩니다.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // 드라이버 클래스 'jdbc : mysql : // localhost : 3306 / edureka', // db url 'root', // 사용자 이름 'root') //암호

이 코드를 사용하면 데이터베이스의 테이블 세부 정보를 전달하고 작업 개체에 설정할 수 있습니다. 매개 변수는 물론 작업 인스턴스, DBWritable 인터페이스를 구현해야하는 사용자 정의 쓰기 가능 클래스, 소스 테이블 이름, 조건이 다른 경우 null, 정렬 매개 변수가 다른 경우 null, 테이블 열 목록이 각각 포함됩니다.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] { 'empid', 'name', 'dept'} // table columns)

매퍼

package com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable 공용 클래스 Map extends Mapper {
protected void map (LongWritable key, DBInputWritable value, Context ctx) {try {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (new Text (name + ''+ id + ''+ dept), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

감속기 : ID 감속기 사용

실행할 명령 :

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

출력 : HDFS로 전송 된 MySQL 테이블

hadoop dfs -ls / dbtohdfs / *

사례 2 : MySQL의 한 테이블에서 MySQL의 다른 테이블로 전송

MySQL에서 출력 테이블 생성

employee1 (name varchar (20), id int, dept varchar (20)) 테이블 생성

package com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) throws Exception {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // driver class 'jdbc : mysql : // localhost : 3306 / edureka ', // db url'root ', // user name'root ') // password Job job = new Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] { 'empid ','name ','dept '} // 테이블 열) DBOutputFormat.setOutput (job,'employee1 ', // 출력 테이블 이름 new String [] {'name ','id ','dept '} // table 열) System.exit (job.waitForCompletion (true)? 0 : 1)}}

이 코드를 통해 SQL DB에서 출력 테이블 이름을 구성 할 수 있습니다. 매개 변수는 각각 작업 인스턴스, 출력 테이블 이름 및 출력 열 이름입니다.

DBOutputFormat.setOutput (job, 'employee1', // 출력 테이블 이름 new String [] { 'name', 'id', 'dept'} // table columns)

매퍼 : 사례 1과 동일

감속기 :

package com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce extends Reducer {protected void reduce (Text key, Iterable values, Context ctx) {int sum = 0 String line [] = key.toString (). split ( '') try {ctx.write (new DBOutputWritable (line [0] .toString (), Integer.parseInt (line [1] .toString ()), line [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

실행할 명령 :

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

출력 : MySQL의 EMP 테이블에서 MySQL의 다른 테이블 Employee1로 데이터 전송

사례 3 : MySQL의 테이블에서 NoSQL (Hbase) 테이블로 전송

SQL 테이블의 출력을 수용하기 위해 Hbase 테이블 만들기 :

'직원', 'official_info'생성

드라이버 클래스 :

package Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) throws Exception {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // 드라이버 클래스 'jdbc : mysql : // localhost : 3306 / edureka' , // db url 'root', // 사용자 이름 'root') // password Job job = new Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ( 'employee', Reduce.class, job) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (TableOutputFormat. class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // input table name null, null, new String [] { 'empid', 'name', 'dept'} // table columns) System.exit (job.waitForCompletion (true)? 0 : 1)}}

이 코드를 사용하면 hbase의 경우 ImmutableBytesWritable 인 출력 키 클래스를 구성 할 수 있습니다.

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

여기서 우리는 테이블에서 작동 할 hbase 테이블 이름과 감속기를 전달합니다.

TableMapReduceUtil.initTableReducerJob ( 'employee', Reduce.class, job)

매퍼 :

package Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map extends Mapper {private IntWritable one = new IntWritable (1) protected void map (LongWritable id, DBInputWritable value, Context context) {try {String line = value.getName () String cd = value.getId () + ''String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

이 코드에서는 DBinputwritable 클래스의 getter에서 값을 가져 와서 전달합니다.
ImmutableBytesWritable을 사용하여 Hbase가 이해하는 bytewriatble 형식의 감속기에 도달합니다.

String line = value.getName () String cd = value.getId () + ''String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ''+ dept ))

감속기 :

package Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce extends TableReducer {public void reduce (ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {String [] cause = null // loop values for (Text val : values) {cause = val.toString (). split ( '')} // HBase에 넣기 put = new Put (key.get ()) put.add (Bytes.toBytes ( 'official_info' ), Bytes.toBytes ( 'name'), Bytes.toBytes (cause [0])) put.add (Bytes.toBytes ( 'official_info'), Bytes.toBytes ( 'department'), Bytes.toBytes (cause [1 ])) context.write (key, put)}}

이 코드 조각을 통해 감속기의 값을 저장할 정확한 행과 열을 결정할 수 있습니다. 여기서 우리는 empid를 고유 한 row key로 만들었 기 때문에 각 empid를 별도의 행에 저장합니다. 각 행에서 'name'및 'department'열에 각각 'official_info'열 패밀리 아래에 직원의 공식 정보를 저장합니다.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ( 'official_info'), Bytes.toBytes ( 'name'), Bytes.toBytes (cause [0])) put.add (Bytes. toBytes ( 'official_info'), Bytes.toBytes ( 'department'), Bytes.toBytes (cause [1])) context.write (key, put)

Hbase에서 전송 된 데이터 :

직원 스캔

우리는 비즈니스 데이터를 관계형 SQL DB에서 NoSQL DB로 성공적으로 마이그레이션하는 작업을 완료 할 수있었습니다.

다음 블로그에서는 다른 입력 및 출력 형식에 대한 코드를 작성하고 실행하는 방법에 대해 알아 봅니다.

의견, 질문 또는 피드백을 계속 게시하십시오. 나는 당신의 의견을 듣고 싶습니다.

질문이 있으십니까? 의견란에 언급 해 주시면 연락 드리겠습니다.

관련 게시물:

오버로딩과 오버라이드의 차이