Samouczek HDFS dla analityków danych, który utknął z relacyjnymi bazami danych

Opublikowany: 2022-03-11

Wstęp

Do tej pory prawdopodobnie słyszałeś o rozproszonym systemie plików Hadoop (HDFS), zwłaszcza jeśli jesteś analitykiem danych lub osobą odpowiedzialną za przenoszenie danych z jednego systemu do drugiego. Jakie są jednak zalety HDFS w stosunku do relacyjnych baz danych?

HDFS to skalowalne rozwiązanie typu open source do przechowywania i przetwarzania dużych ilości danych. HDFS okazał się niezawodny i wydajny w wielu nowoczesnych centrach danych.

HDFS wykorzystuje standardowy sprzęt wraz z oprogramowaniem typu open source, aby obniżyć całkowity koszt na bajt pamięci masowej.

Dzięki wbudowanej replikacji i odporności na awarie dysków, HDFS jest idealnym systemem do przechowywania i przetwarzania danych do celów analitycznych. Nie wymaga podstaw i narzutów, aby obsługiwać atomowość transakcji, spójność, izolację i trwałość (ACID), co jest konieczne w przypadku tradycyjnych relacyjnych systemów baz danych.

Co więcej, w porównaniu z korporacyjnymi i komercyjnymi bazami danych, takimi jak Oracle, wykorzystanie Hadoop jako platformy analitycznej pozwala uniknąć dodatkowych kosztów licencji.

Jednym z pytań, które wiele osób zadaje, gdy po raz pierwszy dowiadują się o HDFS, jest: Jak mogę przenieść istniejące dane do HDFS?

W tym artykule zbadamy, jak importować dane z bazy danych PostgreSQL do HDFS. Wykorzystamy Apache Sqoop, który jest obecnie najwydajniejszym rozwiązaniem open source do przesyłania danych między HDFS a relacyjnymi systemami baz danych. Apache Sqoop jest przeznaczony do zbiorczego ładowania danych z relacyjnej bazy danych do systemu HDFS (import) oraz do zbiorczego zapisu danych z systemu HDFS do relacyjnej bazy danych (eksport).

HDFS

Przyspiesz analizy, migrując dane do HDFS.
Ćwierkać

Kroki w tym samouczku zostały napisane dla kogoś, kto ma podstawową wiedzę na temat wykonywania zapytań SQL i elementarną wiedzę na temat poleceń HDFS.

Używany system bazy danych to PostgreSQL 9.5 dla Windows, a wersja HDFS to Cloudera Hadoop 2.5.0-cdh5.2.0 na maszynie wirtualnej Centos 6.4 Linux.

Apache Sqoop opiera się na plikach JAR sterownika JDBC, które są specyficzne dla dostawcy relacyjnej bazy danych i wersji bazy danych.

Aby wykonać kroki przedstawione w tym artykule, użytkownik będzie potrzebował uprawnień do zdalnego łączenia się z bazą danych PostgreSQL, uprawnień SELECT do relacyjnej bazy danych, uprawnień do zapisu na HDFS i uprawnień do wykonywania w pliku wykonywalnym Sqoop.

Na potrzeby tego samouczka stworzyliśmy bazę danych PostgreSQL o nazwie Toptal i udostępniliśmy ją przez port 5432.

Źródło danych PostgreSQL

Aby rozpocząć, w naszej bazie danych PostgreSQL Toptal tabelę danych testowych o nazwie sales . Założymy, że certyfikat OpenSSL i pliki kluczy prywatnych już istnieją na serwerze PostgreSQL.

 Server [localhost]: Database [postgres]: Toptal Port [5432]: Username [postgres]: Password for user postgres: psql (9.5.3) Toptal=# create table sales Toptal-# ( Toptal(# pkSales integer constraint salesKey primary key, Toptal(# saleDate date, Toptal(# saleAmount money, Toptal(# orderID int not null, Toptal(# itemID int not null Toptal(# ); CREATE TABLE

