Ein HDFS-Tutorial für Datenanalysten, die mit relationalen Datenbanken nicht weiterkommen

Veröffentlicht: 2022-03-11

Einführung

Inzwischen haben Sie wahrscheinlich schon vom Hadoop Distributed File System (HDFS) gehört, insbesondere wenn Sie Datenanalyst oder jemand sind, der für das Verschieben von Daten von einem System auf ein anderes verantwortlich ist. Welche Vorteile hat HDFS jedoch gegenüber relationalen Datenbanken?

HDFS ist eine skalierbare Open-Source-Lösung zum Speichern und Verarbeiten großer Datenmengen. HDFS hat sich in vielen modernen Rechenzentren als zuverlässig und effizient erwiesen.

HDFS verwendet handelsübliche Hardware zusammen mit Open-Source-Software, um die Gesamtkosten pro Speicherbyte zu senken.

Mit seiner integrierten Replikation und Widerstandsfähigkeit gegenüber Festplattenausfällen ist HDFS ein ideales System zum Speichern und Verarbeiten von Daten für Analysen. Es erfordert nicht die Grundlagen und den Overhead, um Transaktionsatomizität, Konsistenz, Isolierung und Dauerhaftigkeit (ACID) zu unterstützen, wie dies bei herkömmlichen relationalen Datenbanksystemen erforderlich ist.

Darüber hinaus vermeidet die Verwendung von Hadoop als Analyseplattform im Vergleich zu Unternehmens- und kommerziellen Datenbanken wie Oracle zusätzliche Lizenzkosten.

Eine der Fragen, die sich viele Leute stellen, wenn sie zum ersten Mal etwas über HDFS lernen, lautet: Wie bekomme ich meine vorhandenen Daten in das HDFS?

In diesem Artikel untersuchen wir, wie Sie Daten aus einer PostgreSQL-Datenbank in HDFS importieren. Wir werden Apache Sqoop verwenden, die derzeit effizienteste Open-Source-Lösung, um Daten zwischen HDFS und relationalen Datenbanksystemen zu übertragen. Apache Sqoop dient zum Massenladen von Daten aus einer relationalen Datenbank in das HDFS (Import) und zum Massenschreiben von Daten aus dem HDFS in eine relationale Datenbank (Export).

HDFS

Beschleunigen Sie Analysen, indem Sie Ihre Daten in das HDFS migrieren.
Twittern

Die Schritte in diesem Lernprogramm sind für jemanden geschrieben, der über Grundkenntnisse in der Ausführung von SQL-Abfragen und elementare Kenntnisse von HDFS-Befehlen verfügt.

Das verwendete Datenbanksystem ist PostgreSQL 9.5 für Windows, und die HDFS-Version ist Cloudera Hadoop 2.5.0-cdh5.2.0 auf einer virtuellen Centos 6.4-Linux-Maschine.

Apache Sqoop stützt sich auf die JAR-Dateien des JDBC-Treibers, die für den Anbieter und die Datenbankversion der relationalen Datenbank spezifisch sind.

Um die in diesem Artikel gezeigten Schritte auszuführen, benötigt der Benutzer Berechtigungen für die Remoteverbindung mit der PostgreSQL-Datenbank, SELECT Berechtigungen für die relationale Datenbank, Schreibberechtigungen für das HDFS und Ausführungsberechtigungen für die ausführbare Sqoop-Datei.

Für dieses Tutorial haben wir eine PostgreSQL-Datenbank mit dem Namen Toptal erstellt und über Port 5432 zugänglich gemacht.

PostgreSQL-Datenquelle

Zu Beginn erstellen wir in unserer PostgreSQL Toptal Datenbank eine Testdatentabelle mit dem Namen sales . Wir gehen davon aus, dass das OpenSSL-Zertifikat und die privaten Schlüsseldateien bereits auf dem PostgreSQL-Server vorhanden sind.

 Server [localhost]: Database [postgres]: Toptal Port [5432]: Username [postgres]: Password for user postgres: psql (9.5.3) Toptal=# create table sales Toptal-# ( Toptal(# pkSales integer constraint salesKey primary key, Toptal(# saleDate date, Toptal(# saleAmount money, Toptal(# orderID int not null, Toptal(# itemID int not null Toptal(# ); CREATE TABLE

