搜索
查看: 2826|: 1

Hadoop添加节点的方法

[复制链接]

191

主题

18

回帖

694

积分

高级会员

积分
694
发表于 2014-2-26 00:21:54 | 显示全部楼层 |阅读模式
自己实际添加节点过程:
1. 先在slave上配置好环境,包括ssh,jdk,相关config,lib,bin等的拷贝;
2. 将新的datanode的host加到集群namenode及其他datanode中去;
3. 将新的datanode的ip加到master的conf/slaves中;
4. 重启cluster,在cluster中看到新的datanode节点;
5. 运行bin/start-balancer.sh,这个会很耗时间
备注:
1. 如果不balance,那么cluster会把新的数据都存放在新的node上,这样会降低mr的工作效率;
2. 也可调用bin/start-balancer.sh 命令执行,也可加参数 -threshold 5
   threshold 是平衡阈值,默认是10%,值越低各节点越平衡,但消耗时间也更长。
3. balancer也可以在有mr job的cluster上运行,默认dfs.balance.bandwidthPerSec很低,为1M/s。在没有mr job时,可以提高该设置加快负载均衡时间。

其他备注:
1. 必须确保slave的firewall已关闭;
2. 确保新的slave的ip已经添加到master及其他slaves的/etc/hosts中,反之也要将master及其他slave的ip添加到新的slave的/etc/hosts中
mapper及reducer个数
url地址: http://wiki.apache.org/hadoop/HowManyMapsAndReduces
HowManyMapsAndReduces
Partitioning your job into maps and reduces
Picking the appropriate size for the tasks for your job can radically change the performance of Hadoop. Increasing the number of tasks increases the framework overhead, but increases load balancing and lowers the cost of failures. At one extreme is the 1 map/1 reduce case where nothing is distributed. The other extreme is to have 1,000,000 maps/ 1,000,000 reduces where the framework runs out of resources for the overhead.
Number of Maps
The number of maps is usually driven by the number of DFS blocks in the input files. Although that causes people to adjust their DFS block size to adjust the number of maps. The right level of parallelism for maps seems to be around 10-100 maps/node, although we have taken it up to 300 or so for very cpu-light map tasks. Task setup takes awhile, so it is best if the maps take at least a minute to execute.
Actually controlling the number of maps is subtle. The mapred.map.tasks parameter is just a hint to the InputFormat for the number of maps. The default InputFormat behavior is to split the total number of bytes into the right number of fragments. However, in the default case the DFS block size of the input files is treated as an upper bound for input splits. A lower bound on the split size can be set via mapred.min.split.size. Thus, if you expect 10TB of input data and have 128MB DFS blocks, you'll end up with 82k maps, unless your mapred.map.tasks is even larger. Ultimately the [WWW] InputFormat determines the number of maps.
The number of map tasks can also be increased manually using the JobConf's conf.setNumMapTasks(int num). This can be used to increase the number of map tasks, but will not set the number below that which Hadoop determines via splitting the input data.
Number of Reduces
The right number of reduces seems to be 0.95 or 1.75 * (nodes * mapred.tasktracker.tasks.maximum). At 0.95 all of the reduces can launch immediately and start transfering map outputs as the maps finish. At 1.75 the faster nodes will finish their first round of reduces and launch a second round of reduces doing a much better job of load balancing.
Currently the number of reduces is limited to roughly 1000 by the buffer size for the output files (io.buffer.size * 2 * numReduces << heapSize). This will be fixed at some point, but until it is it provides a pretty firm upper bound.
The number of reduces also controls the number of output files in the output directory, but usually that is not important because the next map/reduce step will split them into even smaller splits for the maps.
The number of reduce tasks can also be increased in the same way as the map tasks, via JobConf's conf.setNumReduceTasks(int num).
自己的理解:
mapper个数的设置:跟input file 有关系,也跟filesplits有关系,filesplits的上线为dfs.block.size,下线可以通过mapred.min.split.size设置,最后还是由InputFormat决定。

较好的建议:
The right number of reduces seems to be 0.95 or 1.75 multiplied by (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum).increasing the number of reduces increases the framework overhead, but increases load balancing and lowers the cost of failures.
<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
  <value>2</value>
  <description>The maximum number of reduce tasks that will be run
  simultaneously by a task tracker.
  </description>
</property>

