Un tutorial HDFS pentru analiștii de date blocați cu baze de date relaționale
Publicat: 2022-03-11Introducere
Până acum, probabil ați auzit de sistemul de fișiere distribuit Hadoop (HDFS), mai ales dacă sunteți analist de date sau cineva care este responsabil pentru mutarea datelor de la un sistem la altul. Cu toate acestea, care sunt beneficiile pe care le are HDFS față de bazele de date relaționale?
HDFS este o soluție scalabilă, open-source, pentru stocarea și procesarea unor volume mari de date. HDFS s-a dovedit a fi fiabil și eficient în multe centre de date moderne.
HDFS utilizează hardware de bază împreună cu software open source pentru a reduce costul total pe octet de stocare.
Cu replicarea încorporată și rezistența la defecțiunile discului, HDFS este un sistem ideal pentru stocarea și procesarea datelor pentru analiză. Nu necesită elementele de bază și cheltuielile generale pentru a susține atomicitatea tranzacției, consistența, izolarea și durabilitatea (ACID), așa cum este necesar în cazul sistemelor tradiționale de baze de date relaționale.
În plus, în comparație cu bazele de date comerciale și de întreprinderi, cum ar fi Oracle, utilizarea Hadoop ca platformă de analiză evită orice costuri suplimentare de licențiere.
Una dintre întrebările pe care și le pun mulți oameni atunci când învață pentru prima dată despre HDFS este: Cum introduc datele mele existente în HDFS?
În acest articol, vom examina cum să importați date dintr-o bază de date PostgreSQL în HDFS. Vom folosi Apache Sqoop, care este în prezent cea mai eficientă soluție open source pentru a transfera date între HDFS și sistemele de baze de date relaționale. Apache Sqoop este conceput pentru a încărca în bloc datele dintr-o bază de date relațională în HDFS (import) și pentru a scrie în bloc datele din HDFS într-o bază de date relațională (export).
Pașii din acest tutorial sunt scrisi pentru cineva cu cunoștințe de bază despre executarea interogărilor SQL și cunoștințe elementare despre comenzile HDFS.
Sistemul de baze de date folosit este PostgreSQL 9.5 pentru Windows, iar versiunea HDFS este Cloudera Hadoop 2.5.0-cdh5.2.0 pe o mașină virtuală Centos 6.4 Linux.
Apache Sqoop se bazează pe fișierele JAR ale driverului JDBC care sunt specifice furnizorului bazei de date relaționale și versiunii bazei de date.
Pentru a executa pașii indicați în acest articol, utilizatorul va avea nevoie de permisiuni pentru a se conecta de la distanță la baza de date PostgreSQL, permisiuni SELECT
pentru baza de date relațională, permisiuni de scriere pe HDFS și permisiuni de executare pe executabilul Sqoop.
În scopul acestui tutorial, am creat o bază de date PostgreSQL, numită-o Toptal și am făcut-o accesibilă prin portul 5432.
Sursa de date PostgreSQL
Pentru a începe, în baza noastră de date PostgreSQL Toptal
, vom crea un tabel cu date de testare numit sales
. Vom presupune că certificatul OpenSSL și fișierele cheii private există deja pe serverul PostgreSQL.
Server [localhost]: Database [postgres]: Toptal Port [5432]: Username [postgres]: Password for user postgres: psql (9.5.3) Toptal=# create table sales Toptal-# ( Toptal(# pkSales integer constraint salesKey primary key, Toptal(# saleDate date, Toptal(# saleAmount money, Toptal(# orderID int not null, Toptal(# itemID int not null Toptal(# ); CREATE TABLE
În continuare, vom insera 20 de rânduri în tabel:
Toptal=# insert into sales values (1, '2016-09-27', 1.23, 1, 1); INSERT 0 1 Toptal=# insert into sales values (2, '2016-09-27', 2.34, 1, 2); INSERT 0 1 Toptal=# insert into sales values (3, '2016-09-27', 1.23, 2, 1); INSERT 0 1 Toptal=# insert into sales values (4, '2016-09-27', 2.34, 2, 2); INSERT 0 1 Toptal=# insert into sales values (5, '2016-09-27', 3.45, 2, 3); INSERT 0 1 Toptal=# insert into sales values (6, '2016-09-28', 3.45, 3, 3); INSERT 0 1 Toptal=# insert into sales values (7, '2016-09-28', 4.56, 3, 4); INSERT 0 1 Toptal=# insert into sales values (8, '2016-09-28', 5.67, 3, 5); INSERT 0 1 Toptal=# insert into sales values (9, '2016-09-28', 1.23, 4, 1); INSERT 0 1 Toptal=# insert into sales values (10, '2016-09-28', 1.23, 5, 1); INSERT 0 1 Toptal=# insert into sales values (11, '2016-09-28', 1.23, 6, 1); INSERT 0 1 Toptal=# insert into sales values (12, '2016-09-29', 1.23, 7, 1); INSERT 0 1 Toptal=# insert into sales values (13, '2016-09-29', 2.34, 7, 2); INSERT 0 1 Toptal=# insert into sales values (14, '2016-09-29', 3.45, 7, 3); INSERT 0 1 Toptal=# insert into sales values (15, '2016-09-29', 4.56, 7, 4); INSERT 0 1 Toptal=# insert into sales values (16, '2016-09-29', 5.67, 7, 5); INSERT 0 1 Toptal=# insert into sales values (17, '2016-09-29', 6.78, 7, 6); INSERT 0 1 Toptal=# insert into sales values (18, '2016-09-29', 7.89, 7, 7); INSERT 0 1 Toptal=# insert into sales values (19, '2016-09-29', 7.89, 8, 7); INSERT 0 1 Toptal=# insert into sales values (20, '2016-09-30', 1.23, 9, 1); INSERT 0 1
Să selectăm datele pentru a verifica dacă datele arată corect:
Toptal=# select * from sales; pksales | saledate | saleamount | orderid | itemid ---------+------------+------------+---------+-------- 1 | 2016-09-27 | $1.23 | 1 | 1 2 | 2016-09-27 | $2.34 | 1 | 2 3 | 2016-09-27 | $1.23 | 2 | 1 4 | 2016-09-27 | $2.34 | 2 | 2 5 | 2016-09-27 | $3.45 | 2 | 3 6 | 2016-09-28 | $3.45 | 3 | 3 7 | 2016-09-28 | $4.56 | 3 | 4 8 | 2016-09-28 | $5.67 | 3 | 5 9 | 2016-09-28 | $1.23 | 4 | 1 10 | 2016-09-28 | $1.23 | 5 | 1 11 | 2016-09-28 | $1.23 | 6 | 1 12 | 2016-09-29 | $1.23 | 7 | 1 13 | 2016-09-29 | $2.34 | 7 | 2 14 | 2016-09-29 | $3.45 | 7 | 3 15 | 2016-09-29 | $4.56 | 7 | 4 16 | 2016-09-29 | $5.67 | 7 | 5 17 | 2016-09-29 | $6.78 | 7 | 6 18 | 2016-09-29 | $7.89 | 7 | 7 19 | 2016-09-29 | $7.89 | 8 | 7 20 | 2016-09-30 | $1.23 | 9 | 1 (20 rows)
Datele arată bine, așa că să continuăm.
Importați în HDFS folosind Sqoop
Cu sursa de date definită, acum suntem gata să importam datele în HDFS. Comanda sqoop
pe care o vom examina este listată mai jos și vom descompune fiecare argument în punctele care urmează. Rețineți că comanda ar trebui să fie pe o linie completă sau, după cum se arată mai jos, cu bara oblică inversă (caracterul de continuare a liniei de comandă Linux) la sfârșitul fiecărei linii, cu excepția ultimei.
sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/Toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales'
-
sqoop import
- executabilul se numeștesqoop
și îi solicităm să importe datele dintr-un tabel sau dintr-o vizualizare dintr-o bază de date în HDFS. -
--connect
- Cu argumentul--connect
, transmitem șirul de conectare JDBC pentru PostgreSQL. În acest caz, folosim adresa IP, numărul portului și numele bazei de date. De asemenea, trebuie să specificăm că SSL este utilizat și trebuie să furnizăm clasaSSLSocketFactory
care urmează să fie utilizată. -
--username
- În acest exemplu, numele de utilizator este o autentificare PostgreSQL, nu o autentificare Windows. Utilizatorul trebuie să aibă permisiuni pentru a se conecta la baza de date specificată și pentru a selecta din tabelul specificat. -
-P
- Acest lucru va solicita utilizatorului liniei de comandă parola. Dacă Sqoop este rar executat, aceasta ar putea fi o opțiune bună. Există mai multe alte modalități de a transmite automat parola comenzii, dar încercăm să o menținem simplă pentru acest articol. -
--table
- Aici trecem numele tabelului PostgreSQL. -
--target-dir
- Acest argument specifică directorul HDFS în care urmează să fie stocate datele. -
--split-by
- Trebuie să oferim lui Sqoop un identificator unic pentru a-l ajuta să distribuie volumul de lucru. Mai târziu, în rezultatul jobului, vom vedea unde Sqoop selectează valorile minime și maxime pentru a ajuta la stabilirea limitelor împărțite.
Este o idee bună să puneți comanda într-un script pentru repetabilitate și editare, așa cum se arată mai jos:
[hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales' [hdfs@localhost:/sqoop]$
Acum, este timpul să executați scriptul de comandă Sqoop de mai sus. Ieșirea din comanda Sqoop este prezentată mai jos.
[hdfs@localhost:/sqoop]$ ./sqoopCommand.sh 16/10/02 18:58:34 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0 Enter password: 16/10/02 18:58:40 INFO manager.SqlManager: Using default fetchSize of 1000 16/10/02 18:58:40 INFO tool.CodeGenTool: Beginning code generation 16/10/02 18:58:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "sales" AS t LIMIT 1 16/10/02 18:58:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce Note: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 16/10/02 18:58:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.jar 16/10/02 18:58:43 WARN manager.PostgresqlManager: It looks like you are importing from postgresql. 16/10/02 18:58:43 WARN manager.PostgresqlManager: This transfer can be faster! Use the --direct 16/10/02 18:58:43 WARN manager.PostgresqlManager: option to exercise a postgresql-specific fast path. 16/10/02 18:58:43 INFO mapreduce.ImportJobBase: Beginning import of sales 16/10/02 18:58:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 16/10/02 18:58:46 INFO db.DBInputFormat: Using read commited transaction isolation 16/10/02 18:58:46 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("pksales"), MAX("pksales") FROM "sales" 16/10/02 18:58:47 INFO mapred.JobClient: Running job: job_201609280401_0005 16/10/02 18:58:48 INFO mapred.JobClient: map 0% reduce 0% 16/10/02 18:59:04 INFO mapred.JobClient: map 50% reduce 0% 16/10/02 18:59:14 INFO mapred.JobClient: map 75% reduce 0% 16/10/02 18:59:15 INFO mapred.JobClient: map 100% reduce 0% 16/10/02 18:59:18 INFO mapred.JobClient: Job complete: job_201609280401_0005 16/10/02 18:59:18 INFO mapred.JobClient: Counters: 23 16/10/02 18:59:18 INFO mapred.JobClient: File System Counters 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes read=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes written=1190344 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of write operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes read=438 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes written=451 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of read operations=4 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of write operations=4 16/10/02 18:59:18 INFO mapred.JobClient: Job Counters 16/10/02 18:59:18 INFO mapred.JobClient: Launched map tasks=4 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=48877 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Map-Reduce Framework 16/10/02 18:59:18 INFO mapred.JobClient: Map input records=20 16/10/02 18:59:18 INFO mapred.JobClient: Map output records=20 16/10/02 18:59:18 INFO mapred.JobClient: Input split bytes=438 16/10/02 18:59:18 INFO mapred.JobClient: Spilled Records=0 16/10/02 18:59:18 INFO mapred.JobClient: CPU time spent (ms)=3980 16/10/02 18:59:18 INFO mapred.JobClient: Physical memory (bytes) snapshot=481574912 16/10/02 18:59:18 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2949685248 16/10/02 18:59:18 INFO mapred.JobClient: Total committed heap usage (bytes)=127401984 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Transferred 451 bytes in 33.7555 seconds (13.3608 bytes/sec) 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Retrieved 20 records. [hdfs@localhost:/sqoop]$
Observați că ultima linie de rezultate de mai sus arată că au fost preluate 20 de înregistrări, ceea ce corespunde celor 20 de înregistrări din tabelul din baza de date PostgreSQL.

După executarea comenzii Sqoop, putem executa hdfs dfs -ls
pentru a vedea directorul care a fost creat implicit cu numele tabelului pe HDFS.
[hdfs@localhost:/sqoop]$ hdfs dfs -ls Found 1 items drwxrwxrwx - toptal data 0 2016-10-02 18:59 sales [hdfs@localhost:/sqoop]$
Putem folosi din nou comanda hdfs dfs -ls
pentru a lista conținutul directorului de sales
. Dacă vă uitați pe HDFS, puteți observa că datele sunt partiționate și răspândite în patru fișiere în mod implicit, nu doar conținute într-unul singur.
[hdfs@localhost:/sqoop]$ hdfs dfs -ls sales Found 6 items -rw-rw-rw- 1 toptal data 0 2016-10-02 18:59 sales/_SUCCESS drwxrwxrwx - toptal data 0 2016-10-02 18:58 sales/_logs -rw-rw-rw- 1 toptal data 110 2016-10-02 18:59 sales/part-m-00000 -rw-rw-rw- 1 toptal data 111 2016-10-02 18:59 sales/part-m-00001 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00002 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00003 [hdfs@localhost:/sqoop]$
Comanda hdfs dfs -cat
va afișa toate înregistrările din prima partiție a datelor de vânzări de pe HDFS.
[hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00000 1,2016-09-27,1.23,1,1 2,2016-09-27,2.34,1,2 3,2016-09-27,1.23,2,1 4,2016-09-27,2.34,2,2 5,2016-09-27,3.45,2,3 [hdfs@localhost:/sqoop]$
Observați că delimitatorul implicit al fișierului este o virgulă. De asemenea, observați că există doar cinci rânduri în fiecare partiție, deoarece cele 20 de rânduri din sursă au fost distribuite în mod egal în cele patru partiții.
Pentru a limita numărul de rânduri care sunt afișate pe ecran, putem direcționa ieșirea comenzii cat
către comanda head
, așa cum se arată mai jos, pentru a verifica conținutul celorlalte trei partiții.
Argumentul -n 5
la comanda head
limitează ieșirea ecranului la primele cinci rânduri.
(Rețineți că, în cazul nostru, acest lucru este inutil, deoarece există doar cinci rânduri în fiecare partiție pentru început. În practică, totuși, veți avea probabil mult mai multe rânduri decât acestea în fiecare partiție și veți dori să verificați doar primele câteva pentru asigurați-vă că arată corect, astfel încât aceasta vă arată cum să faceți acest lucru.)
[hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00001 |head -n 5 6,2016-09-28,3.45,3,3 7,2016-09-28,4.56,3,4 8,2016-09-28,5.67,3,5 9,2016-09-28,1.23,4,1 10,2016-09-28,1.23,5,1 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00002 |head -n 5 11,2016-09-28,1.23,6,1 12,2016-09-29,1.23,7,1 13,2016-09-29,2.34,7,2 14,2016-09-29,3.45,7,3 15,2016-09-29,4.56,7,4 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00003 |head -n 5 16,2016-09-29,5.67,7,5 17,2016-09-29,6.78,7,6 18,2016-09-29,7.89,7,7 19,2016-09-29,7.89,8,7 20,2016-09-30,1.23,9,1 [hdfs@localhost:/sqoop]$
Dacă trebuie să rulați o interogare pentru a extrage date din mai multe tabele din baza de date PostgreSQL, acest lucru poate fi realizat cu următoarea comandă:
[hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --target-dir 'creditCardOrders' \ --split-by 'pksales' \ --query "select s.pksales, s.saledate, s.saleamount, o.shippingtype, o.methodofpayment from sales s inner join orders o on s.orderid=o.orderid where o.methodofpayment='credit card' and \$CONDITIONS" [hdfs@localhost:/sqoop]$
În comanda de mai sus, folosim unele dintre aceleași argumente pentru comanda Sqoop, dar ele capătă o importanță diferită atunci când sunt utilizate cu o comandă SQL.
-
--target-dir
- Directorul țintă îi spune lui Sqoop în ce director de pe HDFS să stocheze datele selectate. Acest argument este cerut de Sqoop atunci când se utilizează o interogare în formă liberă. -
--split-by
- Chiar dacă selectăm cheia primară a tabelului de vânzări, tot trebuie să furnizăm Sqoop un identificator unic pentru a-l ajuta să distribuie volumul de lucru. -
--query
- Acesta este argumentul în care furnizăm interogarea SQL. Interogarea de mai sus este cuprinsă între ghilimele duble. Observați că nu există o bară oblică inversă (caracterul de continuare a liniei) în liniile multiple care conțin interogarea. Observați, de asemenea,and \$CONDITIONS
la sfârșitul clauzeiWHERE
. Acest lucru este cerut de Sqoop, deoarece Sqoop va înlocui automat jetonul$CONDITIONS
cu o expresie unică.
Probleme sau fără probleme: ar trebui să luați în considerare HDFS
HDFS are multe avantaje față de bazele de date relaționale. Dacă faceți o analiză de date, ar trebui să luați în considerare migrarea datelor dvs. la HDFS, astăzi.
Cu abilitățile învățate aici, importarea datelor dintr-un sistem de baze de date relaționale în HDFS este un proces simplu și direct care poate fi realizat cu o singură comandă. În timp ce aceste exemple au un număr mic de rânduri, mecanica importului de volume mari de date în HDFS dintr-un tabel al bazei de date PostgreSQL rămâne aceeași.
Puteți chiar să experimentați cu importarea tabelelor mari și cu variații delimitatori de stocare. Utilizarea Apache Sqoop este mai eficientă decât exportarea datelor bazei de date într-un fișier, transferul fișierului de pe serverul bazei de date pe HDFS și apoi încărcarea fișierului în HDFS.