Cadrul de date în Apache PySpark: tutorial cuprinzător [cu exemple]

Publicat: 2020-02-27

Astăzi, vom afla despre DataFrame în Apache PySpark . Pyspark este unul dintre instrumentele de top pentru știința datelor în 2020. Este denumit coloane dintr-o colecție distribuită de rânduri în Apache Spark. Este foarte asemănător cu Tabelele sau coloanele din Foile Excel și, de asemenea, similar cu tabelul bazei de date relaționale. PySpark DataFrame are, de asemenea, caracteristici similare ale RDD, care sunt:

Distribuit: natura DataFrame și RDD este ambele distribuite

Evaluări leneșe: Executarea sarcinii nu se face dacă acțiunea nu este efectuată

Natura imuabilului: O altă caracteristică similară a RDD / DataFrame este că nu poate fi schimbată odată ce a fost creat. Dar se poate aplica transformarea pentru a transforma RDD / DataFrame.

Cuprins

Avantajul DataFrames-ului

1. Acesta acceptă multe limbaje diferite, cum ar fi Java, Scala, R, Python, ceea ce este util în ceea ce privește suportul API. Suportul API pentru mai multe limbi ajută la lucrul cu multe limbaje de programare.

2. DataFrame acceptă o gamă largă de surse și formate de date, ceea ce ajută foarte mult la utilizarea convenabilă a unei surse diferite de date și a formatului acestora.

3. Una dintre cele mai bune părți ale DataFrame este că poate gestiona chiar și Petabytes de date, ceea ce este o capacitate remarcabilă de a gestiona astfel de date masive.

4. Apache Spark înțelege rapid schema DataFrame cu observația din Spark DataFrame. Sub coloane numite, observația Spark DataFrame este organizată. În acest fel, planul de execuție a interogărilor este optimizat.

5. Volumul masiv de date semi-structurate și structurate poate fi procesat rapid, deoarece este proiectat să o facă DataFrames.

Configurare Apache Spark

Apache Spark ar trebui să fie configurat în mașină înainte de a putea fi utilizat pentru operațiunile DataFrame. Datele pot fi operate cu suportul DataFrame, deoarece acceptă diferite operații DataFrame. Aici vom discuta câteva operațiuni comune DataFrame.

Crearea SparkContext este primul pas în programarea Apache. Pentru executarea operațiunilor într-un cluster, există o cerință SparkContext. Cum să accesați un cluster este spus de SparkContext. De asemenea, arată Spark-ul despre locația pentru a obține un cluster.

Citiți: Cadre de învățare profundă

Apoi conexiunea Apache Cluster este stabilită. Crearea sa este deja făcută dacă cineva folosește Spark Shell. Cealaltă modalitate, setarea de configurare, poate fi furnizată, inițializată și importată pentru crearea SparkContext.

Se poate folosi acest cod pentru creare:

din pyspark import SparkContext

sc = SparkContext()

Creare DataFrame din fișierul CSV

O nouă bibliotecă trebuie specificată în shell-ul lui Python, astfel încât fișierul CSV să poată fi citit. Pentru a face acest lucru, primul pas este să descărcați cea mai recentă versiune a pachetului Spark-CSV și să faceți extragerea pachetului din Home Directory Spark. După aceea, vom deschide shell-ul PySpark și pachetul trebuie să fie inclus.

$ ./bin/pyspark –pachete com.databricks:spark-csv_2.10:1.3.0

Acum DataFrame va fi creat după ce datele au fost citite din fișierul CSV.

train = sqlContext.load(source=”com.databricks.spark.csv”, cale = 'PATH/train.csv', antet = True, inferSchema = True)

test = sqlContext.load(source=”com.databricks.spark.csv”, cale = 'PATH/test-comb.csv', antet = True, inferSchema = True)

Fișierele CSV de testare și fișierele CSV de antrenament se află în locația folderului numită PATH. Dacă antetul este acolo în fișierul CSV, atunci va apărea ca True. Pentru a cunoaște tipul de date din fiecare coloană a cadrului de date, vom folosi opțiunea inferSchema = True. Folosind opțiunea inferSchema = True, Detectarea tipului de date al cadrului de date din fiecare coloană va fi detectată automat de contextul SQL. Toate coloanele vor fi citite ca șir dacă nu setăm inferSchema ca adevărat.

Citiți: Biblioteci Python pentru învățarea automată

Manipularea DataFrame

