数据分析师的 HDFS 教程被关系数据库困扰

已发表: 2022-03-11

介绍

到目前为止,您可能已经听说过 Hadoop 分布式文件系统 (HDFS),尤其是如果您是数据分析师或负责将数据从一个系统移动到另一个系统的人。 但是,HDFS 相对于关系数据库有什么好处呢?

HDFS 是一种可扩展的开源解决方案,用于存储和处理大量数据。 HDFS 已在许多现代数据中心中被证明是可靠和高效的。

HDFS 利用商用硬件和开源软件来降低每字节存储的总成本。

凭借其内置的复制和对磁盘故障的恢复能力,HDFS 是存储和处理数据以进行分析的理想系统。 它不需要像传统关系数据库系统那样支持事务原子性、一致性、隔离性和持久性 (ACID) 的基础和开销。

此外,与 Oracle 等企业和商业数据库相比,使用 Hadoop 作为分析平台可以避免任何额外的许可成本。

许多人在第一次了解 HDFS 时提出的问题之一是:如何将现有数据放入 HDFS?

在本文中,我们将研究如何将 PostgreSQL 数据库中的数据导入 HDFS。 我们将使用 Apache Sqoop,它是目前最高效的开源解决方案,用于在 HDFS 和关系数据库系统之间传输数据。 Apache Sqoop 旨在将数据从关系数据库批量加载到 HDFS(导入),并将数据从 HDFS 批量写入到关系数据库(导出)。

高密度文件系统

通过将数据迁移到 HDFS 来加速分析。
鸣叫

本教程中的步骤是为具有执行 SQL 查询的基本知识和 HDFS 命令的基本知识的人编写的。

使用的数据库系统是 PostgreSQL 9.5 for Windows,HDFS 版本是 Centos 6.4 Linux 虚拟机上的 Cloudera Hadoop 2.5.0-cdh5.2.0。

Apache Sqoop 依赖于特定于关系数据库供应商和数据库版本的 JDBC 驱动程序 JAR 文件。

要执行本文中显示的步骤,用户将需要远程连接到 PostgreSQL 数据库的权限、关系数据库的SELECT权限、HDFS 的写入权限以及 Sqoop 可执行文件的执行权限。

出于本教程的目的,我们创建了一个 PostgreSQL 数据库,将其命名为Toptal ,并使其可通过端口 5432 访问。

PostgreSQL 数据源

首先,在我们的 PostgreSQL Toptal数据库中,我们将创建一个名为sales的测试数据表。 我们将假设 OpenSSL 证书和私钥文件已经存在于 PostgreSQL 服务器上。

 Server [localhost]: Database [postgres]: Toptal Port [5432]: Username [postgres]: Password for user postgres: psql (9.5.3) Toptal=# create table sales Toptal-# ( Toptal(# pkSales integer constraint salesKey primary key, Toptal(# saleDate date, Toptal(# saleAmount money, Toptal(# orderID int not null, Toptal(# itemID int not null Toptal(# ); CREATE TABLE

