Jak implementovat streamování dat v reálném čase v Pythonu

Jak Implementovat Streamovani Dat V Realnem Case V Pythonu



Zvládnutí implementace streamování dat v reálném čase v Pythonu funguje jako základní dovednost v dnešním světě plném dat. Tato příručka zkoumá základní kroky a základní nástroje pro využití streamování dat v reálném čase s autentičností v Pythonu. Od výběru vhodného rámce, jako je Apache Kafka nebo Apache Pulsar, až po psaní kódu v Pythonu pro snadnou spotřebu dat, zpracování a efektivní vizualizaci, získáme potřebné dovednosti pro konstrukci agilních a efektivních datových kanálů v reálném čase.

Příklad 1: Implementace streamování dat v reálném čase v Pythonu

Implementace streamování dat v reálném čase v Pythonu je v dnešní době a světě založeném na datech zásadní. V tomto podrobném příkladu projdeme procesem budování systému streamování dat v reálném čase pomocí Apache Kafka a Pythonu v Google Colab.







Pro inicializaci příkladu, než začneme kódovat, je nezbytné vytvořit konkrétní prostředí v Google Colab. První věc, kterou musíme udělat, je nainstalovat potřebné knihovny. Pro integraci Kafka používáme knihovnu „kafka-python“.



! pip Nainstalujte kafka-python


Tento příkaz nainstaluje knihovnu „kafka-python“, která poskytuje funkce Pythonu a vazby pro Apache Kafka. Dále importujeme požadované knihovny pro náš projekt. Import požadovaných knihoven včetně „KafkaProducer“ a „KafkaConsumer“ jsou třídy z knihovny „kafka-python“, které nám umožňují komunikovat s makléři Kafka. JSON je knihovna Pythonu pro práci s daty JSON, která používáme k serializaci a deserializaci zpráv.



od kafka import KafkaProducer, KafkaConsumer
import json


Tvorba Kafkova producenta





To je důležité, protože producent Kafka posílá data do tématu Kafka. V našem příkladu vytváříme producenta, který odesílá simulovaná data v reálném čase k tématu zvanému „téma v reálném čase“.

Vytvoříme instanci „KafkaProducer“, která specifikuje adresu brokera Kafka jako „localhost:9092“. Potom použijeme „value_serializer“, funkci, která serializuje data před jejich odesláním do Kafky. V našem případě lambda funkce kóduje data jako JSON kódovaný UTF-8. Nyní simulujme některá data v reálném čase a pošleme je do tématu Kafka.



producent = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( v ) .zakódovat ( 'utf-8' ) )
# Simulovaná data v reálném čase
údaje = { 'id_senzoru' : 1 , 'teplota' : 25.5 , 'vlhkost vzduchu' : 60,2 }
# Odeslání dat k tématu
výrobce.poslat ( 'téma v reálném čase' , údaje )


V těchto řádcích definujeme „datový“ slovník, který představuje simulovaná data senzoru. Poté použijeme metodu „odeslat“ k publikování těchto dat do „tématu v reálném čase“.

Potom chceme vytvořit spotřebitele Kafky a spotřebitel Kafka čte data z tématu Kafka. Vytváříme spotřebitele, který konzumuje a zpracovává zprávy v „témě v reálném čase“. Vytvoříme instanci „KafkaConsumer“, která specifikuje téma, které chceme konzumovat, např. (téma v reálném čase) a adresu brokera Kafka. Potom „value_deserializer“ je funkce, která deserializuje data přijatá od Kafky. V našem případě lambda funkce dekóduje data jako JSON s kódováním UTF-8.

spotřebitel = KafkaConsumer ( 'téma v reálném čase' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )


K neustálému přijímání a zpracovávání zpráv z tématu používáme iterační smyčku.

# Čtení a zpracování dat v reálném čase
pro zpráva v spotřebitel:
data = zpráva.hodnota
tisk ( F 'Přijatá data: {data}' )


Načteme hodnotu každé zprávy a naše simulovaná data senzoru uvnitř smyčky a vytiskneme je do konzoly. Spuštění výrobce a spotřebitele Kafka zahrnuje spuštění tohoto kódu ve službě Google Colab a samostatné spouštění buněk kódu. Výrobce odešle simulovaná data do Kafkova tématu a spotřebitel přijatá data přečte a vytiskne.


Analýza výstupu během běhu kódu

Budeme sledovat data v reálném čase, která se vyrábí a spotřebovávají. Formát dat se může lišit v závislosti na naší simulaci nebo skutečném zdroji dat. V tomto podrobném příkladu pokrýváme celý proces nastavení systému streamování dat v reálném čase pomocí Apache Kafka a Pythonu v Google Colab. Vysvětlíme každý řádek kódu a jeho význam při budování tohoto systému. Streamování dat v reálném čase je výkonná funkce a tento příklad slouží jako základ pro složitější aplikace v reálném světě.

Příklad 2: Implementace streamování dat v reálném čase v Pythonu pomocí dat akciového trhu

