PySpark Pandas_Udf()

Pyspark Pandas Udf



Transformace PySpark DataFrame je možná pomocí funkce pandas_udf(). Je to uživatelsky definovaná funkce, která se aplikuje na PySpark DataFrame se šipkou. Vektorizované operace můžeme provádět pomocí pandas_udf(). Lze ji implementovat předáním této funkce jako dekoratér. Pojďme se ponořit do této příručky, abychom poznali syntaxi, parametry a různé příklady.

Téma obsahu:

Pokud se chcete dozvědět o PySpark DataFrame a instalaci modulu, projděte si toto článek .







Pyspark.sql.functions.pandas_udf()

Pandas_udf () je k dispozici v modulu sql.functions v PySpark, který lze importovat pomocí klíčového slova „from“. Používá se k provádění vektorizovaných operací na našem PySpark DataFrame. Tato funkce je implementována jako dekorátor předáním tří parametrů. Poté můžeme vytvořit uživatelsky definovanou funkci, která vrací data ve vektorovém formátu (jako my k tomu používáme series/NumPy) pomocí šipky. V rámci této funkce jsme schopni vrátit výsledek.



Struktura a syntaxe:



Nejprve se podívejme na strukturu a syntaxi této funkce:

@pandas_udf(datatype)
def název_funkce(operace) -> convert_format:
návratový výpis

Název_funkce je zde název naší definované funkce. Datový typ určuje datový typ, který je vrácen touto funkcí. Výsledek můžeme vrátit pomocí klíčového slova „return“. Všechny operace se provádějí uvnitř funkce s přiřazením šipky.





Pandas_udf (funkce a návratový typ)

  1. Prvním parametrem je uživatelsky definovaná funkce, která je mu předána.
  2. Druhý parametr se používá k určení návratového datového typu z funkce.

Data:

V celé této příručce používáme pro demonstraci pouze jeden PySpark DataFrame. Všechny uživatelem definované funkce, které definujeme, jsou aplikovány na tento PySpark DataFrame. Ujistěte se, že tento DataFrame vytvoříte ve svém prostředí nejdříve po instalaci PySpark.



importovat pyspark

z pyspark.sql importujte SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Linux Hint' ).getOrCreate()

z pyspark.sql.functions import pandas_udf

import z pyspark.sql.types *

importovat pandy jako pandy

# detaily zeleniny

zelenina =[{ 'typ' : 'zelenina' , 'název' : 'rajče' , 'locate_country' : 'USA' , 'Množství' : 800 },

{ 'typ' : 'ovoce' , 'název' : 'banán' , 'locate_country' : 'ČÍNA' , 'Množství' : dvacet },

{ 'typ' : 'zelenina' , 'název' : 'rajče' , 'locate_country' : 'USA' , 'Množství' : 800 },

{ 'typ' : 'zelenina' , 'název' : 'Mango' , 'locate_country' : 'JAPONSKO' , 'Množství' : 0 },

{ 'typ' : 'ovoce' , 'název' : 'citrón' , 'locate_country' : 'INDIE' , 'Množství' : 1700 },

{ 'typ' : 'zelenina' , 'název' : 'rajče' , 'locate_country' : 'USA' , 'Množství' : 1200 },

{ 'typ' : 'zelenina' , 'název' : 'Mango' , 'locate_country' : 'JAPONSKO' , 'Množství' : 0 },

{ 'typ' : 'ovoce' , 'název' : 'citrón' , 'locate_country' : 'INDIE' , 'Množství' : 0 }

]

# z výše uvedených dat vytvořte datový rámec trhu

market_df = linuxhint_spark_app.createDataFrame(zelenina)

market_df.show()

Výstup:

Zde vytvoříme tento DataFrame se 4 sloupci a 8 řádky. Nyní použijeme pandas_udf() k vytvoření uživatelem definovaných funkcí a jejich aplikování na tyto sloupce.

Pandas_udf() s různými datovými typy

V tomto scénáři vytváříme některé uživatelem definované funkce pomocí pandas_udf() a aplikujeme je na sloupce a zobrazujeme výsledky pomocí metody select(). V každém případě používáme řadu pandas.Series, když provádíme vektorizované operace. To považuje hodnoty sloupce za jednorozměrné pole a operace se použije na sloupec. V samotném dekorátoru určíme návratový typ funkce.

Příklad 1: Pandas_udf() s typem řetězce

Zde vytvoříme dvě uživatelsky definované funkce s návratovým typem řetězce pro převod hodnot sloupců typu řetězec na velká a malá písmena. Nakonec tyto funkce aplikujeme na sloupce „type“ a „locate_country“.

# Převeďte sloupec typu na velká písmena pomocí pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

vrátit i.str.upper()

# Převeďte sloupec locate_country na malá písmena pomocí pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

return i.str.lower()

# Zobrazte sloupce pomocí select()

market_df.select( 'typ' ,type_upper_case( 'typ' ), 'locate_country' ,
země_malá_malá písmena( 'locate_country' )).ukázat()

