10 основних команд PySpark для обробки великих даних

10 основних команд PySpark для обробки великих даних

10 основних команд PySpark для обробки великих даних10 основних команд PySpark для обробки великих даних
Зображення автора | Ідеограма

PySpark об’єднує найкраще з двох світів: простоту мови та бібліотек Python і масштабованість Apache Spark.

У цій статті наведено — через приклади коду — 10 зручних команд PySpark для турбонаддуву конвеєрів обробки великих даних у проектах Python.

Початкове налаштування

Щоб проілюструвати використання 10 представлених команд, ми імпортуємо необхідні бібліотеки, ініціалізуємо сеанс Spark і завантажуємо загальнодоступний набір даних про пінгвінів, який описує зразки пінгвінів, що належать до трьох видів, використовуючи комбінацію числових і категоріальних характеристик. Набір даних спочатку завантажується в Pandas DataFrame.

!pip install pyspark pandas

from pyspark.sql import SparkSession
import pandas as pd

spark = SparkSession.builder \
    .appName("PenguinAnalysis") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

pdf = pd.read_csv("https://raw.githubusercontent.com/gakudo-ai/open-datasets/refs/heads/main/penguins.csv")

10 основних команд PySpark для обробки великих даних

1. Завантаження даних у Spark DataFrame

Spark DataFrames відрізняються від Pandas DataFrames своєю розподіленістю, відкладеним обчисленням і незмінністю. Перетворити Pandas DataFrame так само просто, як використовувати createDataFrame команда.

df = spark.createDataFrame(pdf)

Ця структура даних більше підходить для паралельної обробки даних під сценами, зберігаючи багато характеристик звичайних DataFrames.

2. Виберіть і відфільтруйте дані

The вибрати і фільтр функції вибирають певні стовпці (функції) у Spark DataFrame та фільтрують рядки (екземпляри), де задана умова відповідає дійсності. У цьому прикладі відбираються та фільтруються пінгвіни роду Gentoo вагою понад 5 кг.

df.select("species", "body_mass_g").filter(df.species == "Gentoo").filter(df.body_mass_g > 5000).show()

3. Групові та зведені дані

Групування даних за категоріями зазвичай необхідне для виконання агрегацій, як-от отримання середніх значень або інших статистичних даних для кожної категорії. Цей приклад показує, як використовувати groupBy і укр команди разом, щоб отримати зведення середніх вимірювань для кожного виду.

df.groupBy("species").agg({"flipper_length_mm": "mean", "body_mass_g": "mean"}).show()

4. Функції вікна

Віконні функції виконують обчислення в рядках, пов’язаних із поточним рядком, наприклад ранжування чи поточні підсумки. У цьому прикладі показано, як створити віконну функцію, яка розбиває дані на види, а потім ранжує пінгвінів за масою тіла в межах кожного виду.

from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col
windowSpec = Window.partitionBy("species").orderBy(col("body_mass_g").desc())
df.withColumn("mass_rank", rank().over(windowSpec)).show()

5. Об'єднати операції

Подібно до операцій об’єднання SQL для об’єднання таблиць, операції об’єднання PySpark об’єднують два DataFrame на основі визначених стовпців, використовуючи кілька політик, як-от ліве об’єднання, праве об’єднання, зовнішнє об’єднання тощо. У цьому прикладі створюється DataFrame, що містить загальну кількість для кожного виду, а потім застосовує ліве об’єднання на основі видів у «стовпці зв’язків», щоб додавати до кожного спостереження у вихідному наборі даних кількість, пов’язану з його видом.

species_stats = df.groupBy("species").count()
df.join(species_stats, "species", "left").show()

6. Функції, визначені користувачем

PySpark udf дозволяє створювати визначені користувачем функції, по суті спеціальні лямбда-функції, які після визначення можна використовувати для застосування складних перетворень до стовпців у DataFrame. Цей приклад визначає та застосовує визначену користувачем функцію для відображення маси тіла пінгвіна в нову категоріальну змінну, що описує розмір пінгвіна як великий або малий.

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
size_category = udf(lambda x: "Large" if x > 4500 else "Small", StringType())
df.withColumn("size_class", size_category(df.body_mass_g)).show()

7. Зведені таблиці

Зведені таблиці використовуються для перетворення категорій у стовпці, наприклад сексу кілька стовпців, призначених для опису агрегацій або зведеної статистики для кожної категорії. Нижче ми створюємо зведену таблицю, яка для кожного виду обчислює та відображає середню довжину клюва окремо за статтю:

df.groupBy("species").pivot("sex").agg({"bill_length_mm": "avg"}).show()

8. Обробка відсутніх значень

The заповнити і водянка функції можна використовувати для введення відсутніх значень або очищення набору даних від рядків, що містять відсутні значення. Їх також можна використовувати разом, якщо ми хочемо призначити відсутні значення одного конкретного стовпця, але тоді ми хочемо видалити спостереження, які містять відсутні значення в будь-яких інших стовпцях після цього:

df.na.fill({"sex": "Unknown"}).dropna().show()

9. Збережіть оброблений набір даних

Звичайно, після того, як ми застосували деякі операції обробки до набору даних, ми можемо зберегти його в різних форматах, таких як parquet:

large_penguins = df.filter(df.body_mass_g > 4500)
large_penguins.write.mode("overwrite").parquet("large_penguins.parquet")

10. Виконуйте SQL запити

Остання команда в цьому списку дозволяє вставляти SQL-запити в команду PySpark і запускати їх у тимчасових представленнях Spark DataFrames. Цей підхід поєднує гнучкість SQL із розподіленою обробкою PySpark.

df.createOrReplaceTempView("penguins")
spark.sql("""
    SELECT species, island, 
           AVG(body_mass_g) as avg_mass,
           AVG(flipper_length_mm) as avg_flipper
    FROM penguins 
    GROUP BY species, island
    ORDER BY avg_mass DESC
""").show()

Ми настійно рекомендуємо вам потренуватися випробувати ці 10 прикладів команд одну за одною в блокноті Jupyter або Google Colab, щоб ви могли бачити кінцеві результати. Набір даних легко доступний для безпосереднього читання та завантаження у ваш код.

Іван Паломарес Карраскоса є лідером, автором, доповідачем і консультантом у сфері штучного інтелекту, машинного навчання, глибокого навчання та магістра права. Він навчає та направляє інших у використанні ШІ в реальному світі.