Acum, aici vom vedea cum să manipulăm cadrul de date:

  • Cunoașteți tipul de date al coloanelor

printSchema va fi folosit pentru a vedea tipul de coloană și tipul acesteia de date. Acum schema va fi tipărită în format arbore prin aplicarea printSchema().

train.printSchema()

Ieșire:

rădăcină

|– User_ID: întreg (nullable = adevărat)

|– ID_produs: șir (nullable = adevărat)

|– Gen: șir (nullable = adevărat)

|– Vârstă: șir (nullable = adevărat)

|– Ocupație: întreg (nullable = adevărat)

|– City_Category: șir (nullable = true)

|– Stay_In_Current_City_Years: string (nullable = true)

|– Stare_conjugală: întreg (nullable = adevărat)

|– Product_Category_1: număr întreg (nullable = adevărat)

|– Product_Category_2: număr întreg (nullabil = adevărat)

|– Product_Category_3: număr întreg (nullable = adevărat)

|– Cumpărare: întreg (nullable = adevărat)

După citirea fișierului csv, putem vedea că am obținut cu exactitate tipul de date sau schema fiecărei coloane din cadrul de date.

  • Afișând prima n observație

Pentru a vedea prima n observație, se poate folosi operația capului. Operațiunea capului lui Pandas este aceeași cu cea a funcționării capului lui PySpark.

tren.cap(5)

Ieșire:

[Rând(User_ID=1000001, Product_ID=u'P00069042′, Sex=u'F', Age=u'0-17′, Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2′=Status=Conjugal 0, Product_Category_1=3, Product_Category_2=Niciuna, Product_Category_3=Niciuna, Cumparare=8370),

Rând (User_ID=1000001, Product_ID=u'P00248942′, Sex=u'F', Age=u'0-17′, Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2′=0_Status) , Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),

Rând(User_ID=1000001, Product_ID=u'P00087842′, Sex=u'F', Age=u'0-17′, Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2′=Status=0_Status , Product_Category_1=12, Product_Category_2=Niciuna, Product_Category_3=Niciuna, Cumparare=1422),

