PYSPARK RDD - Transformationen

PYSPARK RDD - Transformationen
In Python ist PySpark ein Spark -Modul, das eine ähnliche Art von Verarbeitung wie Spark bietet.

RDD steht für widerstandsfähige verteilte Datensätze. Wir können RDD als grundlegende Datenstruktur in Apache Spark bezeichnen.

Wir müssen RDD aus dem PySpark importieren.RDD -Modul.

In PYSPARK, um eine RDD zu erstellen, können wir die parallelize () -Methode verwenden.

Syntax:

Spark_App.SparkContext.parallelisieren (Daten)

Wo,

Daten können eine eindimensionale (lineare Daten) oder zwei dimensionale Daten (Zeilen-Säulen-Daten) sein.

RDD -Transformationen:

Eine Transformations -RDD ist eine Operation, die auf eine RDD angewendet wird, um neue Daten aus der vorhandenen RDD zu erstellen. Verwenden von Transformationen können wir die RDD durch Anwenden einiger Transformationen filtern.

Lassen Sie uns die Transformationen sehen, die auf der gegebenen RDD durchgeführt werden.

Wir werden sie nacheinander besprechen.

1. Karte()

MAP () Transformation wird verwendet, um einen Wert den in der RDD vorhandenen Elementen abzubilden. Es dauert eine anonyme Funktion als Parameter wie Lambda und transformiert die Elemente in einer RDD.

Syntax:

Rdd_data.Karte (anonymous_function)

Parameter:

anonymous_function sieht aus wie:

Lambda -Element: Operation

Zum Beispiel besteht die Operation darin, alle Elemente mit einem neuen Element hinzuzufügen/subtrahieren.

Lassen Sie uns die Beispiele sehen, um diese Transformation besser zu verstehen.

Beispiel 1:

In diesem Beispiel erstellen wir eine RDD mit dem Namen student_marks mit 20 Elementen und anwenden Sie MAP () -Transformation, indem Sie jedes Element mit 20 hinzufügen und diese mit Collect () -Aktion anzeigen.

#Amportieren Sie das PYSPARK -Modul
pysspark importieren
#import SparkSession für die Erstellung einer Sitzung
aus pysspark.SQL Import SparkSession
# RDD aus PYSPARK importieren.RDD
aus pysspark.RDD Import RDD
#create eine App namens LinuxHint
Spark_app = SparkSession.Erbauer.AppName ('LinuxHint').Getorcreate ()
# Erstellen Sie Daten für Schülermarken mit 20 Elementen
Student_marks = Spark_App.SparkContext.parallelisieren ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
#Display -Daten in RDD
print ("tatsächliche Daten in RDD:", Student_marks.Karte (Lambda -Element: Element).sammeln())
#Apply map () Transformation, indem Sie 20 zu jedem Element in RDD addieren
Drucken ("Nach 20 zu jedem Element in RDD hinzugefügt:", Student_marks.Karte (Lambda -Element: Element+ 20).sammeln())

Ausgang:

Tatsächliche Daten in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Nachdem 20 zu jedem Element in RDD hinzugefügt wurde: [109, 96, 98, 109, 110, 120, 54, 76, 74, 42, 65, 63, 43, 76, 98, 41, 54, 54, 76, 54]

Aus der obigen Ausgabe können wir sehen, dass Element 20 zu jedem Element in RDD über die Lambda -Funktion mit MAP () -Transformation hinzugefügt wird.

Beispiel 2:

In diesem Beispiel erstellen wir eine RDD mit dem Namen student_marks mit 20 Elementen und anwenden Sie MAP () -Transformation, indem Sie jedes Element mit 15 subtrahieren und diese mit Collect () -Aktion anzeigen.

#Amportieren Sie das PYSPARK -Modul
pysspark importieren
#import SparkSession für die Erstellung einer Sitzung
aus pysspark.SQL Import SparkSession
# RDD aus PYSPARK importieren.RDD
aus pysspark.RDD Import RDD
#create eine App namens LinuxHint
Spark_app = SparkSession.Erbauer.AppName ('LinuxHint').Getorcreate ()
# Erstellen Sie Daten für Schülermarken mit 20 Elementen
Student_marks = Spark_App.SparkContext.parallelisieren ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
#Display -Daten in RDD
print ("tatsächliche Daten in RDD:", Student_marks.Karte (Lambda -Element: Element).sammeln())
#Apply map () Transformation durch Subtrahieren von 15 von jedem Element in RDD
print ("Nach dem Abziehen von 15 von jedem Element in RDD:", Student_marks.Karte (Lambda Element: Element-15).sammeln())