Als nächstes fügen wir 20 Zeilen in die Tabelle ein:

 Toptal=# insert into sales values (1, '2016-09-27', 1.23, 1, 1); INSERT 0 1 Toptal=# insert into sales values (2, '2016-09-27', 2.34, 1, 2); INSERT 0 1 Toptal=# insert into sales values (3, '2016-09-27', 1.23, 2, 1); INSERT 0 1 Toptal=# insert into sales values (4, '2016-09-27', 2.34, 2, 2); INSERT 0 1 Toptal=# insert into sales values (5, '2016-09-27', 3.45, 2, 3); INSERT 0 1 Toptal=# insert into sales values (6, '2016-09-28', 3.45, 3, 3); INSERT 0 1 Toptal=# insert into sales values (7, '2016-09-28', 4.56, 3, 4); INSERT 0 1 Toptal=# insert into sales values (8, '2016-09-28', 5.67, 3, 5); INSERT 0 1 Toptal=# insert into sales values (9, '2016-09-28', 1.23, 4, 1); INSERT 0 1 Toptal=# insert into sales values (10, '2016-09-28', 1.23, 5, 1); INSERT 0 1 Toptal=# insert into sales values (11, '2016-09-28', 1.23, 6, 1); INSERT 0 1 Toptal=# insert into sales values (12, '2016-09-29', 1.23, 7, 1); INSERT 0 1 Toptal=# insert into sales values (13, '2016-09-29', 2.34, 7, 2); INSERT 0 1 Toptal=# insert into sales values (14, '2016-09-29', 3.45, 7, 3); INSERT 0 1 Toptal=# insert into sales values (15, '2016-09-29', 4.56, 7, 4); INSERT 0 1 Toptal=# insert into sales values (16, '2016-09-29', 5.67, 7, 5); INSERT 0 1 Toptal=# insert into sales values (17, '2016-09-29', 6.78, 7, 6); INSERT 0 1 Toptal=# insert into sales values (18, '2016-09-29', 7.89, 7, 7); INSERT 0 1 Toptal=# insert into sales values (19, '2016-09-29', 7.89, 8, 7); INSERT 0 1 Toptal=# insert into sales values (20, '2016-09-30', 1.23, 9, 1); INSERT 0 1

Lassen Sie uns die Daten auswählen, um zu überprüfen, ob die Daten korrekt aussehen:

 Toptal=# select * from sales; pksales | saledate | saleamount | orderid | itemid ---------+------------+------------+---------+-------- 1 | 2016-09-27 | $1.23 | 1 | 1 2 | 2016-09-27 | $2.34 | 1 | 2 3 | 2016-09-27 | $1.23 | 2 | 1 4 | 2016-09-27 | $2.34 | 2 | 2 5 | 2016-09-27 | $3.45 | 2 | 3 6 | 2016-09-28 | $3.45 | 3 | 3 7 | 2016-09-28 | $4.56 | 3 | 4 8 | 2016-09-28 | $5.67 | 3 | 5 9 | 2016-09-28 | $1.23 | 4 | 1 10 | 2016-09-28 | $1.23 | 5 | 1 11 | 2016-09-28 | $1.23 | 6 | 1 12 | 2016-09-29 | $1.23 | 7 | 1 13 | 2016-09-29 | $2.34 | 7 | 2 14 | 2016-09-29 | $3.45 | 7 | 3 15 | 2016-09-29 | $4.56 | 7 | 4 16 | 2016-09-29 | $5.67 | 7 | 5 17 | 2016-09-29 | $6.78 | 7 | 6 18 | 2016-09-29 | $7.89 | 7 | 7 19 | 2016-09-29 | $7.89 | 8 | 7 20 | 2016-09-30 | $1.23 | 9 | 1 (20 rows)

Die Daten sehen gut aus, also machen wir weiter.

Importieren Sie mit Sqoop in das HDFS

