Převod PySpark DataFrame na CSV

Prevod Pyspark Dataframe Na Csv



Podívejme se na čtyři různé scénáře převodu PySpark DataFrame na CSV. Přímo používáme metodu write.csv() k převodu PySpark DataFrame na CSV. Pomocí funkce to_csv() převedeme PySpark Pandas DataFrame na CSV. Může to být také možné převedením na pole NumPy.

Téma obsahu:

Pokud se chcete dozvědět o PySpark DataFrame a instalaci modulu, projděte si toto článek .







PySpark DataFrame na CSV převodem na Pandas DataFrame

To_csv() je metoda dostupná v modulu Pandas, která převádí Pandas DataFrame na CSV. Nejprve musíme převést náš PySpark DataFrame na Pandas DataFrame. K tomu se používá metoda toPandas(). Podívejme se na syntaxi to_csv() spolu s jejími parametry.



Syntax:



pandas_dataframe_obj.to_csv(cesta/ 'název_souboru.csv' , záhlaví ,index,sloupce,režim...)
  1. Musíme zadat název souboru CSV. Pokud chcete stažený soubor CSV uložit na určité místo v počítači, můžete také zadat cestu spolu s názvem souboru.
  2. Sloupce jsou zahrnuty, pokud je záhlaví nastaveno na „True“. Pokud sloupce nepotřebujete, nastavte záhlaví na „False“.
  3. Indexy jsou specifikovány, pokud je index nastaven na „True“. Pokud indexy nepotřebujete, nastavte index na „False“.
  4. Parametr Columns přebírá seznam názvů sloupců, ve kterém můžeme specifikovat, které konkrétní sloupce jsou extrahovány do CSV souboru.
  5. Záznamy jsme schopni přidat do CSV pomocí parametru mode. Append – k tomu se používá „a“.

Příklad 1: S parametry záhlaví a indexu

Vytvořte „skills_df“ PySpark DataFrame se 3 řádky a 4 sloupci. Převeďte tento DataFrame na CSV tak, že jej nejprve převedete na Pandas DataFrame.





importovat pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# data dovedností se 3 řádky a 4 sloupci

dovednosti =[{ 'id' : 123 , 'osoba' : 'Miláček' , 'dovednost' : 'malování' , 'cena' : 25 000 },

{ 'id' : 112 , 'osoba' : 'Mouni' , 'dovednost' : 'tanec' , 'cena' : 2000 },

{ 'id' : 153 , 'osoba' : 'Tulasi' , 'dovednost' : 'čtení' , 'cena' : 1200 }

]

# vytvořte datový rámec dovedností z výše uvedených dat

skills_df = linuxhint_spark_app.createDataFrame(dovednosti)

skills_df.show()

# Převést skills_df na pandas DataFrame

pandas_skills_df= skills_df.toPandas()

print(pandas_skills_df)

# Převeďte tento DataFrame na csv s hlavičkou a indexem

pandas_skills_df.to_csv( 'pandas_skills1.csv' , záhlaví =True, index=True)

Výstup:



Můžeme vidět, že PySpark DataFrame je převeden na Pandas DataFrame. Podívejme se, zda je převeden na CSV s názvy sloupců a indexy:

Příklad 2: Připojte data do CSV

Vytvořte ještě jeden PySpark DataFrame s 1 záznamem a připojte jej k CSV, který je vytvořen jako součást našeho prvního příkladu. Ujistěte se, že musíme nastavit záhlaví na „False“ spolu s parametrem mode. Jinak jsou názvy sloupců také připojeny jako řádek.

importovat pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

dovednosti =[{ 'id' : 90 , 'osoba' : 'Bhargav' , 'dovednost' : 'čtení' , 'cena' : 12 000 }

]

# vytvořte datový rámec dovedností z výše uvedených dat

skills_df = linuxhint_spark_app.createDataFrame(dovednosti)

# Převést skills_df na pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Přidejte tento DataFrame do souboru pandas_skills1.csv

pandas_skills_df.to_csv( 'pandas_skills1.csv' , režim= 'A' , záhlaví = nepravda)

Výstup CSV:

Vidíme, že do souboru CSV je přidán nový řádek.

Příklad 3: S parametrem Columns

Mějme stejný DataFrame a převeďte jej do CSV se dvěma sloupci: „osoba“ a „cena“.

importovat pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# data dovedností se 3 řádky a 4 sloupci

dovednosti =[{ 'id' : 123 , 'osoba' : 'Miláček' , 'dovednost' : 'malování' , 'cena' : 25 000 },

{ 'id' : 112 , 'osoba' : 'Mouni' , 'dovednost' : 'tanec' , 'cena' : 2000 },

{ 'id' : 153 , 'osoba' : 'Tulasi' , 'dovednost' : 'čtení' , 'cena' : 1200 }

]

# vytvořte datový rámec dovedností z výše uvedených dat

skills_df = linuxhint_spark_app.createDataFrame(dovednosti)

# Převést skills_df na pandas DataFrame

pandas_skills_df= skills_df.toPandas()

# Převeďte tento DataFrame na csv s konkrétními sloupci

pandas_skills_df.to_csv( 'pandas_skills2.csv' , sloupce=[ 'osoba' , 'cena' ])

Výstup CSV:

Vidíme, že v souboru CSV existují pouze sloupce „osoba“ a „cena“.

PySpark Pandas DataFrame do CSV pomocí metody To_Csv().

To_csv() je metoda dostupná v modulu Pandas, která převádí Pandas DataFrame na CSV. Nejprve musíme převést náš PySpark DataFrame na Pandas DataFrame. K tomu se používá metoda toPandas(). Podívejme se na syntaxi to_csv() spolu s jejími parametry:

Syntax:

pyspark_pandas_dataframe_obj.to_csv(cesta/ 'název_souboru.csv' , záhlaví ,index,sloupce,...)
  1. Musíme zadat název souboru CSV. Pokud chcete stažený soubor CSV uložit na určité místo v počítači, můžete také zadat cestu spolu s názvem souboru.
  2. Sloupce jsou zahrnuty, pokud je záhlaví nastaveno na „True“. Pokud sloupce nepotřebujete, nastavte záhlaví na „False“.
  3. Indexy jsou specifikovány, pokud je index nastaven na „True“. Pokud indexy nepotřebujete, nastavte index na „False“.
  4. Parametr columns přebírá seznam názvů sloupců, ve kterém můžeme určit, které konkrétní sloupce jsou extrahovány do souboru CSV.

Příklad 1: S parametrem Columns

Vytvořte PySpark Pandas DataFrame se 3 sloupci a převeďte jej na CSV pomocí to_csv() se sloupci „osoba“ a „cena“.

z pyspark importovat pandy

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Miláček' , 'Mouni' , 'sám' , 'radha' ], 'cena' :[ 1 , 2 , 3 , 4 ]})

print(pyspark_pandas_dataframe)

# Převeďte tento DataFrame na csv s konkrétními sloupci

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas1' , sloupce=[ 'osoba' , 'cena' ])

Výstup:

Vidíme, že PySpark Pandas DataFrame je převeden na CSV se dvěma oddíly. Každý oddíl obsahuje 2 záznamy. Také sloupce v CSV jsou pouze „osoba“ a „cena“.

Soubor oddílu 1:

Soubor oddílu 2:

Příklad 2: S parametrem záhlaví

Použijte předchozí DataFrame a zadejte parametr záhlaví jeho nastavením na „True“.

z pyspark importovat pandy

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Miláček' , 'Mouni' , 'sám' , 'radha' ], 'cena' :[ 1 , 2 , 3 , 4 ]})

# Převeďte tento DataFrame na csv s hlavičkou.

pyspark_pandas_dataframe.to_csv( 'pyspark_pandas2' , záhlaví = Pravda)

Výstup CSV:

Vidíme, že PySpark Pandas DataFrame je převeden na CSV se dvěma oddíly. Každý oddíl obsahuje 2 záznamy s názvy sloupců.

Soubor oddílu 1:

Soubor oddílu 2:

PySpark Pandas DataFrame na CSV převodem na pole NumPy

