Translate

неділю, 31 січня 2016 р.

Apache Spark for Newbie


Вступ (Лінивим можна пропустити)

    Врамках Big Data тренінгу я почав вивчати роботу з Apach Spark computing framework. В інтернеті є достатня кількість прикладів побудови пайплайнів на різних мовах програмування (наприклад тут), проте я ніде не зміг найти хорошого гайду чи прикладу як написати проект з "нуля" і запустити його виконання на кластері. Усі що я знаходив приклади використовували локальний (тестовий) режим мастера , таким чином ніякої дистрибуції логіки не відбувалося і відчути що таке Apache Spark в повній мірі мені не вдавалося. До дня коли я прослухав тренінг людини, що працювала з цим інструментом в продакшені, саме він вніс ясність в те як Apache Spark працює (дякую тобі Тарас Матяшовський). Отже, після того як я в цьому розібрався я вирішив залишити ці знання на просторах Java User Group для всіх кому цікаво і для самого себе, щоб не забути ;-) Також, надіюся з заголовку статті зрозуміло що тут буде йтись лише про поверхневе ознайомлення з спарком і основна мета це зробити усе правильно. Тут буде використаний Stand Alone Spark Master який не потрібно використувувати в продакшені, для цого ви може використати Hadoop Yarn.

Опис задачі

    Задача взята з документації ApacheSpark - підрахунок слів у текстовому файлі. Проте зробимо це по законам жанру:
1. Дистрибутивна бібліотека (саме та яка розлетиться по воркерах кластеру і буде робити усю роботу);
2. Spark Driver (іншими словами це і буде наш клієнт через який ми доступатимемось до мастера і де ми будемо будувати наші пайплайни);
3. Web application (Наша веб аплікація - ендпоінти які ми будемо викликати щоб виконати ту чи іншу роботу, ну і щоб Spark Application UI був доступний і ми змогли подивитись статистику виконання роботи).

    Надіюся усе зрозуміло , проте якщо й ні , то далі постараюся усе розписати як найкраще. Поїхали !

Створення архітектури проектів

    Архітектура нашого проекту буде складатися з трьох модулів, а саме :
1) Distributed JAR
2) Spark Driver
3) Web Application
Загальна, архітектура зображена нижче :

    Для збірки проектів я буду використовувати Apache Maven, і почнемо з того що створемо батьківський проект який назвемо "spark-for-newbie" і додамо туди pom.xml файл. Батьківський pom.xml файл повинен включати усі вище згадані модулі :

<modules>
        <module>distributed-library</module>
        <module>web-api</module>
        <module>spark-driver</module>
</modules>

    Тепер підключаємо до нього основні залежності :
- Залежності на наші бібліотечні модулі :
<dependency>
        <groupId>org.ar.spark.newbie</groupId>
        <artifactId>spark-client</artifactId>
        <version>${project.version}</version>
</dependency>
<dependency>
        <groupId>org.ar.spark.newbie</groupId>
        <artifactId>distributed-library</artifactId>
        <version>${project.version}</version>
</dependency>

- Також залежність на останню (на момент публікації) версію Apache Spark :
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.6.0</version>
    <exclusions>
        <exclusion>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Тепер створюємо наші модулі в середині батьківського проекту:

distributed-library
    Додаємо до цього проекту лише залежність на Apache Spark , так як ця бібліотека міститиме класи функцій з його пакету, що будуть виконуватись на різних воркерах нашого кластеру.
<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
      <scope>provided</scope>
</dependency>

spark-driver
    Цей проект також буде бібліотекою яку ми використаємо у наступному web-api модулі. Він міститиме Spark Conext та залежність на функції, що будуть виконувати логіку нашого обчислення, тому нам потрібні наступні залежності :
<dependency>
      <groupId>org.ar.spark.newbie</groupId>
      <artifactId>distributed-library</artifactId>
</dependency>
<dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.10</artifactId>
</dependency>