Nachdem die Datenquelle definiert ist, können wir nun die Daten in das HDFS importieren. Der sqoop Befehl, den wir untersuchen werden, ist unten aufgeführt, und wir werden jedes Argument in den folgenden Aufzählungspunkten aufschlüsseln. Beachten Sie, dass der Befehl in einer vollständigen Zeile oder, wie unten gezeigt, mit dem umgekehrten Schrägstrich (dem Fortsetzungszeichen der Linux-Befehlszeile) am Ende jeder Zeile außer der letzten stehen sollte.

 sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/Toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales'
  • sqoop import – Die ausführbare Datei heißt sqoop und wir weisen sie an, die Daten aus einer Tabelle oder Ansicht aus einer Datenbank in das HDFS zu importieren.
  • --connect – Mit dem Argument --connect wir die JDBC-Verbindungszeichenfolge für PostgreSQL. In diesem Fall verwenden wir die IP-Adresse, die Portnummer und den Datenbanknamen. Außerdem müssen wir angeben, dass SSL verwendet wird, und die zu verwendende SSLSocketFactory -Klasse bereitstellen.
  • --username – In diesem Beispiel ist der Benutzername ein PostgreSQL-Login, kein Windows-Login. Der Benutzer muss über Berechtigungen zum Herstellen einer Verbindung mit der angegebenen Datenbank und zum Auswählen aus der angegebenen Tabelle verfügen.
  • -P - Dies fordert den Befehlszeilenbenutzer zur Eingabe des Passworts auf. Wenn Sqoop selten ausgeführt wird, könnte dies eine gute Option sein. Es gibt mehrere andere Möglichkeiten, das Passwort automatisch an den Befehl zu übergeben, aber wir versuchen, es für diesen Artikel einfach zu halten.
  • --table – Hier übergeben wir den Namen der PostgreSQL-Tabelle.
  • --target-dir – Dieses Argument gibt das HDFS-Verzeichnis an, in dem die Daten gespeichert werden sollen.
  • --split-by - Wir müssen Sqoop einen eindeutigen Bezeichner zuweisen, um ihm zu helfen, die Arbeitslast zu verteilen. Später in der Jobausgabe werden wir sehen, wo Sqoop die minimalen und maximalen Werte auswählt, um die Festlegung von Split-Grenzen zu unterstützen.

Es ist eine gute Idee, den Befehl zu Wiederholbarkeits- und Bearbeitungszwecken in ein Skript einzufügen, wie unten gezeigt:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales' [hdfs@localhost:/sqoop]$

Jetzt ist es an der Zeit, das obige Sqoop-Befehlsskript auszuführen. Die Ausgabe des Sqoop-Befehls ist unten dargestellt.

 [hdfs@localhost:/sqoop]$ ./sqoopCommand.sh 16/10/02 18:58:34 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0 Enter password: 16/10/02 18:58:40 INFO manager.SqlManager: Using default fetchSize of 1000 16/10/02 18:58:40 INFO tool.CodeGenTool: Beginning code generation 16/10/02 18:58:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "sales" AS t LIMIT 1 16/10/02 18:58:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce Note: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 16/10/02 18:58:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.jar 16/10/02 18:58:43 WARN manager.PostgresqlManager: It looks like you are importing from postgresql. 16/10/02 18:58:43 WARN manager.PostgresqlManager: This transfer can be faster! Use the --direct 16/10/02 18:58:43 WARN manager.PostgresqlManager: option to exercise a postgresql-specific fast path. 16/10/02 18:58:43 INFO mapreduce.ImportJobBase: Beginning import of sales 16/10/02 18:58:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 16/10/02 18:58:46 INFO db.DBInputFormat: Using read commited transaction isolation 16/10/02 18:58:46 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("pksales"), MAX("pksales") FROM "sales" 16/10/02 18:58:47 INFO mapred.JobClient: Running job: job_201609280401_0005 16/10/02 18:58:48 INFO mapred.JobClient: map 0% reduce 0% 16/10/02 18:59:04 INFO mapred.JobClient: map 50% reduce 0% 16/10/02 18:59:14 INFO mapred.JobClient: map 75% reduce 0% 16/10/02 18:59:15 INFO mapred.JobClient: map 100% reduce 0% 16/10/02 18:59:18 INFO mapred.JobClient: Job complete: job_201609280401_0005 16/10/02 18:59:18 INFO mapred.JobClient: Counters: 23 16/10/02 18:59:18 INFO mapred.JobClient: File System Counters 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes read=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes written=1190344 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of write operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes read=438 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes written=451 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of read operations=4 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of write operations=4 16/10/02 18:59:18 INFO mapred.JobClient: Job Counters 16/10/02 18:59:18 INFO mapred.JobClient: Launched map tasks=4 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=48877 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Map-Reduce Framework 16/10/02 18:59:18 INFO mapred.JobClient: Map input records=20 16/10/02 18:59:18 INFO mapred.JobClient: Map output records=20 16/10/02 18:59:18 INFO mapred.JobClient: Input split bytes=438 16/10/02 18:59:18 INFO mapred.JobClient: Spilled Records=0 16/10/02 18:59:18 INFO mapred.JobClient: CPU time spent (ms)=3980 16/10/02 18:59:18 INFO mapred.JobClient: Physical memory (bytes) snapshot=481574912 16/10/02 18:59:18 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2949685248 16/10/02 18:59:18 INFO mapred.JobClient: Total committed heap usage (bytes)=127401984 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Transferred 451 bytes in 33.7555 seconds (13.3608 bytes/sec) 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Retrieved 20 records. [hdfs@localhost:/sqoop]$

