首页 > 代码库 > FileSystem实例化过程
FileSystem实例化过程
HDFS案例代码
Configuration configuration = new Configuration();FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop000:8020"), configuration); InputStream in = fileSystem.open(new Path(HDFS_PATH+"/hdfsapi/test/log4j.properties"));OutputStream out = new FileOutputStream(new File("log4j_download.properties"));IOUtils.copyBytes(in, out, 4096, true); //最后一个参数表示完成拷贝之后关闭输入/出流
FileSystem.java
static final Cache CACHE = new Cache();public static FileSystem get(URI uri, Configuration conf) throws IOException { String scheme = uri.getScheme(); //hdfs String authority = uri.getAuthority(); //hadoop000:8020 return CACHE.get(uri, conf);}FileSystem get(URI uri, Configuration conf) throws IOException{ Key key = new Key(uri, conf); return getInternal(uri, conf, key);}private FileSystem getInternal(URI uri, Configuration conf, Key key) throws IOException{ FileSystem fs; synchronized (this) { fs = map.get(key); } //根据URI取得一个FileSystem实例,如果允许缓存,会中从缓存中取出,否则将调用createFileSystem创建一个新实例 if (fs != null) { return fs; } fs = createFileSystem(uri, conf); synchronized (this) { FileSystem oldfs = map.get(key); ... //放入到CACHE中秋 return fs; }}private static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException { Class<?> clazz = getFileSystemClass(uri.getScheme(), conf); // 返回的是:org.apache.hadoop.hdfs.DistributedFileSystem FileSystem fs = (FileSystem)ReflectionUtils.newInstance(clazz, conf); fs.initialize(uri, conf); //初始化DistributedFileSystem return fs;}public static Class<? extends FileSystem> getFileSystemClass(String scheme,Configuration conf) throws IOException { if (!FILE_SYSTEMS_LOADED) { //文件系统是否被加载过,刚开始时为false loadFileSystems(); } Class<? extends FileSystem> clazz = null; if (conf != null) { clazz = (Class<? extends FileSystem>) conf.getClass("fs." + scheme + ".impl", null); //fs.hdfs.impl ,此时我们并没有在core-default.xml和core-site.xml中配置该属性 } if (clazz == null) { clazz = SERVICE_FILE_SYSTEMS.get(scheme); //class org.apache.hadoop.hdfs.DistributedFileSystem } if (clazz == null) { throw new IOException("No FileSystem for scheme: " + scheme); } return clazz;}private static void loadFileSystems() { synchronized (FileSystem.class) { if (!FILE_SYSTEMS_LOADED) { ServiceLoader<FileSystem> serviceLoader = ServiceLoader.load(FileSystem.class); for (FileSystem fs : serviceLoader) { SERVICE_FILE_SYSTEMS.put(fs.getScheme(), fs.getClass()); } FILE_SYSTEMS_LOADED = true; //标识为已经从系统中加载过 } }}
loadFileSystems后SERVICE_FILE_SYSTEMS存在如下值:
file=class org.apache.hadoop.fs.LocalFileSystem, ftp=class org.apache.hadoop.fs.ftp.FTPFileSystem, hdfs=class org.apache.hadoop.hdfs.DistributedFileSystem, hftp=class org.apache.hadoop.hdfs.web.HftpFileSystem, webhdfs=class org.apache.hadoop.hdfs.web.WebHdfsFileSystem, s3n=class org.apache.hadoop.fs.s3native.NativeS3FileSystem, viewfs=class org.apache.hadoop.fs.viewfs.ViewFileSystem, swebhdfs=class org.apache.hadoop.hdfs.web.SWebHdfsFileSystem, har=class org.apache.hadoop.fs.HarFileSystem, s3=class org.apache.hadoop.fs.s3.S3FileSystem, hsftp=class org.apache.hadoop.hdfs.web.HsftpFileSystem
DistributedFileSystem.java
DFSClient dfs; //重点属性:客户端与服务端交互操作需要先拿到DFSClient@Overridepublic void initialize(URI uri, Configuration conf) throws IOException { super.initialize(uri, conf); setConf(conf); String host = uri.getHost(); //hadoop000 this.dfs = new DFSClient(uri, conf, statistics); this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority()); this.workingDir = getHomeDirectory();}
DFSClient.java
final ClientProtocol namenode; //重点属性:客户端与NameNode通信的PRC接口public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats)throws IOException { NameNodeProxies.ProxyAndInfo<ClientProtocol> proxyInfo = NameNodeProxies.createProxy(conf, nameNodeUri,ClientProtocol.class); this.dtService = proxyInfo.getDelegationTokenService(); this.namenode = proxyInfo.getProxy(); //org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB}
NameNodeProxies.java
public static <T> ProxyAndInfo<T> createProxy(Configuration conf, URI nameNodeUri, Class<T> xface) throws IOException { Class<FailoverProxyProvider<T>> failoverProxyProviderClass = getFailoverProxyProviderClass(conf, nameNodeUri, xface); return createNonHAProxy(conf, NameNode.getAddress(nameNodeUri), xface,UserGroupInformation.getCurrentUser(), true);}public static <T> ProxyAndInfo<T> createNonHAProxy(Configuration conf, InetSocketAddress nnAddr, Class<T> xface, UserGroupInformation ugi, boolean withRetries) throws IOException { Text dtService = SecurityUtil.buildTokenService(nnAddr); T proxy; if (xface == ClientProtocol.class) { proxy = (T) createNNProxyWithClientProtocol(nnAddr, conf, ugi,withRetries); } ... return new ProxyAndInfo<T>(proxy, dtService);}private static ClientProtocol createNNProxyWithClientProtocol( InetSocketAddress address, Configuration conf, UserGroupInformation ugi,boolean withRetries) throws IOException { //Client与NameNode的RPC交互接口 final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class); ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy( ClientNamenodeProtocolPB.class, version, address, ugi, conf, NetUtils.getDefaultSocketFactory(conf), org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy) .getProxy(); if (withRetries) { //使用jdk的动态代理创建实例 proxy = (ClientNamenodeProtocolPB) RetryProxy.create( ClientNamenodeProtocolPB.class,new DefaultFailoverProxyProvider<ClientNamenodeProtocolPB>( ClientNamenodeProtocolPB.class, proxy),methodNameToPolicyMap,defaultPolicy); } return new ClientNamenodeProtocolTranslatorPB(proxy);}
RetryProxy.java
public static <T> Object create(Class<T> iface,FailoverProxyProvider<T> proxyProvider, RetryPolicy retryPolicy) { return Proxy.newProxyInstance( proxyProvider.getInterface().getClassLoader(), new Class<?>[] { iface }, new RetryInvocationHandler<T>(proxyProvider, retryPolicy) );}
获取FileSystem实例源码分析总结:
1、FileSystem.get通过反射实例化了一个DistributedFileSystem;
2、DistributedFileSystem中new DFSCilent()把他作为自己的成员变量;
3、在DFSClient构造方法里面,调用了createProxy使用RPC机制得到了一个NameNode的代理对象,就可以和NameNode进行通信;
4、整个流程:FileSystem.get()--> DistributedFileSystem.initialize() --> DFSClient(RPC.getProtocolProxy()) --> NameNode的代理。
FileSystem实例化过程
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。