接下来,我们将向表中插入 20 行:

 Toptal=# insert into sales values (1, '2016-09-27', 1.23, 1, 1); INSERT 0 1 Toptal=# insert into sales values (2, '2016-09-27', 2.34, 1, 2); INSERT 0 1 Toptal=# insert into sales values (3, '2016-09-27', 1.23, 2, 1); INSERT 0 1 Toptal=# insert into sales values (4, '2016-09-27', 2.34, 2, 2); INSERT 0 1 Toptal=# insert into sales values (5, '2016-09-27', 3.45, 2, 3); INSERT 0 1 Toptal=# insert into sales values (6, '2016-09-28', 3.45, 3, 3); INSERT 0 1 Toptal=# insert into sales values (7, '2016-09-28', 4.56, 3, 4); INSERT 0 1 Toptal=# insert into sales values (8, '2016-09-28', 5.67, 3, 5); INSERT 0 1 Toptal=# insert into sales values (9, '2016-09-28', 1.23, 4, 1); INSERT 0 1 Toptal=# insert into sales values (10, '2016-09-28', 1.23, 5, 1); INSERT 0 1 Toptal=# insert into sales values (11, '2016-09-28', 1.23, 6, 1); INSERT 0 1 Toptal=# insert into sales values (12, '2016-09-29', 1.23, 7, 1); INSERT 0 1 Toptal=# insert into sales values (13, '2016-09-29', 2.34, 7, 2); INSERT 0 1 Toptal=# insert into sales values (14, '2016-09-29', 3.45, 7, 3); INSERT 0 1 Toptal=# insert into sales values (15, '2016-09-29', 4.56, 7, 4); INSERT 0 1 Toptal=# insert into sales values (16, '2016-09-29', 5.67, 7, 5); INSERT 0 1 Toptal=# insert into sales values (17, '2016-09-29', 6.78, 7, 6); INSERT 0 1 Toptal=# insert into sales values (18, '2016-09-29', 7.89, 7, 7); INSERT 0 1 Toptal=# insert into sales values (19, '2016-09-29', 7.89, 8, 7); INSERT 0 1 Toptal=# insert into sales values (20, '2016-09-30', 1.23, 9, 1); INSERT 0 1

让我们选择数据来验证数据看起来是否正确:

 Toptal=# select * from sales; pksales | saledate | saleamount | orderid | itemid ---------+------------+------------+---------+-------- 1 | 2016-09-27 | $1.23 | 1 | 1 2 | 2016-09-27 | $2.34 | 1 | 2 3 | 2016-09-27 | $1.23 | 2 | 1 4 | 2016-09-27 | $2.34 | 2 | 2 5 | 2016-09-27 | $3.45 | 2 | 3 6 | 2016-09-28 | $3.45 | 3 | 3 7 | 2016-09-28 | $4.56 | 3 | 4 8 | 2016-09-28 | $5.67 | 3 | 5 9 | 2016-09-28 | $1.23 | 4 | 1 10 | 2016-09-28 | $1.23 | 5 | 1 11 | 2016-09-28 | $1.23 | 6 | 1 12 | 2016-09-29 | $1.23 | 7 | 1 13 | 2016-09-29 | $2.34 | 7 | 2 14 | 2016-09-29 | $3.45 | 7 | 3 15 | 2016-09-29 | $4.56 | 7 | 4 16 | 2016-09-29 | $5.67 | 7 | 5 17 | 2016-09-29 | $6.78 | 7 | 6 18 | 2016-09-29 | $7.89 | 7 | 7 19 | 2016-09-29 | $7.89 | 8 | 7 20 | 2016-09-30 | $1.23 | 9 | 1 (20 rows)

数据看起来不错,所以让我们继续吧。

使用 Sqoop 导入 HDFS

定义好数据源后,我们现在可以将数据导入 HDFS。 下面列出了我们将检查的sqoop命令,我们将在后面的要点中分解每个参数。 请注意,该命令应该在一个完整的行上,或者如下所示,在除最后一行之外的每一行的末尾都带有反斜杠(Linux 命令行连续字符)。

 sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/Toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales'
  • sqoop import - 可执行文件名为sqoop ,我们指示它将数据从表或视图从数据库导入到 HDFS。
  • --connect - 使用--connect参数,我们传入 PostgreSQL 的 JDBC 连接字符串。 在这种情况下,我们使用 IP 地址、端口号和数据库名称。 我们还需要指定正在使用 SSL,并且需要提供要使用的SSLSocketFactory类。
  • --username - 在此示例中,用户名是 PostgreSQL 登录名,而不是 Windows 登录名。 用户必须有权连接到指定的数据库并从指定的表中进行选择。
  • -P - 这将提示命令行用户输入密码。 如果很少执行 Sqoop,这可能是一个不错的选择。 有多种其他方法可以自动将密码传递给命令,但我们试图在本文中保持简单。
  • --table - 这是我们传入 PostgreSQL 表名称的地方。
  • --target-dir - 此参数指定要存储数据的 HDFS 目录。
  • --split-by - 我们必须为 Sqoop 提供一个唯一标识符,以帮助它分配工作负载。 稍后在作业输出中,我们将看到 Sqoop 在哪里选择最小值和最大值以帮助设置分割边界。