Výstup:

Vysvětlení:

Funkce StringType() je dostupná v modulu pyspark.sql.types. Tento modul jsme již importovali při vytváření PySpark DataFrame.

  1. Za prvé, UDF (uživatelem definovaná funkce) vrací řetězce psané velkými písmeny pomocí funkce str.upper(). Str.upper() je k dispozici ve Strukturě dat řady (když převádíme na řadu se šipkou uvnitř funkce), která převádí daný řetězec na velká písmena. Nakonec je tato funkce aplikována na sloupec „type“, který je zadán uvnitř metody select(). Dříve byly všechny řetězce ve sloupci type psány malými písmeny. Nyní jsou změněna na velká písmena.
  2. Za druhé, UDF vrátí řetězce velkými písmeny pomocí funkce str.lower(). Str.lower() je k dispozici ve Strukturě dat řady, která převádí daný řetězec na malá písmena. Nakonec je tato funkce aplikována na sloupec „type“, který je zadán uvnitř metody select(). Dříve byly všechny řetězce ve sloupci type psány velkými písmeny. Nyní jsou změněna na malá písmena.

Příklad 2: Pandas_udf() s celočíselným typem

Vytvořme UDF, který převede celočíselný sloupec PySpark DataFrame na řadu Pandas a ke každé hodnotě přidá 100. Předejte sloupec „množství“ této funkci uvnitř metody select().

# Přidejte 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

vrátit i+ 100

# Předejte sloupec množství výše uvedené funkci a zobrazení.

market_df.select( 'Množství' ,add_100( 'Množství' )).ukázat()

Výstup:

Vysvětlení:

Uvnitř UDF iterujeme všechny hodnoty a převedeme je na Series. Poté ke každé hodnotě v řadě přidáme 100. Nakonec této funkci předáme sloupec „množství“ a vidíme, že ke všem hodnotám se přičte 100.

Pandas_udf() s různými typy dat pomocí Groupby() a Agg()

Podívejme se na příklady předání UDF do agregovaných sloupců. Zde jsou hodnoty sloupců seskupeny nejprve pomocí funkce groupby() a agregace se provádí pomocí funkce agg(). Náš UDF předáváme do této agregační funkce.

Syntax:

pyspark_dataframe_object.groupby( 'sloupec_seskupení' ).agg(UDF
(pyspark_dataframe_object[ 'sloupec' ]))

Zde jsou nejprve seskupeny hodnoty ve sloupci seskupení. Poté se provede agregace na každém seskupeném datu s ohledem na náš UDF.

Příklad 1: Pandas_udf() s Aggregate Mean()

Zde vytvoříme uživatelsky definovanou funkci s návratovým typem float. Uvnitř funkce vypočítáme průměr pomocí funkce mean(). Tento UDF je předán do sloupce „množství“, kde se získá průměrné množství pro každý typ.

# vrátí průměr/průměr

@pandas_udf( 'plovák' )

def average_function(i: panda.Series) -> float:

vrátit i.mean()

# Předejte sloupec množství funkci seskupením sloupce typu.

market_df.groupby( 'typ' ).agg(průměrná_funkce(trh_df[ 'Množství' ])).ukázat()

Výstup:

Seskupujeme na základě prvků ve sloupci „typ“. Vznikají dvě skupiny – „ovoce“ a „zelenina“. Pro každou skupinu se vypočítá a vrátí průměr.

Příklad 2: Pandas_udf() s Aggregate Max() a Min()

Zde vytvoříme dvě uživatelsky definované funkce s návratovým typem integer (int). První UDF vrací minimální hodnotu a druhý UDF maximální hodnotu.

# pandas_udf, které vrací minimální hodnotu

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

vrátit i.min()

# pandas_udf, které vrací maximální hodnotu

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

návrat i.max()

# Předejte sloupec množství do min_ pandas_udf seskupením locate_country.

market_df.groupby( 'locate_country' ).agg(min_(market_df[ 'Množství' ])).ukázat()

# Předejte sloupec množství do max_ pandas_udf seskupením locate_country.

market_df.groupby( 'locate_country' ).agg(max_(market_df[ 'Množství' ])).ukázat()

Výstup:

K vrácení minimálních a maximálních hodnot využíváme funkce min() a max() v návratovém typu UDF. Nyní data seskupíme do sloupce „locate_country“. Jsou vytvořeny čtyři skupiny („ČÍNA“, „INDIE“, „JAPONSKO“, „USA“). Pro každou skupinu vracíme maximální množství. Podobně vracíme minimální množství.

Závěr

V zásadě se pandas_udf () používá k provádění vektorizovaných operací na našem PySpark DataFrame. Viděli jsme, jak vytvořit pandas_udf() a aplikovat jej na PySpark DataFrame. Pro lepší pochopení jsme diskutovali o různých příkladech s ohledem na všechny datové typy (string, float a integer). Je možné použít pandas_udf() s groupby() prostřednictvím funkce agg().