V této příručce se zaměříme především na čtení/načítání souboru parket do PySpark DataFrame/SQL pomocí funkce read.parquet(), která je dostupná ve třídě pyspark.sql.DataFrameReader.
Téma obsahu:
Přečtěte si soubor Parquet do PySpark DataFrame
Přečtěte si soubor Parquet do PySpark SQL
Pyspark.sql.DataFrameReader.parquet()
Tato funkce se používá ke čtení souboru parket a jeho načtení do PySpark DataFrame. Přebírá cestu/název souboru parketového souboru. Můžeme jednoduše použít funkci read.parquet(), protože se jedná o obecnou funkci.
Syntax:
Podívejme se na syntaxi read.parquet():
spark_app.read.parquet(název_souboru.parketa/cesta)Nejprve nainstalujte modul PySpark pomocí příkazu pip:
pip nainstalovat pyspark
Získejte soubor na parkety
Chcete-li číst soubor parket, potřebujete data, ve kterých je z těchto dat generován soubor parket. V této části uvidíme, jak vygenerovat soubor parket z PySpark DataFrame.
Vytvořme PySpark DataFrame s 5 záznamy a zapišme to do souboru parket „industry_parquet“.
importovat pysparkz pyspark.sql importujte SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# vytvořte datový rámec, který ukládá podrobnosti o odvětví
industry_df = linuxhint_spark_app.createDataFrame([Row(Type= 'Zemědělství' ,Oblast= 'USA' ,
Hodnocení = 'Horký' ,Total_employees= 100 ),
Řádek (Typ= 'Zemědělství' ,Oblast= 'Indie' ,Hodnocení= 'Horký' ,Total_employees= 200 ),
Řádek (Typ= 'Rozvoj' ,Oblast= 'USA' ,Hodnocení= 'Teplý' ,Total_employees= 100 ),
Řádek (Typ= 'Vzdělání' ,Oblast= 'USA' ,Hodnocení= 'Chladný' ,Total_employees= 400 ),
Řádek (Typ= 'Vzdělání' ,Plocha= 'USA' ,Hodnocení= 'Teplý' ,Total_employees= dvacet )
])
# Skutečný DataFrame
industry_df.show()
# Zapište industry_df do souboru parket
industry_df.coalesce( 1 ).pište.parkety( 'průmysl_parkety' )
Výstup:
Toto je DataFrame, který obsahuje 5 záznamů.
Pro předchozí DataFrame je vytvořen soubor parket. Zde je název našeho souboru s příponou „part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet“. Tento soubor používáme v celém tutoriálu.
Přečtěte si soubor Parquet do PySpark DataFrame
Máme soubor na parkety. Pojďme si tento soubor přečíst pomocí funkce read.parquet() a načíst jej do PySpark DataFrame.
importovat pysparkz pyspark.sql importujte SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# Načtěte soubor parket do objektu dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'část-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parkquet' )
# Zobrazte dataframe_from_parquet-DataFrame
dataframe_from_parquet.show()
Výstup:
DataFrame zobrazíme pomocí metody show(), která byla vytvořena ze souboru parket.
SQL dotazy se souborem Parquet
Po načtení do DataFrame je možné vytvořit SQL tabulky a zobrazit data, která jsou přítomna v DataFrame. Musíme vytvořit TEMPORARY VIEW a pomocí příkazů SQL vrátit záznamy z DataFrame, který je vytvořen ze souboru parket.
Příklad 1:
Vytvořte dočasné zobrazení s názvem „Sectors“ a pomocí příkazu SELECT zobrazte záznamy v DataFrame. Můžete odkazovat na toto tutorial který vysvětluje, jak vytvořit VIEW v Spark – SQL.
importovat pysparkz pyspark.sql importujte SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# Načtěte soubor parket do objektu dataframe_from_parquet.
dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'část-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parkquet' )
# Vytvořte pohled z výše uvedeného souboru parket s názvem - 'Sectors'
dataframe_from_parquet.createOrReplaceTempView( 'Sektory' )
# Dotaz pro zobrazení všech záznamů ze sektorů
linuxhint_spark_app.sql( 'vyberte * ze sektorů' ).ukázat()
Výstup:
Příklad 2:
Pomocí předchozího VIEW napište dotaz SQL:
- Zobrazení všech záznamů ze sektorů, které patří do „Indie“.
- Chcete-li zobrazit všechny záznamy ze sektorů se zaměstnancem, který je větší než 100.
linuxhint_spark_app.sql( 'vyberte * ze sektorů, kde Area='Indie'' ).ukázat()
# Dotaz pro zobrazení všech záznamů ze sektorů se zaměstnanci větším než 100
linuxhint_spark_app.sql( 'vyberte * ze sektorů, kde Total_employees>100' ).ukázat()
Výstup:
Existuje pouze jeden záznam s oblastí, která je „Indie“, a dva záznamy se zaměstnanci, která je větší než 100.
Přečtěte si soubor Parquet do PySpark SQL
Nejprve musíme vytvořit VIEW pomocí příkazu CREATE. Pomocí klíčového slova „path“ v rámci SQL dotazu můžeme načíst soubor parket do Spark SQL. Po cestě musíme zadat název souboru/umístění souboru.
Syntax:
spark_app.sql( 'VYTVOŘIT DOČASNÉ ZOBRAZENÍ view_name POMOCÍ MOŽNOSTÍ parket (cesta ' název_souboru.parkety ')' )Příklad 1:
Vytvořte dočasný pohled s názvem „Sector2“ a načtěte do něj soubor parket. Pomocí funkce sql() napište výběrový dotaz, aby se zobrazily všechny záznamy, které jsou v pohledu přítomny.
importovat pysparkz pyspark.sql importujte SparkSession,Row
linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()
# Přečtěte si soubor parket do Spark-SQL
linuxhint_spark_app.sql( 'VYTVOŘIT DOČASNÝ POHLED Sektor 2 POMOCÍ MOŽNOSTÍ parket (cesta ' část-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parkquet ')' )
# Dotaz pro zobrazení všech záznamů ze Sektoru2
linuxhint_spark_app.sql( 'vyberte * ze sektoru 2' ).ukázat()
Výstup:
Příklad 2:
Použijte předchozí VIEW a napište dotaz pro zobrazení všech záznamů s hodnocením „Hot“ nebo „Cool“.
# Dotaz pro zobrazení všech záznamů ze Sektoru2 s hodnocením - Hot nebo Cool.linuxhint_spark_app.sql( 'vyberte * ze sektoru 2, kde Rating='Hot' OR Rating='Cool'' ).ukázat()
Výstup:
Existují tři záznamy s hodnocením „Hot“ nebo „Cool“.
Závěr
V PySpark zapisuje funkce write.parquet() DataFrame do souboru parket. Funkce read.parquet() načte soubor parket do PySpark DataFrame nebo jakéhokoli jiného DataSource. Naučili jsme se, jak načíst soubor parket do PySpark DataFrame a do tabulky PySpark. V rámci tohoto tutoriálu jsme také probrali, jak vytvořit tabulky z PySpark DataFrame a filtrovat data pomocí klauzule WHERE.