搜索
查看: 1783|: 0

Hadoop教程 第二章:Hadoop分布式文件系统[3]

[复制链接]

322

主题

0

回帖

1208

积分

网站编辑

积分
1208
发表于 2014-9-26 14:01:26 | 显示全部楼层 |阅读模式
MapReduce中使用HDFS

对于MapReduce HDFS是一个功能强大的工具,通过设置fs.default.name来配置名字结点的信息,Hadoop MapReduce会自动在HDFS找它的输入文件,使用FileInputFormat子类,Hadoop会自动从输入文件中得到数据,并以一种智能的方式来使得数据块本地性更高,并以此分配工作到集群。

在编程中使用HDFS

HDFS可以由显式的命令操作,也可以隐式地作为一个Hadoop MapReduce job的输入输出,你可以在你的java应用中使用HDFS

这节会给出一个基于javaHDFS的简单介绍,我们示例代码如下:

1:  import java.io.File;

2:  import java.io.IOException;

3:

4:  import org.apache.hadoop.conf.Configuration;

5:  import org.apache.hadoop.fs.FileSystem;

6:  import org.apache.hadoop.fs.FSDataInputStream;

7:  import org.apache.hadoop.fs.FSDataOutputStream;

8:  import org.apache.hadoop.fs.Path;

9:

10: public class HDFSHelloWorld {

11:

12:   public static final String theFilename = "hello.txt";

13:   public static final String message = "Hello, world!\n";

14:

15:   public static void main (String [] args) throws IOException {

16:

17:     Configuration conf = new Configuration();

18:     FileSystem fs = FileSystem.get(conf);

19:

20:     Path filenamePath = new Path(theFilename);

21:

22:     try {

23:       if (fs.exists(filenamePath)) {

24:         // remove the file first

25:         fs.delete(filenamePath);

26:       }

27:

28:       FSDataOutputStream out = fs.create(filenamePath);

29:       out.writeUTF(message);

30:       out.close();

31:

32:       FSDataInputStream in = fs.open(filenamePath);

33:       String messageIn = in.readUTF();

34:       System.out.print(messageIn);

35:       in.close();

46:     } catch (IOException ioe) {

47:       System.err.println("IOException during operation: " + ioe.toString());

48:       System.exit(1);

49:     }

40:   }

41: }

这个程序创建了一个名为hello.txt的文件,并写入一串字符,然后再从文件中读出,并在屏幕上显示,如果文件已经存在就先删除。

首先我们要得到一个文件FileSystem对象,它是由应用配置所决定的,Configuration对象用默认构造函数创建。

17:     Configuration conf = new Configuration();

18:     FileSystem fs = FileSystem.get(conf);

FileSystem接口实际上是为在不同文件系统中使用提供的抽象,通过Hadoop配置程序可以使用HDFS或本地文件系统或是其它文件系统,如果这个测试程序用普通的“java classname”命令运行,它不会查找con/hadoop-site.xml,并会使用本地文件系统,为确保它使用合适的Hadoop配置,将你的程序打成一个jar包,并用下面的命令启动:

$HADOOP_HOME/bin/hadoop jar yourjar HDFSHelloWorld

不管你是如何启动这个程序,也不管它连接的是哪个文件系统,它都以同样的方式写入:

28:       FSDataOutputStream out = fs.create(filenamePath);

29:       out.writeUTF(message);

30:       out.close();

首先我们要利用fs.create()创建一个文件,它会返回一个FSDataOutputStream对象,这个对象用于将数据写入文件,然后我们用writeUTF将信息写入文件,FSDataOutputStream继承自java.io.DataOutputStream类,当我们不再操作文件时,我们用close()函数关闭流。

如果文件已经存在,调用fs.create()函数会覆盖这个文件,但作为示例,程序将显示地删除这个文件,测试文件是否存在并删除的代码在23-26行。

23:       if (fs.exists(filenamePath)) {

24:         // remove the file first

25:         fs.delete(filenamePath);

26:       }

Path对象上的其它操作如拷贝,移动,删除也都一样的直观。

最后,我们重新打开文件,并从文件中读出内容,将它们转成UTF编码,显示到屏幕上。

fs.open()方法返回一个FSDataInputStream对象,它的readUTF可以从流中读取数据,当我们处理完流后,就应调用close()释放与文件相关的句柄。

更多信息

HDFS API完整的JavaDoc在:

http://hadoop.apache.org/common/docs/r0.20.2/api/index.html

一个直接链接到FileSystem接口的地址是:

http://hadoop.apache.org/common/docs/r0.20.2/api/org/apache/hadoop/fs/FileSystem.html