为了可重复性和编辑目的,将命令放在脚本中是个好主意,如下所示:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales' [hdfs@localhost:/sqoop]$

现在,是时候执行上面的 Sqoop 命令脚本了。 Sqoop 命令的输出如下所示。

 [hdfs@localhost:/sqoop]$ ./sqoopCommand.sh 16/10/02 18:58:34 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0 Enter password: 16/10/02 18:58:40 INFO manager.SqlManager: Using default fetchSize of 1000 16/10/02 18:58:40 INFO tool.CodeGenTool: Beginning code generation 16/10/02 18:58:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "sales" AS t LIMIT 1 16/10/02 18:58:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce Note: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 16/10/02 18:58:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.jar 16/10/02 18:58:43 WARN manager.PostgresqlManager: It looks like you are importing from postgresql. 16/10/02 18:58:43 WARN manager.PostgresqlManager: This transfer can be faster! Use the --direct 16/10/02 18:58:43 WARN manager.PostgresqlManager: option to exercise a postgresql-specific fast path. 16/10/02 18:58:43 INFO mapreduce.ImportJobBase: Beginning import of sales 16/10/02 18:58:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 16/10/02 18:58:46 INFO db.DBInputFormat: Using read commited transaction isolation 16/10/02 18:58:46 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("pksales"), MAX("pksales") FROM "sales" 16/10/02 18:58:47 INFO mapred.JobClient: Running job: job_201609280401_0005 16/10/02 18:58:48 INFO mapred.JobClient: map 0% reduce 0% 16/10/02 18:59:04 INFO mapred.JobClient: map 50% reduce 0% 16/10/02 18:59:14 INFO mapred.JobClient: map 75% reduce 0% 16/10/02 18:59:15 INFO mapred.JobClient: map 100% reduce 0% 16/10/02 18:59:18 INFO mapred.JobClient: Job complete: job_201609280401_0005 16/10/02 18:59:18 INFO mapred.JobClient: Counters: 23 16/10/02 18:59:18 INFO mapred.JobClient: File System Counters 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes read=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes written=1190344 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of write operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes read=438 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes written=451 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of read operations=4 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of write operations=4 16/10/02 18:59:18 INFO mapred.JobClient: Job Counters 16/10/02 18:59:18 INFO mapred.JobClient: Launched map tasks=4 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=48877 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Map-Reduce Framework 16/10/02 18:59:18 INFO mapred.JobClient: Map input records=20 16/10/02 18:59:18 INFO mapred.JobClient: Map output records=20 16/10/02 18:59:18 INFO mapred.JobClient: Input split bytes=438 16/10/02 18:59:18 INFO mapred.JobClient: Spilled Records=0 16/10/02 18:59:18 INFO mapred.JobClient: CPU time spent (ms)=3980 16/10/02 18:59:18 INFO mapred.JobClient: Physical memory (bytes) snapshot=481574912 16/10/02 18:59:18 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2949685248 16/10/02 18:59:18 INFO mapred.JobClient: Total committed heap usage (bytes)=127401984 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Transferred 451 bytes in 33.7555 seconds (13.3608 bytes/sec) 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Retrieved 20 records. [hdfs@localhost:/sqoop]$

注意上面输出的最后一行显示检索到了 20 条记录,这对应于 PostgreSQL 数据库表中的 20 条记录。

执行完Sqoop命令后,我们可以执行hdfs dfs -ls命令查看HDFS上默认创建的目录,表名是这个目录。

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls Found 1 items drwxrwxrwx - toptal data 0 2016-10-02 18:59 sales [hdfs@localhost:/sqoop]$

我们可以再次使用hdfs dfs -ls命令列出sales目录的内容。 如果您查看 HDFS,您会注意到默认情况下数据被分区并分布在四个文件中,而不仅仅是包含在一个文件中。

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls sales Found 6 items -rw-rw-rw- 1 toptal data 0 2016-10-02 18:59 sales/_SUCCESS drwxrwxrwx - toptal data 0 2016-10-02 18:58 sales/_logs -rw-rw-rw- 1 toptal data 110 2016-10-02 18:59 sales/part-m-00000 -rw-rw-rw- 1 toptal data 111 2016-10-02 18:59 sales/part-m-00001 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00002 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00003 [hdfs@localhost:/sqoop]$