单个node新加硬盘
1.修改需要新加硬盘的node的dfs.data.dir,用逗号分隔新、旧文件目录
2.重启dfs

同步hadoop 代码
hadoop-env.sh
# host:path where hadoop code should be rsync'd from.  Unset by default.
# export HADOOP_MASTER=master:/home/$USER/src/hadoop

用命令合并HDFS小文件
hadoop fs -getmerge <src> <dest>

重启reduce job方法
Introduced recovery of jobs when JobTracker restarts. This facility is off by default.
Introduced config parameters "mapred.jobtracker.restart.recover", "mapred.jobtracker.job.history.block.size", and "mapred.jobtracker.job.history.buffer.size".
还未验证过。

IO写操作出现问题
0-1246359584298, infoPort=50075, ipcPort=50020):Got exception while serving blk_-5911099437886836280_1292 to /172.16.100.165:
java.net.SocketTimeoutException: 480000 millis timeout while waiting for channel to be ready for write. ch : java.nio.channels.SocketChannel[connected local=/
172.16.100.165:50010 remote=/172.16.100.165:50930]
        at org.apache.hadoop.net.SocketIOWithTimeout.waitForIO(SocketIOWithTimeout.java:185)
        at org.apache.hadoop.net.SocketOutputStream.waitForWritable(SocketOutputStream.java:159)
        at org.apache.hadoop.net.SocketOutputStream.transferToFully(SocketOutputStream.java:198)
        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendChunks(BlockSender.java:293)
        at org.apache.hadoop.hdfs.server.datanode.BlockSender.sendBlock(BlockSender.java:387)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.readBlock(DataXceiver.java:179)
        at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:94)
        at java.lang.Thread.run(Thread.java:619)

It seems there are many reasons that it can timeout, the example given in
HADOOP-3831 is a slow reading client.

解决办法:在hadoop-site.xml中设置dfs.datanode.socket.write.timeout=0试试;
My understanding is that this issue should be fixed in Hadoop 0.19.1 so that
we should leave the standard timeout. However until then this can help
resolve issues like the one you're seeing.

HDFS退服节点的方法
目前版本的dfsadmin的帮助信息是没写清楚的,已经file了一个bug了,正确的方法如下:
1. 将 dfs.hosts 置为当前的 slaves,文件名用完整路径,注意,列表中的节点主机名要用大名,即 uname -n 可以得到的那个。
2. 将 slaves 中要被退服的节点的全名列表放在另一个文件里,如 slaves.ex,使用 dfs.host.exclude 参数指向这个文件的完整路径
3. 运行命令 bin/hadoop dfsadmin -refreshNodes
4. web界面或 bin/hadoop dfsadmin -report 可以看到退服节点的状态是 Decomission in progress,直到需要复制的数据复制完成为止
5. 完成之后,从 slaves 里(指 dfs.hosts 指向的文件)去掉已经退服的节点

附带说一下 -refreshNodes 命令的另外三种用途:
2. 添加允许的节点到列表中(添加主机名到 dfs.hosts 里来)
3. 直接去掉节点,不做数据副本备份(在 dfs.hosts 里去掉主机名)
4. 退服的逆操作——停止 exclude 里面和 dfs.hosts 里面都有的,正在进行 decomission 的节点的退服,也就是把 Decomission in progress 的节点重新变为 Normal (在 web 界面叫 in service)

Hadoop添加节点的方法
自己实际添加节点过程:
1. 先在slave上配置好环境,包括ssh,jdk,相关config,lib,bin等的拷贝;
2. 将新的datanode的host加到集群namenode及其他datanode中去;
3. 将新的datanode的ip加到master的conf/slaves中;
4. 重启cluster,在cluster中看到新的datanode节点;
5. 运行bin/start-balancer.sh,这个会很耗时间
备注:
1. 如果不balance,那么cluster会把新的数据都存放在新的node上,这样会降低mr的工作效率;
2. 也可调用bin/start-balancer.sh 命令执行,也可加参数 -threshold 5
   threshold 是平衡阈值,默认是10%,值越低各节点越平衡,但消耗时间也更长。
3. balancer也可以在有mr job的cluster上运行,默认dfs.balance.bandwidthPerSec很低,为1M/s。在没有mr job时,可以提高该设置加快负载均衡时间。

