數據分析師的 HDFS 教程被關係數據庫困擾

已發表: 2022-03-11

介紹

到目前為止,您可能已經聽說過 Hadoop 分佈式文件系統 (HDFS),尤其是如果您是數據分析師或負責將數據從一個系統移動到另一個系統的人。 但是,HDFS 相對於關係數據庫有什麼好處呢?

HDFS 是一種可擴展的開源解決方案,用於存儲和處理大量數據。 HDFS 已在許多現代數據中心中被證明是可靠和高效的。

HDFS 利用商用硬件和開源軟件來降低每字節存儲的總成本。

憑藉其內置的複制和對磁盤故障的恢復能力,HDFS 是存儲和處理數據以進行分析的理想系統。 它不需要像傳統關係數據庫系統那樣支持事務原子性、一致性、隔離性和持久性 (ACID) 的基礎和開銷。

此外,與 Oracle 等企業和商業數據庫相比,使用 Hadoop 作為分析平台可以避免任何額外的許可成本。

許多人在第一次了解 HDFS 時提出的問題之一是:如何將現有數據放入 HDFS?

在本文中,我們將研究如何將 PostgreSQL 數據庫中的數據導入 HDFS。 我們將使用 Apache Sqoop,它是目前最高效的開源解決方案,用於在 HDFS 和關係數據庫系統之間傳輸數據。 Apache Sqoop 旨在將數據從關係數據庫批量加載到 HDFS(導入),並將數據從 HDFS 批量寫入到關係數據庫(導出)。

高密度文件系統

通過將數據遷移到 HDFS 來加速分析。
鳴叫

本教程中的步驟是為具有執行 SQL 查詢的基本知識和 HDFS 命令的基本知識的人編寫的。

使用的數據庫系統是 PostgreSQL 9.5 for Windows,HDFS 版本是 Centos 6.4 Linux 虛擬機上的 Cloudera Hadoop 2.5.0-cdh5.2.0。

Apache Sqoop 依賴於特定於關係數據庫供應商和數據庫版本的 JDBC 驅動程序 JAR 文件。

要執行本文中顯示的步驟,用戶將需要遠程連接到 PostgreSQL 數據庫的權限、關係數據庫的SELECT權限、HDFS 的寫入權限以及 Sqoop 可執行文件的執行權限。

出於本教程的目的,我們創建了一個 PostgreSQL 數據庫,將其命名為Toptal ,並使其可通過端口 5432 訪問。

PostgreSQL 數據源

首先,在我們的 PostgreSQL Toptal數據庫中,我們將創建一個名為sales的測試數據表。 我們將假設 OpenSSL 證書和私鑰文件已經存在於 PostgreSQL 服務器上。

 Server [localhost]: Database [postgres]: Toptal Port [5432]: Username [postgres]: Password for user postgres: psql (9.5.3) Toptal=# create table sales Toptal-# ( Toptal(# pkSales integer constraint salesKey primary key, Toptal(# saleDate date, Toptal(# saleAmount money, Toptal(# orderID int not null, Toptal(# itemID int not null Toptal(# ); CREATE TABLE

