PYSPARK - LAG -Funktion

PYSPARK - LAG -Funktion
Die LAG () -Funktion in pySpark ist im Fenstermodul verfügbar, mit dem die vorherigen Zeilenwerte an die aktuellen Zeilen zurückgegeben werden können. Firstl, die Funktion LAG () kehrt null für Top -Zeilen zurück. Es ist ein Offset -Parameter, der die Gesamtzahl der Zeilen darstellt, so dass die vorherigen Zeilenwerte an die nächsten Zeilen zurückgegeben werden. Für die ersten oberen Zeilen werden die (Offset) Nulls platziert.

Es ist möglich, die Zeilen im DataFrame mithilfe der Fensterfunktion zu partitionieren. Es ist in der erhältlich PYSPARK.sql.Fenster Modul.

Syntax:

DataFrame_OBJ.WithColumn ("lag_column", lag ("Spalte", Offset).über (Partition))

Es dauert zwei Parameter:

  1. Die Spalte ist der Spaltenname im PYSPARK -Datenrahmen, in dem die verzögerten Zeilenwerte basierend auf den Werten in dieser Spalte platziert werden.
  2. Der Offset gibt die Ganzzahl an, die diese Anzahl früherer Zeilen an die aktuellen Zeilenwerte zurückgibt.

Schritte:

  1. Erstellen Sie einen PYSPARK -Datenrahmen, der in mindestens einer Spalte einige ähnliche Werte aufweist.
  2. Partitionieren Sie die Daten mit der in der Fensterfunktion verfügbaren PartitionBy () -Methode und bestellen Sie sie basierend auf der Spalte mit der Funktion "OrderBy ()).

Syntax:

Partition = Fenster.partitionby ("Spalte").OrderBy ("Spalte")

Wir können die partitionierten Daten mit der partitionierten Spalte oder einer anderen Spalte bestellen.

Jetzt können Sie die LAG () -Funktion auf den partitionierten Zeilen verwenden, indem Sie die über() Funktion.

Wir fügen eine Spalte hinzu, um die Zeilennummer mit dem zu speichern with column () Funktion.

Syntax:

DataFrame_OBJ.WithColumn ("lag_column", lag ("Spalte", Offset).über (Partition))

Hier gibt der Name den Zeilennamen an und der DataFrame_OBJ ist unser PYSPARK -Datenrahmen.

Lassen Sie uns den Code implementieren.

Beispiel 1:

Hier erstellen wir einen pyspark -Datenrahmen mit 5 Spalten - ['Subjekt_ID', 'Name', 'Age', 'Technology1', 'Technology2'] mit 10 Zeilen und Partition der Zeilen basierend auf Technologie1 Verwenden der Fensterfunktion. Danach verzögern wir 1 Reihe.