Beachten Sie, dass die letzte Ausgabezeile oben zeigt, dass 20 Datensätze abgerufen wurden, was den 20 Datensätzen in der Tabelle in der PostgreSQL-Datenbank entspricht.

Nach dem Ausführen des Sqoop-Befehls können wir den hdfs dfs -ls ausführen, um das Verzeichnis anzuzeigen, das standardmäßig mit dem Tabellennamen auf dem HDFS erstellt wurde.

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls Found 1 items drwxrwxrwx - toptal data 0 2016-10-02 18:59 sales [hdfs@localhost:/sqoop]$

Wir können den hdfs dfs -ls erneut verwenden, um den Inhalt des sales aufzulisten. Wenn Sie sich das HDFS ansehen, können Sie feststellen, dass die Daten standardmäßig partitioniert und auf vier Dateien verteilt sind, nicht nur in einer.

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls sales Found 6 items -rw-rw-rw- 1 toptal data 0 2016-10-02 18:59 sales/_SUCCESS drwxrwxrwx - toptal data 0 2016-10-02 18:58 sales/_logs -rw-rw-rw- 1 toptal data 110 2016-10-02 18:59 sales/part-m-00000 -rw-rw-rw- 1 toptal data 111 2016-10-02 18:59 sales/part-m-00001 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00002 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00003 [hdfs@localhost:/sqoop]$

Der hdfs dfs -cat zeigt alle Datensätze in der ersten Partition der Verkaufsdaten im HDFS an.

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00000 1,2016-09-27,1.23,1,1 2,2016-09-27,2.34,1,2 3,2016-09-27,1.23,2,1 4,2016-09-27,2.34,2,2 5,2016-09-27,3.45,2,3 [hdfs@localhost:/sqoop]$

Beachten Sie, dass das standardmäßige Dateitrennzeichen ein Komma ist. Beachten Sie außerdem, dass jede Partition nur fünf Zeilen enthält, da die 20 Zeilen in der Quelle gleichmäßig auf die vier Partitionen verteilt wurden.

Um die Anzahl der Zeilen zu begrenzen, die auf dem Bildschirm ausgegeben werden, können wir die Ausgabe des cat -Befehls wie unten gezeigt an den head Befehl weiterleiten, um den Inhalt der anderen drei Partitionen zu überprüfen.

Das Argument -n 5 des Befehls head begrenzt die Bildschirmausgabe auf die ersten fünf Zeilen.

