首页 > 代码库 > Storm中的LocalState 代码解析
Storm中的LocalState 代码解析
官方的解释这个类为:
/** * A simple, durable, atomic K/V database. *Very inefficient*, should only be * used for occasional reads/writes. Every read/write hits disk. */
简单来理解就是这个类每次读写都会将一个Map<Object, Object>的对象序列化存储到磁盘中,读的时候将其反序列化。
构造函数指定的参数就是你在磁盘中存储的目录,同时也作为VersionedStore的构造函数的参数。
这些文件在目录中是以一个long类型的id进行命名
public LocalState(String backingDir) throws IOException { _vs = new VersionedStore(backingDir); }
snapshot函数,找到最近的版本,将其反序列化
public synchronized Map<Object, Object> snapshot() throws IOException { int attempts = 0; while (true) { String latestPath = _vs.mostRecentVersionPath(); //获取最近的版本 if (latestPath == null) return new HashMap<Object, Object>(); try { return (Map<Object, Object>) Utils.deserialize(FileUtils .readFileToByteArray(new File(latestPath))); } catch (IOException e) { attempts++; if (attempts >= 10) { throw e; } } } }
public Object get(Object key) throws IOException { return snapshot().get(key); } public synchronized void put(Object key, Object val) throws IOException { put(key, val, true); } public synchronized void put(Object key, Object val, boolean cleanup) throws IOException { Map<Object, Object> curr = snapshot(); curr.put(key, val); persist(curr, cleanup); //persist会将其写入到磁盘中 } public synchronized void remove(Object key) throws IOException { remove(key, true); } public synchronized void remove(Object key, boolean cleanup) throws IOException { Map<Object, Object> curr = snapshot(); curr.remove(key); persist(curr, cleanup); } public synchronized void cleanup(int keepVersions) throws IOException { _vs.cleanup(keepVersions); }
可以看到,基本暴露的接口都通过synchronized关键字来保证串行化的操作,同时多次调用了以下的persist方法,
private void persist(Map<Object, Object> val, boolean cleanup) throws IOException { byte[] toWrite = Utils.serialize(val); String newPath = _vs.createVersion(); //创建一个新的版本号 FileUtils.writeByteArrayToFile(new File(newPath), toWrite); _vs.succeedVersion(newPath); //如果写入成功,那么会生成 id.version 文件来声明该文件写入成功 if (cleanup) _vs.cleanup(4); //默认保留4个版本 }
接下来看看VersionedStore这个类,它是进行实际存储操作的类,提供了接口给LocalState
public void succeedVersion(String path) throws IOException { long version = validateAndGetVersion(path); //验证一下这个文件是否存在 // should rewrite this to do a file move createNewFile(tokenPath(version)); //创建对应的 id.version 文件说明写入成功 }
path的值是一个long类型的id,表示对应的文件
private long validateAndGetVersion(String path) { Long v = parseVersion(path); if (v == null) throw new RuntimeException(path + " is not a valid version"); return v; }
//解析出版本号,如果以.version结尾的,去掉.version
private Long parseVersion(String path) { String name = new File(path).getName(); if (name.endsWith(FINISHED_VERSION_SUFFIX)) { name = name.substring(0, name.length() - FINISHED_VERSION_SUFFIX.length()); } try { return Long.parseLong(name); } catch (NumberFormatException e) { return null; } }
createNewFile(tokenPath(version)); //创建对应的 id.version 文件说明写入成功
token file就是一种标志文件,用于标志对应的文件已经写入成功,以.version 结尾
private String tokenPath(long version) { return new File(_root, "" + version + FINISHED_VERSION_SUFFIX) .getAbsolutePath(); }
private void createNewFile(String path) throws IOException { new File(path).createNewFile(); }
cleanup函数,保留versionsToKeep版本,清除其他的版本
public void cleanup(int versionsToKeep) throws IOException { List<Long> versions = getAllVersions(); //获取所有的版本,这个返回的是以倒序排列的,最新的版本在最前面 if (versionsToKeep >= 0) { versions = versions.subList(0, Math.min(versions.size(), versionsToKeep)); //所以可以用subList来得到需要的版本 } HashSet<Long> keepers = new HashSet<Long>(versions); //存在HashSet中方便快速存取 for (String p : listDir(_root)) { Long v = parseVersion(p); if (v != null && !keepers.contains(v)) { deleteVersion(v); //删除其他的版本 } } }
getAllVersions,注意这里是获取所有以version结尾的文件,也就是说所有写入成功的文件,不包括某些还没写成功的文件
/** * Sorted from most recent to oldest */ public List<Long> getAllVersions() throws IOException { List<Long> ret = new ArrayList<Long>(); for (String s : listDir(_root)) { //获取该目录下的所有文件 if (s.endsWith(FINISHED_VERSION_SUFFIX)) { ret.add(validateAndGetVersion(s)); //验证该文件是否存在 } } Collections.sort(ret); Collections.reverse(ret); //逆序排列 return ret; }
删除对应的version文件和token文件
public void deleteVersion(long version) throws IOException { File versionFile = new File(versionPath(version)); File tokenFile = new File(tokenPath(version)); if (versionFile.exists()) { FileUtils.forceDelete(versionFile); } if (tokenFile.exists()) { FileUtils.forceDelete(tokenFile); } }
在最开始的地方,snapshot()函数调用了 mostRecentVersionPath() 来获取最近的版本,也就是调用getAllVersions,然后拿到最新的version
public String mostRecentVersionPath() throws IOException { Long v = mostRecentVersion(); if (v == null) return null; return versionPath(v); }
public Long mostRecentVersion() throws IOException { List<Long> all = getAllVersions(); if (all.size() == 0) return null; return all.get(0); }
如果提供了version号的话,可以看到是取出了比这个version号小的最大的version
public String mostRecentVersionPath(long maxVersion) throws IOException { Long v = mostRecentVersion(maxVersion); if (v == null) return null; return versionPath(v); }
public Long mostRecentVersion(long maxVersion) throws IOException { List<Long> all = getAllVersions(); for (Long v : all) { if (v <= maxVersion) //取出比maxVersion小的最大version return v; } return null; }
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。