其他备注:
1. 必须确保slave的firewall已关闭;
2. 确保新的slave的ip已经添加到master及其他slaves的/etc/hosts中,反之也要将master及其他slave的ip添加到新的slave的/etc/hosts中

hadoop 学习借鉴
1. 解决hadoop OutOfMemoryError问题:
<property>
   <name>mapred.child.java.opts</name>
   <value>-Xmx800M -server</value>
</property>
With the right JVM size in your hadoop-site.xml , you will have to copy this
to all mapred nodes and restart the cluster.
或者:hadoop jar jarfile [main class] -D mapred.child.java.opts=-Xmx800M

2. Hadoop java.io.IOException: Job failed! at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1232) while indexing.
when i use nutch1.0,get this error:
Hadoop java.io.IOException: Job failed! at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:1232) while indexing.
这个也很好解决:
可以删除conf/log4j.properties,然后可以看到详细的错误报告
我这儿出现的是out of memory
解决办法是在给运行主类org.apache.nutch.crawl.Crawl加上参数:-Xms64m -Xmx512m
你的或许不是这个问题,但是能看到详细的错误报告问题就好解决了

distribute cache使用
类似一个全局变量,但是由于这个变量较大,所以不能设置在config文件中,转而使用distribute cache
具体使用方法:(详见《the definitive guide》,P240)
1. 在命令行调用时:调用-files,引入需要查询的文件(可以是local file, HDFS file(使用hdfs://xxx?)), 或者 -archives (JAR,ZIP, tar等)
% hadoop jar job.jar MaxTemperatureByStationNameUsingDistributedCacheFile \
  -files input/ncdc/metadata/stations-fixed-width.txt input/ncdc/all output
2. 程序中调用:
   public void configure(JobConf conf) {
      metadata = new NcdcStationMetadata();
      try {
        metadata.initialize(new File("stations-fixed-width.txt"));
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
   }
另外一种间接的使用方法:在hadoop-0.19.0中好像没有
调用addCacheFile()或者addCacheArchive()添加文件,
使用getLocalCacheFiles() 或 getLocalCacheArchives() 获得文件

hadoop的job显示web
There are web-based interfaces to both the JobTracker (MapReduce master) and NameNode (HDFS master) which display status pages about the state of the entire system. By default, these are located at [WWW] http://job.tracker.addr:50030/ and [WWW] http://name.node.addr:50070/.

hadoop监控
OnlyXP(52388483) 131702
用nagios作告警,ganglia作监控图表即可

status of 255 error
错误类型:
java.io.IOException: Task process exit with nonzero status of 255.
        at org.apache.hadoop.mapred.TaskRunner.run(TaskRunner.java:424)

错误原因:
Set mapred.jobtracker.retirejob.interval and mapred.userlog.retain.hours to higher value. By default, their values are 24 hours. These might be the reason for failure, though I'm not sure

split size
FileInputFormat input splits: (详见 《the definitive guide》P190)
mapred.min.split.size: default=1, the smallest valide size in bytes for a file split.
mapred.max.split.size: default=Long.MAX_VALUE, the largest valid size.
dfs.block.size: default = 64M, 系统中设置为128M。
如果设置 minimum split size > block size, 会增加块的数量。(猜想从其他节点拿去数据的时候,会合并block,导致block数量增多)
如果设置maximum split size < block size, 会进一步拆分block。

split size = max(minimumSize, min(maximumSize, blockSize));
其中 minimumSize < blockSize < maximumSize.

sort by value
hadoop 不提供直接的sort by value方法,因为这样会降低mapreduce性能。
但可以用组合的办法来实现,具体实现方法见《the definitive guide》, P250
基本思想:
1. 组合key/value作为新的key;
2. 重载partitioner,根据old key来分割;
conf.setPartitionerClass(FirstPartitioner.class);
3. 自定义keyComparator:先根据old key排序,再根据old value排序;
conf.setOutputKeyComparatorClass(KeyComparator.class);
4. 重载GroupComparator, 也根据old key 来组合;  conf.setOutputValueGroupingComparator(GroupComparator.class);

small input files的处理
对于一系列的small files作为input file,会降低hadoop效率。
有3种方法可以将small file合并处理:
1. 将一系列的small files合并成一个sequneceFile,加快mapreduce速度。
详见WholeFileInputFormat及SmallFilesToSequenceFileConverter,《the definitive guide》, P194
2. 使用CombineFileInputFormat集成FileinputFormat,但是未实现过;
3. 使用hadoop archives(类似打包),减少小文件在namenode中的metadata内存消耗。(这个方法不一定可行,所以不建议使用)
   方法:
   将/my/files目录及其子目录归档成files.har,然后放在/my目录下
   bin/hadoop archive -archiveName files.har /my/files /my
   
   查看files in the archive:
   bin/hadoop fs -lsr har://my/files.har

skip bad records
JobConf conf = new JobConf(ProductMR.class);
conf.setJobName("ProductMR");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(Product.class);
conf.setMapperClass(Map.class);
conf.setReducerClass(Reduce.class);
conf.setMapOutputCompressorClass(DefaultCodec.class);
conf.setInputFormat(SequenceFileInputFormat.class);
conf.setOutputFormat(SequenceFileOutputFormat.class);
String objpath = "abc1";
SequenceFileInputFormat.addInputPath(conf, new Path(objpath));
SkipBadRecords.setMapperMaxSkipRecords(conf, Long.MAX_VALUE);
SkipBadRecords.setAttemptsToStartSkipping(conf, 0);
SkipBadRecords.setSkipOutputPath(conf, new Path("data/product/skip/"));
String output = "abc";
SequenceFileOutputFormat.setOutputPath(conf, new Path(output));
JobClient.runJob(conf);

For skipping failed tasks try : mapred.max.map.failures.percent

restart 单个datanode
如果一个datanode 出现问题,解决之后需要重新加入cluster而不重启cluster,方法如下:
bin/hadoop-daemon.sh start datanode
bin/hadoop-daemon.sh start jobtracker

reduce exceed 100%
"Reduce Task Progress shows > 100% when the total size of map outputs (for a
single reducer) is high "
造成原因:
在reduce的merge过程中,check progress有误差,导致status > 100%,在统计过程中就会出现以下错误:java.lang.ArrayIndexOutOfBoundsException: 3
        at org.apache.hadoop.mapred.StatusHttpServer$TaskGraphServlet.getReduceAvarageProgresses(StatusHttpServer.java:228)
        at org.apache.hadoop.mapred.StatusHttpServer$TaskGraphServlet.doGet(StatusHttpServer.java:159)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:689)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:802)
        at org.mortbay.jetty.servlet.ServletHolder.handle(ServletHolder.java:427)
        at org.mortbay.jetty.servlet.WebApplicationHandler.dispatch(WebApplicationHandler.java:475)
        at org.mortbay.jetty.servlet.ServletHandler.handle(ServletHandler.java:567)
        at org.mortbay.http.HttpContext.handle(HttpContext.java:1565)
        at org.mortbay.jetty.servlet.WebApplicationContext.handle(WebApplicationContext.java:635)
        at org.mortbay.http.HttpContext.handle(HttpContext.java:1517)
        at org.mortbay.http.HttpServer.service(HttpServer.java:954)