web-api
    Даний модуль буде нашою основною аплікацією, він повинен містити залежність на наш Spark Driver а також на дуже цікаву бібліотеку "Java Spark", завдяки якій ми зробимо REST Endpoint всього в одну лінійку коду. Spark Java, немає нічого спілького з Big Data та Apache Spark, також вона заслуговує окремої статті щоб описати її можливості.
<dependency>
      <groupId>org.ar.spark.newbie</groupId>
      <artifactId>spark-driver</artifactId>
      <version>1.0-SNAPSHOT</version>
</dependency>
<dependency>
      <groupId>com.sparkjava</groupId>
      <artifactId>spark-core</artifactId>
      <version>2.3</version>
</dependency>

Розробка дистрибутивної бібліотеки (distributed-library)

    Для того, щоб підрахувати кількість слів у текстовому файлі ми повинні зробити наступний пайплайн :


    Отже нам треба буде написати 4 функції, 3 map і 1 reduce а також клас в який ми будемо мапити key-value рузультати і повернемо результати з воркера назад в аплікацію.

    SeparateWordLinesFunction.java - її задача отримати на вхід об'єкт типу String котрий є вичитаною лінією тексту з текстового файлу і розбити на окремі слова , тобто повернути список об'єктів String. Для цього, клас функції робитиме наступне :

public class SeparateWordLinesFunction implements FlatMapFunction<String, String> {
  @Override
  public Iterable<String> call(String s) throws Exception {
    return Arrays.asList(s.split(" "));
  }
};

    MapWordsToKeyValueFunction.java - повинна замапити кожне слово до лічіильника, який по замовчуванню буде "1", таким чином ми отримаємо key-value структуру даних. Для того, щоб тримати таку структуру буде використано об'єкт Tuple2 з пакету Scala бібліотеки Apache Spark : 

public class MapWordsToKeyValueFunction implements PairFunction<String, String, Integer> {
  @Override
  public Tuple2<String, Integer> call(String s) throws Exception {
    return new Tuple2<String, Integer>(s, 1);
  }
};

    ReduceKeyValueWordsByKey.java - reduce операція по ключу нашої структури, тут ми повинні вирішити що будемо робити з значенням і в нашому випадку ми його просумуємо :

public class ReduceKeyValueWordsByKey implements Function2<Integer, Integer, Integer> {
  @Override
  public Integer call(Integer intVal1, Integer intVal2) throws Exception {
    return intVal1 + intVal2;
  }
}

    MapKeyValueWordsToWrapperObject.java - функція що перетворить key-value структуру в більш лояльний для подальшої роботи об'єкт :

public class MapKeyValueWordsToWrapperObject implements Function<Tuple2<String,Integer>, WordResult> {
  @Override
  public WordResult call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
    return new WordResult(stringIntegerTuple2._1, stringIntegerTuple2._2);
  }
}

   WordsResult.java - звичайний POJO об'єкт котрий містить поля для самого слова і його лічильника. Потрібно лише імплементувати інтерфейс Serializable (для надання можливості серіалізувати його при передачі по мережі) і особисто від себе я перевизначив метод toString так як буду виводи об'єкти напряму до користувача.

Розробка  Spark Driver бібліотеки (spark-driver)

    Тут ми створемо SparkContext через який отримаємо доступ до мастера і задекларуємо наш пайплайн.

    Створення  SparkContext :
Для цього необідно створити SparkConf - конфігураційних об'єкт якому ми передамо назву нашої аплікації, шлях до мастера та шлях до скомпільованого JAR файлу який Spark розповсюдить на воркери де будуть виконуватись етапи нашого пайплайну :

private String[] distributedJars = new String[]{"/<path_to_workspace>/distributed-library/target/distributed-library-1.0-SNAPSHOT.jar"};

private JavaSparkContext sparkContext;

public SparkDriver(){
    SparkConf sparkConf = new SparkConf().setAppName("SparkForNewbie")
.setJars(distributedJars).setMaster("spark://127.0.0.1:7077");
    this.sparkContext = new JavaSparkContext(sparkConf);
}

    Імплементація Pipeline :