Następnie wstawimy do tabeli 20 wierszy:

 Toptal=# insert into sales values (1, '2016-09-27', 1.23, 1, 1); INSERT 0 1 Toptal=# insert into sales values (2, '2016-09-27', 2.34, 1, 2); INSERT 0 1 Toptal=# insert into sales values (3, '2016-09-27', 1.23, 2, 1); INSERT 0 1 Toptal=# insert into sales values (4, '2016-09-27', 2.34, 2, 2); INSERT 0 1 Toptal=# insert into sales values (5, '2016-09-27', 3.45, 2, 3); INSERT 0 1 Toptal=# insert into sales values (6, '2016-09-28', 3.45, 3, 3); INSERT 0 1 Toptal=# insert into sales values (7, '2016-09-28', 4.56, 3, 4); INSERT 0 1 Toptal=# insert into sales values (8, '2016-09-28', 5.67, 3, 5); INSERT 0 1 Toptal=# insert into sales values (9, '2016-09-28', 1.23, 4, 1); INSERT 0 1 Toptal=# insert into sales values (10, '2016-09-28', 1.23, 5, 1); INSERT 0 1 Toptal=# insert into sales values (11, '2016-09-28', 1.23, 6, 1); INSERT 0 1 Toptal=# insert into sales values (12, '2016-09-29', 1.23, 7, 1); INSERT 0 1 Toptal=# insert into sales values (13, '2016-09-29', 2.34, 7, 2); INSERT 0 1 Toptal=# insert into sales values (14, '2016-09-29', 3.45, 7, 3); INSERT 0 1 Toptal=# insert into sales values (15, '2016-09-29', 4.56, 7, 4); INSERT 0 1 Toptal=# insert into sales values (16, '2016-09-29', 5.67, 7, 5); INSERT 0 1 Toptal=# insert into sales values (17, '2016-09-29', 6.78, 7, 6); INSERT 0 1 Toptal=# insert into sales values (18, '2016-09-29', 7.89, 7, 7); INSERT 0 1 Toptal=# insert into sales values (19, '2016-09-29', 7.89, 8, 7); INSERT 0 1 Toptal=# insert into sales values (20, '2016-09-30', 1.23, 9, 1); INSERT 0 1

Wybierzmy dane, aby sprawdzić, czy dane wyglądają poprawnie:

 Toptal=# select * from sales; pksales | saledate | saleamount | orderid | itemid ---------+------------+------------+---------+-------- 1 | 2016-09-27 | $1.23 | 1 | 1 2 | 2016-09-27 | $2.34 | 1 | 2 3 | 2016-09-27 | $1.23 | 2 | 1 4 | 2016-09-27 | $2.34 | 2 | 2 5 | 2016-09-27 | $3.45 | 2 | 3 6 | 2016-09-28 | $3.45 | 3 | 3 7 | 2016-09-28 | $4.56 | 3 | 4 8 | 2016-09-28 | $5.67 | 3 | 5 9 | 2016-09-28 | $1.23 | 4 | 1 10 | 2016-09-28 | $1.23 | 5 | 1 11 | 2016-09-28 | $1.23 | 6 | 1 12 | 2016-09-29 | $1.23 | 7 | 1 13 | 2016-09-29 | $2.34 | 7 | 2 14 | 2016-09-29 | $3.45 | 7 | 3 15 | 2016-09-29 | $4.56 | 7 | 4 16 | 2016-09-29 | $5.67 | 7 | 5 17 | 2016-09-29 | $6.78 | 7 | 6 18 | 2016-09-29 | $7.89 | 7 | 7 19 | 2016-09-29 | $7.89 | 8 | 7 20 | 2016-09-30 | $1.23 | 9 | 1 (20 rows)

Dane wyglądają dobrze, więc przejdźmy dalej.

Importuj do HDFS za pomocą Sqoop

