首页 > 代码库 > Berkeley DB分布式探索

Berkeley DB分布式探索

明天回家就没有网络,今晚就将整个编写过程记录下来。顺带整理思路以解决未能解决的问题。

标题有点托大,想将Berkeley DB做成分布式存储,感觉很高端的样子,实际上就是通过ssh将Berkeley DB备份到其他网络地址上,查询的时候向多台电脑发送查询请求并返回结果,仅此而已。现在属于编写阶段,如果效果好会考虑用来做web项目的数据库也不一定。

还是这个图:

首先是数据来源:数据来源自己产生。现阶段是在本机产生随机数,实际应用是利用socket传数据进来程序。

在插入数据之前首先读取一些配置信息,包括节点名称,节点硬盘剩余量,节点环境个数,并按节点环境个数进行升序排序。这里个人暂时将数据存放在mysql中。

读取配置信息代码:

 1 package com.Common; 2  3 import java.sql.ResultSet; 4 import java.sql.SQLException; 5 import java.util.ArrayList; 6 import java.util.List; 7  8 import com.MySQL.MySQLDB; 9 import com.config.Config;10 import com.config.dbEnvironment;11 12 public class InitConfig {13 14     15     public InitConfig(){16         17         18     }19     20     //初始化21     public Config getConfig() throws SQLException{22         23         String sql = "select * from conf order by EnvironmentCount asc";        //升序查询,第一个为最优24         MySQLDB db = new MySQLDB();25         db.SetSQL(sql);26         ResultSet rs = db.excuteQuerry();27         Config conf = new Config();28         29         if( rs.next() ){30             31             32             String node = rs.getString("node");33             String d = rs.getString("d");34             String e = rs.getString("e");35             String g = rs.getString("g");36             37             String dr = rs.getString("dr");38             String er = rs.getString("er");39             String gr = rs.getString("gr");40             41             int environmentcount = rs.getInt("environmentcount");42             43             conf.setNode(node);44             45             conf.setD(d);46             conf.setE(e);47             conf.setG(g);48             49             conf.setDr(dr);50             conf.setEr(er);51             conf.setGr(gr);52             conf.setEnvironmentcount(environmentcount);53             54         }55         return conf;56         57     }58     59     public List<dbEnvironment> getEnvironment() throws SQLException{60         61         String sql = "select * from dbEnvironment";        //升序查询,第一个为最优62         MySQLDB db = new MySQLDB();63         db.SetSQL(sql);64         ResultSet rs = db.excuteQuerry();65         66         List<dbEnvironment> dbes = new ArrayList<dbEnvironment>();67         68         while(rs.next()){69             dbEnvironment dbe = new dbEnvironment();70             dbe.setEnvname(rs.getString("envname"));71             dbe.setEnvpath(rs.getString("path"));72             73             dbes.add(dbe);74             75         }76         77         return dbes;78     }79     80     81     82 }

 

上面利用了一个config类,config类封装大多数信息。以上有一点比较值得注意的是,linux环境下数据库表的属性不一样。参照下面自己做的一个分析

windows下数据库设计:所有相关都以该文件变量为准conf节点        D盘容量    E盘容量    G盘容量    D剩余        E剩余        G剩余        环境个数node    d        e        g        dr        er        gr        EnvironmentCountexample:node1    200G    200G    200G    100G    100G    100G    3dbEnvironment环境        路径env        pathexample:env1    node1:/G:/dbenv1---------------------------------------------------------------------------Linux下数据库设计:主要是路径的问题,linux路径没有windows这么麻烦,只要提供环境的名称即可conf节点        总容量        剩余容量    环境个数node    total    rest    EnvironmentCountexample:node1    200G    100G    3dbEnvironment表不用设置

 

config类封装配置信息

 1 package com.config; 2  3 public class Config { 4      5     private String node; 6     private String d; 7     private String e; 8     private String g; 9     private String dr;10     private String er;11     private String gr;12     private int EnvironmentCount;13     public String getNode() {14         return node;15     }16     public void setNode(String node) {17         this.node = node;18     }19     public String getD() {20         return d;21     }22     public void setD(String d) {23         this.d = d;24     }25     public String getE() {26         return e;27     }28     public void setE(String e) {29         this.e = e;30     }31     public String getG() {32         return g;33     }34     public void setG(String g) {35         this.g = g;36     }37     public String getDr() {38         return dr;39     }40     public void setDr(String dr) {41         this.dr = dr;42     }43     public String getEr() {44         return er;45     }46     public void setEr(String er) {47         this.er = er;48     }49     public String getGr() {50         return gr;51     }52     public void setGr(String gr) {53         this.gr = gr;54     }55     public int getEnvironmentcount() {56         return EnvironmentCount;57     }58     public void setEnvironmentcount(int environmentcount2) {59         this.EnvironmentCount = environmentcount2;60     }61     62     63 }

 

上面做了这么多实际上仅仅是冰山一角而已。从上面获得了conf类,里面包含了节点信息,节点硬盘使用信息和环境个数。供给后续部队使用

可以看看入口在哪里

  1 package com.entry;  2   3 import java.io.BufferedWriter;  4 import java.io.File;  5 import java.io.IOException;  6 import java.io.OutputStreamWriter;  7 import java.io.PrintWriter;  8 import java.lang.management.ManagementFactory;  9 import java.lang.management.RuntimeMXBean; 10 import java.net.Socket; 11 import java.net.UnknownHostException; 12 import java.sql.ResultSet; 13 import java.sql.SQLException; 14 import java.util.ArrayList; 15 import java.util.List; 16  17 import com.config.*; 18 import com.BerkeleyDB.BDBInsert; 19 import com.BerkeleyDB.BackUp; 20 import com.Common.InitConfig; 21 import com.MySQL.MySQLDB; 22 import com.sleepycat.je.Database; 23 import com.sleepycat.je.DatabaseConfig; 24 import com.sleepycat.je.Environment; 25 import com.sleepycat.je.EnvironmentConfig; 26  27 public class Entry { 28      29     //将类通用的环境,数据库,文件路径,打开数据库等都提取出来,然后作为参数传进去就行了 30      31     private static Environment dbEnvironment = null; 32     private static Database db = null; 33     private static File des = null; 34     private static String node = null;        //给备份类使用 35     private static String envname = null;    //环境名称,备份类使用 36     private static String envpath = "d:/dbEnv/";        //一直使用这个环境 37      38     //OpenBDB 39          40     public static  void OpenBDB(){ 41         EnvironmentConfig envConfig = new EnvironmentConfig(); 42          43         envConfig.setAllowCreate(true); 44         envConfig.setCacheSize(32*1024*1024); 45         envConfig.setLocking(false); 46         //envConfig.setLockingVoid(false); 47          48          49         envConfig.setAllowCreate(true); 50         dbEnvironment = new Environment( new File(envpath), envConfig ); 51          52          DatabaseConfig dbConfig = new DatabaseConfig(); 53           54          dbConfig.setAllowCreate(true); 55          dbConfig.setSortedDuplicates(true); 56          dbConfig.setDeferredWrite(true); 57          db = dbEnvironment.openDatabase(null,"BDB", dbConfig);  58           59          System.out.println("打开数据库成功"); 60     } 61      62     //CloseBDB 63          64     public static void CloseBDB(){ 65         if(db != null){ 66             db.close(); 67         } 68         if(dbEnvironment != null){ 69             dbEnvironment.close(); 70         } 71     } 72     private static int getPid(){ 73              74        RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); 75         String name = runtime.getName(); // format: "pid@hostname" 76         try { 77             return Integer.parseInt(name.substring(0, name.indexOf(‘@‘))); 78         } catch (Exception e) { 79             return -1; 80         } 81                  82     } 83     //将信息通过socket发送到ListenBerkeleyBDB 84     public static void SendMessage() throws UnknownHostException, IOException{ 85          86         Socket socket = new Socket("localhost",20141); 87         PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())), true); 88  89          90         out.println(Entry.getPid()); 91         out.println(node); 92          93         out.close(); 94         socket.close(); 95     } 96  97     //要确定db所在节点,确定db环境的名称 98      99     public static void Init() throws SQLException{100         Config conf = new InitConfig().getConfig();101         //确定节点所在位置102         node = conf.getNode();103         int envcount = conf.getEnvironmentcount();104         envname = "dbEnv" + Integer.valueOf(envcount+1).toString() + "/";105         106     }107     108     public static void main(String args[]) throws UnknownHostException, IOException{109         110         111         //环境默认为d:/dbenv112         113         Entry.SendMessage();        //跟负责监听本线程的ListenThread进程对接,将pid发给他114         Entry.OpenBDB();            //打开BDB115         116         new BDBInsert(db,dbEnvironment).start();        //启动插入,117         new BackUp(db,dbEnvironment,node,envname).start();    //启动备份,需要备份目标的相关信息,目标节点,目标环境名称118         119         //插入数据的基本结构算完了120         121         122         123         124     }125 126 }

 

从主函数看,Entry.SendMessage()是通过socket向本地的监听port的服务器端程序发送pid和目标节点的名称。该服务器端程序其实是一个监听进程,监听我们的Entry是否挂掉。当然他还有其他的任务,等分析到这个服务器进程的时候再来一览。

OpenBDB不用多说,相对前两篇文章来说出了把openbdb提取出来供大家使用之外没有其他不同。后面的BDBInsert和BackUp为插入数据的线程和备份数据的线程。

OK,可以看看监听进程了

监听进程:

  1 package com.ListenThread;  2   3 import java.io.BufferedReader;  4 import java.io.File;  5 import java.io.IOException;  6 import java.io.InputStream;  7 import java.io.InputStreamReader;  8 import java.net.ServerSocket;  9 import java.net.Socket; 10 import java.sql.ResultSet; 11 import java.sql.SQLException; 12 import java.util.Timer; 13 import java.util.TimerTask; 14  15 import com.MySQL.MySQLDB; 16 import com.sleepycat.je.Database; 17 import com.sleepycat.je.Environment; 18  19  20 public class ListenBerkeleyBDB { 21      22      23     public static void main(String args[]) throws IOException{ 24          25         int port = 20141; 26          27         ServerSocket serversocket = new ServerSocket(port); 28          29         //我只等待一次 30         Socket socket = serversocket.accept(); 31          32         BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream())); 33          34         String pid = in.readLine(); 35         String desnode = in.readLine(); 36          37         socket.close(); 38         serversocket.close(); 39          40         MyTask myTask = new MyTask(pid,desnode); 41             42         Timer timer = new Timer(); 43                 44         timer.schedule(myTask, 1000, 2*6*10000);    //一分钟监听一次 45     } 46  47 } 48  49  50  51 class MyTask extends TimerTask{ 52  53      54     //通过文件流来拷贝数据 55     private static String pid; 56     private static String desnode; 57     public MyTask(String pid,String node){ 58         this.pid = pid; 59         this.desnode = node; 60     } 61     //监听BDB进程 62     public static boolean ListenToBDB() throws IOException{ 63          64         Process p = Runtime.getRuntime().exec("tasklist"); 65          66         BufferedReader br = new BufferedReader(new InputStreamReader(p 67                 .getInputStream())); 68         String line = null; 69         while((line = br.readLine())!=null){ 70             if(line.contains(pid)){ 71  72                 System.out.println("当前进程没有挂"); 73                 return false; 74             } 75         } 76         return true; 77     } 78     //监听自己硬盘,监听对方硬盘 79     public static boolean ListenToHardDisk() throws SQLException, IOException{ 80         if(getNodeRestHardDisk() < 50){        //node的硬盘小于50G,用50作为临界点 81             return true; 82         } 83         if(getMyRestHardDisk()<50){ 84             return true; 85         } 86         return false; 87     } 88     //处理进程挂掉问题 89     public static void doWithBerkeleyDBException(){ 90          91         Reboot(); 92     } 93     //处理硬盘问题 94     public static void doWithHardDiskException() throws SQLException{ 95          96         Reboot(); 97     } 98      99     //监听自己的HardDisk,本身硬盘容量可以绝对准确的获取到100     public static long getMyRestHardDisk() throws IOException{101         StringBuffer sb=new StringBuffer();  102         File[] roots = File.listRoots(); 103         long result = 0;104         for (File file : roots) {  105              long usableSpace=file.getUsableSpace(); 106              result = result + usableSpace;107         }           108         return result;109     }110     111     //监听节点HardDisk112     public static int getNodeRestHardDisk() throws SQLException{113         String sql = "select rest from conf where node = \"" + desnode;114         MySQLDB db = new MySQLDB();115         116         db.SetSQL(sql);117         ResultSet rs = db.excuteQuerry();118         119         int rest = 0;120         121         if(rs.next()){122             rest = rs.getInt("rest");123         }124         125         return rest;126     }127     public static void Reboot(){128         129         //暂时决定将打包成jar的文件用runtime去执行。不可能用runtime去执行Eclipse程序130         131         System.out.println("重启,待处理");132     }133     public MyTask(Database db, Environment dbEnvironment){134         135         136     }137     138     139     @Override140     public void run() {141         // TODO Auto-generated method stub142         //监听硬盘和进程并处理异常143         try {144             if(ListenToBDB()){145                 doWithBerkeleyDBException();146             }147         } catch (IOException e) {148             // TODO Auto-generated catch block149             e.printStackTrace();150         }151         try {152             if(ListenToHardDisk()){153                 doWithHardDiskException();154             }155         } catch (SQLException e) {156             // TODO Auto-generated catch block157             e.printStackTrace();158         } catch (IOException e) {159             // TODO Auto-generated catch block160             e.printStackTrace();161         }162         163         164         165         166     }167 }

 

main首先监听端口,等待连接,获取连接的数据,我们的Entry.SendMessage传数据过来就是给上面使用的,node和pid。通过pid我们可以看看进程是否挂了,通过node我们可以获取数据库中的剩余磁盘大小。

监听线程的作用就是处理数据插入过程中的异常,这种异常包括自己硬盘使用完了,node硬盘使用完了,Entry挂了三种情况。node使用这个问题有点小纠结。到底是在本机上看好还是在node上看好。在node上看的话还得定时给上面的程序发送数据。在本机上看的话数据可能不准确,所以就设置了一下,在本地上看,如果剩余磁盘空间小于50GB,那我就有理由认为node硬盘快不行了。强行切换一下。至于硬盘剩余数据量的问题,从mysql中读取就行了。Berkeley DB备份的时候修改一下mysql数据库中的剩余容量。至于pid的问题,tasklist看下pid是否存在就行了。

监听线程大概就做这么多,感觉这里写监听写了挺久,有两三个个小时了。现在想想也简单。

数据库插入就比较简单,之前的两篇记录也有

数据库备份,这里使用的是备份类备份。通过scp拷贝到目标节点

数据库备份:

package com.BerkeleyDB;import java.io.File;import java.io.FileInputStream;import java.io.FileNotFoundException;import java.io.FileOutputStream;import java.io.IOException;import java.nio.channels.FileChannel;import java.util.Timer;import java.util.TimerTask;import com.sleepycat.je.CheckpointConfig;import com.sleepycat.je.Database;import com.sleepycat.je.DatabaseConfig;import com.sleepycat.je.Environment;import com.sleepycat.je.EnvironmentConfig;import com.sleepycat.je.util.DbBackup;public class BackUp extends Thread{            private Environment dbEnvironment = null;    private Database db = null;    private String desnode = null;    private String desdbenvname = null;        public BackUp(Database db, Environment dbEnvironment,String desnode,String desdbenvname) {        // TODO Auto-generated constructor stub        this.db = db;        this.dbEnvironment = dbEnvironment;        this.desnode = desnode;        this.desdbenvname = desdbenvname;    }    //线程入口    public void run(){        MyTask myTask = new MyTask(db,dbEnvironment,desnode,desdbenvname);                           Timer timer = new Timer();                       timer.schedule(myTask, 1000, 2*6*10000);    }        }class MyTask extends TimerTask{    private Database db = null;    private Environment dbEnvironment = null;    private final String SRCPATH = "d:/dbEnv1/";        //linux平台下要修改的路径,但是不用磁盘分区    private File des = null;    private static DbBackup backupHelper = null;    private static long lastFileCopiedInPrevBackup = -1;        private String desnode;    private String desdbenvname;            public void copy(String file) throws IOException{                Runtime rt = Runtime.getRuntime();        rt.exec("scp " + SRCPATH + file + desnode + desdbenvname );        //scp发送文件            }        public MyTask(Database db, Environment dbEnvironment,String desnode,String desdbenvname){        this.db = db;                        this.dbEnvironment = dbEnvironment;                this.desnode = desnode;        this.desdbenvname = desdbenvname;                backupHelper = new DbBackup(dbEnvironment,-1);                    }        @Override    public void run() {        // TODO Auto-generated method stub                Start();                    }        public void Start(){                        backupHelper.startBackup();                //每备份一次修改一次数据库Rest        try{            String[] filesForBackup = backupHelper.getLogFilesInBackupSet(lastFileCopiedInPrevBackup);            for(int i = 0;i<filesForBackup.length; i++){                                File f = new File(filesForBackup[i]);                                copy(filesForBackup[i]);                                        System.out.println(filesForBackup[i]);                            }                        lastFileCopiedInPrevBackup = backupHelper.getLastFileInBackupSet();                        backupHelper.endBackup();                    }catch(Exception e){            e.printStackTrace();        }//        CloseBDB();                    }    }

 

不知道备份类使用是否正确,备份的效果倒是真能做到。网上利用备份类备份的也貌似也比较少。由于copy函数是自己写的,所以我就直接scpcopy到desnode上了。之前自己写过一个类似备份类的,不过看了下备份类的源代码,清理日志这里之前的倒是没有处理过。

至于查询的问题,我觉得最简单的就是查询了。当然看到小伙伴们写了个单环境查询激动半天我就不好意思跟小伙伴说最简单的就是查询。如果不考虑多环境的话查询就so easy了。考虑多环境的话我就把环境个数通过socket发送过去,然后遍历下dbEnv[i]这个数据库环境就ok了。

先看看如何发送查询。查询是另外放在一个节点上进行,为减轻A的负担。

 发送查询代码

 

package com.sendQuery;import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.OutputStream;import java.io.OutputStreamWriter;import java.io.PrintWriter;import java.net.Socket;import java.net.UnknownHostException;import java.sql.ResultSet;import java.sql.SQLException;import java.util.Scanner;import com.MySQL.MySQLDB;public class SendQuery {        public static void main(String args[]) throws SQLException, UnknownHostException, IOException{                Scanner cin = new Scanner(System.in);                String queryKey = cin.next();        int port = 20141;        String sql = "select node,EnvironmentCount from conf";        MySQLDB mysql = new MySQLDB();        mysql.SetSQL(sql);                ResultSet rs = mysql.excuteQuerry();                //往所有        while(rs.next()){            String node = rs.getString("node");            int environmentcount = rs.getInt("EnvironmentCount");            new QueryThread(node,queryKey,environmentcount).start();                                }            }}

多线程目的是可以多查询,现在还没做图形界面甚至web界面,效果自然不出来,做了之后多个查询提交没有问题。我们把node和EnvironmentCount发过去给目标节点使用,发送数据通过socket去发。

package com.sendQuery;import java.io.BufferedReader;import java.io.BufferedWriter;import java.io.IOException;import java.io.InputStreamReader;import java.io.OutputStreamWriter;import java.io.PrintWriter;import java.net.Socket;import java.net.UnknownHostException;public class QueryThread extends Thread{        private String node = null;    private static int port = 20141;    private String queryKey = null;    private int environmentcount = 0;    public QueryThread(String node,String queryKey,int environmentcount){        this.node = node;        this.queryKey = queryKey;        this.environmentcount = environmentcount;    }        public void run(){                Socket socket = null;        try {            socket = new Socket(node,port);                        PrintWriter out = new PrintWriter(new BufferedWriter(new                 OutputStreamWriter(socket.getOutputStream())), true);                out.println(queryKey);        out.println(environmentcount);                BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));                String result = in.readLine();                System.out.println(result);        in.close();        socket.close();                }catch (IOException e) {                // TODO Auto-generated catch block            e.printStackTrace();        }    }}

 

查询响应:

跟之前几乎一致,获取数据,执行查询的线程。

获取数据

package com.QueryResponse;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.ServerSocket;import java.net.Socket;public class QueryResponse {    private static int port = 20141;    public static void main(String args[]) throws IOException{                ServerSocket serversocket = new ServerSocket(port);                Socket socket = serversocket.accept();                //利用线程可以解决多个用户发过来的查询,每个查询对应一个线程                new ResponseThread(socket).start();                            }}package com.QueryResponse;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStreamReader;import java.net.Socket;//负责查询public class ResponseThread extends Thread{        Socket socket = null;        public ResponseThread(Socket socket){        this.socket = socket;    }            public void run(){        BufferedReader in = null;        try {            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));        } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }                try {            String queryKey = in.readLine();            int environmentcount = Integer.valueOf(in.readLine());            //是开线程呢还是遍历去查询                    } catch (IOException e) {            // TODO Auto-generated catch block            e.printStackTrace();        }    }}

查询暂且用getSearchKey来弄弄。

-------------------------------------------------------------------------------------------------------

后记:

  总的来说,话三四十分钟在这上面还是有点意义的。弄清了那些问题没有解决。学生党明天回家,今晚就写完它吧。把结构弄出来,然后用C去写,C的代码写起来最有意思。不打算将代码发给任何人,免得跟人发生矛盾,自己也乐得清闲,有时间自己慢慢琢磨,把细节弄清楚自己实现一遍。

 

Berkeley DB分布式探索