Udělejme další jedinečný příklad implementace streamování dat v reálném čase v Pythonu pomocí jiného scénáře; tentokrát se zaměříme na data akciového trhu. Vytváříme systém streamování dat v reálném čase, který zachycuje změny cen akcií a zpracovává je pomocí Apache Kafka a Python v Google Colab. Jak je ukázáno v předchozím příkladu, začneme konfigurací našeho prostředí ve službě Google Colab. Nejprve nainstalujeme potřebné knihovny:

! pip Nainstalujte kafka-python yfinance


Zde přidáváme knihovnu „yfinance“, která nám umožňuje získat data akciového trhu v reálném čase. Dále importujeme potřebné knihovny. Pro interakci s Kafkou nadále používáme třídy „KafkaProducer“ a „KafkaConsumer“ z knihovny „kafka-python“. Importujeme JSON, abychom mohli pracovat s daty JSON. Také používáme „yfinance“ k získání údajů o akciovém trhu v reálném čase. Importujeme také „časovou“ knihovnu, abychom přidali časové zpoždění pro simulaci aktualizací v reálném čase.

od kafka import KafkaProducer, KafkaConsumer
import json
import yfinance tak jako yf
import čas


Nyní vytváříme producenta Kafka pro skladová data. Náš výrobce Kafka získává údaje o akciích v reálném čase a odesílá je do tématu Kafka s názvem „cena akcií“.

producent = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( v ) .zakódovat ( 'utf-8' ) )

zatímco Skutečný:
akcie = yf.Ticker ( 'AAPL' ) # Příklad: akcie společnosti Apple Inc
stock_data = stock.history ( doba = '1 d' )
last_price = stock_data [ 'Zavřít' ] .iloc [ - 1 ]
údaje = { 'symbol' : 'AAPL' , 'cena' : poslední cena }
výrobce.poslat ( 'tržní cena' , údaje )
čas.spánek ( 10 ) # Simulujte aktualizace v reálném čase každých 10 sekund


Vytvoříme instanci „KafkaProducer“ s adresou brokera Kafka v tomto kódu. Uvnitř smyčky používáme „yfinance“ k získání nejnovější ceny akcií společnosti Apple Inc. („AAPL“). Poté extrahujeme poslední závěrečnou cenu a odešleme ji do tématu „akciová cena“. Nakonec zavedeme časové zpoždění pro simulaci aktualizací v reálném čase každých 10 sekund.

Vytvořme spotřebitele Kafka, který bude číst a zpracovávat data o ceně akcií z tématu „cena akcií“.

spotřebitel = KafkaConsumer ( 'tržní cena' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

pro zpráva v spotřebitel:
stock_data = message.value
tisk ( F 'Přijaté údaje o skladech: {stock_data['symbol']} – Cena: {stock_data['price']}' )


Tento kód je podobný spotřebitelskému nastavení v předchozím příkladu. Průběžně čte a zpracovává zprávy z tématu „akciová cena“ a tiskne symbol akcií a cenu do konzole. Buňky kódu spouštíme postupně, např. jednu po druhé v Google Colab, abychom spustili producenta a spotřebitele. Výrobce získává a odesílá aktualizace cen akcií v reálném čase, zatímco spotřebitel tato data čte a zobrazuje.

! pip Nainstalujte kafka-python yfinance
od kafka import KafkaProducer, KafkaConsumer
import json
import yfinance tak jako yf
import čas
producent = KafkaProducer ( bootstrap_servers = 'localhost:9092' ,
value_serializer =lambda v: json.dumps ( v ) .zakódovat ( 'utf-8' ) )

zatímco Skutečný:
akcie = yf.Ticker ( 'AAPL' ) # Akcie společnosti Apple Inc
stock_data = stock.history ( doba = '1 d' )
last_price = stock_data [ 'Zavřít' ] .iloc [ - 1 ]

údaje = { 'symbol' : 'AAPL' , 'cena' : poslední cena }

výrobce.poslat ( 'tržní cena' , údaje )

čas.spánek ( 10 ) # Simulujte aktualizace v reálném čase každých 10 sekund
spotřebitel = KafkaConsumer ( 'tržní cena' ,
bootstrap_servers = 'localhost:9092' ,
value_deserializer =lambda x: json.loads ( x.decode ( 'utf-8' ) ) )

pro zpráva v spotřebitel:
stock_data = message.value
tisk ( F 'Přijaté údaje o skladech: {stock_data['symbol']} – Cena: {stock_data['price']}' )


V analýze výstupu po spuštění kódu budeme pozorovat aktualizace cen akcií společnosti Apple Inc. v reálném čase, které jsou vyráběny a spotřebovávány.

Závěr

V tomto unikátním příkladu jsme demonstrovali implementaci streamování dat v reálném čase v Pythonu pomocí Apache Kafka a knihovny „yfinance“ k zachycení a zpracování dat akciového trhu. Důkladně jsme vysvětlili každý řádek kódu. Streamování dat v reálném čase lze použít v různých oblastech pro vytváření aplikací v reálném světě ve financích, IoT a dalších.