Po zdefiniowaniu źródła danych jesteśmy teraz gotowi do zaimportowania danych do HDFS. Polecenie sqoop , które zbadamy, jest wymienione poniżej, a każdy argument zostanie rozbity w kolejnych punktach. Zauważ, że polecenie powinno znajdować się w jednym pełnym wierszu lub, jak pokazano poniżej, z odwrotnym ukośnikiem (znakiem kontynuacji wiersza poleceń Linuksa) na końcu każdego wiersza, z wyjątkiem ostatniego.

 sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/Toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales'
  • sqoop import — plik wykonywalny nazywa się sqoop i nakazujemy mu zaimportować dane z tabeli lub widoku z bazy danych do systemu HDFS.
  • --connect — za pomocą argumentu --connect przekazujemy łańcuch połączenia JDBC dla PostgreSQL. W tym przypadku używamy adresu IP, numeru portu i nazwy bazy danych. Musimy również określić, że używany jest protokół SSL i musimy podać klasę SSLSocketFactory , która ma być używana.
  • --username - W tym przykładzie nazwa użytkownika jest loginem PostgreSQL, a nie loginem Windows. Użytkownik musi mieć uprawnienia do łączenia się z określoną bazą danych i wybierania z określonej tabeli.
  • -P - To poprosi użytkownika wiersza poleceń o hasło. Jeśli Sqoop jest rzadko wykonywany, może to być dobra opcja. Istnieje wiele innych sposobów automatycznego przekazywania hasła do polecenia, ale w tym artykule staramy się, aby było to proste.
  • --table — W tym miejscu przekazujemy nazwę tabeli PostgreSQL.
  • --target-dir — ten argument określa katalog HDFS, w którym mają być przechowywane dane.
  • --split-by — musimy zapewnić Sqoop unikalny identyfikator, aby pomóc w dystrybucji obciążenia. W dalszej części wyników zadania zobaczymy, gdzie Sqoop wybiera wartości minimalne i maksymalne, aby pomóc w ustaleniu granic podziału.

Dobrym pomysłem jest umieszczenie polecenia w skrypcie dla celów powtarzalności i edycji, jak pokazano poniżej:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --table 'sales' \ --target-dir 'sales' \ --split-by 'pksales' [hdfs@localhost:/sqoop]$

Teraz nadszedł czas na wykonanie powyższego skryptu poleceń Sqoop. Dane wyjściowe polecenia Sqoop pokazano poniżej.

 [hdfs@localhost:/sqoop]$ ./sqoopCommand.sh 16/10/02 18:58:34 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5-cdh5.2.0 Enter password: 16/10/02 18:58:40 INFO manager.SqlManager: Using default fetchSize of 1000 16/10/02 18:58:40 INFO tool.CodeGenTool: Beginning code generation 16/10/02 18:58:41 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM "sales" AS t LIMIT 1 16/10/02 18:58:41 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-0.20-mapreduce Note: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.java uses or overrides a deprecated API. Note: Recompile with -Xlint:deprecation for details. 16/10/02 18:58:43 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-training/compile/77f9452788024792770d52da72ae871f/sales.jar 16/10/02 18:58:43 WARN manager.PostgresqlManager: It looks like you are importing from postgresql. 16/10/02 18:58:43 WARN manager.PostgresqlManager: This transfer can be faster! Use the --direct 16/10/02 18:58:43 WARN manager.PostgresqlManager: option to exercise a postgresql-specific fast path. 16/10/02 18:58:43 INFO mapreduce.ImportJobBase: Beginning import of sales 16/10/02 18:58:45 WARN mapred.JobClient: Use GenericOptionsParser for parsing the arguments. Applications should implement Tool for the same. 16/10/02 18:58:46 INFO db.DBInputFormat: Using read commited transaction isolation 16/10/02 18:58:46 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN("pksales"), MAX("pksales") FROM "sales" 16/10/02 18:58:47 INFO mapred.JobClient: Running job: job_201609280401_0005 16/10/02 18:58:48 INFO mapred.JobClient: map 0% reduce 0% 16/10/02 18:59:04 INFO mapred.JobClient: map 50% reduce 0% 16/10/02 18:59:14 INFO mapred.JobClient: map 75% reduce 0% 16/10/02 18:59:15 INFO mapred.JobClient: map 100% reduce 0% 16/10/02 18:59:18 INFO mapred.JobClient: Job complete: job_201609280401_0005 16/10/02 18:59:18 INFO mapred.JobClient: Counters: 23 16/10/02 18:59:18 INFO mapred.JobClient: File System Counters 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes read=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of bytes written=1190344 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: FILE: Number of write operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes read=438 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of bytes written=451 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of read operations=4 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of large read operations=0 16/10/02 18:59:18 INFO mapred.JobClient: HDFS: Number of write operations=4 16/10/02 18:59:18 INFO mapred.JobClient: Job Counters 16/10/02 18:59:18 INFO mapred.JobClient: Launched map tasks=4 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps in occupied slots (ms)=48877 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces in occupied slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0 16/10/02 18:59:18 INFO mapred.JobClient: Map-Reduce Framework 16/10/02 18:59:18 INFO mapred.JobClient: Map input records=20 16/10/02 18:59:18 INFO mapred.JobClient: Map output records=20 16/10/02 18:59:18 INFO mapred.JobClient: Input split bytes=438 16/10/02 18:59:18 INFO mapred.JobClient: Spilled Records=0 16/10/02 18:59:18 INFO mapred.JobClient: CPU time spent (ms)=3980 16/10/02 18:59:18 INFO mapred.JobClient: Physical memory (bytes) snapshot=481574912 16/10/02 18:59:18 INFO mapred.JobClient: Virtual memory (bytes) snapshot=2949685248 16/10/02 18:59:18 INFO mapred.JobClient: Total committed heap usage (bytes)=127401984 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Transferred 451 bytes in 33.7555 seconds (13.3608 bytes/sec) 16/10/02 18:59:18 INFO mapreduce.ImportJobBase: Retrieved 20 records. [hdfs@localhost:/sqoop]$

