首页 > 代码库 > Storm集群上的开发 ,任务计算输出到mysql数据库,集成jdbc(十)

Storm集群上的开发 ,任务计算输出到mysql数据库,集成jdbc(十)

storm集成jdbc,把计算结果保存到mysql中。

首先在mysql中建表 ,表的字段与输出的tuple的schema一致:

create table result(

    word varchar(20),
    total int

);

编写一个连接提供器,用于获取mysql数据库连接:

需要引入jar :/usr/local/apps/apache-storm-1.0.3/external/storm-jdbc 的 storm-jdbc-1.0.3.jar

技术分享
package mystorm.wordcount;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

import org.apache.storm.jdbc.common.ConnectionProvider;


/**
 * storm集成jdbc的 连接提供者
 * @author Administrator
 *
 */

//为jdbcBolt组件提供对应的数据连接
public class MyConnectionProvider implements ConnectionProvider {

    private static String driver = "com.mysql.jdbc.driver";
    private static String url = "jdbc:mysql://192.168.2.1:3306/test";
    private static String user = "root";
    private static String password = "123456";
    
    static{
        
        try {
            Class.forName(driver);
        } catch (ClassNotFoundException e) {
            // TODO Auto-generated catch block
            new ExceptionInInitializerError(e);
//            e.printStackTrace();
        }
        
    }
    
    @Override
    public void cleanup() {
        // TODO Auto-generated method stub

    }

    @Override
    public Connection getConnection() {
        // TODO Auto-generated method stub
        try {
            return DriverManager.getConnection(url,user,password);
        } catch (SQLException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        return null;
    }

    @Override
    public void prepare() {
        // TODO Auto-generated method stub

    }

}
View Code

然后在前面我们编写的Topology任务添加一个Storm提供的Bolt组件,用于把数据写入mysql

    //创建一个新的jdbcbolt组件,把前一个bolt组件发送过来的数据插入到mysql数据库中
    
    // storm集成各种框架的jar包路径 : /usr/local/apps/apache-storm-1.0.3/external
    //集成jdbc路径 :/usr/local/apps/apache-storm-1.0.3/external/storm-jdbc
    /**
     * 
     * 
     * jar包 引入:
     * 1.external/sql/storm-sql-core/*.jar
     * 2.external/storm-jdbc
     * 3.mysql驱动
     * 4.commons-lang3-3.1.jar
     * 
     * @return
     */
    private static IRichBolt createJDBCBolt(){
        //创建一个connectionProvider
        MyConnectionProvider conectionProvider = new MyConnectionProvider();
        
        //创建一个mapper,填写表名
        JdbcMapper mapper = new SimpleJdbcMapper("result",conectionProvider);
        
        //通过mapper创建一个bolt组件
        
        return new JdbcInsertBolt(conectionProvider,mapper)
                .withTableName("result")
                .withQueryTimeoutSecs(30);
    }

然后把这个组件加到Topology任务的最后面  第三个Bolt组件:

技术分享
/**
 * 
 * 单词计数的Topology的入口,主程序
 * 
 * @author Administrator
 *
 */
public class WordCountTopology {
    
    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        TopologyBuilder builder = new TopologyBuilder();
        
        //设置任务的spout组件
        builder.setSpout("wordcount_spout", new WordCountSpout());
        
        //设置任务的第一个bolt组件
        builder.setBolt("wordcount_bolt", new WordCountSplitBolt())
        // (随机分配策略接收spout的任务)
                .shuffleGrouping("wordcount_spout");
        
        //设置第二个bolt组件
        builder.setBolt("wordcount_countbolt", new WordCountBoltCount())
        //(接受第一个bolt组件的数据,按字段进行分组)
                .fieldsGrouping("wordcount_bolt", new Fields("word"));
        
        //设置第三个bolt组件(storm提供的),用与把记录保存到mysql数据库中。
        builder.setBolt("wordcount_jdbcbolt", createJDBCBolt())
                .shuffleGrouping("wordcount_countbolt");
        
        //创建topology的任务
        StormTopology wc = builder.createTopology();
        
        //配置参数信息
        Config conf = new Config();
        
        //1.本地模式提交
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology( "mywordcount", conf, wc);
        
        
        /**
         * 
         * 本地模式运行结果
         * 
         * 
            spout采集的数据是:beijing is the capital of china
            输出的结果:{beijing=1}
            输出的结果:{is=1, beijing=1}
            输出的结果:{the=1, is=1, beijing=1}
            输出的结果:{the=1, capital=1, is=1, beijing=1}
            输出的结果:{the=1, capital=1, of=1, is=1, beijing=1}
            输出的结果:{the=1, capital=1, china=1, of=1, is=1, beijing=1}
            spout采集的数据是:I love beijing
            输出的结果:{the=1, capital=1, china=1, of=1, I=1, is=1, beijing=1}
            输出的结果:{the=1, love=1, capital=1, china=1, of=1, I=1, is=1, beijing=1}
            输出的结果:{the=1, love=1, capital=1, china=1, of=1, I=1, is=1, beijing=2}
            spout采集的数据是:I love china
            输出的结果:{the=1, love=1, capital=1, china=1, of=1, I=2, is=1, beijing=2}
            输出的结果:{the=1, love=2, capital=1, china=1, of=1, I=2, is=1, beijing=2}
            输出的结果:{the=1, love=2, capital=1, china=2, of=1, I=2, is=1, beijing=2}
         * 
         * 
         * 
         */
        
        //2集群模式提交
//        StormSubmitter.submitTopology(args[0], conf, wc);
        
    }
View Code

 

打成jar包,放到storm集群中运行,会宝如下错误 :

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/commons/lang/Validate
    at org.apache.storm.jdbc.mapper.SimpleJdbcMapper.<init>(SimpleJdbcMapper.java:39)
    at mystorm.wordcount.WordCountTopology.createJDBCBolt(WordCountTopology.java:95)
    at mystorm.wordcount.WordCountTopology.main(WordCountTopology.java:42)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.lang.Validate
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 3 more

 

还有一些jar没有集成进来 ,需要集成storm的jar ,放到storm集群HOME的lib下,复制到集群的其他机器:

     * 1.external/sql/storm-sql-core/*.jar
     * 2.external/storm-jdbc
     * 3.mysql驱动
     * 4.commons-lang3-3.1.jar

 

Storm集群上的开发 ,任务计算输出到mysql数据库,集成jdbc(十)