Un tutorial de HDFS para analistas de datos atascados con bases de datos relacionales
Publicado: 2022-03-11Introducción
A estas alturas, probablemente haya oído hablar del sistema de archivos distribuidos de Hadoop (HDFS), especialmente si es analista de datos o alguien responsable de mover datos de un sistema a otro. Sin embargo, ¿cuáles son los beneficios que tiene HDFS sobre las bases de datos relacionales?
HDFS es una solución escalable de código abierto para almacenar y procesar grandes volúmenes de datos. Se ha demostrado que HDFS es confiable y eficiente en muchos centros de datos modernos.
HDFS utiliza hardware básico junto con software de código abierto para reducir el costo total por byte de almacenamiento.
Con su replicación integrada y resistencia a fallas de disco, HDFS es un sistema ideal para almacenar y procesar datos para análisis. No requiere los fundamentos ni la sobrecarga para respaldar la atomicidad, la consistencia, el aislamiento y la durabilidad (ACID) de las transacciones, como es necesario con los sistemas de bases de datos relacionales tradicionales.
Además, en comparación con las bases de datos empresariales y comerciales, como Oracle, utilizar Hadoop como plataforma de análisis evita cualquier costo de licencia adicional.
Una de las preguntas que mucha gente se hace cuando se entera por primera vez de HDFS es: ¿Cómo introduzco mis datos existentes en HDFS?
En este artículo, examinaremos cómo importar datos de una base de datos PostgreSQL a HDFS. Usaremos Apache Sqoop, que actualmente es la solución de código abierto más eficiente para transferir datos entre HDFS y sistemas de bases de datos relacionales. Apache Sqoop está diseñado para cargar datos de forma masiva desde una base de datos relacional al HDFS (importación) y para escribir datos de forma masiva desde el HDFS a una base de datos relacional (exportación).
Los pasos de este tutorial están escritos para alguien con un conocimiento básico de la ejecución de consultas SQL y un conocimiento elemental de los comandos HDFS.
El sistema de base de datos utilizado es PostgreSQL 9.5 para Windows y la versión HDFS es Cloudera Hadoop 2.5.0-cdh5.2.0 en una máquina virtual Centos 6.4 Linux.
Apache Sqoop se basa en los archivos JAR del controlador JDBC que son específicos del proveedor de la base de datos relacional y la versión de la base de datos.
Para ejecutar los pasos que se muestran en este artículo, el usuario necesitará permisos para conectarse de forma remota a la base de datos de PostgreSQL, permisos de SELECT
en la base de datos relacional, permisos de escritura en HDFS y permisos de ejecución en el ejecutable de Sqoop.
A los fines de este tutorial, creamos una base de datos PostgreSQL, la llamamos Toptal y la hicimos accesible a través del puerto 5432.
Fuente de datos PostgreSQL
Para comenzar, en nuestra base de datos PostgreSQL Toptal
, crearemos una tabla de datos de prueba llamada sales
. Asumiremos que el certificado OpenSSL y los archivos de clave privada ya existen en el servidor PostgreSQL.
Server [localhost]: Database [postgres]: Toptal Port [5432]: Username [postgres]: Password for user postgres: psql (9.5.3) Toptal=# create table sales Toptal-# ( Toptal(# pkSales integer constraint salesKey primary key, Toptal(# saleDate date, Toptal(# saleAmount money, Toptal(# orderID int not null, Toptal(# itemID int not null Toptal(# ); CREATE TABLE
A continuación, insertaremos 20 filas en la tabla:
Toptal=# insert into sales values (1, '2016-09-27', 1.23, 1, 1); INSERT 0 1 Toptal=# insert into sales values (2, '2016-09-27', 2.34, 1, 2); INSERT 0 1 Toptal=# insert into sales values (3, '2016-09-27', 1.23, 2, 1); INSERT 0 1 Toptal=# insert into sales values (4, '2016-09-27', 2.34, 2, 2); INSERT 0 1 Toptal=# insert into sales values (5, '2016-09-27', 3.45, 2, 3); INSERT 0 1 Toptal=# insert into sales values (6, '2016-09-28', 3.45, 3, 3); INSERT 0 1 Toptal=# insert into sales values (7, '2016-09-28', 4.56, 3, 4); INSERT 0 1 Toptal=# insert into sales values (8, '2016-09-28', 5.67, 3, 5); INSERT 0 1 Toptal=# insert into sales values (9, '2016-09-28', 1.23, 4, 1); INSERT 0 1 Toptal=# insert into sales values (10, '2016-09-28', 1.23, 5, 1); INSERT 0 1 Toptal=# insert into sales values (11, '2016-09-28', 1.23, 6, 1); INSERT 0 1 Toptal=# insert into sales values (12, '2016-09-29', 1.23, 7, 1); INSERT 0 1 Toptal=# insert into sales values (13, '2016-09-29', 2.34, 7, 2); INSERT 0 1 Toptal=# insert into sales values (14, '2016-09-29', 3.45, 7, 3); INSERT 0 1 Toptal=# insert into sales values (15, '2016-09-29', 4.56, 7, 4); INSERT 0 1 Toptal=# insert into sales values (16, '2016-09-29', 5.67, 7, 5); INSERT 0 1 Toptal=# insert into sales values (17, '2016-09-29', 6.78, 7, 6); INSERT 0 1 Toptal=# insert into sales values (18, '2016-09-29', 7.89, 7, 7); INSERT 0 1 Toptal=# insert into sales values (19, '2016-09-29', 7.89, 8, 7); INSERT 0 1 Toptal=# insert into sales values (20, '2016-09-30', 1.23, 9, 1); INSERT 0 1
Seleccionemos los datos para verificar que los datos se ven correctos:
Toptal=# select * from sales; pksales | saledate | saleamount | orderid | itemid ---------+------------+------------+---------+-------- 1 | 2016-09-27 | $1.23 | 1 | 1 2 | 2016-09-27 | $2.34 | 1 | 2 3 | 2016-09-27 | $1.23 | 2 | 1 4 | 2016-09-27 | $2.34 | 2 | 2 5 | 2016-09-27 | $3.45 | 2 | 3 6 | 2016-09-28 | $3.45 | 3 | 3 7 | 2016-09-28 | $4.56 | 3 | 4 8 | 2016-09-28 | $5.67 | 3 | 5 9 | 2016-09-28 | $1.23 | 4 | 1 10 | 2016-09-28 | $1.23 | 5 | 1 11 | 2016-09-28 | $1.23 | 6 | 1 12 | 2016-09-29 | $1.23 | 7 | 1 13 | 2016-09-29 | $2.34 | 7 | 2 14 | 2016-09-29 | $3.45 | 7 | 3 15 | 2016-09-29 | $4.56 | 7 | 4 16 | 2016-09-29 | $5.67 | 7 | 5 17 | 2016-09-29 | $6.78 | 7 | 6 18 | 2016-09-29 | $7.89 | 7 | 7 19 | 2016-09-29 | $7.89 | 8 | 7 20 | 2016-09-30 | $1.23 | 9 | 1 (20 rows)
Los datos se ven bien, así que procedamos.
Importar a HDFS usando Sqoop
Con la fuente de datos definida, ahora estamos listos para importar los datos al HDFS. El comando sqoop
que examinaremos se enumera a continuación, y desglosaremos cada argumento en las viñetas que siguen. Tenga en cuenta que se supone que el comando debe estar en una línea completa o, como se muestra a continuación, con la barra invertida (el carácter de continuación de la línea de comandos de Linux) al final de cada línea excepto la última.
sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/Toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales'
-
sqoop import
: el ejecutable se llamasqoop
y le indicamos que importe los datos de una tabla o una vista de una base de datos al HDFS. -
--connect
: con el argumento--connect
, estamos pasando la cadena de conexión JDBC para PostgreSQL. En este caso, usamos la dirección IP, el número de puerto y el nombre de la base de datos. También debemos especificar que se está utilizando SSL y proporcionar la claseSSLSocketFactory
que se utilizará. -
--username
: en este ejemplo, el nombre de usuario es un inicio de sesión de PostgreSQL, no un inicio de sesión de Windows. El usuario debe tener permisos para conectarse a la base de datos especificada y seleccionar de la tabla especificada. -
-P
: esto solicitará la contraseña al usuario de la línea de comandos. Si rara vez se ejecuta Sqoop, esta podría ser una buena opción. Hay muchas otras formas de pasar la contraseña al comando automáticamente, pero estamos tratando de mantenerlo simple para este artículo. -
--table
- Aquí es donde pasamos el nombre de la tabla de PostgreSQL. -
--target-dir
: este argumento especifica el directorio HDFS donde se almacenarán los datos. -
--split-by
: debemos proporcionar a Sqoop un identificador único para ayudarlo a distribuir la carga de trabajo. Más adelante en el resultado del trabajo, veremos dónde Sqoop selecciona los valores mínimo y máximo para ayudar a establecer límites de división.
Es una buena idea poner el comando en una secuencia de comandos para fines de edición y repetibilidad, como se muestra a continuación:
[hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales' [hdfs@localhost:/sqoop]$
Ahora es el momento de ejecutar el script de comando Sqoop anterior. El resultado del comando Sqoop se muestra a continuación.
[hdfs@localhost:/sqoop]$ ./sqoopCommand.sh 16/10/02 18:58:34 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0 Enter password: 16/10/02 18:58:40 INFO manager.SqlManager: Using default fetchSize of 1000 16/10/02 18:58:40 INFO tool.CodeGenTool: Beginning code generation 16/10/02 18:58:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "sales" AS t LIMIT 1 16/10/02 18:58:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce Note: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 16/10/02 18:58:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.jar 16/10/02 18:58:43 WARN manager.PostgresqlManager: It looks like you are importing from postgresql. 16/10/02 18:58:43 WARN manager.PostgresqlManager: This transfer can be faster! Use the --direct 16/10/02 18:58:43 WARN manager.PostgresqlManager: option to exercise a postgresql-specific fast path. 16/10/02 18:58:43 INFO mapreduce.ImportJobBase: Beginning import of sales 16/10/02 18:58:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 16/10/02 18:58:46 INFO db.DBInputFormat: Using read commited transaction isolation 16/10/02 18:58:46 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("pksales"), MAX("pksales") FROM "sales" 16/10/02 18:58:47 INFO mapred.JobClient: Running job: job_201609280401_0005 16/10/02 18:58:48 INFO mapred.JobClient: map 0% reduce 0% 16/10/02 18:59:04 INFO mapred.JobClient: map 50% reduce 0% 16/10/02 18:59:14 INFO mapred.JobClient: map 75% reduce 0% 16/10/02 18:59:15 INFO mapred.JobClient: map 100% reduce 0% 16/10/02 18:59:18 INFO mapred.JobClient: Job complete: job_201609280401_0005 16/10/02 18:59:18 INFO mapred.JobClient: Counters: 23 16/10/02 18:59:18 INFO mapred.JobClient: File System Counters 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes read=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes written=1190344 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of write operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes read=438 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes written=451 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of read operations=4 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of write operations=4 16/10/02 18:59:18 INFO mapred.JobClient: Job Counters 16/10/02 18:59:18 INFO mapred.JobClient: Launched map tasks=4 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=48877 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Map-Reduce Framework 16/10/02 18:59:18 INFO mapred.JobClient: Map input records=20 16/10/02 18:59:18 INFO mapred.JobClient: Map output records=20 16/10/02 18:59:18 INFO mapred.JobClient: Input split bytes=438 16/10/02 18:59:18 INFO mapred.JobClient: Spilled Records=0 16/10/02 18:59:18 INFO mapred.JobClient: CPU time spent (ms)=3980 16/10/02 18:59:18 INFO mapred.JobClient: Physical memory (bytes) snapshot=481574912 16/10/02 18:59:18 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2949685248 16/10/02 18:59:18 INFO mapred.JobClient: Total committed heap usage (bytes)=127401984 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Transferred 451 bytes in 33.7555 seconds (13.3608 bytes/sec) 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Retrieved 20 records. [hdfs@localhost:/sqoop]$
Observe que la última línea de salida anterior muestra que se recuperaron 20 registros, lo que corresponde a los 20 registros en la tabla de la base de datos PostgreSQL.

Después de ejecutar el comando Sqoop, podemos ejecutar el comando hdfs dfs -ls
para ver el directorio que se creó de forma predeterminada con el nombre de la tabla en el HDFS.
[hdfs@localhost:/sqoop]$ hdfs dfs -ls Found 1 items drwxrwxrwx - toptal data 0 2016-10-02 18:59 sales [hdfs@localhost:/sqoop]$
Podemos usar el comando hdfs dfs -ls
nuevamente para listar el contenido del directorio de sales
. Si observa el HDFS, puede notar que los datos están particionados y distribuidos en cuatro archivos de forma predeterminada, no solo contenidos en uno.
[hdfs@localhost:/sqoop]$ hdfs dfs -ls sales Found 6 items -rw-rw-rw- 1 toptal data 0 2016-10-02 18:59 sales/_SUCCESS drwxrwxrwx - toptal data 0 2016-10-02 18:58 sales/_logs -rw-rw-rw- 1 toptal data 110 2016-10-02 18:59 sales/part-m-00000 -rw-rw-rw- 1 toptal data 111 2016-10-02 18:59 sales/part-m-00001 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00002 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00003 [hdfs@localhost:/sqoop]$
El comando hdfs dfs -cat
mostrará todos los registros en la primera partición de los datos de ventas en el HDFS.
[hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00000 1,2016-09-27,1.23,1,1 2,2016-09-27,2.34,1,2 3,2016-09-27,1.23,2,1 4,2016-09-27,2.34,2,2 5,2016-09-27,3.45,2,3 [hdfs@localhost:/sqoop]$
Tenga en cuenta que el delimitador de archivo predeterminado es una coma. Además, observe que solo hay cinco filas en cada partición, porque las 20 filas en el origen se distribuyeron por igual entre las cuatro particiones.
Para limitar la cantidad de filas que se muestran en la pantalla, podemos canalizar la salida del comando cat
al comando head
como se muestra a continuación, para verificar el contenido de las otras tres particiones.
El argumento -n 5
del comando head
limita la salida de la pantalla a las primeras cinco filas.
(Tenga en cuenta que, en nuestro caso, esto no es necesario ya que, para empezar, solo hay cinco filas en cada partición. Sin embargo, en la práctica, probablemente tendrá muchas más filas que esta en cada partición y solo querrá verificar las primeras para asegúrese de que se vean bien, así que esto le muestra cómo hacerlo).
[hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00001 |head -n 5 6,2016-09-28,3.45,3,3 7,2016-09-28,4.56,3,4 8,2016-09-28,5.67,3,5 9,2016-09-28,1.23,4,1 10,2016-09-28,1.23,5,1 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00002 |head -n 5 11,2016-09-28,1.23,6,1 12,2016-09-29,1.23,7,1 13,2016-09-29,2.34,7,2 14,2016-09-29,3.45,7,3 15,2016-09-29,4.56,7,4 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00003 |head -n 5 16,2016-09-29,5.67,7,5 17,2016-09-29,6.78,7,6 18,2016-09-29,7.89,7,7 19,2016-09-29,7.89,8,7 20,2016-09-30,1.23,9,1 [hdfs@localhost:/sqoop]$
Si necesita ejecutar una consulta para extraer datos de varias tablas en la base de datos de PostgreSQL, puede hacerlo con el siguiente comando:
[hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --target-dir 'creditCardOrders' \ --split-by 'pksales' \ --query "select s.pksales, s.saledate, s.saleamount, o.shippingtype, o.methodofpayment from sales s inner join orders o on s.orderid=o.orderid where o.methodofpayment='credit card' and \$CONDITIONS" [hdfs@localhost:/sqoop]$
En el comando anterior, usamos algunos de los mismos argumentos para el comando Sqoop, pero adquieren una importancia diferente cuando se usan con un comando SQL.
-
--target-dir
: el directorio de destino le dice a Sqoop en qué directorio del HDFS almacenar los datos seleccionados. Sqoop requiere este argumento cuando se utiliza una consulta de formato libre. -
--split-by
: aunque estamos seleccionando la clave principal de la tabla de ventas, todavía tenemos que proporcionarle a Sqoop un identificador único para ayudarlo a distribuir la carga de trabajo. -
--query
- Este es el argumento en el que proporcionamos la consulta SQL. La consulta anterior está entre comillas dobles. Tenga en cuenta que no hay una barra invertida (el carácter de continuación de línea) en las múltiples líneas que contienen la consulta. Observe también lasand \$CONDITIONS
al final de la cláusulaWHERE
. Sqoop lo requiere porque Sqoop reemplazará automáticamente el token$CONDITIONS
con una expresión única.
Problemas o ningún problema: debe considerar HDFS
HDFS tiene muchas ventajas sobre las bases de datos relacionales. Si está realizando análisis de datos, debería considerar migrar sus datos a HDFS hoy mismo.
Con las habilidades aprendidas aquí, importar datos de un sistema de base de datos relacional a HDFS es un proceso simple y directo que se puede lograr con un solo comando. Si bien estos ejemplos tienen una pequeña cantidad de filas, la mecánica de importar grandes volúmenes de datos a HDFS desde una tabla de base de datos de PostgreSQL sigue siendo la misma.
Incluso puede experimentar con la importación de tablas grandes y diferentes delimitadores de almacenamiento. Usar Apache Sqoop es más eficiente que exportar los datos de la base de datos a un archivo, transferir el archivo desde el servidor de la base de datos al HDFS y luego cargar el archivo al HDFS.