Zauważ, że ostatni wiersz danych wyjściowych powyżej pokazuje, że pobrano 20 rekordów, co odpowiada 20 rekordom w tabeli w bazie danych PostgreSQL.

Po wykonaniu polecenia Sqoop możemy wykonać polecenie hdfs dfs -ls , aby zobaczyć katalog, który został utworzony domyślnie z nazwą tabeli na HDFS.

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls Found 1 items drwxrwxrwx - toptal data 0 2016-10-02 18:59 sales [hdfs@localhost:/sqoop]$

Możemy ponownie użyć polecenia hdfs dfs -ls , aby wyświetlić zawartość katalogu sales . Jeśli spojrzysz na HDFS, możesz zauważyć, że dane są domyślnie podzielone na partycje i rozłożone na cztery pliki, a nie tylko w jednym.

 [hdfs@localhost:/sqoop]$ hdfs dfs -ls sales Found 6 items -rw-rw-rw- 1 toptal data 0 2016-10-02 18:59 sales/_SUCCESS drwxrwxrwx - toptal data 0 2016-10-02 18:58 sales/_logs -rw-rw-rw- 1 toptal data 110 2016-10-02 18:59 sales/part-m-00000 -rw-rw-rw- 1 toptal data 111 2016-10-02 18:59 sales/part-m-00001 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00002 -rw-rw-rw- 1 toptal data 115 2016-10-02 18:59 sales/part-m-00003 [hdfs@localhost:/sqoop]$

Polecenie hdfs dfs -cat wyświetli wszystkie rekordy z pierwszej partycji danych sprzedaży w systemie HDFS.

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00000 1,2016-09-27,1.23,1,1 2,2016-09-27,2.34,1,2 3,2016-09-27,1.23,2,1 4,2016-09-27,2.34,2,2 5,2016-09-27,3.45,2,3 [hdfs@localhost:/sqoop]$

Zauważ, że domyślnym separatorem pliku jest przecinek. Zauważ również, że w każdej partycji jest tylko pięć wierszy, ponieważ 20 wierszy w źródle zostało równomiernie rozłożonych na cztery partycje.

Aby ograniczyć liczbę wierszy, które są wyprowadzane na ekran, możemy przekazać wyjście polecenia cat do polecenia head , jak pokazano poniżej, aby sprawdzić zawartość pozostałych trzech partycji.

Argument -n 5 komendy head ogranicza wyjście ekranu do pierwszych pięciu wierszy.

