3.4 Ein Beispiel mit Spark

Überlegungen für das Beispiel in Spark

Wir werden in unserem Beispiel nur Python als Sprache verwenden. Warum?

  • Python ist deutlicher einfacher zu verstehen. Immerhin hast du hier in diesem Tutorial das Ziel, Hadoop zu verstehen und nicht noch andere Programmiersprachen und 
  • mit Python musst du nicht alles neu kompilieren, wie man es von Java kennt,
  • … aber Python läuft nur sehr langsam in Spark in Vergleich zu Java, weil Spark in der Programmiersprache Scala geschrieben ist und Scala den Code auch kompiliert, sodass Javacode schneller ausgeführt und interpretiert werden kann.

Nachfolgend dazu ein Vergleich zwischen Scala-Code in Spark und Python-Code in Spark. Wir vergleichen den Code, um die Quadratzahlen von einem vorgegebenen Array zu berechnen.

Python:

nummern = sc.parallelize([1,2,3,4])
quadratzahl = nummern.map(lambda x: x*x).collect()

Scala:

val nummern = sc.parallelize(LIST(1,2,3,4))
val quadratzahl = nummern.map(x => x*x).collect()

Beispiel: Die Filme mit der durchschnittlich schlechtesten Bewertung finden

Wir wollen nun mit Spark herausfinden, welche Filme im Durchschnitt die schlechtesten Bewertungen bekommen haben. Dazu schreiben wir uns nun ein Python-Skript. Das Python-Skript sieht folgendermaßen aus:

from pyspark import SparkConf, SparkContext

# Diese Funktion erstellt wie eine Art "Python Lexikon",  welches wir später verwenden können, 
# um eine movieID zu Filmnamen umwandeln zu können

def loadMovieNames():
    movieNames = {}
    with open("ml-100k/ratings.csv") as f:
        for line in f:
            fields = line.split(',')
            movieNames[int(fields[0])] = fields[1]
    return movieNames

# Der folgende Teil konvertiert jede Zeile der ratings.csv zu (movieID, (rating, 1.0)). 
# Auf diesem Weg können wir dann alle Bewertungen eines Filmes zusammenführen und 
# bekommen so die Anzahl der Bewertungen für jeden Film (, um den Durchschnitt berechnen zu können)

def parseInput(line):
    fields = line.split()
    return (int(fields[1]), (float(fields[2]), 1.0))

if __name__ == "__main__":
    # Das Hauptskript - Erstellen des SparkContext zum Bearbeiten
    conf = SparkConf().setAppName("WorstMovies")
    sc = SparkContext(conf = conf)

    # movieID in die Filmtiteltabelle laden
    movieNames = loadMovieNames()

    # Laden der ratings.csv
    lines = sc.textFile("hdfs:///user/maria_dev/ml-100k/ratings.csv")

    # Konvertieren zu (movieID, (rating, 1.0))
    movieRatings = lines.map(parseInput)

    # Reduzieren auf (movieID, (sumOfRatings, totalRatings))
    ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )

    # Zusammenführen auf (movieID, averageRating)
    averageRatings = ratingTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])

    # Nach durchschnittlicher Bewertung sortieren
    sortedMovies = averageRatings.sortBy(lambda x: x[1])

    # Die ersten 20 Resultate anzeigen lassen
    results = sortedMovies.take(20)

    # Ausgabe des Ergebnisses
    for result in results:
        print(movieNames[result[0]], result[1])

Öffne bitte anschließend Ambari über deinen Adminaccount unter localhost:8080, da wir noch eine Einstellungsänderung bei Spark vornehmen müssen. Gehe danach bitte zu „Spark“ –> „Configs“. Klicke dort bitte auf „Advanced spark2-log4j-properties“. Im Bild siehst du das auch nochmal:

Ändere bitte hier die Zeile `log4j.rootCategory=INFO, console` in

log4j.rootCategory=ERROR, console

Wir ändern damit das LogLevel bei Spark. Nachdem du den Eintrag verändert hast, gehe bitte auf „Save“ (grüner Button oben rechts). Bei der Frage nach der Änderung der Konfiguration kannst du in die Box eintragen, was du möchtest. Gehe danach nochmals auf „Save“.

Wenn Warnungen bei dir auftauchen, kannst du diese ignorieren und auf „Proceed anyway“ klicken.

Die Änderung sollte nun gespeichert sein, du bekommst dazu auch eine Erfolgsmeldung.

Gehe danach bitte auf „Actions“ (unten links) und dann auf „Restart All Required“, um alle Services, die durch diese Änderung neu gestartet werden müssen, neu zu starten. Dies musst du mit „Confirm Restart All“ nochmals bestätigen.

Du bekommst danach eine Zustandsanzeige. Bitte warte, bis diese mit der obersten Leiste auf 100% gegangen ist und sich grün färbt. Danach kannst du auf OK. Der Pfeil als Anzeige des notwendigen Neustarts sollte nun verschwunden sein. Falls du nun einen Fehler im Ambari mittels einer roten Zahl neben dem Service „Spark2“ zu sehen bekommst: Keine Sorge, dieser sollte nach ein paar Sekunden auch wieder verschwinden.

Leider kann Spark nicht mit dem Trennzeichen umgehen, welches bei unseren MovieLens-Daten vorliegt (das Komma). Aus diesem Grund stelle ich dir für das Spark-Kapitel eine angepasste Version zur Verfügung. Im Folgenden laden wir diesen Datensatz herunter und laden ihn in das HDFS. (Anmerkung: Der Datensatz ist etwas älter und reduziert, deswegen stimmen die Größen im Vergleich beider Dateien nicht überein.) 

Bitte logge dich nun mittels Putty in deiner VirtualBox ein. Bitte lade dir nun das Python-Skript, welches ich dir vor Kurzem vorgestellt habe, auf dein System herunter. Außerdem lädst du bitte den veränderten Datensatz herunter. Bitte führe dazu alle folgenden Befehle nacheinander aus. 

# Download des Python-Skriptes mit dem Spark-Programm
wget https://jasonadam.de/hadoop/dateien/SchlechtesterFilmSpark.py

# Download des angepassten movies.csv-Datensatzes
wget https://jasonadam.de/hadoop/dateien/angepassterDatensatz/movies_changed.csv

# Download des angepassten ratings.csv-Datensatzes
wget https://jasonadam.de/hadoop/dateien/angepassterDatensatz/ratings_changed.csv

# den angepassten Datensatz in HDFS hochladen
hadoop fs -copyFromLocal ratings_changed.csv ml-100k/ratings_changed.csv

Nachdem du nun alle Vorbereitungen durchgeführt hast, kannst du das Programm in Spark starten:

spark-submit SchlechtesterFilmSpark.py

Du siehst hier die schlechtesten Filme aus dem Datensatz von Oktober. Aus diesem Grund wird es auch Abweichungen zu den Daten in deinem Datensatz geben. Wenn du die Dauer zur Verarbeitung mal vergleichst mit denen zu Pig oder Map-Reduce, sind wir hier mit Spark deutlich schneller gewesen.


Weiter geht es mit: 4.1 Die Grundlagen von Hive