jira地址:https://issues.apache.org/jira/browse/HADOOP-5210

counters
3中counters:
1. built-in counters: Map input bytes, Map output records...
2. enum counters
   调用方式:
  enum Temperature {
    MISSING,
    MALFORMED
  }

reporter.incrCounter(Temperature.MISSING, 1)
   结果显示:
09/04/20 06:33:36 INFO mapred.JobClient:   Air Temperature Recor
09/04/20 06:33:36 INFO mapred.JobClient:     Malformed=3
09/04/20 06:33:36 INFO mapred.JobClient:     Missing=66136856
3. dynamic countes:
   调用方式:
   reporter.incrCounter("TemperatureQuality", parser.getQuality(),1);
   
   结果显示:
09/04/20 06:33:36 INFO mapred.JobClient:   TemperatureQuality
09/04/20 06:33:36 INFO mapred.JobClient:     2=1246032
09/04/20 06:33:36 INFO mapred.JobClient:     1=973422173
09/04/20 06:33:36 INFO mapred.JobClient:     0=1

22

主题

4

回帖

157

积分

版主

积分
157
发表于 2014-5-20 11:13:53 | 显示全部楼层
不错,赞一个
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

大数据中国微信

QQ   

版权所有: Discuz! © 2001-2013 大数据.

GMT+8, 2025-1-28 03:30 , Processed in 0.139926 second(s), 25 queries .

快速回复 返回顶部 返回列表