Un tutoriel HDFS pour les analystes de données coincés avec des bases de données relationnelles
Publié: 2022-03-11introduction
À ce jour, vous avez probablement entendu parler du système de fichiers distribués Hadoop (HDFS), en particulier si vous êtes analyste de données ou quelqu'un qui est responsable du transfert de données d'un système à un autre. Cependant, quels sont les avantages de HDFS par rapport aux bases de données relationnelles ?
HDFS est une solution open source évolutive pour le stockage et le traitement de gros volumes de données. HDFS s'est avéré fiable et efficace dans de nombreux centres de données modernes.
HDFS utilise du matériel de base ainsi que des logiciels open source pour réduire le coût global par octet de stockage.
Avec sa réplication intégrée et sa résilience aux pannes de disque, HDFS est un système idéal pour stocker et traiter les données à des fins d'analyse. Il ne nécessite pas les fondements et les frais généraux pour prendre en charge l'atomicité, la cohérence, l'isolation et la durabilité des transactions (ACID) comme cela est nécessaire avec les systèmes de bases de données relationnelles traditionnels.
De plus, par rapport aux bases de données d'entreprise et commerciales, telles qu'Oracle, l'utilisation de Hadoop comme plate-forme d'analyse évite tout coût de licence supplémentaire.
L'une des questions que beaucoup de gens se posent lorsqu'ils découvrent HDFS pour la première fois est la suivante : comment puis-je intégrer mes données existantes dans HDFS ?
Dans cet article, nous examinerons comment importer des données d'une base de données PostgreSQL dans HDFS. Nous utiliserons Apache Sqoop, qui est actuellement la solution open source la plus efficace pour transférer des données entre HDFS et les systèmes de bases de données relationnelles. Apache Sqoop est conçu pour charger en masse des données d'une base de données relationnelle vers le HDFS (importation) et pour écrire en masse des données du HDFS vers une base de données relationnelle (exportation).
Les étapes de ce didacticiel sont écrites pour une personne ayant une connaissance de base de l'exécution de requêtes SQL et une connaissance élémentaire des commandes HDFS.
Le système de base de données utilisé est PostgreSQL 9.5 pour Windows et la version HDFS est Cloudera Hadoop 2.5.0-cdh5.2.0 sur une machine virtuelle Centos 6.4 Linux.
Apache Sqoop s'appuie sur les fichiers JAR du pilote JDBC qui sont spécifiques au fournisseur de la base de données relationnelle et à la version de la base de données.
Pour exécuter les étapes présentées dans cet article, l'utilisateur aura besoin des autorisations pour se connecter à distance à la base de données PostgreSQL, des autorisations SELECT
sur la base de données relationnelle, des autorisations d'écriture sur le HDFS et des autorisations d'exécution sur l'exécutable Sqoop.
Pour les besoins de ce didacticiel, nous avons créé une base de données PostgreSQL, nommée Toptal , et l'avons rendue accessible via le port 5432.
Source de données PostgreSQL
Pour commencer, dans notre base de données PostgreSQL Toptal
, nous allons créer une table de données de test nommée sales
. Nous supposerons que le certificat OpenSSL et les fichiers de clé privée existent déjà sur le serveur PostgreSQL.
Server [localhost]: Database [postgres]: Toptal Port [5432]: Username [postgres]: Password for user postgres: psql (9.5.3) Toptal=# create table sales Toptal-# ( Toptal(# pkSales integer constraint salesKey primary key, Toptal(# saleDate date, Toptal(# saleAmount money, Toptal(# orderID int not null, Toptal(# itemID int not null Toptal(# ); CREATE TABLE
Ensuite, nous allons insérer 20 lignes dans le tableau :
Toptal=# insert into sales values (1, '2016-09-27', 1.23, 1, 1); INSERT 0 1 Toptal=# insert into sales values (2, '2016-09-27', 2.34, 1, 2); INSERT 0 1 Toptal=# insert into sales values (3, '2016-09-27', 1.23, 2, 1); INSERT 0 1 Toptal=# insert into sales values (4, '2016-09-27', 2.34, 2, 2); INSERT 0 1 Toptal=# insert into sales values (5, '2016-09-27', 3.45, 2, 3); INSERT 0 1 Toptal=# insert into sales values (6, '2016-09-28', 3.45, 3, 3); INSERT 0 1 Toptal=# insert into sales values (7, '2016-09-28', 4.56, 3, 4); INSERT 0 1 Toptal=# insert into sales values (8, '2016-09-28', 5.67, 3, 5); INSERT 0 1 Toptal=# insert into sales values (9, '2016-09-28', 1.23, 4, 1); INSERT 0 1 Toptal=# insert into sales values (10, '2016-09-28', 1.23, 5, 1); INSERT 0 1 Toptal=# insert into sales values (11, '2016-09-28', 1.23, 6, 1); INSERT 0 1 Toptal=# insert into sales values (12, '2016-09-29', 1.23, 7, 1); INSERT 0 1 Toptal=# insert into sales values (13, '2016-09-29', 2.34, 7, 2); INSERT 0 1 Toptal=# insert into sales values (14, '2016-09-29', 3.45, 7, 3); INSERT 0 1 Toptal=# insert into sales values (15, '2016-09-29', 4.56, 7, 4); INSERT 0 1 Toptal=# insert into sales values (16, '2016-09-29', 5.67, 7, 5); INSERT 0 1 Toptal=# insert into sales values (17, '2016-09-29', 6.78, 7, 6); INSERT 0 1 Toptal=# insert into sales values (18, '2016-09-29', 7.89, 7, 7); INSERT 0 1 Toptal=# insert into sales values (19, '2016-09-29', 7.89, 8, 7); INSERT 0 1 Toptal=# insert into sales values (20, '2016-09-30', 1.23, 9, 1); INSERT 0 1
Sélectionnons les données pour vérifier qu'elles semblent correctes :
Toptal=# select * from sales; pksales | saledate | saleamount | orderid | itemid ---------+------------+------------+---------+-------- 1 | 2016-09-27 | $1.23 | 1 | 1 2 | 2016-09-27 | $2.34 | 1 | 2 3 | 2016-09-27 | $1.23 | 2 | 1 4 | 2016-09-27 | $2.34 | 2 | 2 5 | 2016-09-27 | $3.45 | 2 | 3 6 | 2016-09-28 | $3.45 | 3 | 3 7 | 2016-09-28 | $4.56 | 3 | 4 8 | 2016-09-28 | $5.67 | 3 | 5 9 | 2016-09-28 | $1.23 | 4 | 1 10 | 2016-09-28 | $1.23 | 5 | 1 11 | 2016-09-28 | $1.23 | 6 | 1 12 | 2016-09-29 | $1.23 | 7 | 1 13 | 2016-09-29 | $2.34 | 7 | 2 14 | 2016-09-29 | $3.45 | 7 | 3 15 | 2016-09-29 | $4.56 | 7 | 4 16 | 2016-09-29 | $5.67 | 7 | 5 17 | 2016-09-29 | $6.78 | 7 | 6 18 | 2016-09-29 | $7.89 | 7 | 7 19 | 2016-09-29 | $7.89 | 8 | 7 20 | 2016-09-30 | $1.23 | 9 | 1 (20 rows)
Les données semblent bonnes, alors continuons.
Importer dans le HDFS à l'aide de Sqoop
Une fois la source de données définie, nous sommes maintenant prêts à importer les données dans le HDFS. La commande sqoop
que nous examinerons est répertoriée ci-dessous et nous décomposerons chaque argument dans les puces qui suivent. Notez que la commande est censée être sur une ligne complète ou, comme indiqué ci-dessous, avec la barre oblique inverse (le caractère de continuation de la ligne de commande Linux) à la fin de chaque ligne sauf la dernière.
sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/Toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales'
-
sqoop import
- L'exécutable est nommésqoop
et nous lui demandons d'importer les données d'une table ou d'une vue d'une base de données vers le HDFS. -
--connect
- Avec l'argument--connect
, nous transmettons la chaîne de connexion JDBC pour PostgreSQL. Dans ce cas, nous utilisons l'adresse IP, le numéro de port et le nom de la base de données. Nous devons également spécifier que SSL est utilisé et fournir la classeSSLSocketFactory
à utiliser. -
--username
- Dans cet exemple, le nom d'utilisateur est une connexion PostgreSQL, pas une connexion Windows. L'utilisateur doit disposer des autorisations pour se connecter à la base de données spécifiée et effectuer une sélection dans la table spécifiée. -
-P
- Cela demandera à l'utilisateur de la ligne de commande le mot de passe. Si Sqoop est rarement exécuté, cela peut être une bonne option. Il existe plusieurs autres façons de transmettre automatiquement le mot de passe à la commande, mais nous essayons de rester simple pour cet article. -
--table
- C'est ici que nous passons le nom de la table PostgreSQL. -
--target-dir
- Cet argument spécifie le répertoire HDFS où les données doivent être stockées. -
--split-by
- Nous devons fournir à Sqoop un identifiant unique pour l'aider à répartir la charge de travail. Plus tard dans la sortie du travail, nous verrons où Sqoop sélectionne les valeurs minimales et maximales pour aider à définir les limites de fractionnement.
C'est une bonne idée de mettre la commande dans un script à des fins de répétabilité et d'édition, comme indiqué ci-dessous :
[hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales' [hdfs@localhost:/sqoop]$
Il est maintenant temps d'exécuter le script de commande Sqoop ci-dessus. La sortie de la commande Sqoop est illustrée ci-dessous.
[hdfs@localhost:/sqoop]$ ./sqoopCommand.sh 16/10/02 18:58:34 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0 Enter password: 16/10/02 18:58:40 INFO manager.SqlManager: Using default fetchSize of 1000 16/10/02 18:58:40 INFO tool.CodeGenTool: Beginning code generation 16/10/02 18:58:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "sales" AS t LIMIT 1 16/10/02 18:58:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce Note: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 16/10/02 18:58:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.jar 16/10/02 18:58:43 WARN manager.PostgresqlManager: It looks like you are importing from postgresql. 16/10/02 18:58:43 WARN manager.PostgresqlManager: This transfer can be faster! Use the --direct 16/10/02 18:58:43 WARN manager.PostgresqlManager: option to exercise a postgresql-specific fast path. 16/10/02 18:58:43 INFO mapreduce.ImportJobBase: Beginning import of sales 16/10/02 18:58:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 16/10/02 18:58:46 INFO db.DBInputFormat: Using read commited transaction isolation 16/10/02 18:58:46 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("pksales"), MAX("pksales") FROM "sales" 16/10/02 18:58:47 INFO mapred.JobClient: Running job: job_201609280401_0005 16/10/02 18:58:48 INFO mapred.JobClient: map 0% reduce 0% 16/10/02 18:59:04 INFO mapred.JobClient: map 50% reduce 0% 16/10/02 18:59:14 INFO mapred.JobClient: map 75% reduce 0% 16/10/02 18:59:15 INFO mapred.JobClient: map 100% reduce 0% 16/10/02 18:59:18 INFO mapred.JobClient: Job complete: job_201609280401_0005 16/10/02 18:59:18 INFO mapred.JobClient: Counters: 23 16/10/02 18:59:18 INFO mapred.JobClient: File System Counters 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes read=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes written=1190344 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of write operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes read=438 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes written=451 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of read operations=4 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of write operations=4 16/10/02 18:59:18 INFO mapred.JobClient: Job Counters 16/10/02 18:59:18 INFO mapred.JobClient: Launched map tasks=4 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=48877 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Map-Reduce Framework 16/10/02 18:59:18 INFO mapred.JobClient: Map input records=20 16/10/02 18:59:18 INFO mapred.JobClient: Map output records=20 16/10/02 18:59:18 INFO mapred.JobClient: Input split bytes=438 16/10/02 18:59:18 INFO mapred.JobClient: Spilled Records=0 16/10/02 18:59:18 INFO mapred.JobClient: CPU time spent (ms)=3980 16/10/02 18:59:18 INFO mapred.JobClient: Physical memory (bytes) snapshot=481574912 16/10/02 18:59:18 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2949685248 16/10/02 18:59:18 INFO mapred.JobClient: Total committed heap usage (bytes)=127401984 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Transferred 451 bytes in 33.7555 seconds (13.3608 bytes/sec) 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Retrieved 20 records. [hdfs@localhost:/sqoop]$
Notez que la dernière ligne de sortie ci-dessus montre que 20 enregistrements ont été récupérés, ce qui correspond aux 20 enregistrements de la table sur la base de données PostgreSQL.

Après avoir exécuté la commande Sqoop, nous pouvons exécuter la hdfs dfs -ls
pour voir le répertoire qui a été créé par défaut avec le nom de la table sur le HDFS.
[hdfs@localhost:/sqoop]$ hdfs dfs -ls Found 1 items drwxrwxrwx - toptal data 0 2016-10-02 18:59 sales [hdfs@localhost:/sqoop]$
Nous pouvons à nouveau utiliser la hdfs dfs -ls
pour répertorier le contenu du répertoire des sales
. Si vous regardez sur le HDFS, vous pouvez remarquer que les données sont partitionnées et réparties sur quatre fichiers par défaut, et pas seulement contenues dans un seul.
[hdfs@localhost:/sqoop]$ hdfs dfs -ls sales Found 6 items -rw-rw-rw- 1 toptal data 0 2016-10-02 18:59 sales/_SUCCESS drwxrwxrwx - toptal data 0 2016-10-02 18:58 sales/_logs -rw-rw-rw- 1 toptal data 110 2016-10-02 18:59 sales/part-m-00000 -rw-rw-rw- 1 toptal data 111 2016-10-02 18:59 sales/part-m-00001 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00002 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00003 [hdfs@localhost:/sqoop]$
La hdfs dfs -cat
affichera tous les enregistrements de la première partition des données de vente sur le HDFS.
[hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00000 1,2016-09-27,1.23,1,1 2,2016-09-27,2.34,1,2 3,2016-09-27,1.23,2,1 4,2016-09-27,2.34,2,2 5,2016-09-27,3.45,2,3 [hdfs@localhost:/sqoop]$
Notez que le délimiteur de fichier par défaut est une virgule. Notez également qu'il n'y a que cinq lignes dans chaque partition, car les 20 lignes de la source ont été réparties de manière égale sur les quatre partitions.
Pour limiter le nombre de lignes qui sont sorties à l'écran, nous pouvons diriger la sortie de la commande cat
vers la commande head
comme indiqué ci-dessous, pour vérifier le contenu des trois autres partitions.
L'argument -n 5
de la commande head
limite la sortie d'écran aux cinq premières lignes.
(Notez que dans notre cas, cela n'est pas nécessaire car il n'y a que cinq lignes dans chaque partition pour commencer. En pratique, cependant, vous aurez probablement beaucoup plus de lignes que cela dans chaque partition et vous voudrez simplement vérifier les premières pour assurez-vous qu'ils ont l'air correct, ceci vous montre comment faire.)
[hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00001 |head -n 5 6,2016-09-28,3.45,3,3 7,2016-09-28,4.56,3,4 8,2016-09-28,5.67,3,5 9,2016-09-28,1.23,4,1 10,2016-09-28,1.23,5,1 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00002 |head -n 5 11,2016-09-28,1.23,6,1 12,2016-09-29,1.23,7,1 13,2016-09-29,2.34,7,2 14,2016-09-29,3.45,7,3 15,2016-09-29,4.56,7,4 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00003 |head -n 5 16,2016-09-29,5.67,7,5 17,2016-09-29,6.78,7,6 18,2016-09-29,7.89,7,7 19,2016-09-29,7.89,8,7 20,2016-09-30,1.23,9,1 [hdfs@localhost:/sqoop]$
Si vous devez exécuter une requête pour extraire des données de plusieurs tables dans la base de données PostgreSQL, cela peut être accompli avec la commande suivante :
[hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --target-dir 'creditCardOrders' \ --split-by 'pksales' \ --query "select s.pksales, s.saledate, s.saleamount, o.shippingtype, o.methodofpayment from sales s inner join orders o on s.orderid=o.orderid where o.methodofpayment='credit card' and \$CONDITIONS" [hdfs@localhost:/sqoop]$
Dans la commande ci-dessus, nous utilisons certains des mêmes arguments pour la commande Sqoop, mais ils prennent une importance différente lorsqu'ils sont utilisés avec une commande SQL.
-
--target-dir
- Le répertoire cible indique à Sqoop dans quel répertoire du HDFS stocker les données sélectionnées. Cet argument est requis par Sqoop lors de l'utilisation d'une requête de forme libre. -
--split-by
- Même si nous sélectionnons la clé primaire de la table des ventes, nous devons toujours fournir à Sqoop un identifiant unique pour l'aider à répartir la charge de travail. -
--query
- C'est l'argument dans lequel nous fournissons la requête SQL. La requête ci-dessus est entourée de guillemets doubles. Notez qu'il n'y a pas de barre oblique inverse (le caractère de continuation de ligne) dans les multiples lignes contenant la requête. Notez également lesand \$CONDITIONS
à la fin de la clauseWHERE
. Ceci est requis par Sqoop car Sqoop remplacera automatiquement le jeton$CONDITIONS
par une expression unique.
Problèmes ou pas de problèmes : vous devriez envisager HDFS
HDFS présente de nombreux avantages par rapport aux bases de données relationnelles. Si vous effectuez une analyse de données, vous devriez envisager de migrer vos données vers HDFS dès aujourd'hui.
Avec les compétences acquises ici, l'importation de données d'un système de base de données relationnelle dans HDFS est un processus simple et direct qui peut être accompli avec une seule commande. Bien que ces exemples aient un petit nombre de lignes, le mécanisme d'importation de gros volumes de données vers HDFS à partir d'une table de base de données PostgreSQL reste le même.
Vous pouvez même expérimenter l'importation de grandes tables et divers délimiteurs de stockage. L'utilisation d'Apache Sqoop est plus efficace que l'exportation des données de la base de données vers un fichier, le transfert du fichier du serveur de base de données vers le HDFS, puis le chargement du fichier vers le HDFS.