接下來,我們將向表中插入 20 行:

 Toptal=# insert into sales values (1, '2016-09-27', 1.23, 1, 1); INSERT 0 1 Toptal=# insert into sales values (2, '2016-09-27', 2.34, 1, 2); INSERT 0 1 Toptal=# insert into sales values (3, '2016-09-27', 1.23, 2, 1); INSERT 0 1 Toptal=# insert into sales values (4, '2016-09-27', 2.34, 2, 2); INSERT 0 1 Toptal=# insert into sales values (5, '2016-09-27', 3.45, 2, 3); INSERT 0 1 Toptal=# insert into sales values (6, '2016-09-28', 3.45, 3, 3); INSERT 0 1 Toptal=# insert into sales values (7, '2016-09-28', 4.56, 3, 4); INSERT 0 1 Toptal=# insert into sales values (8, '2016-09-28', 5.67, 3, 5); INSERT 0 1 Toptal=# insert into sales values (9, '2016-09-28', 1.23, 4, 1); INSERT 0 1 Toptal=# insert into sales values (10, '2016-09-28', 1.23, 5, 1); INSERT 0 1 Toptal=# insert into sales values (11, '2016-09-28', 1.23, 6, 1); INSERT 0 1 Toptal=# insert into sales values (12, '2016-09-29', 1.23, 7, 1); INSERT 0 1 Toptal=# insert into sales values (13, '2016-09-29', 2.34, 7, 2); INSERT 0 1 Toptal=# insert into sales values (14, '2016-09-29', 3.45, 7, 3); INSERT 0 1 Toptal=# insert into sales values (15, '2016-09-29', 4.56, 7, 4); INSERT 0 1 Toptal=# insert into sales values (16, '2016-09-29', 5.67, 7, 5); INSERT 0 1 Toptal=# insert into sales values (17, '2016-09-29', 6.78, 7, 6); INSERT 0 1 Toptal=# insert into sales values (18, '2016-09-29', 7.89, 7, 7); INSERT 0 1 Toptal=# insert into sales values (19, '2016-09-29', 7.89, 8, 7); INSERT 0 1 Toptal=# insert into sales values (20, '2016-09-30', 1.23, 9, 1); INSERT 0 1

讓我們選擇數據來驗證數據看起來是否正確:

 Toptal=# select * from sales; pksales | saledate | saleamount | orderid | itemid ---------+------------+------------+---------+-------- 1 | 2016-09-27 | $1.23 | 1 | 1 2 | 2016-09-27 | $2.34 | 1 | 2 3 | 2016-09-27 | $1.23 | 2 | 1 4 | 2016-09-27 | $2.34 | 2 | 2 5 | 2016-09-27 | $3.45 | 2 | 3 6 | 2016-09-28 | $3.45 | 3 | 3 7 | 2016-09-28 | $4.56 | 3 | 4 8 | 2016-09-28 | $5.67 | 3 | 5 9 | 2016-09-28 | $1.23 | 4 | 1 10 | 2016-09-28 | $1.23 | 5 | 1 11 | 2016-09-28 | $1.23 | 6 | 1 12 | 2016-09-29 | $1.23 | 7 | 1 13 | 2016-09-29 | $2.34 | 7 | 2 14 | 2016-09-29 | $3.45 | 7 | 3 15 | 2016-09-29 | $4.56 | 7 | 4 16 | 2016-09-29 | $5.67 | 7 | 5 17 | 2016-09-29 | $6.78 | 7 | 6 18 | 2016-09-29 | $7.89 | 7 | 7 19 | 2016-09-29 | $7.89 | 8 | 7 20 | 2016-09-30 | $1.23 | 9 | 1 (20 rows)

數據看起來不錯,所以讓我們繼續吧。

使用 Sqoop 導入 HDFS

定義好數據源後,我們現在可以將數據導入 HDFS。 下面列出了我們將檢查的sqoop命令,我們將在後面的要點中分解每個參數。 請注意,該命令應該在一個完整的行上,或者如下所示,在除最後一行之外的每一行的末尾都帶有反斜杠(Linux 命令行連續字符)。

 sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/Toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales'
  • sqoop import - 可執行文件名為sqoop ,我們指示它將數據從表或視圖從數據庫導入到 HDFS。
  • --connect - 使用--connect參數,我們傳入 PostgreSQL 的 JDBC 連接字符串。 在這種情況下,我們使用 IP 地址、端口號和數據庫名稱。 我們還需要指定正在使用 SSL,並且需要提供要使用的SSLSocketFactory類。
  • --username - 在此示例中,用戶名是 PostgreSQL 登錄名,而不是 Windows 登錄名。 用戶必須有權連接到指定的數據庫並從指定的表中進行選擇。
  • -P - 這將提示命令行用戶輸入密碼。 如果很少執行 Sqoop,這可能是一個不錯的選擇。 有多種其他方法可以自動將密碼傳遞給命令,但我們試圖在本文中保持簡單。
  • --table - 這是我們傳入 PostgreSQL 表名稱的地方。
  • --target-dir - 此參數指定要存儲數據的 HDFS 目錄。
  • --split-by - 我們必須為 Sqoop 提供一個唯一標識符,以幫助它分配工作負載。 稍後在作業輸出中,我們將看到 Sqoop 在哪裡選擇最小值和最大值以幫助設置分割邊界。

