首页 > 代码库 > spark mysql读写
spark mysql读写
val data2Mysql2 = (iterator: Iterator[(String, Int)]) => { var conn: Connection = null; var ps: PreparedStatement = null val sql = "Insert into location_info(location,counts,accesse_date) values(?,?,?)" try { conn = DriverManager.getConnection("jdbc://localhist:3306/bigdata","root","root") //整个分区的数据用了一个conn iterator.foreach(line =>{ ps = conn.prepareStatement(sql) ps.setString(1,line._1) ps.setInt(2,line._2) ps.setDate(3,new Date(System.currentTimeMillis())) ps.executeUpdate() }) } catch { case e: Exception => println("Mysql Exception") } finally { if (ps != null) ps.close() if (conn != null) conn.close() }
rddres2.foreachPartition(data2MySQL)
def mysql2Spark(){ val conf = new SparkConf().setAppName("JdbcRDDDemo").setMaster("local[2]") val sc = new SparkContext(conf) val connection = () => { Class.forName("com.mysql.jdbc.Driver").newInstance() DriverManager.getConnection("jdbc:mysql://localhost:3306/bigdata", "root", "root") } val jdbcRDD = new JdbcRDD( sc, connection, //location_info(location,counts "SELECT id, location FROM location_info where id >= ? AND id <= ?", 1, 4, 2, r => { val id = r.getInt(1) val code = r.getString(2) (id, code) } ) val jrdd = jdbcRDD.collect() println(jdbcRDD.collect().toBuffer) sc.stop() }
spark mysql读写
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。