其它HDFS任务块的重新负载均衡
新的结点可以以很直观的方式加入集群。在新的结点上,进行集群其它部分相同的配置,启动数据结点守护进程会使它与它与名字结点交互,加入集群。
但新的结点上没有数据,所以它没有减轻已存在结点的空间占用问题,虽然新的文件会被保存到这个新的结点,但为了优化,我们应该使所有结点负载相同。
为达到上述目标,可以用Hadoop的自动负载工具进行负载均衡,Balancer类可以智能地将数据进行负载均衡,它可以达到一个给定的负载差距的阈值,这个阈值用百分数表示(默认10%),设置更小的百分数可使结点更平衡,但就需要更多的运行时间,完美的负载均衡(0%)现实中不太容易达到。
负载均衡可以通过启动bin/start-balancer.sh脚本,脚本支持平衡比较参数,-threshold,比如bin/start-balancer.sh –threshold 5,这个脚本会在完成任务后自动退出,也会在发生错误或它找不到候选数据块移来来均衡负载时退出,负载均衡的过程可以通过超级用户运行bin/stop –balancer.sh停止。
负载均衡脚本可以在没有使用时使用,也可以在有许多任务在运行时使用,为防止重新重载过程消耗大量带宽,严重影响其它进程的效率,可以配置dfs.balance.bandwidthPerSec用限制负载均衡时每秒移动的字节数。
拷贝大量文件
当需要把大量文件从一个地点移到另一地点时(或是从一个HDFS集群到另一集群,或是从S3Amazon S3 (Simple Storage Service))到HDFS,或是反过来,等等)。这一任务应该被分到多个结点上,使它们可以共享带宽来进行这一任务。Hadoop有一个distcp的工具来完成这个任务。
通过调用bin/hadoop distcp src destHadoop会启动一个MapReduce任务,它将拷贝从src拷贝大量文件到dest的任务分开,命令中两个参数必须是完整的URL路径,比如hdfs://SomeNameNode:9000/foo/bar/hdfs://OtherNameNode:2000/baz/quux/,这将拷贝一个集群上的/foo/bar目录到另一集群的baz/quuz,路径都被认为是目录,并且会被递归地拷贝,S3URLs可以用类似s3://bucket-name/key来描述。
停止结点
Hadoop能将新结点在集群工作时加入到集群,它也能让集群中结点工作时移出,而不会有数据损失,但如果结点只是简单的被停止,并且这些结点上有文件中唯一的某些数据块时,将会发生数据丢失。
结点必须以一定的计算退出集群,这个计划可使HDFS保证没有数据块会只在将退出的数据结点上。
HDFS提供了停止结点的能力,它会保证这个过程是安全的,可以通过以下几步来进行。
第一步,集群配置。在让一些结点退出集群时,必须配置一个不包含文件(excludes file),方法是在你的conf/hadoop-site.xml中加入一个dfs.hosts.exclude键,它的值是包含不允许再与HDFS连接的机器组成的一个文件地址,这个地址是名字结点的本地文件路径。< xmlnamespace prefix ="o" ns ="urn:schemas-microsoft-comfficeffice" />
第二步,确定要停止的主机名。每个要停止的机器都要加入dfs.hosts.exclude指定的路径的文件中,它将会阻止它们与名字结点连接。
第三步,强制配置重新载入。运行bin/hadoop dfsadmin –refreshNodes命令,这个命令将强制让名字结点重新载入,它的配置包括新的(不包含文件)(excludes file),它将在一定时间内停止结点,在这段时间内,会将要停止的机器上的块复制到剩余的机器上。
第四步,关闭结点。当停止过程结束后,停止的结点可以安全地让机器关闭,bin/hadoop dfsadmin –report命令会描述哪些结点还在集群中。
第五步,重新编辑不包含文件。当机器停止后,它们可以从不包含文件中删除了,运行bin/hadoop dfsadmin –refreshNodes会重新将不包含文件读入名字结点,可以使数据结点重新加入集群或可使其它机器加入集群等等。
检查文件系统的健康
当停止结点,重新启动集群或在它运行时,你想确认文件系统是健康的,即文件没有损坏,或正在被复制,并且没有块丢失。
Hadoop提供的fsck命令就是用来完成这个任务的,它的命令格式如下:
bin/hadoop fsck [path] [options]
如果运行不带参数,它会打印出命令使用信息后退出,如果参数是/,它会检查整个文件系统的健康,并打印出结点,如果参数是一个目录或是文件,它就只检查这个目录或文件,如果给出了可选参数,但没给出路径,这会从文件根目录开始,options可以包含两种不同的选项。
Action选项指定找到崩溃文件时所采取的动作,可以是-move,用这个选项会在找到崩溃文件后将它们移到/lost+found,或用-delete选项,它会直接删除崩溃文件。
Information选项指定打印出的报告的详细度。-files选项会列出所有它检查的文件,再加上-blocks选项会列出每个文件的文件的块,加上-locations在这两个选项后,它会打印出保存这些块的数据结点,还可再加-racks选项,它会打印出每个结点的主机架拓扑信息,注意你不能单独使用后面的三个选项,命令不会默认将前面的选项加入,你必须一起使用。同样注意,Hadoop在不同的命令都用-files参数,比如dfsadminfsckdfs,等等。这意味着你省略在fsck中的path参数,它不会得到-files参数,这时你可用--files作为参数,以来区别普通命令和fsck参数,示例如下:
bin/hadoop fsck -- -files -blocks
如果你提供开始检查的路径,那么这个--不是必须的,你在-move写在前面时也不是必须的。
默认的fsck不会操作还在被客户端写入的文件,如果要列出这种文件的选项是-openforwrite
对主机架的考虑
在小型集群中,所有的服务器都由一个交换机连接,这种集群的位置只有两种,“在机器上”和“不在机器上”,当从数据结点本地文件系统载入数据到HDFS中时,名字结点会将一份拷贝拷至本地结点中,再从集群中随机选两个结点拷贝。
对于大的Hadoop集群,服务器 在多个主机架中,这时保证数据在多个主机架中是很重要的,因为只有如此,一个交换机的崩溃不会导致数据丢失。
HDFS可以通过使用脚本的方式对主机等问题进行考虑,这个脚本会让主结点来了解集群拓扑,虽然也可以用别的方式,但默认的方法是提供一个可执行脚本,它返回一个IP地址表的主机架地址。
网络拓扑脚本将一个或多个集群结点IP地址作为参数,它在标准输出上返回一个主机架名列表,列表每行对应一个IP地址,并且注意输入输出的顺序必须一致。
要设置主机架映射脚本 ,你要在conf/hadoop-site.xml脚本指定topology.script.file.name值,这个值是一个可执行脚本或程序的路径,这样Hadoop就可以用命令得到IP的主机架名,默认情况下,Hadoop将一组IP地址分开,依次将它们作为参数,你可以设置最多可接收的参数个数,设置topology.script.arg.number键。
Hadoop中,主机架ids是层次结构的,它们看起来像是路径名,默认的每个结点有一个主机架id/default-rack,你可将结点的主机架id设置任意路径,比如/foo/bar-rack,路径/foo就是层次中较高层,在大规模安装时,一个合理的结构应该是/top-switch-name/rack-nameHadoop主机架ids现在还不能处理特殊的拓扑,比如3D Torus网络,这种路由假设每个结点都与单个交换机连接,那么就是有一个upstream交换机,这通常不是个问题,但是真实的包路由将直接用这个拓扑,而Hadoop主机架ids是用来找要复制块存放较近和较远结点的。
下面的脚本是一个IP到主机架id的例子,IP地址是由络管理员定义的网络层次结构产生,这个脚本在一般的网络结构中可行,但在复杂的网络结构中,要进行一个基于文件或基于表的查找过程,要注意在结点移到不同的主机架中,文件或表要及时更新,下面的脚本要指定最多可接收的参数为1
#!/bin/bash
# Set rack id based on IP address.
# Assumes network administrator has complete control
# over IP addresses assigned to nodes and they are
# in the 10.x.y.z address space. Assumes that
# IP addresses are distributed hierarchically. e.g.,
# 10.1.y.z is one data center segment and 10.2.y.z is another;
# < xmlnamespace prefix ="st1" ns ="urn:schemas-microsoft-comffice:smarttags" />10.1.1.z is one rack, 10.1.2.z is another rack in
# the same segment, etc.)
#
# This is invoked with an IP address as its only argument
# get IP address from the input
ipaddr=$0
# select "x.y" and convert it to "x/y"
segments=`echo $ipaddr | cut --delimiter=. --fields=2-3 --output-delimiter=/`
echo /${segments}
HDFSweb接口
HDFS有一个web服务器,它提供了基本的状态监视和文件浏览操作,默认情况下它在名字结点上,所用端口号为50070,在web浏览器中键入地址http://namenode:50070浏览器就会返回集群的健康,可用容量,和使用容量的信息(与bin/hadoop dfsadmin -report返回的信息相似。
改变web接口的地址如端口可以在conf/hadoop-site.xml中设置dfs.http.address,它必须写成address:port形式,如果要接收所有地址用0.0.0.0
通过web接口,你可以用基本文件浏览接口来浏览HDFS,每个数据结点在端口50075上开放它的端口,你可以通过设置http.address来改变它的值,重写0.0.0.0:50075Hdoop守护进程产生的日志文件可以通过这个接口访问,这对于分布式debugging和错误定位是很有帮助的。

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

大数据中国微信

QQ   

版权所有: Discuz! © 2001-2013 大数据.

GMT+8, 2024-12-22 16:39 , Processed in 0.066803 second(s), 24 queries .

快速回复 返回顶部 返回列表