Un tutorial HDFS per analisti di dati bloccati con database relazionali

Pubblicato: 2022-03-11

introduzione

A questo punto, probabilmente hai sentito parlare di Hadoop Distributed File System (HDFS), specialmente se sei un analista di dati o qualcuno che è responsabile dello spostamento dei dati da un sistema all'altro. Tuttavia, quali sono i vantaggi di HDFS rispetto ai database relazionali?

HDFS è una soluzione scalabile e open source per l'archiviazione e l'elaborazione di grandi volumi di dati. HDFS ha dimostrato di essere affidabile ed efficiente in molti data center moderni.

HDFS utilizza hardware di base insieme a software open source per ridurre il costo complessivo per byte di storage.

Grazie alla replica integrata e alla resilienza ai guasti del disco, HDFS è un sistema ideale per l'archiviazione e l'elaborazione dei dati per l'analisi. Non richiede le basi e le spese generali per supportare l'atomicità, la coerenza, l'isolamento e la durabilità delle transazioni (ACID) come è necessario con i tradizionali sistemi di database relazionali.

Inoltre, rispetto ai database aziendali e commerciali, come Oracle, l'utilizzo di Hadoop come piattaforma di analisi evita costi di licenza aggiuntivi.

Una delle domande che molte persone si pongono quando apprendono per la prima volta l'HDFS è: come faccio a trasferire i miei dati esistenti nell'HDFS?

In questo articolo, esamineremo come importare dati da un database PostgreSQL in HDFS. Useremo Apache Sqoop, che è attualmente la soluzione open source più efficiente per trasferire dati tra HDFS e sistemi di database relazionali. Apache Sqoop è progettato per caricare in blocco i dati da un database relazionale nell'HDFS (importazione) e per scrivere in blocco i dati dall'HDFS in un database relazionale (esportazione).

HDFS

Accelera l'analisi migrando i tuoi dati nell'HDFS.
Twitta

I passaggi di questo tutorial sono scritti per qualcuno con una conoscenza di base dell'esecuzione di query SQL e una conoscenza elementare dei comandi HDFS.

Il sistema di database utilizzato è PostgreSQL 9.5 per Windows e la versione HDFS è Cloudera Hadoop 2.5.0-cdh5.2.0 su una macchina virtuale Linux Centos 6.4.

Apache Sqoop si basa sui file JAR del driver JDBC specifici per il fornitore del database relazionale e la versione del database.

Per eseguire i passaggi mostrati in questo articolo, l'utente avrà bisogno delle autorizzazioni per connettersi in remoto al database PostgreSQL, SELECT autorizzazioni sul database relazionale, scrivere autorizzazioni su HDFS ed eseguire autorizzazioni sull'eseguibile Sqoop.

Ai fini di questo tutorial, abbiamo creato un database PostgreSQL, chiamato Toptal e reso accessibile tramite la porta 5432.

Origine dati PostgreSQL

Per iniziare, nel nostro database PostgreSQL Toptal creeremo una tabella di dati di test denominata sales . Assumiamo che il certificato OpenSSL e i file della chiave privata esistano già sul server PostgreSQL.

 Server [localhost]: Database [postgres]: Toptal Port [5432]: Username [postgres]: Password for user postgres: psql (9.5.3) Toptal=# create table sales Toptal-# ( Toptal(# pkSales integer constraint salesKey primary key, Toptal(# saleDate date, Toptal(# saleAmount money, Toptal(# orderID int not null, Toptal(# itemID int not null Toptal(# ); CREATE TABLE

