Учебное пособие по HDFS для аналитиков данных, застрявших на реляционных базах данных

Опубликовано: 2022-03-11

Введение

К настоящему времени вы, вероятно, слышали о распределенной файловой системе Hadoop (HDFS), особенно если вы аналитик данных или кто-то, кто отвечает за перемещение данных из одной системы в другую. Однако каковы преимущества HDFS по сравнению с реляционными базами данных?

HDFS — это масштабируемое решение с открытым исходным кодом для хранения и обработки больших объемов данных. Надежность и эффективность HDFS доказана во многих современных центрах обработки данных.

HDFS использует стандартное оборудование вместе с программным обеспечением с открытым исходным кодом, чтобы снизить общую стоимость байта хранилища.

Благодаря встроенной репликации и устойчивости к сбоям диска HDFS является идеальной системой для хранения и обработки данных для аналитики. Он не требует поддержки и накладных расходов для поддержки атомарности транзакций, согласованности, изоляции и надежности (ACID), как это необходимо для традиционных систем реляционных баз данных.

Кроме того, по сравнению с корпоративными и коммерческими базами данных, такими как Oracle, использование Hadoop в качестве аналитической платформы позволяет избежать дополнительных затрат на лицензирование.

Один из вопросов, который многие люди задают при первом знакомстве с HDFS, звучит так: как перенести существующие данные в HDFS?

В этой статье мы рассмотрим, как импортировать данные из базы данных PostgreSQL в HDFS. Мы будем использовать Apache Sqoop, который в настоящее время является наиболее эффективным решением с открытым исходным кодом для передачи данных между HDFS и системами реляционных баз данных. Apache Sqoop предназначен для массовой загрузки данных из реляционной базы данных в HDFS (импорт) и массовой записи данных из HDFS в реляционную базу данных (экспорт).

HDFS

Ускорьте аналитику, перенеся свои данные в HDFS.
Твитнуть

Шаги в этом руководстве написаны для тех, у кого есть базовые знания о выполнении запросов SQL и элементарные знания о командах HDFS.

Используемая система базы данных — PostgreSQL 9.5 для Windows, а версия HDFS — Cloudera Hadoop 2.5.0-cdh5.2.0 на виртуальной машине Centos 6.4 Linux.

Apache Sqoop использует JAR-файлы драйвера JDBC, которые относятся к поставщику реляционной базы данных и версии базы данных.

Для выполнения действий, описанных в этой статье, пользователю потребуются разрешения на удаленное подключение к базе данных PostgreSQL, разрешения SELECT для реляционной базы данных, разрешения на запись в HDFS и разрешения на выполнение исполняемого файла Sqoop.

Для целей этого руководства мы создали базу данных PostgreSQL, назвали ее Toptal и сделали ее доступной через порт 5432.

Источник данных PostgreSQL

Для начала в нашей базе данных PostgreSQL Toptal мы создадим таблицу тестовых данных с именем sales . Предположим, что сертификат OpenSSL и файлы закрытого ключа уже существуют на сервере PostgreSQL.

 Server [localhost]: Database [postgres]: Toptal Port [5432]: Username [postgres]: Password for user postgres: psql (9.5.3) Toptal=# create table sales Toptal-# ( Toptal(# pkSales integer constraint salesKey primary key, Toptal(# saleDate date, Toptal(# saleAmount money, Toptal(# orderID int not null, Toptal(# itemID int not null Toptal(# ); CREATE TABLE