(Zauważ, że w naszym przypadku jest to niepotrzebne, ponieważ na początku w każdej partycji jest tylko pięć wierszy. W praktyce jednak prawdopodobnie będziesz mieć o wiele więcej wierszy w każdej partycji i będziesz chciał po prostu sprawdzić kilka pierwszych, aby upewnij się, że wyglądają dobrze, aby pokazać, jak to zrobić.)

 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00001 |head -n 5 6,2016-09-28,3.45,3,3 7,2016-09-28,4.56,3,4 8,2016-09-28,5.67,3,5 9,2016-09-28,1.23,4,1 10,2016-09-28,1.23,5,1 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00002 |head -n 5 11,2016-09-28,1.23,6,1 12,2016-09-29,1.23,7,1 13,2016-09-29,2.34,7,2 14,2016-09-29,3.45,7,3 15,2016-09-29,4.56,7,4 [hdfs@localhost:/sqoop]$ hdfs dfs -cat sales/part-m-00003 |head -n 5 16,2016-09-29,5.67,7,5 17,2016-09-29,6.78,7,6 18,2016-09-29,7.89,7,7 19,2016-09-29,7.89,8,7 20,2016-09-30,1.23,9,1 [hdfs@localhost:/sqoop]$

Jeśli potrzebujesz uruchomić zapytanie, aby wyodrębnić dane z wielu tabel w bazie danych PostgreSQL, możesz to zrobić za pomocą następującego polecenia:

 [hdfs@localhost:/sqoop]$ cat sqoopCommand.sh sqoop import --connect 'jdbc:postgresql://aaa.bbb.ccc.ddd:5432/toptal?ssl=true&sslfactory=org.postgresql.ssl.NonValidatingFactory' \ --username 'postgres' -P \ --target-dir 'creditCardOrders' \ --split-by 'pksales' \ --query "select s.pksales, s.saledate, s.saleamount, o.shippingtype, o.methodofpayment from sales s inner join orders o on s.orderid=o.orderid where o.methodofpayment='credit card' and \$CONDITIONS" [hdfs@localhost:/sqoop]$

W powyższym poleceniu używamy niektórych z tych samych argumentów co polecenie Sqoop, ale nabierają one innego znaczenia, gdy są używane z poleceniem SQL.

  • --target-dir — katalog docelowy informuje Sqoop, w którym katalogu na HDFS ma przechowywać wybrane dane. Ten argument jest wymagany przez Sqoop podczas korzystania z zapytania o dowolnej formie.
  • --split-by — Mimo że wybieramy klucz podstawowy tabeli sprzedaży, nadal musimy zapewnić Sqoopowi unikalny identyfikator, aby pomóc w rozłożeniu obciążenia.
  • --query — jest to argument, w którym podajemy zapytanie SQL. Powyższe zapytanie jest ujęte w cudzysłów. Zauważ, że w wielu wierszach zawierających zapytanie nie ma odwrotnego ukośnika (znaku kontynuacji wiersza). Zwróć także uwagę na and \$CONDITIONS na końcu klauzuli WHERE . Jest to wymagane przez Sqoop, ponieważ Sqoop automatycznie zastąpi token $CONDITIONS unikalnym wyrażeniem.

Problemy lub brak problemów: powinieneś rozważyć HDFS

HDFS ma wiele zalet w stosunku do relacyjnych baz danych. Jeśli przeprowadzasz analizę danych, powinieneś już dziś rozważyć migrację swoich danych do HDFS.

Dzięki zdobytym tutaj umiejętnościom importowanie danych z systemu relacyjnej bazy danych do systemu HDFS jest prostym i nieskomplikowanym procesem, który można wykonać za pomocą jednego polecenia. Chociaż te przykłady mają niewielką liczbę wierszy, mechanika importowania dużych ilości danych do HDFS z tabeli bazy danych PostgreSQL pozostaje taka sama.

Możesz nawet poeksperymentować z importowaniem dużych tabel i różnymi ogranicznikami pamięci. Korzystanie z Apache Sqoop jest bardziej wydajne niż eksportowanie danych z bazy danych do pliku, przesyłanie pliku z serwera bazy danych do systemu HDFS, a następnie ładowanie pliku do systemu HDFS.

Powiązane: Wzmocnij swoje dane za pomocą R