為了可重複性和編輯目的,將命令放在腳本中是個好主意,如下所示:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales' [hdfs@localhost:/sqoop]$

現在,是時候執行上面的 Sqoop 命令腳本了。 Sqoop 命令的輸出如下所示。

 [hdfs@localhost:/sqoop]$ ./sqoopCommand.sh 16/10/02 18:58:34 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0 Enter password: 16/10/02 18:58:40 INFO manager.SqlManager: Using default fetchSize of 1000 16/10/02 18:58:40 INFO tool.CodeGenTool: Beginning code generation 16/10/02 18:58:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "sales" AS t LIMIT 1 16/10/02 18:58:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce Note: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 16/10/02 18:58:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.jar 16/10/02 18:58:43 WARN manager.PostgresqlManager: It looks like you are importing from postgresql. 16/10/02 18:58:43 WARN manager.PostgresqlManager: This transfer can be faster! Use the --direct 16/10/02 18:58:43 WARN manager.PostgresqlManager: option to exercise a postgresql-specific fast path. 16/10/02 18:58:43 INFO mapreduce.ImportJobBase: Beginning import of sales 16/10/02 18:58:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 16/10/02 18:58:46 INFO db.DBInputFormat: Using read commited transaction isolation 16/10/02 18:58:46 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("pksales"), MAX("pksales") FROM "sales" 16/10/02 18:58:47 INFO mapred.JobClient: Running job: job_201609280401_0005 16/10/02 18:58:48 INFO mapred.JobClient: map 0% reduce 0% 16/10/02 18:59:04 INFO mapred.JobClient: map 50% reduce 0% 16/10/02 18:59:14 INFO mapred.JobClient: map 75% reduce 0% 16/10/02 18:59:15 INFO mapred.JobClient: map 100% reduce 0% 16/10/02 18:59:18 INFO mapred.JobClient: Job complete: job_201609280401_0005 16/10/02 18:59:18 INFO mapred.JobClient: Counters: 23 16/10/02 18:59:18 INFO mapred.JobClient: File System Counters 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes read=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes written=1190344 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of write operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes read=438 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes written=451 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of read operations=4 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of write operations=4 16/10/02 18:59:18 INFO mapred.JobClient: Job Counters 16/10/02 18:59:18 INFO mapred.JobClient: Launched map tasks=4 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=48877 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Map-Reduce Framework 16/10/02 18:59:18 INFO mapred.JobClient: Map input records=20 16/10/02 18:59:18 INFO mapred.JobClient: Map output records=20 16/10/02 18:59:18 INFO mapred.JobClient: Input split bytes=438 16/10/02 18:59:18 INFO mapred.JobClient: Spilled Records=0 16/10/02 18:59:18 INFO mapred.JobClient: CPU time spent (ms)=3980 16/10/02 18:59:18 INFO mapred.JobClient: Physical memory (bytes) snapshot=481574912 16/10/02 18:59:18 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2949685248 16/10/02 18:59:18 INFO mapred.JobClient: Total committed heap usage (bytes)=127401984 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Transferred 451 bytes in 33.7555 seconds (13.3608 bytes/sec) 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Retrieved 20 records. [hdfs@localhost:/sqoop]$

注意上面輸出的最後一行顯示檢索到了 20 條記錄,這對應於 PostgreSQL 數據庫表中的 20 條記錄。

執行完Sqoop命令後,我們可以執行hdfs dfs -ls命令查看HDFS上默認創建的目錄,表名是這個目錄。

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls Found 1 items drwxrwxrwx - toptal data 0 2016-10-02 18:59 sales [hdfs@localhost:/sqoop]$

我們可以再次使用hdfs dfs -ls命令列出sales目錄的內容。 如果您查看 HDFS,您會注意到默認情況下數據被分區並分佈在四個文件中,而不僅僅是包含在一個文件中。

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls sales Found 6 items -rw-rw-rw- 1 toptal data 0 2016-10-02 18:59 sales/_SUCCESS drwxrwxrwx - toptal data 0 2016-10-02 18:58 sales/_logs -rw-rw-rw- 1 toptal data 110 2016-10-02 18:59 sales/part-m-00000 -rw-rw-rw- 1 toptal data 111 2016-10-02 18:59 sales/part-m-00001 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00002 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00003 [hdfs@localhost:/sqoop]$