Далее мы вставим в таблицу 20 строк:

 Toptal=# insert into sales values (1, '2016-09-27', 1.23, 1, 1); INSERT 0 1 Toptal=# insert into sales values (2, '2016-09-27', 2.34, 1, 2); INSERT 0 1 Toptal=# insert into sales values (3, '2016-09-27', 1.23, 2, 1); INSERT 0 1 Toptal=# insert into sales values (4, '2016-09-27', 2.34, 2, 2); INSERT 0 1 Toptal=# insert into sales values (5, '2016-09-27', 3.45, 2, 3); INSERT 0 1 Toptal=# insert into sales values (6, '2016-09-28', 3.45, 3, 3); INSERT 0 1 Toptal=# insert into sales values (7, '2016-09-28', 4.56, 3, 4); INSERT 0 1 Toptal=# insert into sales values (8, '2016-09-28', 5.67, 3, 5); INSERT 0 1 Toptal=# insert into sales values (9, '2016-09-28', 1.23, 4, 1); INSERT 0 1 Toptal=# insert into sales values (10, '2016-09-28', 1.23, 5, 1); INSERT 0 1 Toptal=# insert into sales values (11, '2016-09-28', 1.23, 6, 1); INSERT 0 1 Toptal=# insert into sales values (12, '2016-09-29', 1.23, 7, 1); INSERT 0 1 Toptal=# insert into sales values (13, '2016-09-29', 2.34, 7, 2); INSERT 0 1 Toptal=# insert into sales values (14, '2016-09-29', 3.45, 7, 3); INSERT 0 1 Toptal=# insert into sales values (15, '2016-09-29', 4.56, 7, 4); INSERT 0 1 Toptal=# insert into sales values (16, '2016-09-29', 5.67, 7, 5); INSERT 0 1 Toptal=# insert into sales values (17, '2016-09-29', 6.78, 7, 6); INSERT 0 1 Toptal=# insert into sales values (18, '2016-09-29', 7.89, 7, 7); INSERT 0 1 Toptal=# insert into sales values (19, '2016-09-29', 7.89, 8, 7); INSERT 0 1 Toptal=# insert into sales values (20, '2016-09-30', 1.23, 9, 1); INSERT 0 1

Давайте выберем данные, чтобы убедиться, что данные выглядят правильно:

 Toptal=# select * from sales; pksales | saledate | saleamount | orderid | itemid ---------+------------+------------+---------+-------- 1 | 2016-09-27 | $1.23 | 1 | 1 2 | 2016-09-27 | $2.34 | 1 | 2 3 | 2016-09-27 | $1.23 | 2 | 1 4 | 2016-09-27 | $2.34 | 2 | 2 5 | 2016-09-27 | $3.45 | 2 | 3 6 | 2016-09-28 | $3.45 | 3 | 3 7 | 2016-09-28 | $4.56 | 3 | 4 8 | 2016-09-28 | $5.67 | 3 | 5 9 | 2016-09-28 | $1.23 | 4 | 1 10 | 2016-09-28 | $1.23 | 5 | 1 11 | 2016-09-28 | $1.23 | 6 | 1 12 | 2016-09-29 | $1.23 | 7 | 1 13 | 2016-09-29 | $2.34 | 7 | 2 14 | 2016-09-29 | $3.45 | 7 | 3 15 | 2016-09-29 | $4.56 | 7 | 4 16 | 2016-09-29 | $5.67 | 7 | 5 17 | 2016-09-29 | $6.78 | 7 | 6 18 | 2016-09-29 | $7.89 | 7 | 7 19 | 2016-09-29 | $7.89 | 8 | 7 20 | 2016-09-30 | $1.23 | 9 | 1 (20 rows)

Данные выглядят хорошо, так что давайте продолжим.

Импорт в HDFS с помощью Sqoop

Теперь, когда источник данных определен, мы готовы импортировать данные в HDFS. Команда sqoop , которую мы рассмотрим, указана ниже, и мы разберем каждый аргумент в следующих пунктах. Обратите внимание, что команда должна находиться в одной полной строке или, как показано ниже, с обратной косой чертой (символом продолжения командной строки Linux) в конце каждой строки, кроме последней.

 sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/Toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales'
  • sqoop import — исполняемый файл называется sqoop , и мы указываем ему импортировать данные из таблицы или представления из базы данных в HDFS.
  • --connect — с помощью аргумента --connect мы передаем строку подключения JDBC для PostgreSQL. В этом случае мы используем IP-адрес, номер порта и имя базы данных. Нам также необходимо указать, что используется SSL, и необходимо предоставить используемый класс SSLSocketFactory .
  • --username — в этом примере имя пользователя — это логин PostgreSQL, а не логин Windows. Пользователь должен иметь разрешения на подключение к указанной базе данных и выбор из указанной таблицы.
  • -P — это запросит у пользователя командной строки пароль. Если Sqoop редко выполняется, это может быть хорошим вариантом. Есть несколько других способов автоматической передачи пароля команде, но в этой статье мы постараемся упростить его.
  • --table — здесь мы передаем имя таблицы PostgreSQL.
  • --target-dir — этот аргумент указывает каталог HDFS, в котором должны храниться данные.
  • --split-by — мы должны предоставить Sqoop уникальный идентификатор, чтобы помочь ему распределить нагрузку. Позже в выходных данных задания мы увидим, где Sqoop выбирает минимальное и максимальное значения, чтобы помочь установить границы разделения.