pysspark importieren
aus pysspark.SQL Import *
Spark_app = SparkSession.Erbauer.App Name('_').Getorcreate ()
Schüler = [(4, 'Sravan', 23, 'Php', 'Testing'),
(4, 'Sravan', 23, 'php', 'testing'),
(46, 'Mounika', 22, ','.Net ',' html '),
(4, "Deepika", 21, "Oracle", "Html"),
(46, 'Mounika', 22, 'Oracle', 'Testing'),
(12, 'Chandrika', 22, 'Hadoop', 'C#'),
(12, "Chandrika", 22, "Oracle", "Testing"),
(4, 'sravan', 23, oracle ',' c#'),
(4, 'Deepika', 21, 'Php', 'C#'),
(46, 'Mounika', 22, ','.Net ',' testing ')
]
DataFrame_OBJ = Spark_App.Createdataframe (Studenten, ['Subjekt_ID', 'Name', 'Alter', 'Technology1', 'Technology2'])
print ("---------- tatsächliche Datenfreque ----------")
DataFrame_OBJ.zeigen()
# Importieren Sie die Fensterfunktion
aus pysspark.sql.Fensterimportfenster
#Port der Verzögerung von PYSPARK.sql.Funktionen
aus pysspark.sql.Funktionen importieren Verzögerungen
#Partition den Datenrahmen basierend auf den Werten in der Spalte Technologie1 und
#Bestand die Zeilen in jeder Partition basierend auf der Spalte Subjekt_ID
Partition = Fenster.partitionby ("Technology1").OrderBy ('Subjekt_ID')
print ("---------- partitionierte Datenframe ----------")
#Now erwähnen Sie die Verzögerung mit Offset-1 basierend auf Subjekt_ID
DataFrame_OBJ.WithColumn ("Lag", Verzögerung ("Subjekt_ID", 1).über (Partition)).zeigen()

Ausgang:

Erläuterung:

In der ersten Ausgabe repräsentiert die tatsächlichen Daten im Datenrahmen. In der zweiten Ausgabe erfolgt die Partition basierend auf dem Technologie1 Spalte.

Die Gesamtzahl der Partitionen beträgt 4.

Partition 1:

Der .Das Netz trat zweimal in der ersten Partition auf. Da wir den Verlierer als 1 angegeben haben, die erste .Nettowert ist null und der nächste .Der Nettowert ist der vorherige Zeilen -Subjekt_ID -Wert - 46.

Partition 2:

Hadoop ereignete sich einmal in der zweiten Partition. Lag ist also null.

Partition 3:

Oracle trat viermal in der Dritten Partition auf.

Für das erste Orakel ist Lag null.

Für das zweite Orakel beträgt der Verzögerungswert 4 (da der vorherige Zeilenwert 4 beträgt 4).

Für das dritte Orakel beträgt der Verzögerungswert 4 (da der vorherige Zeilenwert 4 beträgt 4).

Für das vierte Orakel beträgt der Verzögerungswert 12 (da der Wert der vorherigen Zeile beträgt.

Partition 4:

PHP trat dreimal in der vierten Partition auf.

Der Verzögerungswert für den 1. PHP ist null.

Der Verzögerungswert für den 2. PHP beträgt 4 (da der vorherige Zeilenwert 4 beträgt 4).

Der Verzögerungswert für den 3. PHP beträgt 4 (da der vorherige Zeilenwert 4 beträgt 4).

Beispiel 2:

Verzögern Sie die Reihen um 2. Stellen Sie sicher, dass Sie den PYSPark -Datenfreame erstellt haben, wie in Beispiel 1 zu sehen.

# Importieren Sie die Fensterfunktion
aus pysspark.sql.Fensterimportfenster
#Port der Verzögerung von PYSPARK.sql.Funktionen
aus pysspark.sql.Funktionen importieren Verzögerungen
#Partition den Datenrahmen basierend auf den Werten in der Spalte Technologie1 und
#Bestand die Zeilen in jeder Partition basierend auf der Spalte Subjekt_ID
Partition = Fenster.partitionby ("Technology1").OrderBy ('Subjekt_ID')
print ("---------- partitionierte Datenframe ----------")
#Now erwähnen Sie Verzögerung mit Offset-2 basierend auf Subjekt_ID
DataFrame_OBJ.WithColumn ("Lag", Verzögerung ("Subjekt_ID", 2).über (Partition)).zeigen()

Ausgang:

Erläuterung:

Die Partition basiert auf dem Technologie1 Spalte. Die Gesamtzahl der Partitionen beträgt 4.

Partition 1:

Der .Das Netz trat zweimal in der ersten Partition auf. Da wir den Lag-Offset als 2 angegeben haben, ist der Offset für beide Werte null.

Partition 2:

Hadoop ereignete sich einmal in der zweiten Partition. Lag ist also null.

Partition 3:

Oracle trat viermal in der Dritten Partition auf.

Für das erste und zweite Orakel ist Lag null.

Für das dritte Orakel beträgt der Verzögerungswert 4 (da die vorherigen 2 Zeilen der Subjekt_ID 4 sind).

Für das vierte Orakel beträgt der Verzögerungswert 4 (da die vorherigen 2 Zeilen der Subjekt_ID 4 sind).

Partition 4:

PHP trat dreimal in der vierten Partition auf.

Der Verzögerungswert für den 1. und 2. PHP ist null.

Der Verzögerungswert für den 3. PHP beträgt 4 (da der vorherige Wert der letzten 2 Zeilen 4 beträgt 4).

Beispiel 3:

Verzögern Sie die Zeilen um 2 basierend auf der Altersspalte. Stellen Sie sicher, dass Sie den PYSPark -Datenfreame erstellt haben, wie in Beispiel 1 zu sehen.

# Importieren Sie die Fensterfunktion
aus pysspark.sql.Fensterimportfenster
#Port der Verzögerung von PYSPARK.sql.Funktionen
aus pysspark.sql.Funktionen importieren Verzögerungen
#Partition den Datenrahmen basierend auf den Werten in der Spalte Technologie1 und
#Bestand die Zeilen in jeder Partition basierend auf der Altersspalte
Partition = Fenster.partitionby ("Technology1").OrderBy ('Alter')
print ("---------- partitionierte Datenframe ----------")
#Now erwähnen Sie Verzögerungen mit Offset-2 basierend auf dem Alter
DataFrame_OBJ.With Column ("Lag", Verzögerung ("Alter", 2).über (Partition)).zeigen()

Ausgang:

Erläuterung:

Die Partition basiert auf dem Technologie1 Spalte und Verzögerung werden basierend auf der Altersspalte definiert. Die Gesamtzahl der Partitionen beträgt 4.

Partition 1:

Der .Das Netz trat zweimal in der ersten Partition auf. Da wir den Lag-Offset als 2 angegeben haben, ist der Offset für beide Werte null.

Partition 2:

Hadoop ereignete sich einmal in der zweiten Partition. Lag ist also null.

Partition 3:

Oracle trat viermal in der Dritten Partition auf.

Für das erste und zweite Orakel ist Lag null.

Für das dritte Orakel beträgt der Verzögerungswert 21 (der Alterswert aus den beiden vorherigen Zeilen beträgt 21).

Für das vierte Orakel beträgt der Verzögerungswert 22 (der Alterswert aus den beiden vorherigen Zeilen beträgt 22).

Partition 4:

PHP trat dreimal in der vierten Partition auf.

Der Verzögerungswert für den 1. und 2. PHP ist null.

Der Verzögerungswert für den 3. HP beträgt 21 (der Alterswert aus den beiden vorherigen Zeilen beträgt 21).

Abschluss

Wir haben gelernt, wie man die LAG -Werte im PYSPARK -Datenframe in partitionierten Zeilen bekommt. Die LAG () -Funktion in pySpark ist im Fenstermodul verfügbar, mit dem die vorherigen Zeilenwerte an die aktuellen Zeilen zurückgegeben werden können. Wir haben die verschiedenen Beispiele gelernt, indem wir die verschiedenen Offsets festgelegt haben.