Successivamente, inseriremo 20 righe nella tabella:

 Toptal=# insert into sales values (1, '2016-09-27', 1.23, 1, 1); INSERT 0 1 Toptal=# insert into sales values (2, '2016-09-27', 2.34, 1, 2); INSERT 0 1 Toptal=# insert into sales values (3, '2016-09-27', 1.23, 2, 1); INSERT 0 1 Toptal=# insert into sales values (4, '2016-09-27', 2.34, 2, 2); INSERT 0 1 Toptal=# insert into sales values (5, '2016-09-27', 3.45, 2, 3); INSERT 0 1 Toptal=# insert into sales values (6, '2016-09-28', 3.45, 3, 3); INSERT 0 1 Toptal=# insert into sales values (7, '2016-09-28', 4.56, 3, 4); INSERT 0 1 Toptal=# insert into sales values (8, '2016-09-28', 5.67, 3, 5); INSERT 0 1 Toptal=# insert into sales values (9, '2016-09-28', 1.23, 4, 1); INSERT 0 1 Toptal=# insert into sales values (10, '2016-09-28', 1.23, 5, 1); INSERT 0 1 Toptal=# insert into sales values (11, '2016-09-28', 1.23, 6, 1); INSERT 0 1 Toptal=# insert into sales values (12, '2016-09-29', 1.23, 7, 1); INSERT 0 1 Toptal=# insert into sales values (13, '2016-09-29', 2.34, 7, 2); INSERT 0 1 Toptal=# insert into sales values (14, '2016-09-29', 3.45, 7, 3); INSERT 0 1 Toptal=# insert into sales values (15, '2016-09-29', 4.56, 7, 4); INSERT 0 1 Toptal=# insert into sales values (16, '2016-09-29', 5.67, 7, 5); INSERT 0 1 Toptal=# insert into sales values (17, '2016-09-29', 6.78, 7, 6); INSERT 0 1 Toptal=# insert into sales values (18, '2016-09-29', 7.89, 7, 7); INSERT 0 1 Toptal=# insert into sales values (19, '2016-09-29', 7.89, 8, 7); INSERT 0 1 Toptal=# insert into sales values (20, '2016-09-30', 1.23, 9, 1); INSERT 0 1

Selezioniamo i dati per verificare che i dati siano corretti:

 Toptal=# select * from sales; pksales | saledate | saleamount | orderid | itemid ---------+------------+------------+---------+-------- 1 | 2016-09-27 | $1.23 | 1 | 1 2 | 2016-09-27 | $2.34 | 1 | 2 3 | 2016-09-27 | $1.23 | 2 | 1 4 | 2016-09-27 | $2.34 | 2 | 2 5 | 2016-09-27 | $3.45 | 2 | 3 6 | 2016-09-28 | $3.45 | 3 | 3 7 | 2016-09-28 | $4.56 | 3 | 4 8 | 2016-09-28 | $5.67 | 3 | 5 9 | 2016-09-28 | $1.23 | 4 | 1 10 | 2016-09-28 | $1.23 | 5 | 1 11 | 2016-09-28 | $1.23 | 6 | 1 12 | 2016-09-29 | $1.23 | 7 | 1 13 | 2016-09-29 | $2.34 | 7 | 2 14 | 2016-09-29 | $3.45 | 7 | 3 15 | 2016-09-29 | $4.56 | 7 | 4 16 | 2016-09-29 | $5.67 | 7 | 5 17 | 2016-09-29 | $6.78 | 7 | 6 18 | 2016-09-29 | $7.89 | 7 | 7 19 | 2016-09-29 | $7.89 | 8 | 7 20 | 2016-09-30 | $1.23 | 9 | 1 (20 rows)

I dati sembrano buoni, quindi procediamo.

Importa in HDFS usando Sqoop

Con l'origine dati definita, siamo ora pronti per importare i dati nell'HDFS. Il comando sqoop che esamineremo è elencato di seguito e analizzeremo ogni argomento nei punti elenco che seguono. Si noti che il comando dovrebbe trovarsi su una riga completa o, come mostrato di seguito, con la barra rovesciata (il carattere di continuazione della riga di comando di Linux) alla fine di ogni riga tranne l'ultima.

 sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/Toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales'
  • sqoop import - L'eseguibile si chiama sqoop e gli stiamo indicando di importare i dati da una tabella o visualizzare da un database nell'HDFS.
  • --connect - Con l'argomento --connect , stiamo passando la stringa di connessione JDBC per PostgreSQL. In questo caso, utilizziamo l'indirizzo IP, il numero di porta e il nome del database. È inoltre necessario specificare che SSL viene utilizzato e che è necessario fornire la classe SSLSocketFactory da utilizzare.
  • --username - In questo esempio, il nome utente è un login PostgreSQL, non un login Windows. L'utente deve disporre delle autorizzazioni per connettersi al database specificato e per selezionare dalla tabella specificata.
  • -P - Questo richiederà all'utente della riga di comando la password. Se Sqoop viene eseguito raramente, questa potrebbe essere una buona opzione. Esistono molti altri modi per passare automaticamente la password al comando, ma stiamo cercando di mantenerlo semplice per questo articolo.
  • --table - Qui è dove passiamo il nome della tabella PostgreSQL.
  • --target-dir - Questo argomento specifica la directory HDFS in cui devono essere archiviati i dati.
  • --split-by - Dobbiamo fornire a Sqoop un identificatore univoco per aiutarlo a distribuire il carico di lavoro. Più avanti nell'output del lavoro, vedremo dove Sqoop seleziona i valori minimo e massimo per aiutare a impostare i limiti di divisione.

