Pyspark.sql.DataFrameWriter.saveAsTable()
Nejprve uvidíme, jak zapsat existující PySpark DataFrame do tabulky pomocí funkce write.saveAsTable(). Zápis DataFrame do tabulky vyžaduje název tabulky a další volitelné parametry, jako jsou režimy, partionBy atd. Je uložen jako parketový soubor.
Syntax:
dataframe_obj.write.saveAsTable(cesta/název_tabulky,režim,oddílBy,…)
- Table_name je název tabulky, která je vytvořena z dataframe_obj.
- Data tabulky můžeme připojit/přepsat pomocí parametru mode.
- PartitionBy používá jeden/více sloupců k vytvoření oddílů na základě hodnot v těchto poskytnutých sloupcích.
Příklad 1:
Vytvořte PySpark DataFrame s 5 řádky a 4 sloupci. Zapište tento datový rámec do tabulky s názvem „Agri_Table1“.
importovat pyspark
z pyspark.sql importujte SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# farmářské údaje s 5 řádky a 5 sloupci
agri =[{ 'Soil_Type' : 'Černá' , 'Irigation_availability' : 'Ne' , 'Acres' : 2500 , 'Soil_status' : 'Schnout' ,
'Země' : 'USA' },
{ 'Soil_Type' : 'Černá' , 'Irigation_availability' : 'Ano' , 'Acres' : 3500 , 'Soil_status' : 'mokrý' ,
'Země' : 'Indie' },
{ 'Soil_Type' : 'Červené' , 'Irigation_availability' : 'Ano' , 'Acres' : 210 , 'Soil_status' : 'Schnout' ,
'Země' : 'SPOJENÉ KRÁLOVSTVÍ' },
{ 'Soil_Type' : 'Jiný' , 'Irigation_availability' : 'Ne' , 'Acres' : 1000 , 'Soil_status' : 'mokrý' ,
'Země' : 'USA' },
{ 'Soil_Type' : 'Písek' , 'Irigation_availability' : 'Ne' , 'Acres' : 500 , 'Soil_status' : 'Schnout' ,
'Země' : 'Indie' }]
# vytvořte datový rámec z výše uvedených dat
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Zapište výše uvedený DataFrame do tabulky.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )
Výstup:
Vidíme, že jeden soubor parket je vytvořen s předchozími daty PySpark.
Příklad 2:
Zvažte předchozí DataFrame a zapište „Agri_Table2“ do tabulky rozdělením záznamů na základě hodnot ve sloupci „Country“.
# Zapište výše uvedený DataFrame do tabulky s parametrem partitionByagri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Země' ])
Výstup:
Ve sloupci „Country“ jsou tři jedinečné hodnoty – „Indie“, „UK“ a „USA“. Jsou tedy vytvořeny tři oddíly. Každý oddíl obsahuje parketové soubory.
Pyspark.sql.DataFrameReader.table()
Načteme tabulku do PySpark DataFrame pomocí funkce spark.read.table(). Vyžaduje pouze jeden parametr, kterým je cesta/název tabulky. Přímo načte tabulku do PySpark DataFrame a všechny funkce SQL, které jsou aplikovány na PySpark DataFrame, lze také aplikovat na tento načtený DataFrame.
Syntax:
spark_app.read.table(cesta/‘název_tabulky’)V tomto scénáři používáme předchozí tabulku, která byla vytvořena z PySpark DataFrame. Ujistěte se, že ve svém prostředí potřebujete implementovat fragmenty kódu předchozího scénáře.
Příklad:
Načtěte tabulku „Agri_Table1“ do DataFrame s názvem „loaded_data“.
loading_data = linuxhint_spark_app.read.table( 'Agri_Table1' )načtená_data.show()
Výstup:
Vidíme, že tabulka je načtena do PySpark DataFrame.
Provádění SQL dotazů
Nyní provedeme některé SQL dotazy na načteném DataFrame pomocí funkce spark.sql().
# Pomocí příkazu SELECT zobrazte všechny sloupce z výše uvedené tabulky.linuxhint_spark_app.sql( 'SELECT * z Agri_Table1' ).ukázat()
# Klauzule KDE
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Suchý'' ).ukázat()
linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).ukázat()
Výstup:
- První dotaz zobrazí všechny sloupce a záznamy z DataFrame.
- Druhý dotaz zobrazí záznamy založené na sloupci „Soil_status“. Existují pouze tři záznamy s prvkem „Dry“.
- Poslední dotaz vrátí dva záznamy s „akry“, které jsou větší než 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Pomocí funkce insertInto() můžeme přidat DataFrame do existující tabulky. Tuto funkci můžeme použít spolu s selectExpr() k definování názvů sloupců a poté je vložit do tabulky. Tato funkce také bere jako parametr název_tabulky.
Syntax:
DataFrame_obj.write.insertInto(’název_tabulky’)V tomto scénáři používáme předchozí tabulku, která byla vytvořena z PySpark DataFrame. Ujistěte se, že ve svém prostředí potřebujete implementovat fragmenty kódu předchozího scénáře.
Příklad:
Vytvořte nový DataFrame se dvěma záznamy a vložte je do tabulky „Agri_Table1“.
importovat pysparkz pyspark.sql importujte SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# farmářské údaje se 2 řádky
agri =[{ 'Soil_Type' : 'Písek' , 'Irigation_availability' : 'Ne' , 'Acres' : 2500 , 'Soil_status' : 'Schnout' ,
'Země' : 'USA' },
{ 'Soil_Type' : 'Písek' , 'Irigation_availability' : 'Ne' , 'Acres' : 1200 , 'Soil_status' : 'mokrý' ,
'Země' : 'Japonsko' }]
# vytvořte datový rámec z výše uvedených dat
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Akry' , 'Země' , 'Dostupnost_zavlažování' , 'Soil_Type' ,
'Stav_půdy' ).write.insertInto( 'Agri_Table1' )
# Zobrazte konečnou Agri_Table1
linuxhint_spark_app.sql( 'SELECT * z Agri_Table1' ).ukázat()
Výstup:
Nyní je celkový počet řádků přítomných v DataFrame 7.
Závěr
Nyní chápete, jak zapsat PySpark DataFrame do tabulky pomocí funkce write.saveAsTable(). Přebírá název tabulky a další volitelné parametry. Poté jsme tuto tabulku načetli do PySpark DataFrame pomocí funkce spark.read.table(). Vyžaduje pouze jeden parametr, kterým je cesta/název tabulky. Pokud chcete přidat nový DataFrame do existující tabulky, použijte funkci insertInto().