搜索
大数据中国 首页 大数据技术 查看内容
Hadoop集群系列9:HDFS初探之旅
2014-4-19 17:17 |来自: 博客园| 查看: 5723| 评论: 0

5、HDFS常用操作

先说一下"hadoop fs 和hadoop dfs的区别",看两本Hadoop书上各有用到,但效果一样,求证与网络发现下面一解释比较中肯。

粗略的讲,fs是个比较抽象的层面,在分布式环境中,fs就是dfs,但在本地环境中,fs是local file system,这个时候dfs就不能用。

5.1 文件操作

1)列出HDFS文件

此处为你展示如何通过"-ls"命令列出HDFS下的文件:

hadoop fs -ls

执行结果如图5-1-1所示。在这里需要注意:在HDFS中未带参数的"-ls"命名没有返回任何值,它默认返回HDFS的"home"目录下的内容。在HDFS中,没有当前目录这样一个概念,也没有cd这个命令。

图5-1-1 列出HDFS文件

2)列出HDFS目录下某个文档中的文件

此处为你展示如何通过"-ls 文件名"命令浏览HDFS下名为"input"的文档中文件:

hadoop fs –ls input

执行结果如图5-1-2所示。

图5-1-2 列出HDFS下名为input的文档下的文件

3)上传文件到HDFS

此处为你展示如何通过"-put 文件1 文件2"命令将"Master.Hadoop"机器下的"/home/hadoop"目录下的file文件上传到HDFS上并重命名为test:

hadoop fs –put ~/file test

执行结果如图5-1-3所示。在执行"-put"时只有两种可能,即是执行成功和执行失败。在上传文件时,文件首先复制到DataNode上。只有所有的DataNode都成功接收完数据,文件上传才是成功的。其他情况(如文件上传终端等)对HDFS来说都是做了无用功。

图5-1-3 成功上传file到HDFS

4)将HDFS中文件复制到本地系统中

此处为你展示如何通过"-get 文件1 文件2"命令将HDFS中的"output"文件复制到本地系统并命名为"getout"。

hadoop fs –get output getout

执行结果如图5-1-4所示。

图5-1-4 成功将HDFS中output文件复制到本地系统

备注:与"-put"命令一样,"-get"操作既可以操作文件,也可以操作目录。

5)删除HDFS下的文档

此处为你展示如何通过"-rmr 文件"命令删除HDFS下名为"newoutput"的文档:

hadoop fs –rmr newoutput

执行结果如图5-1-5所示。

图5-1-5 成功删除HDFS下的newoutput文档

6)查看HDFS下某个文件

此处为你展示如何通过"-cat 文件"命令查看HDFS下input文件中内容:

hadoop fs -cat input/*

执行结果如图5-1-6所示。

图5-1-6 HDFS下input文件的内容

"hadoop fs"的命令远不止这些,本小节介绍的命令已可以在HDFS上完成大多数常规操作。对于其他操作,可以通过"-help commandName"命令所列出的清单来进一步学习与探索。

5.2 管理与更新

1)报告HDFS的基本统计情况

此处为你展示通过"-report"命令如何查看HDFS的基本统计信息:

hadoop dfsadmin -report

执行结果如图5-2-1所示。

图5-2-1 HDFS基本统计信息

2)退出安全模式

NameNode在启动时会自动进入安全模式。安全模式是NameNode的一种状态,在这个阶段,文件系统不允许有任何修改。安全模式的目的是在系统启动时检查各个DataNode上数据块的有效性,同时根据策略对数据块进行必要的复制或删除,当数据块最小百分比数满足的最小副本数条件时,会自动退出安全模式。

系统显示"Name node is in safe mode",说明系统正处于安全模式,这时只需要等待17秒即可,也可以通过下面的命令退出安全模式:

hadoop dfsadmin –safemode enter

成功退出安全模式结果如图5-2-2所示。

图5-2-2 成功退出安全模式

3)进入安全模式

在必要情况下,可以通过以下命令把HDFS置于安全模式:

hadoop dfsadmin –safemode enter

执行结果如图5-2-3所示。

图5-2-3 进入HDFS安全模式

4)添加节点

可扩展性是HDFS的一个重要特性,向HDFS集群中添加节点是很容易实现的。添加一个新的DataNode节点,首先在新加节点上安装好Hadoop,要和NameNode使用相同的配置(可以直接从NameNode复制),修改"/usr/hadoop/conf/master"文件,加入NameNode主机名。然后在NameNode节点上修改"/usr/hadoop/conf/slaves"文件,加入新节点主机名,再建立到新加点无密码的SSH连接,运行启动命令:

start-all.sh

5)负载均衡

HDFS的数据在各个DataNode中的分布肯能很不均匀,尤其是在DataNode节点出现故障或新增DataNode节点时。新增数据块时NameNode对DataNode节点的选择策略也有可能导致数据块分布的不均匀。用户可以使用命令重新平衡DataNode上的数据块的分布:

start-balancer.sh

执行命令前,DataNode节点上数据分布情况如图5-2-4所示。

负载均衡完毕后,DataNode节点上数据的分布情况如图5-2-5所示。

执行负载均衡命令如图5-2-6所示。

6、HDFS API详解

Hadoop中关于文件操作类基本上全部是在"org.apache.hadoop.fs"包中,这些API能够支持的操作包含:打开文件,读写文件,删除文件等。

Hadoop类库中最终面向用户提供的接口类是FileSystem,该类是个抽象类,只能通过来类的get方法得到具体类。get方法存在几个重载版本,常用的是这个:

static FileSystem get(Configuration conf);

该类封装了几乎所有的文件操作,例如mkdir,delete等。综上基本上可以得出操作文件的程序库框架:

  1. operator() 
  2.     得到Configuration对象 
  3.     得到FileSystem对象 
  4.     进行文件操作 

6.1 上传本地文件

通过"FileSystem.copyFromLocalFile(Path src,Patch dst)"可将本地文件上传到HDFS的制定位置上,其中src和dst均为文件的完整路径。具体事例如下

  1. package com.hebut.file; 
  2. import org.apache.hadoop.conf.Configuration; 
  3. import org.apache.hadoop.fs.FileStatus; 
  4. import org.apache.hadoop.fs.FileSystem; 
  5. import org.apache.hadoop.fs.Path; 
  6. public class CopyFile { 
  7.     public static void main(String[] args) throws Exception { 
  8.         Configuration conf=new Configuration(); 
  9.         FileSystem hdfs=FileSystem.get(conf); 
  10.          //本地文件 
  11.         Path src =new Path("D:\\HebutWinOS"); 
  12.         //HDFS为止 
  13.         Path dst =new Path("/"); 
  14.                hdfs.copyFromLocalFile(src, dst); 
  15.         System.out.println("Upload to"+conf.get("fs.default.name")); 
  16.         FileStatus files[]=hdfs.listStatus(dst); 
  17.         for(FileStatus file:files){ 
  18.             System.out.println(file.getPath()); 
  19.         } 
  20.     } 

运行结果可以通过控制台、项目浏览器和SecureCRT查看,如图6-1-1、图6-1-2、图6-1-3所示。

1)控制台结果

图6-1-1 运行结果(1)

2)项目浏览器

图6-1-2 运行结果(2)

3)SecureCRT结果

图6-1-3 运行结果(3)

6.2 创建HDFS文件

通过"FileSystem.create(Path f)"可在HDFS上创建文件,其中f为文件的完整路径。具体实现如下:

  1. package com.hebut.file; 
  2. import org.apache.hadoop.conf.Configuration; 
  3. import org.apache.hadoop.fs.FSDataOutputStream; 
  4. import org.apache.hadoop.fs.FileSystem; 
  5. import org.apache.hadoop.fs.Path; 
  6. public class CreateFile { 
  7.     public static void main(String[] args) throws Exception { 
  8.         Configuration conf=new Configuration(); 
  9.         FileSystem hdfs=FileSystem.get(conf); 
  10.         byte[] buff="hello hadoop world!\n".getBytes(); 
  11.         Path dfs=new Path("/test"); 
  12.         FSDataOutputStream outputStream=hdfs.create(dfs); 
  13.         outputStream.write(buff,0,buff.length); 
  14.     } 

运行结果如图6-2-1和图6-2-2所示。

1)项目浏览器

图6-2-1 运行结果(1)

2)SecureCRT结果

图6-2-2 运行结果(2)

6.3 创建HDFS目录

通过"FileSystem.mkdirs(Path f)"可在HDFS上创建文件夹,其中f为文件夹的完整路径。具体实现如下:

  1. package com.hebut.dir; 
  2. import org.apache.hadoop.conf.Configuration; 
  3. import org.apache.hadoop.fs.FileSystem; 
  4. import org.apache.hadoop.fs.Path; 
  5. public class CreateDir { 
  6.     public static void main(String[] args) throws Exception{ 
  7.         Configuration conf=new Configuration(); 
  8.         FileSystem hdfs=FileSystem.get(conf); 
  9.         Path dfs=new Path("/TestDir"); 
  10.                hdfs.mkdirs(dfs); 
  11.     } 

运行结果如图6-3-1和图6-3-2所示。

1)项目浏览器

图6-3-1 运行结果(1)

2)SecureCRT结果

图6-3-2 运行结果(2)

6.4 重命名HDFS文件

通过"FileSystem.rename(Path src,Path dst)"可为指定的HDFS文件重命名,其中src和dst均为文件的完整路径。具体实现如下:

  1. package com.hebut.file; 
  2. import org.apache.hadoop.conf.Configuration; 
  3. import org.apache.hadoop.fs.FileSystem; 
  4. import org.apache.hadoop.fs.Path; 
  5. public class Rename{ 
  6.     public static void main(String[] args) throws Exception { 
  7.         Configuration conf=new Configuration(); 
  8.         FileSystem hdfs=FileSystem.get(conf); 
  9.         Path frpaht=new Path("/test");    //旧的文件名 
  10.         Path topath=new Path("/test1");    //新的文件名 
  11.         boolean isRename=hdfs.rename(frpaht, topath); 
  12.         String result=isRename?"成功":"失败"
  13.         System.out.println("文件重命名结果为:"+result); 
  14.     } 

运行结果如图6-4-1和图6-4-2所示。

1)项目浏览器

图6-4-1 运行结果(1)

2)SecureCRT结果

图6-4-2 运行结果(2)

6.5 删除HDFS上的文件

通过"FileSystem.delete(Path f,Boolean recursive)"可删除指定的HDFS文件,其中f为需要删除文件的完整路径,recuresive用来确定是否进行递归删除。具体实现如下

  1. package com.hebut.file; 
  2. import org.apache.hadoop.conf.Configuration; 
  3. import org.apache.hadoop.fs.FileSystem; 
  4. import org.apache.hadoop.fs.Path; 
  5. public class DeleteFile { 
  6.     public static void main(String[] args) throws Exception { 
  7.         Configuration conf=new Configuration(); 
  8.         FileSystem hdfs=FileSystem.get(conf); 
  9.         Path delef=new Path("/test1"); 
  10.         boolean isDeleted=hdfs.delete(delef,false); 
  11.         //递归删除 
  12.         //boolean isDeleted=hdfs.delete(delef,true); 
  13.         System.out.println("Delete?"+isDeleted); 
  14.     } 

运行结果如图6-5-1和图6-5-2所示。

1)控制台结果

图6-5-1 运行结果(1)

2)项目浏览器

图6-5-2 运行结果(2)

6.6 删除HDFS上的目录

同删除文件代码一样,只是换成删除目录路径即可,如果目录下有文件,要进行递归删除。

6.7 查看某个HDFS文件是否存在

通过"FileSystem.exists(Path f)"可查看指定HDFS文件是否存在,其中f为文件的完整路径。具体实现如下:

  1. package com.hebut.file; 
  2. import org.apache.hadoop.conf.Configuration; 
  3. import org.apache.hadoop.fs.FileSystem; 
  4. import org.apache.hadoop.fs.Path; 
  5. public class CheckFile { 
  6.     public static void main(String[] args) throws Exception { 
  7.         Configuration conf=new Configuration(); 
  8.         FileSystem hdfs=FileSystem.get(conf); 
  9.         Path findf=new Path("/test1"); 
  10.         boolean isExists=hdfs.exists(findf); 
  11.         System.out.println("Exist?"+isExists); 
  12.     } 

运行结果如图6-7-1和图6-7-2所示。

1)控制台结果

图6-7-1 运行结果(1)

2)项目浏览器

图6-7-2 运行结果(2)

6.8 查看HDFS文件的最后修改时间

通过"FileSystem.getModificationTime()"可查看指定HDFS文件的修改时间。具体实现如下:

  1. package com.hebut.file; 
  2. import org.apache.hadoop.conf.Configuration; 
  3. import org.apache.hadoop.fs.FileStatus; 
  4. import org.apache.hadoop.fs.FileSystem; 
  5. import org.apache.hadoop.fs.Path; 
  6. public class GetLTime { 
  7.     public static void main(String[] args) throws Exception { 
  8.         Configuration conf=new Configuration(); 
  9.         FileSystem hdfs=FileSystem.get(conf);    
  10.        Path fpath =new Path("/user/hadoop/test/file1.txt"); 
  11.         FileStatus fileStatus=hdfs.getFileStatus(fpath); 
  12.         long modiTime=fileStatus.getModificationTime(); 
  13.         System.out.println("file1.txt的修改时间是"+modiTime); 
  14.     } 

运行结果如图6-8-1所示。

图6-8-1 控制台结果

6.9 读取HDFS某个目录下的所有文件

通过"FileStatus.getPath()"可查看指定HDFS中某个目录下所有文件。具体实现如下:

  1. package com.hebut.file; 
  2. import org.apache.hadoop.conf.Configuration; 
  3. import org.apache.hadoop.fs.FileStatus; 
  4. import org.apache.hadoop.fs.FileSystem; 
  5. import org.apache.hadoop.fs.Path; 
  6. public class ListAllFile { 
  7.     public static void main(String[] args) throws Exception { 
  8.         Configuration conf=new Configuration(); 
  9.         FileSystem hdfs=FileSystem.get(conf); 
  10.         Path listf =new Path("/user/hadoop/test"); 
  11.         FileStatus stats[]=hdfs.listStatus(listf); 
  12.         for(int i = 0; i < stats.length; ++i) 
  13.  { 
  14.  System.out.println(stats[i].getPath().toString()); 
  15.  } 
  16.         hdfs.close(); 
  17.     } 

运行结果如图6-9-1和图6-9-2所示。

1)控制台结果

图6-9-1 运行结果(1)

2)项目浏览器

图6-9-2 运行结果(2)

6.10 查找某个文件在HDFS集群的位置

通过"FileSystem.getFileBlockLocation(FileStatus file,long start,long len)"可查找指定文件在HDFS集群上的位置,其中file为文件的完整路径,start和len来标识查找文件的路径。具体实现如下:

  1. package com.hebut.file; 
  2. import org.apache.hadoop.conf.Configuration; 
  3. import org.apache.hadoop.fs.BlockLocation; 
  4. import org.apache.hadoop.fs.FileStatus; 
  5. import org.apache.hadoop.fs.FileSystem; 
  6. import org.apache.hadoop.fs.Path; 
  7. public class FileLoc { 
  8.     public static void main(String[] args) throws Exception { 
  9.         Configuration conf=new Configuration(); 
  10.         FileSystem hdfs=FileSystem.get(conf); 
  11.         Path fpath=new Path("/user/hadoop/cygwin"); 
  12.         FileStatus filestatus = hdfs.getFileStatus(fpath); 
  13.         BlockLocation[] blkLocations = hdfs.getFileBlockLocations(filestatus, 0, filestatus.getLen()); 
  14.         int blockLen = blkLocations.length; 
  15.         for(int i=0;i
  16.             String[] hosts = blkLocations[i].getHosts(); 
  17.             System.out.println("block_"+i+"_location:"+hosts[0]); 
  18.         } 
  19.     } 

运行结果如图6-10-1和6.10.2所示。

1)控制台结果

图6-10-1 运行结果(1)

2)项目浏览器

图6-10-2 运行结果(2)

6.11 获取HDFS集群上所有节点名称信息

通过"DatanodeInfo.getHostName()"可获取HDFS集群上的所有节点名称。具体实现如下:

  1. package com.hebut.file; 
  2. import org.apache.hadoop.conf.Configuration; 
  3. import org.apache.hadoop.fs.FileSystem; 
  4. import org.apache.hadoop.hdfs.DistributedFileSystem; 
  5. import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 
  6. public class GetList { 
  7.     public static void main(String[] args) throws Exception { 
  8.         Configuration conf=new Configuration(); 
  9.         FileSystem fs=FileSystem.get(conf); 
  10.         DistributedFileSystem hdfs = (DistributedFileSystem)fs; 
  11.         DatanodeInfo[] dataNodeStats = hdfs.getDataNodeStats();        
  12.         for(int i=0;i
  13.  System.out.println("DataNode_"+i+"_Name:"+dataNodeStats[i].getHostName()); 
  14.         } 
  15.     } 

运行结果如图6-11-1所示。

图6-11-1 控制台结果

7、HDFS的读写数据流

7.1 文件的读取剖析

文件读取的过程如下:

1)解释一

  • 客户端(client)用FileSystem的open()函数打开文件。

  • DistributedFileSystem用RPC调用元数据节点,得到文件的数据块信息。

  • 对于每一个数据块,元数据节点返回保存数据块的数据节点的地址。

  • DistributedFileSystem返回FSDataInputStream给客户端,用来读取数据。

  • 客户端调用stream的read()函数开始读取数据。

  • DFSInputStream连接保存此文件第一个数据块的最近的数据节点。

  • Data从数据节点读到客户端(client)。

  • 当此数据块读取完毕时,DFSInputStream关闭和此数据节点的连接,然后连接此文件下一个数据块的最近的数据节点。

  • 当客户端读取完毕数据的时候,调用FSDataInputStream的close函数。

  • 在读取数据的过程中,如果客户端在与数据节点通信出现错误,则尝试连接包含此数据块的下一个数据节点。

  • 失败的数据节点将被记录,以后不再连接。

2)解释二

  • 使用HDFS提供的客户端开发库,向远程的Namenode发起RPC请求;

  • Namenode会视情况返回文件的部分或者全部block列表,对于每个block,Namenode都会返回有该block拷贝的datanode地址;

  • 客户端开发库会选取离客户端最接近的datanode来读取block;

  • 读取完当前block的数据后,关闭与当前的datanode连接,并为读取下一个block寻找最佳的datanode;

  • 当读完列表的block后,且文件读取还没有结束,客户端开发库会继续向Namenode获取下一批的block列表。

  • 读取完一个block都会进行checksum验证,如果读取datanode时出现错误,客户端会通知Namenode,然后再从下一个拥有该block拷贝的datanode继续读。

7.2 文件的写入剖析

写入文件的过程比读取较为复杂:

1)解释一

  • 客户端调用create()来创建文件

  • DistributedFileSystem用RPC调用元数据节点,在文件系统的命名空间中创建一个新的文件。

  • 元数据节点首先确定文件原来不存在,并且客户端有创建文件的权限,然后创建新文件。

  • DistributedFileSystem返回DFSOutputStream,客户端用于写数据。

  • 客户端开始写入数据,DFSOutputStream将数据分成块,写入data queue。

  • Data queue由Data Streamer读取,并通知元数据节点分配数据节点,用来存储数据块(每块默认复制3块)。分配的数据节点放在一个pipeline里。

  • Data Streamer将数据块写入pipeline中的第一个数据节点。第一个数据节点将数据块发送给第二个数据节点。第二个数据节点将数据发送给第三个数据节点。

  • DFSOutputStream为发出去的数据块保存了ack queue,等待pipeline中的数据节点告知数据已经写入成功。

  • 如果数据节点在写入的过程中失败:

    • 关闭pipeline,将ack queue中的数据块放入data queue的开始。

    • 当前的数据块在已经写入的数据节点中被元数据节点赋予新的标示,则错误节点重启后能够察觉其数据块是过时的,会被删除。

    • 失败的数据节点从pipeline中移除,另外的数据块则写入pipeline中的另外两个数据节点。

    • 元数据节点则被通知此数据块是复制块数不足,将来会再创建第三份备份。

  • 当客户端结束写入数据,则调用stream的close函数。此操作将所有的数据块写入pipeline中的数据节点,并等待ack queue返回成功。最后通知元数据节点写入完毕。

2)解释二

  • 使用HDFS提供的客户端开发库,向远程的Namenode发起RPC请求;

  • Namenode会检查要创建的文件是否已经存在,创建者是否有权限进行操作,成功则会为文件创建一个记录,否则会让客户端抛出异常;

  • 当客户端开始写入文件的时候,开发库会将文件切分成多个packets,并在内部以"data queue"的形式管理这些packets,并向Namenode申请新的blocks,获取用来存储replicas的合适的datanodes列表,列表的大小根据在Namenode中对replication的设置而定。

  • 开始以pipeline(管道)的形式将packet写入所有的replicas中。开发库把packet以流的方式写入第一个datanode,该datanode把该packet存储之后,再将其传递给在此pipeline中的下一个datanode,直到最后一个datanode,这种写数据的方式呈流水线的形式。

  • 最后一个datanode成功存储之后会返回一个ack packet,在pipeline里传递至客户端,在客户端的开发库内部维护着"ack queue",成功收到datanode返回的ack packet后会从"ack queue"移除相应的packet。

  • 如果传输过程中,有某个datanode出现了故障,那么当前的pipeline会被关闭,出现故障的datanode会从当前的pipeline中移除,剩余的block会继续剩下的datanode中继续以pipeline的形式传输,同时Namenode会分配一个新的datanode,保持replicas设定的数量。



免责声明: 除非特别声明,文章均为投稿或网络转载,仅代表作者观点,与大数据中国网无关。其原创性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容、文字的真实性、完整性、及时性本站不作任何保证或承诺,请读者仅作参考,并请自行核实相关内容。如果本文内容有侵犯你的权益,请发送信息至ab12-120@163.com,我们会及时删除
123

最新评论

关闭

站长推荐上一条 /1 下一条

大数据中国微信

QQ   

版权所有: Discuz! © 2001-2013 大数据.

GMT+8, 2024-11-25 00:53 , Processed in 0.175673 second(s), 23 queries .

返回顶部