È una buona idea inserire il comando in uno script per scopi di ripetibilità e modifica, come mostrato di seguito:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales' [hdfs@localhost:/sqoop]$

Ora è il momento di eseguire lo script di comando Sqoop sopra. L'output del comando Sqoop è mostrato di seguito.

 [hdfs@localhost:/sqoop]$ ./sqoopCommand.sh 16/10/02 18:58:34 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0 Enter password: 16/10/02 18:58:40 INFO manager.SqlManager: Using default fetchSize of 1000 16/10/02 18:58:40 INFO tool.CodeGenTool: Beginning code generation 16/10/02 18:58:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "sales" AS t LIMIT 1 16/10/02 18:58:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce Note: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 16/10/02 18:58:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.jar 16/10/02 18:58:43 WARN manager.PostgresqlManager: It looks like you are importing from postgresql. 16/10/02 18:58:43 WARN manager.PostgresqlManager: This transfer can be faster! Use the --direct 16/10/02 18:58:43 WARN manager.PostgresqlManager: option to exercise a postgresql-specific fast path. 16/10/02 18:58:43 INFO mapreduce.ImportJobBase: Beginning import of sales 16/10/02 18:58:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 16/10/02 18:58:46 INFO db.DBInputFormat: Using read commited transaction isolation 16/10/02 18:58:46 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("pksales"), MAX("pksales") FROM "sales" 16/10/02 18:58:47 INFO mapred.JobClient: Running job: job_201609280401_0005 16/10/02 18:58:48 INFO mapred.JobClient: map 0% reduce 0% 16/10/02 18:59:04 INFO mapred.JobClient: map 50% reduce 0% 16/10/02 18:59:14 INFO mapred.JobClient: map 75% reduce 0% 16/10/02 18:59:15 INFO mapred.JobClient: map 100% reduce 0% 16/10/02 18:59:18 INFO mapred.JobClient: Job complete: job_201609280401_0005 16/10/02 18:59:18 INFO mapred.JobClient: Counters: 23 16/10/02 18:59:18 INFO mapred.JobClient: File System Counters 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes read=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes written=1190344 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of write operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes read=438 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes written=451 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of read operations=4 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of write operations=4 16/10/02 18:59:18 INFO mapred.JobClient: Job Counters 16/10/02 18:59:18 INFO mapred.JobClient: Launched map tasks=4 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=48877 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Map-Reduce Framework 16/10/02 18:59:18 INFO mapred.JobClient: Map input records=20 16/10/02 18:59:18 INFO mapred.JobClient: Map output records=20 16/10/02 18:59:18 INFO mapred.JobClient: Input split bytes=438 16/10/02 18:59:18 INFO mapred.JobClient: Spilled Records=0 16/10/02 18:59:18 INFO mapred.JobClient: CPU time spent (ms)=3980 16/10/02 18:59:18 INFO mapred.JobClient: Physical memory (bytes) snapshot=481574912 16/10/02 18:59:18 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2949685248 16/10/02 18:59:18 INFO mapred.JobClient: Total committed heap usage (bytes)=127401984 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Transferred 451 bytes in 33.7555 seconds (13.3608 bytes/sec) 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Retrieved 20 records. [hdfs@localhost:/sqoop]$

Si noti che l'ultima riga di output sopra mostra che sono stati recuperati 20 record, che corrispondono ai 20 record nella tabella sul database PostgreSQL.

Dopo aver eseguito il comando Sqoop, possiamo eseguire il comando hdfs dfs -ls per vedere la directory che è stata creata per impostazione predefinita con il nome della tabella su HDFS.

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls Found 1 items drwxrwxrwx - toptal data 0 2016-10-02 18:59 sales [hdfs@localhost:/sqoop]$

Possiamo usare di nuovo il comando hdfs dfs -ls per elencare i contenuti della directory di sales . Se guardi su HDFS, puoi notare che i dati sono partizionati e distribuiti su quattro file per impostazione predefinita, non solo contenuti in uno.

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls sales Found 6 items -rw-rw-rw- 1 toptal data 0 2016-10-02 18:59 sales/_SUCCESS drwxrwxrwx - toptal data 0 2016-10-02 18:58 sales/_logs -rw-rw-rw- 1 toptal data 110 2016-10-02 18:59 sales/part-m-00000 -rw-rw-rw- 1 toptal data 111 2016-10-02 18:59 sales/part-m-00001 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00002 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00003 [hdfs@localhost:/sqoop]$