(Beachten Sie, dass dies in unserem Fall unnötig ist, da es zunächst nur fünf Zeilen in jeder Partition gibt. In der Praxis werden Sie jedoch wahrscheinlich viel mehr Zeilen als diese in jeder Partition haben und nur die ersten paar überprüfen wollen stellen Sie sicher, dass sie richtig aussehen, also zeigen wir Ihnen, wie das geht.)

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00001 |head -n 5 6,2016-09-28,3.45,3,3 7,2016-09-28,4.56,3,4 8,2016-09-28,5.67,3,5 9,2016-09-28,1.23,4,1 10,2016-09-28,1.23,5,1 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00002 |head -n 5 11,2016-09-28,1.23,6,1 12,2016-09-29,1.23,7,1 13,2016-09-29,2.34,7,2 14,2016-09-29,3.45,7,3 15,2016-09-29,4.56,7,4 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00003 |head -n 5 16,2016-09-29,5.67,7,5 17,2016-09-29,6.78,7,6 18,2016-09-29,7.89,7,7 19,2016-09-29,7.89,8,7 20,2016-09-30,1.23,9,1 [hdfs@localhost:/sqoop]$

Wenn Sie eine Abfrage ausführen müssen, um Daten aus mehreren Tabellen in der PostgreSQL-Datenbank zu extrahieren, können Sie dies mit dem folgenden Befehl erreichen:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --target-dir 'creditCardOrders' \ --split-by 'pksales' \ --query "select s.pksales, s.saledate, s.saleamount, o.shippingtype, o.methodofpayment from sales s inner join orders o on s.orderid=o.orderid where o.methodofpayment='credit card' and \$CONDITIONS" [hdfs@localhost:/sqoop]$

Im obigen Befehl verwenden wir einige der gleichen Argumente wie im Sqoop-Befehl, aber sie haben eine andere Bedeutung, wenn sie mit einem SQL-Befehl verwendet werden.

  • --target-dir – Das Zielverzeichnis teilt Sqoop mit, in welchem ​​Verzeichnis auf dem HDFS die ausgewählten Daten gespeichert werden sollen. Dieses Argument wird von Sqoop benötigt, wenn eine Freiformabfrage verwendet wird.
  • --split-by - Obwohl wir den Primärschlüssel der Verkaufstabelle auswählen, müssen wir Sqoop dennoch eine eindeutige Kennung zuweisen, um die Arbeitslast zu verteilen.
  • --query – Dies ist das Argument, in dem wir die SQL-Abfrage bereitstellen. Die obige Abfrage ist in doppelte Anführungszeichen eingeschlossen. Beachten Sie, dass in den mehreren Zeilen, die die Abfrage enthalten, kein umgekehrter Schrägstrich (das Zeilenfortsetzungszeichen) vorhanden ist. Beachten Sie auch die and \$CONDITIONS am Ende der WHERE -Klausel. Dies ist für Sqoop erforderlich, da Sqoop das $CONDITIONS Token automatisch durch einen eindeutigen Ausdruck ersetzt.

Probleme oder keine Probleme: Sie sollten HDFS in Betracht ziehen

HDFS hat viele Vorteile gegenüber den relationalen Datenbanken. Wenn Sie Datenanalysen durchführen, sollten Sie Ihre Daten noch heute auf HDFS migrieren.

Mit den hier erlernten Fähigkeiten ist das Importieren von Daten aus einem relationalen Datenbanksystem in HDFS ein einfacher und unkomplizierter Vorgang, der mit einem einzigen Befehl ausgeführt werden kann. Während diese Beispiele eine kleine Anzahl von Zeilen haben, bleibt die Mechanik zum Importieren großer Datenmengen in HDFS aus einer PostgreSQL-Datenbanktabelle gleich.

Sie können sogar mit dem Importieren großer Tabellen und unterschiedlichen Speichertrennzeichen experimentieren. Die Verwendung von Apache Sqoop ist effizienter als das Exportieren der Datenbankdaten in eine Datei, das Übertragen der Datei vom Datenbankserver auf das HDFS und das anschließende Laden der Datei in das HDFS.

Verwandt: Steigern Sie Ihr Data Munging mit R