首页 > 代码库 > hadoop java接口及常用api

hadoop java接口及常用api

# java接口及常用api
package com.yting.hadoop.hdfs;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

/**
 * 使用FileSystem
 * 
 * eclipse创建方法快捷键 Shift+Alt+M  
 * eclipse创建局部变量名称快捷键 Shift+Alt+L
 *
 */
public class HDFSJavaOperation {
public static final String HDFS_PATH = "hdfs://hadoop0:9000";
public static final String DIR_PATH = "/d1000";
public static final String FILE_PATH = "/d1000/f1000";

public static void main(String[] args) throws Exception {
final FileSystem fileSystem = FileSystem.get(new URI(HDFS_PATH), new Configuration());

//创建文件夹
//makeDirectory(fileSystem);

//上次文件
uploadData(fileSystem);

//下载文件  
//downloadData(fileSystem);

//删除文件(夹)
//deleteFile(fileSystem);
}

private static void deleteFile(final FileSystem fileSystem)
throws IOException {
fileSystem.delete(new Path(FILE_PATH), true);
}

private static void downloadData(final FileSystem fileSystem)
throws IOException {
final FSDataInputStream in = fileSystem.open(new Path(FILE_PATH));
IOUtils.copyBytes(in, System.out, 1024, true);
}

private static void makeDirectory(final FileSystem fileSystem)
throws IOException {
fileSystem.mkdirs(new Path(DIR_PATH));
}

private static void uploadData(final FileSystem fileSystem)
throws IOException, FileNotFoundException {
final FSDataOutputStream out = fileSystem.create(new Path(FILE_PATH));

final FileInputStream in = new FileInputStream("c:/log.txt");
IOUtils.copyBytes(in, out, 1024, true);
}

# ---------------------------加深拓展----------------------
# RPC调用
Client发起调用请求,请求调用Server端的对象的方法

# MyRpcServer类
package com.yting.hadoop.rpc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;

public class MyRpcServer {
public static String BIND_ADDRESS = "localhost";   // 绑定地址
public static int PORT = 1129;                    // 绑定端口

/** Construct an RPC server. 构造一个RPC的Server
     * @param instance the instance whose methods will be called 实例中的方法被客户端调用的实例
     * @param conf the configuration to use 使用的配置
     * @param bindAddress the address to bind on to listen for connection 绑定的地址用于监听链接的到来
     * @param port the port to listen for connections on 端口也是用于监听链接的到来
* @throws Exception 
     */
public static void main(String[] args) throws Exception {
MyInstance myInstance = new MyInstanceImpl();
final Server server = RPC.getServer(myInstance, BIND_ADDRESS, PORT, new Configuration());
server.start();
}
}
# MyRpcClient类
package com.yting.hadoop.rpc;

import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;

public class MyRpcClient {
public static void main(String[] args) throws Exception {
/** Construct a client-side proxy object that implements the named protocol,
  * talking to a server at the named address. */
/*
* Class<? extends VersionedProtocol> protocol,
    * long clientVersion,
    * InetSocketAddress addr,
    * Configuration conf
*/
MyInstance proxy = (MyInstance) RPC.waitForProxy(MyInstance.class, MyInstance.versionID, new InetSocketAddress(MyRpcServer.BIND_ADDRESS, MyRpcServer.PORT), new Configuration());
String retVal = proxy.hello("world");
System.out.println("客户端调用结果:" + retVal);
RPC.stopProxy(proxy);
}
}
# MyInstance接口
package com.yting.hadoop.rpc;

import org.apache.hadoop.ipc.VersionedProtocol;

public interface MyInstance extends VersionedProtocol {
public static final long versionID = 1234567L;

public abstract String hello(String name);

}
# MyInstanceImpl 实现
package com.yting.hadoop.rpc;

import java.io.IOException;

public class MyInstanceImpl  implements MyInstance{
/* (non-Javadoc)
* @see com.yting.hadoop.rpc.MyInstance#hello(java.lang.String)
*/
@Override
public String hello(String name) {
System.out.println("我被调用了、、、");
return "hello" + name;
}

@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return MyInstance.versionID;
}
}
# 运行结果
客户端的运行结果
客户端调用结果:helloworld
服务端的运行结果
14/03/02 15:35:42 INFO ipc.Server: Starting SocketReader
14/03/02 15:35:42 INFO ipc.Server: IPC Server Responder: starting
14/03/02 15:35:42 INFO ipc.Server: IPC Server listener on 1129: starting
14/03/02 15:35:42 INFO ipc.Server: IPC Server handler 0 on 1129: starting
我被调用了、、、
# 结论
1、RPC 实际上就是RPC远程过程调用
2、被调用的对象位于服务端,并且这个对象必须有接口(jdk反射要求),实现VersionedProtocol(api要求)
3、客户端调用的对象中的方法必须位于接口中
4、在本地运行jps看看

由此可以推断出hadoop中启动的5个进程,也就是RPC的服务端
# HDFS的分布式存储架构的源码分析
# HDFS的高可靠
Fsimage备份
Secondarynamenode
# edits文件可以没有么?(必须有)下面是一个例子
F1 start transfer
F1 block1 shuff
F1 end transfer
Edits文件仅仅记录操作日志(确保事务的正确性)