Il comando hdfs dfs -cat visualizzerà tutti i record nella prima partizione dei dati di vendita sull'HDFS.

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00000 1,2016-09-27,1.23,1,1 2,2016-09-27,2.34,1,2 3,2016-09-27,1.23,2,1 4,2016-09-27,2.34,2,2 5,2016-09-27,3.45,2,3 [hdfs@localhost:/sqoop]$

Si noti che il delimitatore di file predefinito è una virgola. Si noti inoltre che ci sono solo cinque righe in ogni partizione, perché le 20 righe nell'origine sono state distribuite equamente tra le quattro partizioni.

Per limitare il numero di righe che vengono visualizzate sullo schermo, possiamo reindirizzare l'output del comando cat al comando head come mostrato di seguito, per controllare il contenuto delle altre tre partizioni.

L'argomento -n 5 del comando head limita l'output dello schermo alle prime cinque righe.

(Nota che nel nostro caso, questo non è necessario poiché ci sono solo cinque righe in ogni partizione per cominciare. In pratica, però, probabilmente avrai molte più righe di questa in ogni partizione e vorrai solo controllare le prime per assicurati che abbiano un aspetto corretto, quindi questo ti mostra come farlo.)

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00001 |head -n 5 6,2016-09-28,3.45,3,3 7,2016-09-28,4.56,3,4 8,2016-09-28,5.67,3,5 9,2016-09-28,1.23,4,1 10,2016-09-28,1.23,5,1 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00002 |head -n 5 11,2016-09-28,1.23,6,1 12,2016-09-29,1.23,7,1 13,2016-09-29,2.34,7,2 14,2016-09-29,3.45,7,3 15,2016-09-29,4.56,7,4 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00003 |head -n 5 16,2016-09-29,5.67,7,5 17,2016-09-29,6.78,7,6 18,2016-09-29,7.89,7,7 19,2016-09-29,7.89,8,7 20,2016-09-30,1.23,9,1 [hdfs@localhost:/sqoop]$

Se è necessario eseguire una query per estrarre dati da più tabelle nel database PostgreSQL, è possibile farlo con il comando seguente:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --target-dir 'creditCardOrders' \ --split-by 'pksales' \ --query "select s.pksales, s.saledate, s.saleamount, o.shippingtype, o.methodofpayment from sales s inner join orders o on s.orderid=o.orderid where o.methodofpayment='credit card' and \$CONDITIONS" [hdfs@localhost:/sqoop]$

Nel comando precedente, utilizziamo alcuni degli stessi argomenti del comando Sqoop, ma assumono un'importanza diversa se utilizzati con un comando SQL.

  • --target-dir - La directory di destinazione indica a Sqoop in quale directory sull'HDFS memorizzare i dati selezionati. Questo argomento è richiesto da Sqoop quando si utilizza una query in formato libero.
  • --split-by - Anche se stiamo selezionando la chiave primaria della tabella di vendita, dobbiamo comunque fornire a Sqoop un identificatore univoco per aiutarlo a distribuire il carico di lavoro.
  • --query - Questo è l'argomento in cui forniamo la query SQL. La query sopra è racchiusa tra virgolette. Si noti che non è presente una barra rovesciata (il carattere di continuazione della riga) nelle righe multiple che contengono la query. Notare anche and \$CONDITIONS alla fine della clausola WHERE . Questo è richiesto da Sqoop perché Sqoop sostituirà automaticamente il token $CONDITIONS con un'espressione univoca.

Problemi o nessun problema: dovresti considerare HDFS

HDFS ha molti vantaggi rispetto ai database relazionali. Se stai eseguendo l'analisi dei dati, dovresti considerare di migrare i tuoi dati su HDFS, oggi.

Con le competenze apprese qui, l'importazione di dati da un sistema di database relazionale in HDFS è un processo semplice e diretto che può essere eseguito con un solo comando. Sebbene questi esempi abbiano un numero ridotto di righe, i meccanismi di importazione di grandi volumi di dati in HDFS da una tabella di database PostgreSQL rimangono gli stessi.

Puoi anche sperimentare l'importazione di tabelle di grandi dimensioni e diversi delimitatori di archiviazione. L'utilizzo di Apache Sqoop è più efficiente dell'esportazione dei dati del database in un file, del trasferimento del file dal server del database all'HDFS e del caricamento del file nell'HDFS.

Correlati: potenzia i tuoi dati con R