Máme možnost převést PySpark Pandas DataFrame na CSV převodem do pole Numpy. To_numpy() je metoda, která je dostupná v modulu PySpark Pandas a která převádí PySpark Pandas DataFrame na pole NumPy.

Syntax:

pyspark_pandas_dataframe_obj.to_numpy()

Nepotřebuje žádné parametry.

Pomocí metody Tofile().

Po převodu do pole NumPy můžeme pomocí metody tofile() převést NumPy na CSV. Zde ukládá každý záznam do nové buňky sloupcově v souboru CSV.

Syntax:

array_obj.to_numpy(název souboru/cesta,sep=’ ’)

Přebírá název souboru nebo cestu CSV a oddělovač.

Příklad:

Vytvořte PySpark Pandas DataFrame se 3 sloupci a 4 záznamy a převeďte jej na CSV tak, že jej nejprve převedete na pole NumPy.

z pyspark importovat pandy

pyspark_pandas_dataframe=pandas.DataFrame({ 'id' :[ 90 , 78 , 90 , 57 ], 'osoba' :[ 'Miláček' , 'Mouni' , 'sám' , 'radha' ], 'cena' :[ 1 , 2 , 3 , 4 ]})

# Převeďte výše uvedený DataFrame na numpy pole

převedeno = pyspark_pandas_dataframe.to_numpy()

tisknout (převedeno)

# Použití tofile()

convert.tofile( 'converted1.csv' , září = ',' )

Výstup:

[[ 90 'Miláček' 1 ]

[ 78 'Mouni' 2 ]

[ 90 'sám' 3 ]

[ 57 'radha' 4 ]]

Můžeme vidět, že PySpark Pandas DataFrame je převeden na pole NumPy (12 hodnot). Pokud vidíte data CSV, uloží každou hodnotu buňky do nového sloupce.

PySpark DataFrame do CSV pomocí metody Write.Csv().

Metoda write.csv() bere jako parametr název/cestu souboru, kam potřebujeme uložit soubor CSV.

Syntax:

dataframe_object.coalesce( 1 ).write.csv( 'název souboru' )

Ve skutečnosti je CSV uložen jako oddíly (více než jeden). Abychom se toho zbavili, sloučíme všechny rozdělené soubory CSV do jednoho. V tomto scénáři používáme funkci coalesce(). Nyní vidíme pouze jeden soubor CSV se všemi řádky z PySpark DataFrame.

Příklad:

Zvažte PySpark DataFrame se 4 záznamy se 4 sloupci. Zapište tento DataFrame do CSV se souborem s názvem „market_details“.

importovat pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# tržní data se 4 řádky a 4 sloupci

trh =[{ 'střední' : 'mz-001' , 'm_name' : 'ABC' , 'm_city' : 'delhi' , 'm_state' : 'delhi' },

{ 'střední' : 'mz-002' , 'm_name' : 'XYZ' , 'm_city' : 'patna' , 'm_state' : 'štěstí' },

{ 'střední' : 'mz-003' , 'm_name' : 'PQR' , 'm_city' : 'florida' , 'm_state' : 'jeden' },

{ 'střední' : 'mz-004' , 'm_name' : 'ABC' , 'm_city' : 'delhi' , 'm_state' : 'štěstí' }

]



# z výše uvedených dat vytvořte datový rámec trhu

market_df = linuxhint_spark_app.createDataFrame(trh)

# Aktuální tržní data

market_df.show()

# write.csv()

market_df.coalesce( 1 ).write.csv( 'podrobnosti o trhu' )

Výstup:

Zkontrolujeme soubor:

Chcete-li zobrazit záznamy, otevřete poslední soubor.

Závěr

Naučili jsme se čtyři různé scénáře, které převádějí PySpark DataFrame na CSV, s příklady při zvážení různých parametrů. Když pracujete s PySpark DataFrame, máte dvě možnosti, jak převést tento DataFrame na CSV: jedním způsobem je použití metody write() a druhým je použití metody to_csv() převedením na Pandas DataFrame. Pokud pracujete s PySpark Pandas DataFrame, můžete také využít to_csv() a tofile() převodem na pole NumPy.