Jak číst a zapisovat data tabulky v PySpark

Jak Cist A Zapisovat Data Tabulky V Pyspark



Zpracování dat v PySpark je rychlejší, pokud jsou data načítána ve formě tabulky. S tímto, pomocí SQL Expressions, bude zpracování rychlé. Takže převedení PySpark DataFrame/RDD na tabulku před odesláním ke zpracování je lepší přístup. Dnes uvidíme, jak načíst data tabulky do PySpark DataFrame, zapsat PySpark DataFrame do tabulky a vložit nový DataFrame do existující tabulky pomocí vestavěných funkcí. Pojďme!

Pyspark.sql.DataFrameWriter.saveAsTable()

Nejprve uvidíme, jak zapsat existující PySpark DataFrame do tabulky pomocí funkce write.saveAsTable(). Zápis DataFrame do tabulky vyžaduje název tabulky a další volitelné parametry, jako jsou režimy, partionBy atd. Je uložen jako parketový soubor.

Syntax:







dataframe_obj.write.saveAsTable(cesta/název_tabulky,režim,oddílBy,…)
  1. Table_name je název tabulky, která je vytvořena z dataframe_obj.
  2. Data tabulky můžeme připojit/přepsat pomocí parametru mode.
  3. PartitionBy používá jeden/více sloupců k vytvoření oddílů na základě hodnot v těchto poskytnutých sloupcích.

Příklad 1:

Vytvořte PySpark DataFrame s 5 řádky a 4 sloupci. Zapište tento datový rámec do tabulky s názvem „Agri_Table1“.



importovat pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# farmářské údaje s 5 řádky a 5 sloupci

agri =[{ 'Soil_Type' : 'Černá' , 'Irigation_availability' : 'Ne' , 'Acres' : 2500 , 'Soil_status' : 'Schnout' ,
'Země' : 'USA' },

{ 'Soil_Type' : 'Černá' , 'Irigation_availability' : 'Ano' , 'Acres' : 3500 , 'Soil_status' : 'mokrý' ,
'Země' : 'Indie' },

{ 'Soil_Type' : 'Červené' , 'Irigation_availability' : 'Ano' , 'Acres' : 210 , 'Soil_status' : 'Schnout' ,
'Země' : 'SPOJENÉ KRÁLOVSTVÍ' },

{ 'Soil_Type' : 'Jiný' , 'Irigation_availability' : 'Ne' , 'Acres' : 1000 , 'Soil_status' : 'mokrý' ,
'Země' : 'USA' },

{ 'Soil_Type' : 'Písek' , 'Irigation_availability' : 'Ne' , 'Acres' : 500 , 'Soil_status' : 'Schnout' ,
'Země' : 'Indie' }]



# vytvořte datový rámec z výše uvedených dat

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Zapište výše uvedený DataFrame do tabulky.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Table1' )

Výstup:







Vidíme, že jeden soubor parket je vytvořen s předchozími daty PySpark.



Příklad 2:

Zvažte předchozí DataFrame a zapište „Agri_Table2“ do tabulky rozdělením záznamů na základě hodnot ve sloupci „Country“.

# Zapište výše uvedený DataFrame do tabulky s parametrem partitionBy

agri_df.write.saveAsTable( 'Agri_Table2' ,partitionBy=[ 'Země' ])

Výstup:

Ve sloupci „Country“ jsou tři jedinečné hodnoty – „Indie“, „UK“ a „USA“. Jsou tedy vytvořeny tři oddíly. Každý oddíl obsahuje parketové soubory.

Pyspark.sql.DataFrameReader.table()

Načteme tabulku do PySpark DataFrame pomocí funkce spark.read.table(). Vyžaduje pouze jeden parametr, kterým je cesta/název tabulky. Přímo načte tabulku do PySpark DataFrame a všechny funkce SQL, které jsou aplikovány na PySpark DataFrame, lze také aplikovat na tento načtený DataFrame.

Syntax:

spark_app.read.table(cesta/‘název_tabulky’)

V tomto scénáři používáme předchozí tabulku, která byla vytvořena z PySpark DataFrame. Ujistěte se, že ve svém prostředí potřebujete implementovat fragmenty kódu předchozího scénáře.

Příklad:

Načtěte tabulku „Agri_Table1“ do DataFrame s názvem „loaded_data“.

loading_data = linuxhint_spark_app.read.table( 'Agri_Table1' )

načtená_data.show()

Výstup:

Vidíme, že tabulka je načtena do PySpark DataFrame.

Provádění SQL dotazů

Nyní provedeme některé SQL dotazy na načteném DataFrame pomocí funkce spark.sql().

# Pomocí příkazu SELECT zobrazte všechny sloupce z výše uvedené tabulky.

linuxhint_spark_app.sql( 'SELECT * z Agri_Table1' ).ukázat()

# Klauzule KDE

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Soil_status='Suchý'' ).ukázat()

linuxhint_spark_app.sql( 'SELECT * from Agri_Table1 WHERE Acres > 2000' ).ukázat()

Výstup:

  1. První dotaz zobrazí všechny sloupce a záznamy z DataFrame.
  2. Druhý dotaz zobrazí záznamy založené na sloupci „Soil_status“. Existují pouze tři záznamy s prvkem „Dry“.
  3. Poslední dotaz vrátí dva záznamy s „akry“, které jsou větší než 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Pomocí funkce insertInto() můžeme přidat DataFrame do existující tabulky. Tuto funkci můžeme použít spolu s selectExpr() k definování názvů sloupců a poté je vložit do tabulky. Tato funkce také bere jako parametr název_tabulky.

Syntax:

DataFrame_obj.write.insertInto(’název_tabulky’)

V tomto scénáři používáme předchozí tabulku, která byla vytvořena z PySpark DataFrame. Ujistěte se, že ve svém prostředí potřebujete implementovat fragmenty kódu předchozího scénáře.

Příklad:

Vytvořte nový DataFrame se dvěma záznamy a vložte je do tabulky „Agri_Table1“.

importovat pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

# farmářské údaje se 2 řádky

agri =[{ 'Soil_Type' : 'Písek' , 'Irigation_availability' : 'Ne' , 'Acres' : 2500 , 'Soil_status' : 'Schnout' ,
'Země' : 'USA' },

{ 'Soil_Type' : 'Písek' , 'Irigation_availability' : 'Ne' , 'Acres' : 1200 , 'Soil_status' : 'mokrý' ,
'Země' : 'Japonsko' }]

# vytvořte datový rámec z výše uvedených dat

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Akry' , 'Země' , 'Dostupnost_zavlažování' , 'Soil_Type' ,
'Stav_půdy' ).write.insertInto( 'Agri_Table1' )

# Zobrazte konečnou Agri_Table1

linuxhint_spark_app.sql( 'SELECT * z Agri_Table1' ).ukázat()

Výstup:

Nyní je celkový počet řádků přítomných v DataFrame 7.

Závěr

Nyní chápete, jak zapsat PySpark DataFrame do tabulky pomocí funkce write.saveAsTable(). Přebírá název tabulky a další volitelné parametry. Poté jsme tuto tabulku načetli do PySpark DataFrame pomocí funkce spark.read.table(). Vyžaduje pouze jeden parametr, kterým je cesta/název tabulky. Pokud chcete přidat nový DataFrame do existující tabulky, použijte funkci insertInto().