Ausgang:

Tatsächliche Daten in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Nach Abzug von 15 von jedem Element in RDD: [74, 61, 63, 74, 75, 85, 19, 41, 39, 7, 30, 28, 8, 41, 63, 6, 19, 19, 41, 19]

Aus der obigen Ausgabe können wir erkennen, dass Element 15 durch die Lambda -Funktion unter Verwendung der MAP () -Transformation an jedem einzelnen Element in RDD abgezogen wird.

2. Filter()

Filter () Transformation wird verwendet, um Werte aus der RDD zu filtern. Es erfordert eine anonyme Funktion wie Lambda und gibt die Elemente zurück, indem sie Elemente von einer RDD filtern.

Syntax:

Rdd_data.Filter (anonymous_function)

Parameter:

anonymous_function sieht aus wie:

Lambda -Element: Zustand/Ausdruck

Beispielsweise wird die Bedingung verwendet, um die ausdrucksstarken Anweisungen zum Filtern der RDD anzugeben.

Lassen Sie uns Beispiele sehen, um diese Transformation besser zu verstehen.

Beispiel 1:

In diesem Beispiel erstellen wir eine RDD mit dem Namen student_marks mit 20 Elementen und verwenden Filter () -Transformation, indem wir nur Multiplikatoren von 5 filtern und diese mit Collect () Aktion anzeigen.

#Amportieren Sie das PYSPARK -Modul
pysspark importieren
#import SparkSession für die Erstellung einer Sitzung
aus pysspark.SQL Import SparkSession
# RDD aus PYSPARK importieren.RDD
aus pysspark.RDD Import RDD
#create eine App namens LinuxHint
Spark_app = SparkSession.Erbauer.AppName ('LinuxHint').Getorcreate ()
# Erstellen Sie Daten für Schülermarken mit 20 Elementen
Student_marks = Spark_App.SparkContext.parallelisieren ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
#Display -Daten in RDD
print ("tatsächliche Daten in RDD:", Student_marks.Karte (Lambda -Element: Element).sammeln())
#Apply filter () Transformation durch Rückgabe von intiely Multiples von 5.
print ("Vielfache von 5 von einem RDD:", Student_marks.Filter (Lambda -Element: Element%5 == 0).sammeln())
)

Ausgang:

Tatsächliche Daten in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Mehrfach 5 von einem RDD: [90, 100, 45]

Aus der obigen Ausgabe können wir sehen, dass Vielfalt von 5 Elementen aus der RDD filtriert werden.

Beispiel 2:

In diesem Beispiel erstellen wir eine RDD mit dem Namen student_marks mit 20 Elementen und anwenden Filter () -Transformation durch Filterelemente, die größer als 45 sind und sie mit Collect () -Aktion anzeigen.

#Amportieren Sie das PYSPARK -Modul
pysspark importieren
#import SparkSession für die Erstellung einer Sitzung
aus pysspark.SQL Import SparkSession
# RDD aus PYSPARK importieren.RDD
aus pysspark.RDD Import RDD
#create eine App namens LinuxHint
Spark_app = SparkSession.Erbauer.AppName ('LinuxHint').Getorcreate ()
# Erstellen Sie Daten für Schülermarken mit 20 Elementen
Student_marks = Spark_App.SparkContext.parallelisieren ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
#Display -Daten in RDD
print ("tatsächliche Daten in RDD:", Student_marks.Karte (Lambda -Element: Element).sammeln())
#Apply filter () Transformation durch Filterwerte größer als 45
print ("Werte größer als 45:", Student_marks.Filter (Lambda -Element: Element> 45).sammeln())

Ausgang:

Tatsächliche Daten in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Werte mehr als 45: [89, 76, 78, 89, 90, 100, 56, 54, 56, 78, 56]

Aus der obigen Ausgabe können wir sehen, dass diese Elemente von mehr als 45 vom RDD gefiltert werden.

3. Union()

Union () Transformation wird verwendet, um zwei RDDs zu kombinieren. Wir können diese Transformation auf zwei RDDs durchführen…

Syntax:

Rdd_data1.Union (rdd_data2)

Lassen Sie uns Beispiele sehen, um diese Transformation besser zu verstehen.

Beispiel 1:

In diesem Beispiel werden wir eine einzelne RDD mit Student Marks -Daten erstellen und zwei RDDs aus der einzelnen RDD erstellen, indem einige Werte mithilfe der Filter () -Transformation gefiltert werden. Danach können wir Union () Transformation auf den beiden gefilterten RDDs durchführen.

#Amportieren Sie das PYSPARK -Modul
pysspark importieren
#import SparkSession für die Erstellung einer Sitzung
aus pysspark.SQL Import SparkSession
# RDD aus PYSPARK importieren.RDD
aus pysspark.RDD Import RDD
#create eine App namens LinuxHint
Spark_app = SparkSession.Erbauer.AppName ('LinuxHint').Getorcreate ()
# Erstellen Sie Daten für Schülermarken mit 20 Elementen
Student_marks = Spark_App.SparkContext.parallelisieren ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
#Display -Daten in RDD
print ("tatsächliche Daten in RDD:", Student_marks.Karte (Lambda -Element: Element).sammeln())
First_filter = student_marks.Filter (Lambda -Element: Element> 90)
Second_filter = student_marks.Filter (Lambda -Element: Element <40)
#Display First gefilterte Transformation
Druck ("Elemente in RDD größer als 90", First_filter.sammeln())
#Display zweite gefilterte Transformation
Print ("Elemente in RDD weniger als 40", Second_Filter.sammeln())
#Apply Union () Transformation durch Ausführung der Union auf den oben genannten 2 Filtern
Print ("Union Transformation auf zwei gefilterten Daten", First_filter.Union (Second_Filter).sammeln())

Ausgang:

Tatsächliche Daten in RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Elemente in RDD über 90 [100]
Elemente in RDD weniger als 40 [34, 22, 23, 21, 34, 34, 34]
Union Transformation an zwei gefilterten Daten [100, 34, 22, 23, 21, 34, 34, 34]

Aus der obigen Ausgabe sehen Sie, dass wir Union auf First_Filter und Second_Filter durchgeführt haben.

First_Filter wird erhalten, indem Elemente von Studentenmarks RDD mehr als 90 und Second_filter erhalten werden.

Beispiel 2:

In diesem Beispiel werden wir zwei RDDs erstellen, so dass die erste RDD 20 Elemente hat und die zweite RDD 10 Elemente hat. Anschließend können wir eine Union () -Transformation auf diese beiden RDDs anwenden.

#Amportieren Sie das PYSPARK -Modul
pysspark importieren
#import SparkSession für die Erstellung einer Sitzung
aus pysspark.SQL Import SparkSession
# RDD aus PYSPARK importieren.RDD
aus pysspark.RDD Import RDD
#create eine App namens LinuxHint
Spark_app = SparkSession.Erbauer.AppName ('LinuxHint').Getorcreate ()
# Erstellen Sie Daten für Schülermarken mit 20 Elementen
Student_marks1 = Spark_App.SparkContext.parallelisieren ([89,76,78,89,90,100,34,56,54,22,45,43,23,56,78,21,34,34,56,34])
# Erstellen Sie die Daten für Schülermarkierungen mit 10 Elementen
Student_marks2 = Spark_App.SparkContext.Parallelisieren ([45,43,23,56,78,21,34,34,56,34])
#Display -Daten in RDD
Print ("Tatsächliche Daten in Studentenmarken 1 RDD:", Student_marks1.Karte (Lambda -Element: Element).sammeln())
#Display -Daten in RDD
print ("Tatsächliche Daten in Studentenmarken 2 RDD:", Student_marks2.Karte (Lambda -Element: Element).sammeln())
#Apply Union () Transformation durch Aufführung von Union auf den oben genannten 2. RDDs
print ("Gewerkschaftsstransformation auf zwei RDD", Student_marks1.Union (student_marks2).sammeln())

Ausgang:

Tatsächliche Daten in den Student Marks 1 RDD: [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Tatsächliche Daten in den Student Marks 2 RDD: [45, 43, 23, 56, 78, 21, 34, 34, 56, 34]
Union Transformation auf zwei RDD [89, 76, 78, 89, 90, 100, 34, 56, 54, 22, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34, 45, 43, 23, 56, 78, 21, 34, 34, 56, 34]

Wir können sehen, dass zwei RDDs mit Union () Transformation kombiniert werden.

Abschluss

Aus diesem PYSPARK -Tutorial sehen wir drei Transformationen, die auf RDD angewendet werden. MAP () Die Transformation wird verwendet, um die Elemente in einem RDD zu transformieren. Schließlich diskutierten wir Union () RDD, mit dem zwei RDDs kombiniert werden.