搜索
查看: 2466|: 0

hadoop中槽-slot是线程还是进程讨论

[复制链接]

152

主题

47

回帖

3015

积分

管理员

积分
3015
发表于 2014-2-5 13:43:31 | 显示全部楼层 |阅读模式
hadoop一个节点默认起两个map slot,这两个slot是多线程吗?

hadoop-0.21.0 源码中是这样的:

首先看看 org.apache.hadoop.mapred.TaskTracker 类:
=====================================================================================
> 两个类变量 maxMapSlots 和 maxReduceSlots:

    1. <li>maxMapSlots = conf.getInt(TT_MAP_SLOTS, 2);</li><li>    maxReduceSlots = conf.getInt(TT_REDUCE_SLOTS, 2);</li>
    复制代码

  其中

    1. <li>   public static final String TT_MAP_SLOTS =  "mapreduce.tasktracker.map.tasks.maximum";</li><li>    public static final String TT_REDUCE_SLOTS = "mapreduce.tasktracker.reduce.tasks.maximum";</li>
    复制代码

> 类方法 initializeMemoryManagement() 中 ,根据 slots 来决定申请内存的大小

    1. <li>totalMemoryAllottedForTasks =</li><li>        maxMapSlots * mapSlotMemorySizeOnTT + maxReduceSlots</li><li>            * reduceSlotSizeMemoryOnTT;</li>
    复制代码

> 类方法 TaskTracker.initialize() 中会起两个 TaskLauncher 线程,分别负责启动 Mapper 和 Reduce 任务:
  1. <ul class="litype_1" type="1"><li>    mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);</li><li>    reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
  2. </li></ul>
复制代码

=====================================================================================
再看看 org.apache.hadoop.mapred.TaskTracker.TaskLauncher 类,它负责启动 Mapper/Reducer 任务。

> 初始化 TaskLauncher 时,需要传入 slots 的数量:
-------------------------------------------

    1. <li>    public TaskLauncher(TaskType taskType, int numSlots) {</li><li>      ... ... </li><li>      this.maxSlots = numSlots;</li><li>      this.numFreeSlots = new IntWritable(numSlots);</li><li>      ... ... </li><li>   
    2. </li>
    复制代码

  特别要注意 numFreeSlots 这个类变量:
      private IntWritable numFreeSlots;
-------------------------------------------
> TaskLauncher.run() 中,循环地看是否有新的 Task 需要启动,并且看是否有足够的 slots 可用:
  1. <font color="rgb(102, 102, 102)"><font style="background-color:rgb(247, 247, 247)"><ul class="litype_1" type="1"><li>  while () {</li><li>    while (numFreeSlots.get() < task.getNumSlotsRequired()) {</li><li>       .......</li><li>    }</li><li>    numFreeSlots.set(numFreeSlots.get() - task.getNumSlotsRequired());   // 用完了就减掉</li><li>  }
  2. </li></ul></font></font>
复制代码

> Task 执行完了以后,需要释放 slots :

    1. <li>    public void addFreeSlots(int numSlots) {</li><li>      ... ...</li><li>      numFreeSlots.set(numFreeSlots.get() + numSlots);</li><li>      ... ...</li><li>   
    2. </li>
    复制代码

=====================================================================================

综合上面看, slots 只是一个逻辑值 ( org.apache.hadoop.mapred.TaskTracker.TaskLauncher.numFreeSlots ),而不是对应着一个线程或者进程。TaskLauncher 会维护这个值,以保证资源使用在控制范围内。


代码可见 : org.apache.hadoop.mapred.TaskTracker.TaskLauncher.run()  。

Mapper 和 Reducer 都是单独的进程,但是它们与 slots 的关系是这样的:

    1. <li>org.apache.hadoop.mapred.TaskTracker.TaskLauncher.run() {</li><li>          ... ...</li><li>          //got a free slot. launch the task</li><li>          startNewTask(tip);</li><li>          ... ...</li><li>
    2. </li>
    复制代码

这里的 slots 有点类似 “令牌” 的感觉:申请资源,先获得令牌;释放资源,交还令牌。

> mapper 和 reducer 都是单独的进程?好像有点不对,是单独的线程吧?


是单独的进程。

启动Mapper/Reducer的总的调用路径是:

    1. <li>org.apache.hadoop.mapred.TaskTracker.TaskLauncher.run()</li><li>-></li><li>org.apache.hadoop.mapred.TaskTracker.startNewTask()</li><li>-></li><li>org.apache.hadoop.mapred.TaskTracker.launchTaskForJob()</li><li>-></li><li>org.apache.hadoop.mapred.TaskTracker.TaskInProgress.launchTask() </li><li>-></li><li>org.apache.hadoop.mapred.Task.createRunner()  //  抽象方法,具体实现在子类 MapTask 和 ReduceTask 中</li><li>   |-> org.apache.hadoop.mapred.MapTask.createRunner()  // 创建 MapTaskRunner 类实例</li><li>   |-> org.apache.hadoop.mapred.ReduceTask.createRunner()  // 创建 ReduceTaskRunner 类实例</li>
    复制代码


最终,跟踪到了 MapTaskRunner 和 ReduceTaskRunner 这两个类。

至此,我们看看它们的父类 org.apache.hadoop.mapred.TaskRunner ,以下是类的说明:
  1. <ul class="litype_1" type="1"><li>/** Base class that runs a task in a separate process.  Tasks are run in a</li><li>* separate process in order to isolate the map/reduce system code from bugs in</li><li>* user supplied map and reduce functions.</li><li>*/
  2. </li></ul>
复制代码



TaskRunner 虽然 extends Thread (看起来是个线程),但是真正启动Mapper和Reduce进程的代码在函数 TaskRunner.run() 中:

    1. <li>  public final void run() {</li><li>      ... ... </li><li>      launchJvmAndWait(setup, vargs, stdout, stderr, logSize, workDir, env);</li><li>      ... ... </li><li>  }</li>
    复制代码



其调用了 TaskRunner.launchJvmAndWait() 方法(在此之前还有些创建文件夹、设置配置参数和环境变量等准备性的操作):

    1. <li>  void launchJvmAndWait(List<String> setup, Vector<String> vargs, File stdout,</li><li>      File stderr, long logSize, File workDir, Map<String, String> env)</li><li>      throws InterruptedException {</li><li>    jvmManager.launchJvm(this, jvmManager.constructJvmEnv(setup, vargs, stdout,</li><li>        stderr, logSize, workDir, env, conf));</li><li>    synchronized (lock) {</li><li>      while (!done) {</li><li>        lock.wait();</li><li>      }</li><li>    }</li><li>  }</li>
    复制代码

上面代码主要是 launch 一个 java虚拟机进程。这也是Hadoop启动代价很高的原因,因为launch虚拟机是比较耗资源的;于是又提供了Task JVM Reuse机制。

单独起进程的原因也说得很清楚,就是: isolate the map/reduce system code from bugs in user supplied map and reduce functions。其实就是,通过使用不同的进程空间,进行隔离,防止用户提供的代码中有bug死掉后,造成 TaskTracker  所在进程也死掉(这个死掉了,效果就跟阿凡达里面的发光树被毁了一样)。


Hadoop-0.20.2源码中的实现基本也是差不多的。

大数据中国(http://www.bigdatas.cn),以数据的力量改变生活!
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

大数据中国微信

QQ   

版权所有: Discuz! © 2001-2013 大数据.

GMT+8, 2025-1-28 00:53 , Processed in 0.173547 second(s), 24 queries .

快速回复 返回顶部 返回列表