Rând(User_ID=1000001, Product_ID=u'P00085442′, Sex=u'F', Age=u'0-17′, Occupation=10, City_Category=u'A', Stay_In_Current_City_Years=u'2′=0_Status=0_Status , Product_Category_1=12, Product_Category_2=14, Product_Category_3=Niciunul, Cumparare=1057),

Rând(User_ID=1000002, Product_ID=u'P00285442′, Sex=u'M', Age=u'55+', Occupation=16, City_Category=u'C', Stay_In_Current_City_Years=u'4+', Conjugal=0_Status , Product_Category_1=8, Product_Category_2=Niciuna, Product_Category_3=Niciuna, Cumparare=7969)]

Acum vom folosi operația de afișare pentru a vedea rezultatul într-o manieră mai bună, deoarece rezultatele vor veni în format de rând. De asemenea, putem trunchia rezultatul folosind argumentul truncate = True.

train.show(2,truncate= Adevărat)

Ieșire:

+——-+———-+——+—-+———-+————-+————————–+————–+——————+ ——————+——————+——–+

|User_ID|Product_ID|Gex| Vârstă|Ocupație|Categoria_orașului|Statul_în_orașul_actual|Starea_conjugală|Categoria_de_produsului_1|Categoria_de_produsului_2|Categoria_de_produsului_3|Cumpărare|

+——-+———-+——+—-+———-+————-+————————–+————–+——————+ ——————+——————+——–+

|1000001| P00069042| F|0-17| 10| A| 2| 0| 3| nul| nul| 8370|

|1000001| P00248942| F|0-17| 10| A| 2| 0| 1| 6| 14| 15200|

+——-+———-+——+—-+———-+————-+————————–+————–+——————+ ——————+——————+——–+

arată doar primele 2 rânduri

  • Numărarea numărului de rânduri din DataFrame

Pentru a număra numărul rândurilor din cadrul de date, putem folosi operația de numărare. Acum vom număra numărul de rânduri de fișiere de testare și fișiere de tren prin aplicarea operației de numărare.

train.count(),test.count()

Ieșire:

(233598, 550069)

Avem 233598, 550069, rânduri în test și respectiv tren.

  • Obținerea numărului coloanei și a numelui coloanelor din fișierul test și tren

Similar cu funcționarea coloanei din DataFrame de panda, vom folosi operația coloane pentru a obține numele coloanei. Acum vom tipări mai întâi nr. a coloanei și numele coloanei din fișierul de testare și dosarul trenului.

len(tren.coloane), tren.coloane

Ieșire:

12 ['User_ID', 'Product_ID', 'Gender', 'Age', 'Occupation', 'City_Category', 'Stay_In_Current_City_Years', 'Conjunt_Status', 'Product_Category_1', 'Product_Category', 'Product_Category_Product_2_Category', 'Product_Category_3,'

Acum o facem în mod similar pentru fișierul de testare.

len(test.coloane), test.coloane

Ieșire:

13 [”, „User_ID”, „Product_ID”, „Gender”, „Age”, „Occupation”, „City_Category”, „Stay_In_Current_City_Years”, „Conjugal_Status”, „Product_Category_1”, „Product_Category_1”, „Product_Category,” Product_Category, „Product_Category,” „Product_Category,” ']

După rezultatul de mai sus, putem vedea că există 12 coloane în fișierul de antrenament și 13 coloane în fișierul de testare. Din rezultatul de mai sus, putem verifica că avem 13 coloane în fișierul de testare și 12 în fișierul de antrenament. Coloana „Comb” este singura coloană din fișierul de testare și nu există nicio „Achiziție” care nu este prezentă în fișierul de testare. Mai există o coloană în fișierul de testare despre care putem vedea că nu are niciun nume al coloanei.

  • Obținerea statisticilor rezumate, cum ar fi numărul, max, min, devianță standard, medie în coloanele numerice ale DataFrame

În DataFrame, vom folosi operația numită descrie operația. Putem face calculul coloanei numerice și obține un rezumat statistic folosind descrieți operația. Toate coloanele numerice vor fi calculate în DataFrame, nu există nici un nume de coloană specificat în calculul rezumatului statisticilor.

train.describe().show()

Ieșire:

+——-+——————+—————–+——————-+——————+——————+——————+—— ————+

|rezumat| User_ID| Ocupație| Stare_conjugală|Categoria_de_produs_1|Categoria_de_produs_2|Categoria_de_produs_3| Cumpărare|

+——-+——————+—————–+——————-+——————+——————+——————+—— ————+

| numără| 550068| 550068| 550068| 550068| 376430| 166821| 550068|

| medie|1003028.8424013031|8.076706879876669|0.40965298835780306| 5.404270017525106| 9,842329251122386|12,668243206790512| 9263.968712959126|

| stddev|1727.5915855308265|6.522660487341778| 0,4917701263173273|3,9362113692014082| 5,086589648693526| 4.125337631575267|5023.0653938206015|

| min| 1000001| 0| 0| 1| 2| 3| 12|

| max| 1006040| 20| 1| 20| 18| 18| 23961|

+——-+——————+—————–+——————-+——————+——————+——————+—— ————+

În operațiunea de descriere, aceasta este ceea ce obținem atunci când este specificat un nume de coloană șir sau un nume de coloană categoric.

train.describe('ID_Produs').show()

Ieșire:

+——-+———-+

|rezumat|ID_produs|

+——-+———-+

| numără| 550068|

| înseamnă| nul|

| stddev| nul|

| min| P00000142|

| max| P0099942|

+——-+———-+

Pe baza ASCII, valorile maxime și minime calculate. Operația Descriere este folosită pentru a lucra pe coloana de tip String.

  • Selectarea coloanei DataFrame

Vom folosi numele coloanelor în operația de selectare pentru a selecta coloana. Vom menționa numele coloanei cu separarea prin virgulă. Acum vom vedea cum se face selecția „Age” și „User_ID” din fișierul de antrenament.

  • train.select('ID_utilizator','Vârsta').show(5)
  • Ieșire:
  • +——-+—-+
  • |ID_utilizator| Varsta|
  • +——-+—-+
  • |1000001|0-17|
  • |1000001|0-17|
  • |1000001|0-17|
  • |1000001|0-17|
  • |1000002| 55+|
  • +——-+—-+
  • Produsul Finding Distinct nr. în fișierele de testare și fișierele de tren

Pentru a calcula numărul DataFrame-ului. de rânduri distincte, vom folosi operația distinctă . Acum, aici vom aplica operația distinctă pentru calculul nr. de produs distinct în dosarul de testare și tren.

train.select('ID_Produs').distinct().count(),test.select('ID_Produs').distinct().count()

Ieșire:

(3633, 3492)

Avem 3492 și 3633 de produse distincte în fișierul de testare și, respectiv, tren. Acum știm că în fișierul de antrenament avem mai multe valori distincte decât fișierul de test, așa cum putem învăța din rezultatul de ieșire. Acum vom folosi operația de scădere pentru a afla categoriile Product_ID care nu sunt prezente în fișierul de antrenament, dar sunt prezente în fișierul de testare. Același lucru se poate face și pentru toate caracteristicile categorice.

diff_cat_in_train_test=test.select('ID_Produs').subtract(train.select('ID_Produs'))

diff_cat_in_train_test.distinct().count()# Pentru numărare distinctă

Ieșire:

46

Deci, din rezultatul de mai sus, putem ști că există 47 de categorii diferite, care nu sunt prezente în fișierul de instruire, dar sunt prezente în fișierul de testare. Datele vor fi sărite sau colectate din fișierul de testare, care nu este prezent în dosarul trenului.

  • Calculul frecvenței perechi a coloanelor categorice?

Să facem calculul frecvenței perechi a coloanei în DataFrame utilizând operațiunea poate încrucișată. Acum haideți să calculăm coloanele „Sex” și „Vârsta” din DataFrame ale trenului, aplicând operația încrucișată .

train.crosstab('Vârsta', 'Sex').show()

Ieșire:

+———-+—–+——+

|Vârsta_Sex| F| M|

+———-+—–+——+

| 0-17| 5083| 10019|

| 46-50|13199| 32502|

| 18-25|24628| 75032|

| 36-45|27170| 82843|

| 55+| 5083| 16421|

| 51-55| 9894| 28607|

| 26-35|50752|168835|

+———-+—–+——+

Valoarea distinctă a Gender este numele coloanei, iar cantitatea diferită de Age este numele rândului, care poate fi văzut în rezultatul de mai sus. În tabel, numărul perechii va fi zero dacă nu a avut loc.

Cum să obțineți DataFrame cu rânduri unice?

Pentru a găsi rânduri unice și pentru a nu include rânduri duplicate, vom folosi operația dropDuplicates . Acesta va obține Dataframe-ul fără rânduri duplicate prin eliminarea rândurilor duplicate ale unui DataFrame. Vă rugăm să verificați aici pentru a afla cum este efectuată procedura dropDuplicates pentru a obține toate rândurile unice pentru coloane.

train.select('Vârsta','Sex').dropDuplicates().show()

Ieșire:

+—–+——+

| Varsta|Sex|

+—–+——+

|51-55| F|

|51-55| M|

|26-35| F|

|26-35| M|

|36-45| F|

|36-45| M|

|46-50| F|

|46-50| M|

| 55+| F|

| 55+| M|

|18-25| F|

| 0-17| F|

|18-25| M|

| 0-17| M|

+—–+——+

  • Cum să aruncați rândurile vor avea valoare nulă?

Dacă se dorește să arunce toate rândurile care au o valoare nulă, atunci putem folosi operația numită operațiune dropna . Pentru a elimina un rând din DataFrame, acesta ia în considerare trei opțiuni.

  • Subset – este lista tuturor numelor coloanelor de luat în considerare pentru operațiunea de eliminare a coloanei cu valori nule.
  • Thresh – acest lucru ajută la eliminarea rândurilor cu valori non-nule mai mici de prag. În mod implicit, nu este specificat nimic în acest lucru.
  • Cum – Poate fi folosit în două tipuri – toate sau oricare. Folosind oricare, va renunța la rând dacă orice valoare din rând este nulă. Folosind all, se va reduce rândul dacă valorile tuturor rândurilor sunt nule.

Acum, aici vom folosi toate aceste opțiuni una câte una pentru a elimina rândurile care sunt nule utilizând opțiuni implicite, cum ar fi subset, thresh, None for how, none, any.

train.dropna().count()

Ieșire:

166821

  • Cum să umpleți valorile nule ale DataFrame cu nr. constantă?

Pentru a completa valorile nule cu constanta nr. Vom folosi operația fillna aici. Există doi parametri care trebuie luați în considerare prin operația fillna pentru a umple valorile nule.

  • subset: Aici, trebuie să specificați coloanele care trebuie luate în considerare pentru valorile de umplere.
  • valoare: Aici putem menționa cantitatea de înlocuit cu ce valoare, care poate fi orice tip de date, cum ar fi string, float, int în toate coloanele.

Aici vom completa „-1” în locul valorilor nule din trenul DataFrame.

train.fillna(-1).show(2)

Ieșire:

+——-+———-+——+—-+———-+————-+————————–+————–+——————+ ——————+——————+——–+

|User_ID|Product_ID|Gex| Vârstă|Ocupație|Categoria_orașului|Statul_în_orașul_actual|Starea_conjugală|Categoria_de_produsului_1|Categoria_de_produsului_2|Categoria_de_produsului_3|Cumpărare|

+——-+———-+——+—-+———-+————-+————————–+————–+——————+ ——————+——————+——–+

|1000001| P00069042| F|0-17| 10| A| 2| 0| 3| -1| -1| 8370|

|1000001| P00248942| F|0-17| 10| A| 2| 0| 1| 6| 14| 15200|

+——-+———-+——+—-+———-+————-+————————–+————–+——————+ ——————+——————+——–+

arată doar primele 2 rânduri

Concluzie

PySpark câștigă amploare în lumea inteligenței artificiale și a învățării automate. PySpark este folosit pentru a rezolva problema de învățare automată din lumea reală. Puteți crea RDD din diferite surse de date atât externe, cât și existente și puteți face toate tipurile de transformări pe acesta. Sper că acest articol a fost informativ și a putut să vă ofere informații profunde despre cadrele de date PySpark.

Dacă sunteți curios să aflați despre PySpark și alte instrumente de știință a datelor, consultați Diploma PG în știința datelor de la IIIT-B și upGrad, care este creată pentru profesioniștii care lucrează și oferă peste 10 studii de caz și proiecte, ateliere practice practice, mentorat cu experți din industrie, 1-la-1 cu mentori din industrie, peste 400 de ore de învățare și asistență profesională cu firme de top.

Este PySpark mai eficient decât Pandas?

Da, PySpark este mai rapid decât Pandas și chiar depășește Pandas într-un test de benchmarking. În termeni de bază, Pandas efectuează operațiuni pe o singură mașină, în timp ce PySpark execută operațiuni pe mai multe mașini. Dacă lucrați la o aplicație Machine Learning cu un set de date uriaș, PySpark este opțiunea ideală, deoarece poate executa operațiuni de 100 de ori mai rapid decât Pandas. Datorită JVM, limbajul de programare Scala este de 10 ori mai rapid decât Python pentru analiza și procesarea datelor. Când codul de programare Python este utilizat pentru a efectua apeluri către bibliotecile Spark, rezultatele sunt mediocre.

Care sunt unele dintre dezavantajele utilizării Apache PySpark?

Spark nu are propriul sistem de gestionare a fișierelor. Datorită costului ridicat al memoriei suplimentare necesare pentru a opera Spark, calculul în memorie ar putea fi extrem de costisitor. Când folosesc Apache Spark cu Hadoop, dezvoltatorii întâmpină dificultăți cu fișierele compacte. Datele se repetă în loturi în Spark, fiecare iterație fiind planificată și procesată independent. În Apache Spark, datele sunt împărțite în loturi mai mici la un interval de timp predeterminat. Ca urmare, criteriile de fereastră bazate pe înregistrări nu vor fi acceptate de Apache. În schimb, oferă criterii de fereastră bazate pe timp.

Cum sunt seturile de date, DataFrame și RDD diferite unele de altele?

RDD este o colecție grupată de elemente de date care este dispersată pe mai multe computere. Datele sunt reprezentate prin RDD-uri, care sunt o colecție de obiecte Java sau Scala. Un DataFrame este o colecție de date structurate în coloane numite care este răspândită pe mai multe servere. Într-o bază de date relațională, este echivalentă conceptual cu un tabel. Setul de date este o extensie API de cadru de date care oferă interfața de programare orientată pe obiecte a API-ului RDD cu siguranță de tip. Un DataFrame este o colecție distribuită de date, similară unui RDD, și nu este mutabilă. Datele sunt structurate în coloane numite, similar unui tabel dintr-o bază de date relațională, mai degrabă decât un RDD. Când vine vorba de sarcini simple, cum ar fi gruparea datelor, RDD este mai lent decât atât cadrele de date, cât și seturile de date. Are un API simplu pentru realizarea sarcinilor agregate. Poate agrega date mai rapid decât RDD-urile și seturile de date.