Рекомендуется поместить команду в сценарий для повторения и редактирования, как показано ниже:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales' [hdfs@localhost:/sqoop]$

Теперь пришло время выполнить приведенный выше командный сценарий Sqoop. Вывод команды Sqoop показан ниже.

 [hdfs@localhost:/sqoop]$ ./sqoopCommand.sh 16/10/02 18:58:34 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0 Enter password: 16/10/02 18:58:40 INFO manager.SqlManager: Using default fetchSize of 1000 16/10/02 18:58:40 INFO tool.CodeGenTool: Beginning code generation 16/10/02 18:58:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "sales" AS t LIMIT 1 16/10/02 18:58:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce Note: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 16/10/02 18:58:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.jar 16/10/02 18:58:43 WARN manager.PostgresqlManager: It looks like you are importing from postgresql. 16/10/02 18:58:43 WARN manager.PostgresqlManager: This transfer can be faster! Use the --direct 16/10/02 18:58:43 WARN manager.PostgresqlManager: option to exercise a postgresql-specific fast path. 16/10/02 18:58:43 INFO mapreduce.ImportJobBase: Beginning import of sales 16/10/02 18:58:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 16/10/02 18:58:46 INFO db.DBInputFormat: Using read commited transaction isolation 16/10/02 18:58:46 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("pksales"), MAX("pksales") FROM "sales" 16/10/02 18:58:47 INFO mapred.JobClient: Running job: job_201609280401_0005 16/10/02 18:58:48 INFO mapred.JobClient: map 0% reduce 0% 16/10/02 18:59:04 INFO mapred.JobClient: map 50% reduce 0% 16/10/02 18:59:14 INFO mapred.JobClient: map 75% reduce 0% 16/10/02 18:59:15 INFO mapred.JobClient: map 100% reduce 0% 16/10/02 18:59:18 INFO mapred.JobClient: Job complete: job_201609280401_0005 16/10/02 18:59:18 INFO mapred.JobClient: Counters: 23 16/10/02 18:59:18 INFO mapred.JobClient: File System Counters 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes read=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes written=1190344 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of write operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes read=438 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes written=451 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of read operations=4 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of write operations=4 16/10/02 18:59:18 INFO mapred.JobClient: Job Counters 16/10/02 18:59:18 INFO mapred.JobClient: Launched map tasks=4 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=48877 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Map-Reduce Framework 16/10/02 18:59:18 INFO mapred.JobClient: Map input records=20 16/10/02 18:59:18 INFO mapred.JobClient: Map output records=20 16/10/02 18:59:18 INFO mapred.JobClient: Input split bytes=438 16/10/02 18:59:18 INFO mapred.JobClient: Spilled Records=0 16/10/02 18:59:18 INFO mapred.JobClient: CPU time spent (ms)=3980 16/10/02 18:59:18 INFO mapred.JobClient: Physical memory (bytes) snapshot=481574912 16/10/02 18:59:18 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2949685248 16/10/02 18:59:18 INFO mapred.JobClient: Total committed heap usage (bytes)=127401984 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Transferred 451 bytes in 33.7555 seconds (13.3608 bytes/sec) 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Retrieved 20 records. [hdfs@localhost:/sqoop]$

Обратите внимание, что последняя строка вывода выше показывает, что было получено 20 записей, что соответствует 20 записям в таблице базы данных PostgreSQL.

После выполнения команды Sqoop мы можем выполнить команду hdfs dfs -ls , чтобы увидеть каталог, созданный по умолчанию, с именем таблицы в HDFS.

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls Found 1 items drwxrwxrwx - toptal data 0 2016-10-02 18:59 sales [hdfs@localhost:/sqoop]$

Мы можем снова использовать команду hdfs dfs -ls , чтобы просмотреть содержимое каталога sales . Если вы посмотрите на HDFS, вы заметите, что данные разделены и распределены по четырем файлам по умолчанию, а не просто содержатся в одном.

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls sales Found 6 items -rw-rw-rw- 1 toptal data 0 2016-10-02 18:59 sales/_SUCCESS drwxrwxrwx - toptal data 0 2016-10-02 18:58 sales/_logs -rw-rw-rw- 1 toptal data 110 2016-10-02 18:59 sales/part-m-00000 -rw-rw-rw- 1 toptal data 111 2016-10-02 18:59 sales/part-m-00001 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00002 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00003 [hdfs@localhost:/sqoop]$