hdfs dfs -cat命令将显示 HDFS 上销售数据的第一个分区中的所有记录。

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00000 1,2016-09-27,1.23,1,1 2,2016-09-27,2.34,1,2 3,2016-09-27,1.23,2,1 4,2016-09-27,2.34,2,2 5,2016-09-27,3.45,2,3 [hdfs@localhost:/sqoop]$

请注意,默认文件分隔符是逗号。 此外,请注意每个分区中只有 5 行,因为源中的 20 行已平均分布在四个分区中。

为了限制输出到屏幕的行数,我们可以将cat命令的输出通过管道传送到head命令,如下所示,以检查其他三个分区的内容。

head命令的-n 5参数将屏幕输出限制为前五行。

(请注意,在我们的例子中,这是不必要的,因为开始时每个分区中只有五行。但实际上,每个分区中的行数可能比这多得多,并且只想检查前几行确保它们看起来正确,因此这将向您展示如何操作。)

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00001 |head -n 5 6,2016-09-28,3.45,3,3 7,2016-09-28,4.56,3,4 8,2016-09-28,5.67,3,5 9,2016-09-28,1.23,4,1 10,2016-09-28,1.23,5,1 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00002 |head -n 5 11,2016-09-28,1.23,6,1 12,2016-09-29,1.23,7,1 13,2016-09-29,2.34,7,2 14,2016-09-29,3.45,7,3 15,2016-09-29,4.56,7,4 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00003 |head -n 5 16,2016-09-29,5.67,7,5 17,2016-09-29,6.78,7,6 18,2016-09-29,7.89,7,7 19,2016-09-29,7.89,8,7 20,2016-09-30,1.23,9,1 [hdfs@localhost:/sqoop]$

如果您需要运行查询以从 PostgreSQL 数据库中的多个表中提取数据,可以使用以下命令完成:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --target-dir 'creditCardOrders' \ --split-by 'pksales' \ --query "select s.pksales, s.saledate, s.saleamount, o.shippingtype, o.methodofpayment from sales s inner join orders o on s.orderid=o.orderid where o.methodofpayment='credit card' and \$CONDITIONS" [hdfs@localhost:/sqoop]$

在上面的命令中,我们使用了一些与 Sqoop 命令相同的参数,但它们在与 SQL 命令一起使用时具有不同的重要性。

  • --target-dir - 目标目录告诉 Sqoop 在 HDFS 上的哪个目录中存储选定的数据。 Sqoop 在使用自由格式查询时需要此参数。
  • --split-by - 即使我们选择了 sales 表的主键,我们仍然必须为 Sqoop 提供一个唯一标识符来帮助它分配工作负载。
  • --query - 这是我们提供 SQL 查询的参数。 上面的查询用双引号括起来。 请注意,包含查询的多行中没有反斜杠(行继续符)。 还要注意WHERE子句末尾的and \$CONDITIONS 。 这是 Sqoop 要求的,因为 Sqoop 会自动将$CONDITIONS标记替换为唯一的表达式。

问题或没有问题:您应该考虑 HDFS

HDFS 与关系数据库相比具有许多优势。 如果您正在进行数据分析,您应该考虑将数据迁移到 HDFS,今天就开始。

借助此处学到的技能,将数据从关系数据库系统导入 HDFS 是一个简单直接的过程,只需一个命令即可完成。 尽管这些示例的行数很少,但将大量数据从 PostgreSQL 数据库表导入 HDFS 的机制保持不变。

您甚至可以尝试导入大型表和不同的存储分隔符。 使用 Apache Sqoop 比将数据库数据导出到文件,将文件从数据库服务器传输到 HDFS,然后将文件加载到 HDFS 更有效。

相关:使用 R 提升数据处理能力