首页 > 代码库 > hadoop java接口及常用api
hadoop java接口及常用api
# java接口及常用api
package com.yting.hadoop.hdfs;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
/**
* 使用FileSystem
*
* eclipse创建方法快捷键 Shift+Alt+M
* eclipse创建局部变量名称快捷键 Shift+Alt+L
*
*/
public class HDFSJavaOperation {
public static final String HDFS_PATH = "hdfs://hadoop0:9000";
public static final String DIR_PATH = "/d1000";
public static final String FILE_PATH = "/d1000/f1000";
public static void main(String[] args) throws Exception {
final FileSystem fileSystem = FileSystem.get(new URI(HDFS_PATH), new Configuration());
//创建文件夹
//makeDirectory(fileSystem);
//上次文件
uploadData(fileSystem);
//下载文件
//downloadData(fileSystem);
//删除文件(夹)
//deleteFile(fileSystem);
}
private static void deleteFile(final FileSystem fileSystem)
throws IOException {
fileSystem.delete(new Path(FILE_PATH), true);
}
private static void downloadData(final FileSystem fileSystem)
throws IOException {
final FSDataInputStream in = fileSystem.open(new Path(FILE_PATH));
IOUtils.copyBytes(in, System.out, 1024, true);
}
private static void makeDirectory(final FileSystem fileSystem)
throws IOException {
fileSystem.mkdirs(new Path(DIR_PATH));
}
private static void uploadData(final FileSystem fileSystem)
throws IOException, FileNotFoundException {
final FSDataOutputStream out = fileSystem.create(new Path(FILE_PATH));
final FileInputStream in = new FileInputStream("c:/log.txt");
IOUtils.copyBytes(in, out, 1024, true);
}
}
# ---------------------------加深拓展----------------------
# RPC调用
Client发起调用请求,请求调用Server端的对象的方法
# MyRpcServer类
package com.yting.hadoop.rpc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
public class MyRpcServer {
public static String BIND_ADDRESS = "localhost"; // 绑定地址
public static int PORT = 1129; // 绑定端口
/** Construct an RPC server. 构造一个RPC的Server
* @param instance the instance whose methods will be called 实例中的方法被客户端调用的实例
* @param conf the configuration to use 使用的配置
* @param bindAddress the address to bind on to listen for connection 绑定的地址用于监听链接的到来
* @param port the port to listen for connections on 端口也是用于监听链接的到来
* @throws Exception
*/
public static void main(String[] args) throws Exception {
MyInstance myInstance = new MyInstanceImpl();
final Server server = RPC.getServer(myInstance, BIND_ADDRESS, PORT, new Configuration());
server.start();
}
}
# MyRpcClient类
package com.yting.hadoop.rpc;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
public class MyRpcClient {
public static void main(String[] args) throws Exception {
/** Construct a client-side proxy object that implements the named protocol,
* talking to a server at the named address. */
/*
* Class<? extends VersionedProtocol> protocol,
* long clientVersion,
* InetSocketAddress addr,
* Configuration conf
*/
MyInstance proxy = (MyInstance) RPC.waitForProxy(MyInstance.class, MyInstance.versionID, new InetSocketAddress(MyRpcServer.BIND_ADDRESS, MyRpcServer.PORT), new Configuration());
String retVal = proxy.hello("world");
System.out.println("客户端调用结果:" + retVal);
RPC.stopProxy(proxy);
}
}
# MyInstance接口
package com.yting.hadoop.rpc;
import org.apache.hadoop.ipc.VersionedProtocol;
public interface MyInstance extends VersionedProtocol {
public static final long versionID = 1234567L;
public abstract String hello(String name);
}
# MyInstanceImpl 实现
package com.yting.hadoop.rpc;
import java.io.IOException;
public class MyInstanceImpl implements MyInstance{
/* (non-Javadoc)
* @see com.yting.hadoop.rpc.MyInstance#hello(java.lang.String)
*/
@Override
public String hello(String name) {
System.out.println("我被调用了、、、");
return "hello" + name;
}
@Override
public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
return MyInstance.versionID;
}
}
# 运行结果
客户端的运行结果
客户端调用结果:helloworld
服务端的运行结果
14/03/02 15:35:42 INFO ipc.Server: Starting SocketReader
14/03/02 15:35:42 INFO ipc.Server: IPC Server Responder: starting
14/03/02 15:35:42 INFO ipc.Server: IPC Server listener on 1129: starting
14/03/02 15:35:42 INFO ipc.Server: IPC Server handler 0 on 1129: starting
我被调用了、、、
# 结论
1、RPC 实际上就是RPC远程过程调用
2、被调用的对象位于服务端,并且这个对象必须有接口(jdk反射要求),实现VersionedProtocol(api要求)
3、客户端调用的对象中的方法必须位于接口中
4、在本地运行jps看看
由此可以推断出hadoop中启动的5个进程,也就是RPC的服务端
# HDFS的分布式存储架构的源码分析
# HDFS的高可靠
Fsimage备份
Secondarynamenode
# edits文件可以没有么?(必须有)下面是一个例子
F1 start transfer
F1 block1 shuff
F1 end transfer
Edits文件仅仅记录操作日志(确保事务的正确性)
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。