hdfs dfs -cat命令將顯示 HDFS 上銷售數據的第一個分區中的所有記錄。

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00000 1,2016-09-27,1.23,1,1 2,2016-09-27,2.34,1,2 3,2016-09-27,1.23,2,1 4,2016-09-27,2.34,2,2 5,2016-09-27,3.45,2,3 [hdfs@localhost:/sqoop]$

請注意,默認文件分隔符是逗號。 此外,請注意每個分區中只有 5 行,因為源中的 20 行已平均分佈在四個分區中。

為了限制輸出到屏幕的行數,我們可以將cat命令的輸出通過管道傳送到head命令,如下所示,以檢查其他三個分區的內容。

head命令的-n 5參數將屏幕輸出限制為前五行。

(請注意,在我們的例子中,這是不必要的,因為開始時每個分區中只有五行。但實際上,每個分區中的行數可能比這多得多,並且只想檢查前幾行確保它們看起來正確,因此這將向您展示如何操作。)

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00001 |head -n 5 6,2016-09-28,3.45,3,3 7,2016-09-28,4.56,3,4 8,2016-09-28,5.67,3,5 9,2016-09-28,1.23,4,1 10,2016-09-28,1.23,5,1 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00002 |head -n 5 11,2016-09-28,1.23,6,1 12,2016-09-29,1.23,7,1 13,2016-09-29,2.34,7,2 14,2016-09-29,3.45,7,3 15,2016-09-29,4.56,7,4 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00003 |head -n 5 16,2016-09-29,5.67,7,5 17,2016-09-29,6.78,7,6 18,2016-09-29,7.89,7,7 19,2016-09-29,7.89,8,7 20,2016-09-30,1.23,9,1 [hdfs@localhost:/sqoop]$

如果您需要運行查詢以從 PostgreSQL 數據庫中的多個表中提取數據,可以使用以下命令完成:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --target-dir 'creditCardOrders' \ --split-by 'pksales' \ --query "select s.pksales, s.saledate, s.saleamount, o.shippingtype, o.methodofpayment from sales s inner join orders o on s.orderid=o.orderid where o.methodofpayment='credit card' and \$CONDITIONS" [hdfs@localhost:/sqoop]$

在上面的命令中,我們使用了一些與 Sqoop 命令相同的參數,但它們在與 SQL 命令一起使用時具有不同的重要性。

  • --target-dir - 目標目錄告訴 Sqoop 在 HDFS 上的哪個目錄中存儲選定的數據。 Sqoop 在使用自由格式查詢時需要此參數。
  • --split-by - 即使我們選擇了 sales 表的主鍵,我們仍然必須為 Sqoop 提供一個唯一標識符來幫助它分配工作負載。
  • --query - 這是我們提供 SQL 查詢的參數。 上面的查詢用雙引號括起來。 請注意,包含查詢的多行中沒有反斜杠(行繼續符)。 還要注意WHERE子句末尾的and \$CONDITIONS 。 這是 Sqoop 要求的,因為 Sqoop 會自動將$CONDITIONS標記替換為唯一的表達式。

問題或沒有問題:您應該考慮 HDFS

HDFS 與關係數據庫相比具有許多優勢。 如果您正在進行數據分析,您應該考慮將數據遷移到 HDFS,今天就開始。

借助此處學到的技能,將數據從關係數據庫系統導入 HDFS 是一個簡單直接的過程,只需一個命令即可完成。 儘管這些示例的行數很少,但將大量數據從 PostgreSQL 數據庫表導入 HDFS 的機制保持不變。

您甚至可以嘗試導入大型表和不同的存儲分隔符。 使用 Apache Sqoop 比將數據庫數據導出到文件,將文件從數據庫服務器傳輸到 HDFS,然後將文件加載到 HDFS 更有效。

相關:使用 R 提升數據處理能力