Команда hdfs dfs -cat отобразит все записи в первом разделе данных о продажах в HDFS.

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00000 1,2016-09-27,1.23,1,1 2,2016-09-27,2.34,1,2 3,2016-09-27,1.23,2,1 4,2016-09-27,2.34,2,2 5,2016-09-27,3.45,2,3 [hdfs@localhost:/sqoop]$

Обратите внимание, что разделителем файлов по умолчанию является запятая. Также обратите внимание, что в каждом разделе всего пять строк, потому что 20 строк в исходном коде были равномерно распределены по четырем разделам.

Чтобы ограничить количество строк, выводимых на экран, мы можем направить вывод команды cat в команду head , как показано ниже, чтобы проверить содержимое трех других разделов.

Аргумент -n 5 команды head ограничивает вывод на экран первыми пятью строками.

(Обратите внимание, что в нашем случае в этом нет необходимости, поскольку для начала в каждом разделе всего пять строк. Однако на практике у вас, вероятно, будет гораздо больше строк в каждом разделе, и вы захотите просто проверить первые несколько, чтобы убедитесь, что они выглядят правильно, так что это покажет вам, как это сделать.)

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00001 |head -n 5 6,2016-09-28,3.45,3,3 7,2016-09-28,4.56,3,4 8,2016-09-28,5.67,3,5 9,2016-09-28,1.23,4,1 10,2016-09-28,1.23,5,1 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00002 |head -n 5 11,2016-09-28,1.23,6,1 12,2016-09-29,1.23,7,1 13,2016-09-29,2.34,7,2 14,2016-09-29,3.45,7,3 15,2016-09-29,4.56,7,4 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00003 |head -n 5 16,2016-09-29,5.67,7,5 17,2016-09-29,6.78,7,6 18,2016-09-29,7.89,7,7 19,2016-09-29,7.89,8,7 20,2016-09-30,1.23,9,1 [hdfs@localhost:/sqoop]$

Если вам нужно выполнить запрос для извлечения данных из нескольких таблиц в базе данных PostgreSQL, это можно сделать с помощью следующей команды:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --target-dir 'creditCardOrders' \ --split-by 'pksales' \ --query "select s.pksales, s.saledate, s.saleamount, o.shippingtype, o.methodofpayment from sales s inner join orders o on s.orderid=o.orderid where o.methodofpayment='credit card' and \$CONDITIONS" [hdfs@localhost:/sqoop]$

В приведенной выше команде мы используем некоторые из тех же аргументов, что и команда Sqoop, но они приобретают разное значение при использовании с командой SQL.

  • --target-dir — целевой каталог сообщает Sqoop, в каком каталоге HDFS хранить выбранные данные. Этот аргумент требуется Sqoop при использовании запроса произвольной формы.
  • --split-by — несмотря на то, что мы выбираем первичный ключ таблицы продаж, нам все равно нужно предоставить Sqoop уникальный идентификатор, чтобы помочь ему распределить рабочую нагрузку.
  • --query — это аргумент, в котором мы указываем SQL-запрос. Приведенный выше запрос заключен в двойные кавычки. Обратите внимание, что в нескольких строках, содержащих запрос, нет обратной косой черты (символа продолжения строки). Также обратите внимание на and \$CONDITIONS в конце WHERE . Это требуется Sqoop, потому что Sqoop автоматически заменит токен $CONDITIONS уникальным выражением.

Проблемы или нет: вы должны рассмотреть HDFS

HDFS имеет много преимуществ перед реляционными базами данных. Если вы занимаетесь анализом данных, вам следует подумать о переносе данных в HDFS уже сегодня.

Благодаря полученным здесь навыкам импорт данных из системы реляционных баз данных в HDFS представляет собой простой и понятный процесс, который можно выполнить с помощью одной команды. Хотя эти примеры имеют небольшое количество строк, механизм импорта больших объемов данных в HDFS из таблицы базы данных PostgreSQL остается прежним.

Вы даже можете поэкспериментировать с импортом больших таблиц и различными разделителями памяти. Использование Apache Sqoop более эффективно, чем экспорт данных базы данных в файл, передача файла с сервера базы данных в HDFS и последующая загрузка файла в HDFS.

Связанный: Повысьте эффективность обработки данных с помощью R