首页 > 代码库 > spark streaming 实例
spark streaming 实例
spark streaming 开发实例
本文将分以下几部分
- spark 开发环境配置
- 创建spark项目
- 编写streaming代码示例
- 调试
环境配置:
spark 原生语言是scala, 我用的是spark-1.4.1-bin-hadoop2.6,可以查阅官方说明,用的是scala-2.10.1。
网上下载 scala-2.10.1 安装包。解压即可。
配置环境变量:SCALA_HOME
path 增加 %SCALA_HOME%\bin
创建项目:
我使用的Ide 是Intellj idea ,为了提供scala 支持,还要先安装Scala 插件。
插件安装后,新建一个 project ,选择scala
选择安装的 scala SDK 路径
add maven 支持
再新建一个maven module
编辑Pom文件
父级Pom
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.rihai.spark</groupId> <artifactId>spark-streaming</artifactId> <packaging>pom</packaging> <version>1.0-SNAPSHOT</version> <modules> <module>hdfs-streaming</module> </modules> <properties> <scala.version>2.10.1</scala.version> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.4.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.4.1</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <version>2.15.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build></project>
子级Pom
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spark-streaming</artifactId> <groupId>com.rihai.spark</groupId> <version>1.0-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>hdfs-streaming</artifactId></project>
pom 说明:
maven编译scala,需要一个专门的maven-scala-plugin。该插件可以编译java 和 scala。
项目结构图:
然后在main文件夹下,建一个scala文件夹,File->project structure 设置scala文件夹为 sources 文件类型。
一个spark 项目就创建好了。
streaming示例:
spark streaming 简单理解就是分批次处理数据源的数据,然后输出到外部数据源。最大优点是可以做到秒级实时处理。
streaming 数据源可以为:文件系统(hdfs),kafka,flume等。输出方式有:文件系统(hdfs),databases等。
上两个官方的图来直接说明问题:
分批次处理:
在本例中以 文件系统 作为数据源,实时处理用户访问的日志数据。
假设日志格式如下:
用户ID,页面ID,数值(暂无意义),访问时间
U86942038,P68658056,1,2016-09-27 15:17:01:137
U25452627,P27395813,5,2016-09-27 15:19:43:901
数据源目录的日志会不断更新,streaming 程序会定时处理该目录更新的日志数据。
处理需求如下:
- 每隔20s统计出该批次 用户-页面访问数据
- 利用窗口计算:每40s统计出前40s 用户-页面访问数据
- 利用updateStateByKey:统计出所有批次 用户-页面访问数据
创建一个scala class ,如图:
streaming代码:
1 package com.rihai.spark.hdfs 2 3 4 import org.apache.spark._ 5 import org.apache.spark.streaming._ 6 import scala.collection.mutable 7 import scala.collection.mutable.ListBuffer 8 9 /** 10 * Created by rihaizhang on 2016/9/26. 11 */ 12 object PageLoggingStreaming { 13 14 def createContext(appName: String, timeUnit: Duration, checkpointPath: String, dataPath: String): StreamingContext = { 15 16 println("Creating new context !") 17 val conf = new SparkConf().setAppName(appName) 18 val ssc = new StreamingContext(conf, timeUnit) 19 ssc.checkpoint(checkpointPath) 20 21 //e.g. 22 //U86942038,P68658056,1,2016-09-27 15:17:01:137 23 24 val lines = ssc.textFileStream(dataPath) 25 26 val page_user = lines.map(line => { 27 28 val strs = line.split(",") 29 30 (strs(1), strs(0)) 31 32 }) 33 34 //单次page-user 35 val p_u = page_user.groupByKey().flatMap(s => { 36 37 val set = new mutable.HashSet[String]() 38 39 for (user <- s._2) { 40 set.+=(user) 41 } 42 43 val listBuffer = new ListBuffer[(String, String)] 44 45 for (elem <- set) { 46 listBuffer.+=((s._1, elem)) 47 } 48 listBuffer.toTraversable 49 }) 50 //p_u.persist(StorageLevel.MEMORY_ONLY) 51 52 // window page-user 53 val wp_u = p_u.window(timeUnit * 2, timeUnit * 2).groupByKey().flatMap(s => { 54 55 val set = new scala.collection.mutable.HashSet[String]() 56 57 for (user <- s._2) { 58 set.+=(user) 59 } 60 61 val listBuffer = new ListBuffer[(String, String)] 62 63 for (elem <- set) { 64 listBuffer.+=((s._1, elem)) 65 } 66 listBuffer.toTraversable 67 }) 68 69 val updateFun = (newValues: Seq[Iterable[String]], prevValues: Option[Iterable[String]]) => { 70 71 val set = new scala.collection.mutable.HashSet[String]() 72 73 for (user <- prevValues.getOrElse(Iterable[String]())) { 74 set.+=(user) 75 } 76 for (value <- newValues) { 77 for (user <- value) { 78 set.+=(user) 79 } 80 } 81 Some(set.toIterable) 82 } 83 84 // updateState page-user 85 val sp_u = p_u.groupByKey().updateStateByKey[Iterable[String]](updateFun).flatMap(s => { 86 87 val listBuffer = new ListBuffer[(String, String)] 88 for (elem <- s._2) { 89 listBuffer.+=((s._1, elem)) 90 } 91 listBuffer.toTraversable 92 }) 93 94 sp_u.checkpoint(timeUnit * 8) 95 96 //print 97 p_u.print() 98 wp_u.print() 99 sp_u.print()100 101 ssc102 }103 104 def main(args: Array[String]): Unit = {105 106 if (args.length < 2) {107 System.err.println("Your arguments error !")108 System.exit(1)109 }110 111 val time_unit = Seconds(20)112 val checkpointPath = args(0)113 val dataPath = args(1)114 115 val ssc = StreamingContext.getOrCreate(checkpointPath, () => createContext("page.logging.streaming", time_unit, checkpointPath, dataPath))116 // val ssc = createContext("page.logging.streaming", time_unit, checkpointPath, dataPath)117 ssc.start()118 119 for (i <- 1 to 10) {120 println("loop-" + i)121 Thread.sleep(1000 * 20)122 }123 124 ssc.stop(true, true)125 //ssc.awaitTermination()126 127 System.exit(0)128 }129 130 131 }
调试:
先用本地目录来调试
配置run configuration ,设置checkpoint目录和数据源目录
运行,然后手动往数据目录增加日志文件,如下图。
spark streaming会自动识别新增文件,并读取。
运行结果
Creating new context !loop-116/10/11 18:35:40 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes-------------------------------------------Time: 1476182140000 ms--------------------------------------------------------------------------------------Time: 1476182140000 ms-------------------------------------------loop-2-------------------------------------------Time: 1476182160000 ms-------------------------------------------(P68658056,U86942038)(P27395813,U21453697)(P27395813,U12142025)(P27395813,U26712632)-------------------------------------------Time: 1476182160000 ms-------------------------------------------(P68658056,U86942038)(P27395813,U21453697)(P27395813,U12142025)(P27395813,U26712632)-------------------------------------------Time: 1476182160000 ms-------------------------------------------(P68658056,U86942038)(P27395813,U21453697)(P27395813,U12142025)(P27395813,U26712632)loop-3-------------------------------------------Time: 1476182180000 ms-------------------------------------------(P68658056,U86142038)(P27395813,U21453697)(P27395813,U26941232)(P27395814,U12142025)-------------------------------------------Time: 1476182180000 ms-------------------------------------------(P68658056,U86142038)(P68658056,U86942038)(P27395813,U21453697)(P27395813,U12142025)(P27395813,U26712632)(P27395813,U26941232)(P27395814,U12142025)loop-4-------------------------------------------Time: 1476182200000 ms--------------------------------------------------------------------------------------Time: 1476182200000 ms-------------------------------------------(P68658056,U86142038)(P27395813,U21453697)(P27395813,U26941232)(P27395814,U12142025)-------------------------------------------Time: 1476182200000 ms-------------------------------------------(P68658056,U86142038)(P68658056,U86942038)(P27395813,U21453697)(P27395813,U12142025)(P27395813,U26712632)(P27395813,U26941232)(P27395814,U12142025)
本地目录调试OK,接下来用hdfs目录来调试:
先写一个日志生成程序,定时往一个临时hdfs目录写入日志文件,并移动至最终目录。先放临时目录的原因是为了保证其原子性。hadoop 的开发可以参考 hadoop 开发&调试
临时目录:"/user/rihai/logdata/tmp",最终目录:"/user/rihai/logdata/"
代码:
1 package com.rihai.hadoop.hdfs; 2 3 4 import java.io.IOException; 5 import java.text.SimpleDateFormat; 6 import java.util.*; 7 8 /** 9 * Created by rihaizhang on 9/7/2016. 10 */ 11 public class CreateLogging { 12 13 private static List<String> usrIdList; 14 private static List<String> pageList; 15 private static int usrCount = 100; 16 private static int pageCount = 1000; 17 private static int scoreMax = 5; 18 private static String tempPath = "/user/rihai/logdata/tmp"; 19 private static String mainPath = "/user/rihai/logdata/"; 20 //private static String tempPath = "hdfs://master:9000/user/rihai/logdata/tmp"; 21 //private static String mainPath = "hdfs://master:9000/user/rihai/logdata/"; 22 23 public static void main(String[] args) throws InterruptedException, IOException { 24 25 buildUsrIds(); 26 bulidPageList(); 27 bulidLog(); 28 System.out.println("正在运行"); 29 Scanner sc = new Scanner(System.in); 30 String input = sc.nextLine(); 31 System.out.println("运行结束"); 32 } 33 34 /** 35 * 生成日志 36 * 37 * @throws InterruptedException 38 * @throws IOException 39 */ 40 private static void bulidLog() throws InterruptedException, IOException { 41 42 SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); 43 SimpleDateFormat format2 = new SimpleDateFormat("yyyyMMddHHmmss"); 44 for (int i = 0; i < 100; i++) { 45 StringBuilder sb = new StringBuilder(); 46 //build log 47 for (int j = 0; j < 100; j++) { 48 if (j > 0) { 49 sb.append("\n"); 50 } 51 int pIndex = GetRandom(pageCount); 52 int uIndex = GetRandom(usrCount); 53 int score = GetRandom(scoreMax) + 1; 54 String datestr = format.format(Calendar.getInstance().getTime()); 55 sb.append(String.format("%s,%s,%s,%s", usrIdList.get(uIndex), pageList.get(pIndex), score, datestr)); 56 } 57 String fileName = String.format("log_%s.txt", format2.format(Calendar.getInstance().getTime())); 58 //send to hdfs 59 System.out.println("准备写入"); 60 SendToHdfs(fileName, sb.toString()); 61 //sleep 62 Thread.sleep(1000 * 60 * 2); 63 } 64 65 } 66 67 /** 68 * 发送Hdfs 69 * 70 * @param fileName 71 * @param content 72 * @throws IOException 73 */ 74 private static void SendToHdfs(String fileName, String content) throws IOException { 75 76 HdfsUtil hdfsUtil = new HdfsUtil(); 77 78 if (!hdfsUtil.exists(tempPath)) { 79 boolean result = hdfsUtil.createDirectory(mainPath); 80 if (result) { 81 System.out.println(tempPath + " 创建成功!"); 82 } else { 83 System.out.println(tempPath + " 创建失败!"); 84 return; 85 } 86 } 87 88 String tempFileName = tempPath + "/" + fileName; 89 String newFileName = mainPath + "/" + fileName; 90 91 hdfsUtil.createFile(tempFileName, content); 92 System.out.println(String.format("写入%s成功", tempFileName)); 93 94 boolean result = hdfsUtil.renameFile(tempFileName, newFileName); 95 96 System.out.println(String.format("移动至%s%s", newFileName, result ? "成功" : "失败")); 97 98 } 99 100 /**101 * 随机生成页面102 */103 private static void bulidPageList() {104 Random random = new Random();105 /**106 * e.g.107 * P93002432108 */109 pageList = new ArrayList<String>();110 for (int i = 0; i < pageCount; i++) {111 int temp = random.nextInt(100000000);112 pageList.add(String.format("P%08d", temp));113 }114 }115 116 /**117 * 随机生成用户118 */119 private static void buildUsrIds() {120 Random random = new Random();121 /**122 * e.g.123 * U00234999124 */125 usrIdList = new ArrayList<String>();126 for (int i = 0; i < usrCount; i++) {127 int temp = random.nextInt(100000000);128 usrIdList.add(String.format("U%08d", temp));129 }130 }131 132 /**133 * 取随机数134 *135 * @param max136 * @return137 */138 private static int GetRandom(int max) {139 Random random = new Random();140 int temp = random.nextInt(max);141 return temp;142 }143 144 }
1 package com.rihai.hadoop.hdfs; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.fs.*; 5 import org.apache.hadoop.io.IOUtils; 6 7 import java.io.ByteArrayOutputStream; 8 import java.io.IOException; 9 10 /** 11 * hdfs 工具类 12 * Created by rihaizhang on 9/6/2016. 13 */ 14 public class HdfsUtil { 15 private Configuration conf = new Configuration(); 16 17 public HdfsUtil() { 18 } 19 20 public HdfsUtil(Configuration conf) { 21 this.conf = conf; 22 } 23 24 /** 25 * 检查文件或目录是否存在 26 * 27 * @param path 28 * @return 29 * @throws IOException 30 */ 31 public boolean exists(String path) throws IOException { 32 try (FileSystem fs = FileSystem.get(conf)) { 33 return fs.exists(new Path(path)); 34 } 35 } 36 37 /** 38 * 创建目录 39 * 40 * @param dirPath 41 * @return 42 * @throws IOException 43 */ 44 public boolean createDirectory(String dirPath) throws IOException { 45 try (FileSystem fs = FileSystem.get(conf)) { 46 boolean result = fs.mkdirs(new Path(dirPath)); 47 return result; 48 } 49 } 50 51 /** 52 * 创建文件 53 * 54 * @param filePath 55 * @param bytes 56 * @throws IOException 57 */ 58 public void createFile(String filePath, byte[] bytes) throws IOException { 59 try (FileSystem fs = FileSystem.get(conf)) { 60 try (FSDataOutputStream output = fs.create(new Path(filePath))) { 61 output.write(bytes); 62 } 63 } 64 } 65 66 /** 67 * 创建文件 68 * 69 * @param filePath 70 * @param contents 71 * @throws IOException 72 */ 73 public void createFile(String filePath, String contents) throws IOException { 74 createFile(filePath, contents.getBytes("UTF-8")); 75 } 76 77 /** 78 * 追加文件 79 * 80 * @param filePath 81 * @param bytes 82 * @throws IOException 83 */ 84 public void appendFile(String filePath, byte[] bytes) throws IOException { 85 try (FileSystem fs = FileSystem.get(conf)) { 86 try (FSDataOutputStream output = fs.append(new Path(filePath))) { 87 output.write(bytes); 88 } 89 } 90 } 91 92 /** 93 * 追加文件 94 * 95 * @param filePath 96 * @param contents 97 * @throws IOException 98 */ 99 public void appendFile(String filePath, String contents) throws IOException {100 appendFile(filePath, contents.getBytes("UTF-8"));101 }102 103 /**104 * 删除文件或目录105 *106 * @param filePath107 * @param recursive108 * @return109 * @throws IOException110 */111 public boolean deleteFile(String filePath, boolean recursive) throws IOException {112 try (FileSystem fs = FileSystem.get(conf)) {113 boolean result = fs.delete(new Path(filePath), recursive);114 return result;115 }116 }117 118 public boolean renameFile(String sourcePath, String targetPath) throws IOException {119 120 try (FileSystem fs = FileSystem.get(conf)) {121 boolean result = fs.rename(new Path(sourcePath), new Path(targetPath));122 return result;123 }124 }125 126 /**127 * 读取文件128 *129 * @param filePath130 * @return131 * @throws IOException132 */133 public String readFile(String filePath) throws IOException {134 try (FileSystem fs = FileSystem.get(conf)) {135 FSDataInputStream input = fs.open(new Path(filePath));136 //byte[] buffer = new byte[input.available()];137 //input.readFully(0, buffer);138 ByteArrayOutputStream output = new ByteArrayOutputStream(input.available());139 140 IOUtils.copyBytes(input, output, conf);141 String fileContent = output.toString("UTF-8");142 return fileContent;143 }144 }145 146 }
运行CreateLogging 程序:
重新配置PageLoggingStreaming 程序的run configuration ,设置checkpoint目录和数据源目录
运行结果:
spark streaming 实例