首页 > 代码库 > Hadoop Day2
Hadoop Day2
1.分布式文件系统与HDFS(****了解***)
? 思考:windows的文件存储目录结构?
? 什么是分布式文件系统?(***了解***)
当数据量越来越多,在一个操作系统管辖的范围存不下了,那么就分配到更多的操作系统管理的磁盘中,但是不方便管理和维护,因此迫切需要一种系统来管理多台机器上的文件,这就是分布式文件管理系统 (Distributed File System)。
l 分布式文件系统的定义:
是一种允许文件通过网络在多台主机上分享的文件的系统,可让多机器上的多用户分享文件和存储空间。
l 分布式文件系统特点:
n 通透性。
让实际上是通过网络来访问文件的动作,由程序与用户看来,就像是访问本地的磁盘一般。
n 容错。
即使系统中有某些节点脱机,整体来说系统仍然可以持续运作而不会有数据损失。
注意:分布式文件管理系统很多,hdfs只是其中一种。适用于一次写入多次查询的情况,不支持并发写情况,小文件不合适。
2.HDFS体系结构与基本概念(***必须掌握****)
? 思考:分布式文件系统的设计思想(****理解***)
? HDFS是什么?(***知道****)
Hadoop Distributed File System(简称HDFS)是Hadoop分布式文件系统。
HDFS有着高容错性(fault-tolerant)的特点,并且设计用来部署在低廉的(low-cost)硬件上。而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求(requirements)这样可以实现流的形式访问(streaming access)文件系统中的数据。
? HDFS体系结构(***理解****)
l NameNode:名字节点
u NameNode是整个文件系统的管理节点。它维护着整个文件系统的文件目录树,文件/目录的元信息和每个文件对应的数据块列表。接收用户的操作请求。
u 文件包括:hdfs-site.xml的dfs.name.dir属性
2 fsimage:元数据镜像文件。存储某一时段NameNode内存元数据信息。
2 edits:操作日志文件。
2 fstime:保存最近一次checkpoint的时间
u 以上这些文件是保存在linux的文件系统中。
l DataNode:数据节点
DateNode提供真实文件数据的存储服务。
说明:DataNode是文件系统的工作节点,他们根据客户端或者是namenode的调度存储和检索数据,并且定期向namenode发送他们所存储的块(block)的列表。
n 文件块(block):最基本的存储单位。对于文件内容而言,一个文件的长度大小是size,那么从文件的0偏移开始,按照固定的大小,顺序对文件进行划分并编号,划分好的每一个块称一个Block。HDFS默认Block大小是128MB,以一个256MB文件,共有256/128=2个Block.
可以在配置里指定dfs.block.size属性的大小.
n 不同于普通文件系统的是,HDFS中,如果一个文件小于一个数据块的大小,并不占用整个数据块存储空间
n Replication指定多副本。默认是三个。
hdfs-site.xml的dfs.replication属性。
l Secondary NameNode:HA的一个解决方案。
n Secondary NameNode有两个作用:
一是镜像备份
二是日志与镜像的定期合并
注意:以上两个过程同时进行,称为checkpoint.
备注:
镜像备份的作用:
备份fsimage(fsimage是元数据发送检查点时写入文件);
日志与镜像的定期合并的作用:
将Namenode中edits日志和fsimage合并,防止(如果Namenode节点故障,namenode下次启动的时候,会把fsimage加载到内存中,应用edit log,edit log往往很大,导致操作往往很耗时。)
n Secondary NameNode的工作流程
1.secondary通知namenode切换edits文件
2.secondary从namenode获得fsimage和` (通过http)
3.secondary将fsimage载入内存,然后开始合并edits
4.secondary将新的fsimage发回给namenode
5.namenode用新的fsimage替换旧的fsimage
n 什么时候checkpiont
1.fs.checkpoint.period 指定两次checkpoint的最大时间间隔,默认3600秒。
2.fs.checkpoint.size规定edits文件的最大值,一旦超过这个值则强制checkpoint,不管是否到达最大时间间隔。默认大小是64M。
l NameNode与Secondary NameNode
图说明:HDFS是一个的主从结构,一个HDFS集群是由一个名字节点,它是一个管理文件命名空间和调节客户端访问文件的主服务器,当然还有一些数据节点,通常是一个节点一个机器,它来管理对应节点的存储。HDFS对外开放文件命名空间并允许用户数据以文件形式存储。
内部机制是将一个文件分割成一个或多个块,这些块被存储在一组数据节点中。名字节点用来操作文件命名空间的文件或目录操作,如打开,关闭,重命名等等。它同时确定块与数据节点的映射。数据节点来负责来自文件系统客户的读写请求。数据节点同时还要执行块的创建,删除,和来自名字节点的块复制指令。
名字节点和数据节点都是运行在普通的机器之上的软件,机器典型的都是GNU/Linux,HDFS是用java编写的,任何支持java的机器都可以运行名字节点或数据节点,利用java语言的超轻便型,很容易将HDFS部署到大范围的机器上。典型的部署是由一个专门的机器来运行名字节点软件,集群中的其他每台机器运行一个数据节点实例。体系结构不排斥在一个机器上运行多个数据节点的实例,但是实际的部署不会有这种情况。
集群中只有一个名字节点极大地简单化了系统的体系结构。名字节点是仲裁者和所有HDFS元数据的仓库,用户的实际数据不经过名字节点。
3.HDFS的shell操作(***必须掌握***)
l 调用文件系统(FS)Shell命令应使用 bin/hadoop fs 的形式。
l 所有的FS shell命令使用URI路径作为参数。
URI格式是scheme://authority/path。HDFS的scheme是hdfs,对本地文件系统,scheme是file。其中scheme和authority参数都是可选的,如果未加指定,就会使用配置中指定的默认scheme。
例如:/parent/child可以表示成hdfs://namenode:namenodePort/parent/child,或者更简单的/parent/child(假设配置文件是namenode:namenodePort)
l 大多数FS Shell命令的行为和对应的Unix Shell命令类似。
? HDFS常见的shell操作有以下三种:
命令格式:
hadoop fs {args}
hadoop dfs {args}
hdfs dfs {args}
详细使用见附件:hadoop-shell.pdf
说明:
hadoop fs:使用面最广,可以操作任何文件系统。
hadoop dfs与hdfs dfs:只能操作HDFS文件系统相关(包括与Local FS间的操作),前者已经Deprecated,一般使用后者。
? 列举一些常用的HDFS的shell
/home/hadoop/app/hadoop-2.4.1/bin start-all.sh
start-dfs.sh start-yarn.sh
文件块位置
/home/hadoop/app/hadoop-2.4.1/data/dfs/data/current/BP-1241655283-192.168.174.100-1469943242838/current/finalized
格式:hdfs dfs 【选项】【参数】
常用的选项如下:
-help [cmd] //显示命令的帮助信息
-ls(r) <path> //显示当前目录下所有文件
-du(s) <path> //显示目录中所有文件大小
-count[-q] <path> //显示目录中文件数量
-mv <src> <dst> //移动多个文件到目标目录
-cp <src> <dst> //复制多个文件到目标目录
-rm(r) //删除文件(夹)
-put <localsrc> <dst> //本地文件复制到hdfs
-copyFromLocal //同put
-moveFromLocal //从本地文件移动到hdfs
-get [-ignoreCrc] <src> <localdst> //复制文件到本地,可以忽略crc校验
-getmerge <src> <localdst> //将源目录中的所有文件排序合并到一个文件中
-cat <src> //在终端显示文件内容
-text <src> //在终端显示文件内容
-copyToLocal [-ignoreCrc] <src> <localdst> //复制到本地
-moveToLocal <src> <localdst>
-mkdir <path> //创建文件夹
-touchz <path> //创建一个空文件
hdfs dfs –ls /aa
? Shell命令练习:验证DataNode存储的块的大小
l 上传一个大文件,验证块的大小。
[hadoop@mk0 finalized]$ hadoop fs -put /home/hadoop/jdk-7u65-linux-i586.tar.gz hdfs://mk0:9000/
[hadoop@mk0 finalized]$ ls –lh
将数据块下载到本地
[hadoop@mk0 finalized]$ touch jdk
[hadoop@mk0 finalized]$ cat blk_1073741825 >> jdk
[hadoop@mk0 finalized]$ cat blk_1073741826 >> jdk
[hadoop@mk0 finalized]$ tar -zxvf jdk
4.java接口及常用api(***必须掌握***)
? 上传eclipse到linux系统
使用secureCRT或者FileZilla上传eclipse IDE开发工具。
? 导入hadoop相关的jar包
导入$HADOOP_HOME/share/hadoop/hdfs/lib/* 和核心包hadoop-hdfs-2.7.0.jar
导入$$HADOOP_HOME/share/hadoop/common/lib/* 和hadoop-common-2.7.0.jar
? 创建java工程编写测试类
一、测试HDFS文件下载:
方法一:(比较底层的写法)
//获得filesystem对象
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
//FileSystem fs = FileSystem.get(new URI(“hdfs://itcast01:9000”),conf);
//打开输入流
Path src = http://www.mamicode.com/new Path("hdfs://itcast01:9000/jdk-7u79-linux-i586.tar.gz");
FSDataInputStream in = fs.open(src);
//设置输出流
FileOutputStream out = new FileOutputStream("/itcast/jdk-7u79-linux-i586.tar.gz");
//调用IOUtils的copy方法,复制文件
IOUtils.copy(in, out);
提示错误:
必须把$HADOOP_HOME/share/etc/hadoop/下的core-site.xml 和 hdfs-site.xml 导入工程的src下,或者在conf.set(“fs.defaultFS”,”hdfs://itcast01:9000/”)
方法二:调用copyToLocalFile()方法(封装好的写法)
fs.copyToLocalFile(new Path("hdfs://itcast01:9000/jdk-7u79-linux-i586.tar.gz"),new Path("hdfs://itcast01:9000/jdk-7u79-linux-i586.tar.gz"));
二、测试HDFS文件上传
方法一:自定义输入输出流,(比较底层的写法)
//获得filesystem对象
Configuration conf = new Configuration();
FileSystem fs= FileSystem.get(conf);
//设置输出流
Path path = new Path("hdfs://itcast01:9000/aa/words.txt");
FSDataOutputStream out = fs.create(path);
//设置输入流
FileInputStream in = new FileInputStream("/root/words.txt");
//调用IOUtils的copy方法,复制文件
IOUtils.copy(in, out);
方法二:直接调用hadoop hdfs 的copyFromLocalFile()方法(封装好的写法)
注意:如果权限问题,可以修改目录权限.
或者修改运行参数:VM arguments:-DHADOOP_USER_NAME=hadoop
或者在程序中设置:
fs =FileSystem.get(new URI(“hdfs://itcast01:9000/”),conf,”hadoop”)
三、测试HDFS创建目录
fs.mkdirs(new Path("/a/b/c"));
四、测试HDFS删除目录和文件
fs.delete(new Path(“/a”),true);
五、测试HDFS显示目录下的文件
public void listFiles() throws Exception{
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
System.out.println(listFiles.next().getPath());
}
}
sudo yum install "@Chinese Support"
? 测试类总结
说明:在hdfs中有多个管理对象用于管理不同的文件系统对应的信息。
如DistributedFileSystem,FTPFileSystem等。
FileSystem顾名思义是一个实现了文件系统的抽象类,继承自org.apache.hadoop.conf.Configured,并实现了Closeable接口,可以适用于多种文件系统,如本地文件系统file://,ftp,hdfs等
5.HADOOP的RPC机制(***必须掌握***)
? 什么是Hadoop的RPC?
Remote Procedure Call(简称:RPC):远程过程调用协议。
RPC是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
? RPC采用的模式?
RPC采用客户机/服务器模式。
请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
? hadoop的整个体系结构就是构建在RPC之上
? 不同进程间方法调用RPC例子:
ClientProtocal.java
public interface ClientProtocal {
public static final long versionID=123123l;
public String findMetaDataByName(String filename);
}
RPCServer_NameNode.java
public class RPCServer_NameNode implements ClientProtocal{
public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
Server server = new RPC.Builder(new Configuration()).setInstance(new RPCServer_NameNode()).setProtocol(ClientProtocal.class).setBindAddress("192.168.0.200").setPort(9123).build();
server.start();
}
public String findMetaDataByName(String filename){
System.out.println("正在从内存中找"+filename+"的元数据信息");
return filename+"找到后元数据信息";
}
}
PRCClient_HDFSClient.java
public class PRCClient_HDFSClient {
public static void main(String[] args) throws IOException {
ClientProtocal proxy = RPC.getProxy(ClientProtocal.class, 123123l, new InetSocketAddress("192.168.0.200", 9123), new Configuration());
String findMetaDataByName = proxy.findMetaDataByName("/words.txt");
System.out.println(findMetaDataByName);
String findMetaDataByName2 = proxy.findMetaDataByName("/words2.txt");
System.out.println(findMetaDataByName2);
RPC.stopProxy(proxy);
}
}
6.HDFS源码分析(***理解***)
? HDFS读过程
1.初始化FileSystem,然后客户端(client)用FileSystem的open()函数打开文件
2.FileSystem用RPC调用元数据节点,得到文件的数据块信息,对于每一个数据块,元数据节点返回保存数据块的数据节点的地址。
3.FileSystem返回FSDataInputStream给客户端,用来读取数据,客户端调用stream的read()函数开始读取数据。
4.DFSInputStream连接保存此文件第一个数据块的最近的数据节点,data从数据节点读到客户端(client)
5.当此数据块读取完毕时,DFSInputStream关闭和此数据节点的连接,然后连接此文件下一个数据块的最近的数据节点。
6.当客户端读取完毕数据的时候,调用FSDataInputStream的close函数。
7.在读取数据的过程中,如果客户端在与数据节点通信出现错误,则尝试连接包含此数据块的下一个数据节点。
8.失败的数据节点将被记录,以后不再连接。
? HDFS写过程
1.初始化FileSystem,客户端调用create()来创建文件
2.FileSystem用RPC调用元数据节点,在文件系统的命名空间中创建一个新的文件,元数据节点首先确定文件原来不存在,并且客户端有创建文件的权限,然后创建新文件。
3.FileSystem返回DFSOutputStream,客户端用于写数据,客户端开始写入数据。
4.DFSOutputStream将数据分成块,写入data queue。data queue由Data Streamer读取,并通知元数据节点分配数据节点,用来存储数据块(每块默认复制3块)。分配的数据节点放在一个pipeline里。Data Streamer将数据块写入pipeline中的第一个数据节点。第一个数据节点将数据块发送给第二个数据节点。第二个数据节点将数据发送给第三个数据节点。
5.DFSOutputStream为发出去的数据块保存了ack queue,等待pipeline中的数据节点告知数据已经写入成功。
6.当客户端结束写入数据,则调用stream的close函数。此操作将所有的数据块写入pipeline中的数据节点,并等待ack queue返回成功。最后通知元数据节点写入完毕。
7.如果数据节点在写入的过程中失败,关闭pipeline,将ack queue中的数据块放入data queue的开始,当前的数据块在已经写入的数据节点中被元数据节点赋予新的标示,则错误节点重启后能够察觉其数据块是过时的,会被删除。失败的数据节点从pipeline中移除,另外的数据块则写入pipeline中的另外两个数据节点。元数据节点则被通知此数据块是复制块数不足,将来会再创建第三份备份。
7.分析分布式文件系统操作类(***深度扩展***)
FileSystem是一个实现了文件系统的抽象类,继承自org.apache.hadoop.conf.Configured,并实现了Closeable接口,可以适用于多种文件系统,如本地文件系统file://,ftp,hdfs等。如果要自己实现一个系统可以通过继承这个类(hadoop中DistributeFileSystem就是这样的),做相应的配置,并实现相应的抽象方法。
? 静态管理对象
FileSystem是一个具有一般文件系统特性的类。在hdfs中有多个管理对象用于管理不同的文件系统对应的信息。
一、cache机制
在FileSystem中有一个静态cache类,内含一个 final Map<Key, FileSystem> map(为了能够快速获取FileSystem,这个map采用的是HashMap),用于存储和管理由{scheme,authority,username}作为Key,而以FileSystem引用作为值的键值对。当给定uri调用get获取指定的FileSystem时,最终还是调用cache.get。cache.get会查找相应的键值对,不存在时调用createFileSystem新建一个FileSystem并将其插入map中。当本FS关闭时,要将CACHE中对应的键值对删除。
由于同一时刻可能要多个client请求访问map,所以需要对访问map的操作进行同步synchronized(这些都要保证是thread-safe)。
当关闭FS的时候(用户手动关闭或JVM在程序运行结束,ClientFinalizer对文件系统进行关闭),首先会调用cache的closeAll方法,对map进行清空(先清空cache中其他FS的键值对,再清空本FS的键值对);然后再调用FS的processDeleteOnExit方法对一些temp目录进行清空。
二、statisticsTable统计信息管理映射表
statisticsTable是一个IdentityHashMap<Class<? extends FileSystem>, Statistics>,用于保存并管理每个文件对应的统计信息Statistics。
三、deleteOnExit临时文件集合
FileSystem中deleteOnExit用于保存所有需要在本FS关闭时或JVM退出时需要删掉的文件集合,通过调用processDeleteOnExit将这些路径进行清空
? 文件系统的信息
一、Statistics类一个FileSystem的统计信息
由于要允许并行化处理,所以Statistics中定义了2个原子变量bytesRead(读的字节数)、bytesWritten(写的字节数)。每个文件系统在刚创建时会调用initialize(URI, Configuration)将自己的statistics添加到statisticsTable中。
二、FileStatus文件元数据类(目录也可以视为文件)
FileSystem中定义了FileStatus用于存放文件和目录的元数据(Permission,是否目录,长度、备份数、块大小、最近访问时间、最近修改时间、是否为目录)。可以getFileStatus获取指定的文件的FileStatus。同时在FileStatus中实现了org.apache.hadoop.io.Writable接口,是可序列化的(之所以需要序列化,是因为我们可以把FileStatus看成一个数据类型(hadoop中数据类型是需要进行序列化,因为要在网络中传输)--保存了文件的元数据)。
三、Key
这个类用于作为一个特定文件系统的标识,由(uri,username)构成。
四、文件块类BlockLocation
|
基于备份冗余机制,一个文件块可能有多个备份,存放在不同的位置。
由于可以视为一个数据类型,故需要实现序列化
? FileSystem各种用于操作文件的方法
一、创建文件方法create:
|
用于获取一个写入文件的输入流,只支持从文件末尾进行写入。
其中提供了多个create的重载函数。
值得注意的是这个函数返回的是一个输出流FSDataOutputStream(用于将本地的输出到FS(FileSystem,下同)(即FileSystem)上),这个流继承自DataOutputStream,可以对文件进行写入。同时其内部还包含了一个静态类PositionCache(用于保存一个文件的已写入数据的统计类和写入的位置
二、读取文件的内容方法open
public abstract FSDataInputStream open(Path f, int bufferSize)throws IOException;
FileSystem的open方法返回一个FSDataInputStream。
实现了Seekable、PositionedReadable这两个接口,支持随机访问读取文件。
三、目录创建mkdirs
可以指定文件的权限或使用默认的路径。对于指定的path,会创建不存在的父目录(可以用exits来判断路径是否存在)。
四、文件的复制、移动()
FileSystem中有两种复制:copyFromLocalFile和copyToLocalFile。
文件的移动可以看成是文件的复制再将源文件删除。故FileSystem中存在两种相应的文件移动操作moveFromLocalFile和moveToLocalFile
? 抽象方法(基本的文件操作方法)
不同的文件系统有不同的特性。在FileSystem中有多个抽象方法需要继承这个类时根据其功能进行实现:
|
8.远程debug(****了解*****)
? JPDA 简介
Sun Microsystem 的 Java Platform Debugger Architecture (JPDA) 技术是一个多层架构,使您能够在各种环境中轻松调试 Java 应用程序。JPDA 由两个接口(分别是 JVM Tool Interface 和 JDI)、一个协议(Java Debug Wire Protocol)和两个用于合并它们的软件组件(后端和前端)组成。它的设计目的是让调试人员在任何环境中都可以进行调试。
更详细的介绍,您可以参考使用 Eclipse 远程调试 Java 应用程序
? JDWP 设置
JVM本身就支持远程调试,Eclipse也支持JDWP,只需要在各模块的JVM启动时加载以下参数:
dt_socket表示使用套接字传输。
address=8000
JVM在8000端口上监听请求,这个设定为一个不冲突的端口即可。
server=y
y表示启动的JVM是被调试者。如果为n,则表示启动的JVM是调试器。
suspend=y
y表示启动的JVM会暂停等待,直到调试器连接上才继续执行。suspend=n,则JVM不会暂停等待。
- 需要在$HADOOP_HOME/etc/hadoop/hadoop-env.sh文件的最后添加你想debug的进程
#远程调试namenode
export HADOOP_NAMENODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=8888,server=y,suspend=y"
#远程调试datanode
export HADOOP_DATANODE_OPTS="-agentlib:jdwp=transport=dt_socket,address=9888,server=y,suspend=y"
#远程调试RM
export YARN_RESOURCEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=10888,server=y,suspend=y"
#远程调试NM
export YARN_NODEMANAGER_OPTS="-agentlib:jdwp=transport=dt_socket,address=10888,server=y,suspend=y"
- 添加完 启动namenode 或者 datanode
- 到eclipse中搜索namenode 然后attach source
- 关联源码
- 关联源码后,记得在要调试的地方打上断点 然后搜索namenode 找到它的main方法 右键debug configuration 双击
remote Java application 修改host port 要上上述一致 点击apply
namenode
datanode
当进入到自己设置的断点处 说明调试正确 ,再根据自己的需求进行详细调试就OK啦
Hadoop Day2