Harp User Guide Samples API Downloads Papers Contacts

Harp Project

Recent newly designed big data processing tools focus on the abstraction of data and related computation flows. For example, Hadoop MapReduce defines data as key-value pairs and computation as Map-Reduce tasks. Pregel/Giraph describes data as vertices and edges in graphs, while computations are iterations of BSP fashion. Spark abstracts data as RDDs with transformation operations on top of them. However, communication patterns are not abstracted and defined in these tools. On the contrary, traditional distributed data processing tools represented by MPI have abstractions on communication patterns, which is called collective communication. But this kind of abstraction in MPI is limited. Collective communication in MPI is still based on arrays and buffers. There is no abstraction for more complicated data such as key-values, vertices and edges in those tools mentioned previously. As a result, related communication patterns on these data abstractions such as shuffling on key-values or graph communication based on edges and vertices also do not exist.

To improve the expressiveness and performance in big data processing, here we present Harp, which provides data abstractions and communication abstractions on top of them. It can be plugged into Hadoop runtime to enable efficient in-memory communication to avoid HDFS read/write. Harp has the following features: hierarchal data abstraction, collective communication model, pool based memory management, BSP style Computation Parallelism, and fault tolerance support with checkpointing.

Hierarchal Data Abstraction
Fig 1: Hierarchal Data Abstraction
  1. Hierarchal data abstraction
    The data abstraction has 3 categories horizontally and 3 levels vertically (see Figure 1). Horizontally, data is abstracted as arrays, key-values or vertices, edges and messages in graphs. Vertically, there are two basic types at the lowest level: arrays and objects. At the middle level, they are wrapped as partitions, such as arrays, key-value and graph partitions. At the top level, partitions form tables. Tables are identified through table IDs. Partitions belonging to the tables with the same ID on different workers are considered to be one dataset. In this way, tables are seen as a distribution description of a dataset. Collective communication operations are defined on tables and partitions.
  2. Collective communication model
    Collective communication is defined as movement of partitions within tables. Currently three different types of collective communication patterns are supported. The first type is inherited from MPI collective communication operations such as broadcast and allgather. The second type of collective communication is inherited from Hadoop MapReduce communication patterns. That is, regroup with combining and reducing support. Allreduce from MPI collectives is also implemented based on regroup + allgather. The third type of collective communication is abstracted from graph communication, such as sending messages to vertices and moving edges to messages. This type of collective communication is close to regrouping, but the regrouping destination is based on the distribution of another data set.
  3. Memory management
    Serialization and de-serialization are two expensive operations before data sending and receiving. Java default ByteArrayOutputStream is slow in serialization, so both are done on byte arrays directly. Since byte arrays are frequently used in sending and receiving, they are cached in a resource pool.
  4. BSP style computation parallelism
    Collective communication requires synchronization. Hadoop scheduler is modified to schedule tasks in BSP style.
  5. Fault tolerance with checkpointing
    Based on the scale of time of execution, jobs with a large number of iterations are separated into a small number each with several iterations and then submitted to the cluster.
  6. Hadoop Plugin
    Harp works as a plugin in Hadoop. Currently it supports Hadoop-1.2.1 and Hadoop-2.2.0. To run map-collective jobs, users only need to put Harp jar package to the directory and configure it in Hadoop configuration files. The goal is to make Hadoop cluster can schedule original MapReduce jobs and Map-Collective jobs at the same time.

Funding and Leadership

This project is in part supported by National Science Foundation Grant OCI-1149432, and supervised by Judy Qiu, Assistant Professor of Computer Science, School of Informatics and Computing at Indiana University Bloomington.