public List<WordResult> countWordsFromFile(String filePath){
        JavaRDD<String> words = sparkContext.textFile(filePath);
        return words
            .flatMap(new SeparateWordLinesFunction())
            .mapToPair(new MapWordsToKeyValueFunction())
            .reduceByKey(new ReduceKeyValueWordsByKey())
            .map(new MapKeyValueWordsToWrapperObject())
            .collect();
}

    З імплементації пайплайну чітко видно нашу попередню схему , де я показував як він виглядатиме. Усі функції підтянуті через залежність і знаходяться в distributed-library.

Розробка Веб аплікації

    Цей модуль потрібен нам саме для взаємодії нашого проекту з кінцевим користувачем. Тут ми зробимо енд-поінт який користувач зможе викликати в браузері і запустити на виконання Spark Pipeline, проте найцікавіше в цьому модулі це використання Spark Java, за допомогою якої ми реалізуємо енд-поінт :

get("/data", (request, response) -> dataService.countWords())

    Угу, саме так, 1 рядок коду і при старті web-api Spark Java розгорне Embedded Jetty і реалізує "/data" енд-поінт.  Далі, при виклику нашого енд-поінту викликаємо сервіс який містить в собі інстанс SparkDriver класу і виконує наш пайплайн передаючи шлях до піддослідного файлу:

public class DataService {

  private static final String TEXT_FILE_PATH = "/workspace/projects/spark-for-newbie/test.txt";
  private SparkDriver sparkDriver = new SparkDriver();

  public List<WordResult> countWords(){
    return sparkDriver.countWordsFromFile(TEXT_FILE_PATH);
  }

}

    Ось і все , усі модулі нашого проекту розписані , і можна переходити до тестування.

Тестування

Запуск Apache Master (stand alone version)
    Для цього качаємо дистрибутив Apache Spark (я викачував spark-1.6.0-bin-hadoop2.6.tgz) і йдемо у папку "conf" що в середині.
Тут потрібно змінити розширення файлу "spark-env.sh.template" на "spark-env.sh", зайти в середину і додати кілька рядків для конфігурації :

export SPARK_LOCAL_IP=127.0.0.1

export SPARK_MASTER_IP=127.0.0.1
export SPARK_MASTER_PORT=7077
export SPARK_MASTER_WEBUI_PORT=8082

export SPARK_WORKER_MEMORY=2g
export SPARK_WORKER_CORES=2
export SPARK_WORKER_WEBUI_PORT=8083

(Опис усіх цих та інших можливих опцій присутні у цьомуж файлі)

    Тепер йдемо назад і переходимо у папку "sbin", тут виконуємо "./start-master.sh " і переходимо в браузері по шляху "http://localhost:8082/", тут має бути розгорутий ЮІ нашого мастера :


    Далі запускаємо воркер (так, так це не кластер адже все робиться на одній машині, але спарк про це не знає, він має все що необхідно для роботи у розподіленому режимі - мастер а мастер має воркер). Виконуємо "/start-slave.sh 127.0.0.1:7077" і переходимо по шляху "http://localhost:8083/" :

    Тепер у нас є все щоб запустити нашу аплікацію, йдемо в мейн метод модуля web-api та запускаємо його. В логах розгортання проекту ви повинні побачити як спарк розгорає ЮІ нашої аплікації :


    Тут буде відображатися найцікавіша інформація виконання нашого пайплайну.

    Переходимо по цьому шляху і повинні побачити :


    Щож, тепер усе готово, відкриваємо нову вкладку в браузері і переходимо на наш енд-поінт :

http://localhost:4567/data
    
    Якщо ж все було зроблено правильно ви повинні побачити результат нашого обчислення :



    P.S.

    Якщо ж у вас щось не вийшло або не спрацювало, ви можете знайти сорси мого проекту тут . Також ділюся з вами сорсами проекту Тараса, котрі я використовував в довідкових цілях коли знайомився з Apache Spark, вдачі !

 

Немає коментарів:

Дописати коментар