首页 > 代码库 > spark streaming 实例

spark streaming 实例

spark streaming 开发实例

本文将分以下几部分

  • spark 开发环境配置
  • 创建spark项目
  • 编写streaming代码示例
  • 调试

环境配置:

spark 原生语言是scala, 我用的是spark-1.4.1-bin-hadoop2.6,可以查阅官方说明,用的是scala-2.10.1。

网上下载 scala-2.10.1 安装包。解压即可。

配置环境变量:SCALA_HOME

技术分享

path 增加 %SCALA_HOME%\bin

创建项目:

我使用的Ide 是Intellj idea  ,为了提供scala 支持,还要先安装Scala 插件。

插件安装后,新建一个 project ,选择scala

技术分享

 选择安装的 scala SDK 路径

技术分享

 

add maven 支持

技术分享

再新建一个maven module

技术分享


编辑Pom文件

父级Pom

技术分享
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <modelVersion>4.0.0</modelVersion>    <groupId>com.rihai.spark</groupId>    <artifactId>spark-streaming</artifactId>    <packaging>pom</packaging>    <version>1.0-SNAPSHOT</version>    <modules>        <module>hdfs-streaming</module>    </modules>    <properties>        <scala.version>2.10.1</scala.version>        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>    </properties>    <dependencies>        <dependency>            <groupId>org.scala-lang</groupId>            <artifactId>scala-library</artifactId>            <version>${scala.version}</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-core_2.10</artifactId>            <version>1.4.1</version>        </dependency>        <dependency>            <groupId>org.apache.spark</groupId>            <artifactId>spark-streaming_2.10</artifactId>            <version>1.4.1</version>        </dependency>    </dependencies>    <build>        <sourceDirectory>src/main/scala</sourceDirectory>        <testSourceDirectory>src/test/scala</testSourceDirectory>        <plugins>            <plugin>                <groupId>org.scala-tools</groupId>                <artifactId>maven-scala-plugin</artifactId>                <version>2.15.2</version>                <executions>                    <execution>                        <goals>                            <goal>compile</goal>                            <goal>testCompile</goal>                        </goals>                    </execution>                </executions>            </plugin>        </plugins>    </build></project>
View Code

子级Pom

技术分享
<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://maven.apache.org/POM/4.0.0"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">    <parent>        <artifactId>spark-streaming</artifactId>        <groupId>com.rihai.spark</groupId>        <version>1.0-SNAPSHOT</version>    </parent>    <modelVersion>4.0.0</modelVersion>    <artifactId>hdfs-streaming</artifactId></project>
View Code

pom 说明:

 maven编译scala,需要一个专门的maven-scala-plugin。该插件可以编译java 和 scala。

项目结构图:

技术分享

 然后在main文件夹下,建一个scala文件夹,File->project structure  设置scala文件夹为 sources 文件类型。

技术分享

一个spark 项目就创建好了。

streaming示例:

 
spark streaming 简单理解就是分批次处理数据源的数据,然后输出到外部数据源。最大优点是可以做到秒级实时处理。
streaming 数据源可以为:文件系统(hdfs),kafka,flume等。输出方式有:文件系统(hdfs),databases等。
上两个官方的图来直接说明问题:
技术分享

分批次处理:

技术分享

 

在本例中以 文件系统 作为数据源,实时处理用户访问的日志数据。
假设日志格式如下:
用户ID,页面ID,数值(暂无意义),访问时间
U86942038,P68658056,1,2016-09-27 15:17:01:137
U25452627,P27395813,5,2016-09-27 15:19:43:901

数据源目录的日志会不断更新,streaming 程序会定时处理该目录更新的日志数据。
处理需求如下:

  • 每隔20s统计出该批次 用户-页面访问数据
  • 利用窗口计算:每40s统计出前40s 用户-页面访问数据
  • 利用updateStateByKey:统计出所有批次 用户-页面访问数据

 

创建一个scala class ,如图:

技术分享

技术分享

streaming代码:

技术分享
  1 package com.rihai.spark.hdfs  2   3   4 import org.apache.spark._  5 import org.apache.spark.streaming._  6 import scala.collection.mutable  7 import scala.collection.mutable.ListBuffer  8   9 /** 10   * Created by rihaizhang on 2016/9/26. 11   */ 12 object PageLoggingStreaming { 13  14   def createContext(appName: String, timeUnit: Duration, checkpointPath: String, dataPath: String): StreamingContext = { 15  16     println("Creating new context !") 17     val conf = new SparkConf().setAppName(appName) 18     val ssc = new StreamingContext(conf, timeUnit) 19     ssc.checkpoint(checkpointPath) 20  21     //e.g. 22     //U86942038,P68658056,1,2016-09-27 15:17:01:137 23  24     val lines = ssc.textFileStream(dataPath) 25  26     val page_user = lines.map(line => { 27  28       val strs = line.split(",") 29  30       (strs(1), strs(0)) 31  32     }) 33  34     //单次page-user 35     val p_u = page_user.groupByKey().flatMap(s => { 36  37       val set = new mutable.HashSet[String]() 38  39       for (user <- s._2) { 40         set.+=(user) 41       } 42  43       val listBuffer = new ListBuffer[(String, String)] 44  45       for (elem <- set) { 46         listBuffer.+=((s._1, elem)) 47       } 48       listBuffer.toTraversable 49     }) 50     //p_u.persist(StorageLevel.MEMORY_ONLY) 51  52     // window page-user 53     val wp_u = p_u.window(timeUnit * 2, timeUnit * 2).groupByKey().flatMap(s => { 54  55       val set = new scala.collection.mutable.HashSet[String]() 56  57       for (user <- s._2) { 58         set.+=(user) 59       } 60  61       val listBuffer = new ListBuffer[(String, String)] 62  63       for (elem <- set) { 64         listBuffer.+=((s._1, elem)) 65       } 66       listBuffer.toTraversable 67     }) 68  69     val updateFun = (newValues: Seq[Iterable[String]], prevValues: Option[Iterable[String]]) => { 70  71       val set = new scala.collection.mutable.HashSet[String]() 72  73       for (user <- prevValues.getOrElse(Iterable[String]())) { 74         set.+=(user) 75       } 76       for (value <- newValues) { 77         for (user <- value) { 78           set.+=(user) 79         } 80       } 81       Some(set.toIterable) 82     } 83  84     // updateState page-user 85     val sp_u = p_u.groupByKey().updateStateByKey[Iterable[String]](updateFun).flatMap(s => { 86  87       val listBuffer = new ListBuffer[(String, String)] 88       for (elem <- s._2) { 89         listBuffer.+=((s._1, elem)) 90       } 91       listBuffer.toTraversable 92     }) 93  94     sp_u.checkpoint(timeUnit * 8) 95  96     //print 97     p_u.print() 98     wp_u.print() 99     sp_u.print()100 101     ssc102   }103 104   def main(args: Array[String]): Unit = {105 106     if (args.length < 2) {107       System.err.println("Your arguments error !")108       System.exit(1)109     }110 111     val time_unit = Seconds(20)112     val checkpointPath = args(0)113     val dataPath = args(1)114 115     val ssc = StreamingContext.getOrCreate(checkpointPath, () => createContext("page.logging.streaming", time_unit, checkpointPath, dataPath))116    // val ssc = createContext("page.logging.streaming", time_unit, checkpointPath, dataPath)117     ssc.start()118 119     for (i <- 1 to 10) {120       println("loop-" + i)121       Thread.sleep(1000 * 20)122     }123 124     ssc.stop(true, true)125     //ssc.awaitTermination()126 127     System.exit(0)128   }129 130 131 }
View Code

调试:

先用本地目录来调试

配置run configuration ,设置checkpoint目录和数据源目录

技术分享

运行,然后手动往数据目录增加日志文件,如下图。

技术分享

spark streaming会自动识别新增文件,并读取。

运行结果

技术分享
Creating new context !loop-116/10/11 18:35:40 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes-------------------------------------------Time: 1476182140000 ms--------------------------------------------------------------------------------------Time: 1476182140000 ms-------------------------------------------loop-2-------------------------------------------Time: 1476182160000 ms-------------------------------------------(P68658056,U86942038)(P27395813,U21453697)(P27395813,U12142025)(P27395813,U26712632)-------------------------------------------Time: 1476182160000 ms-------------------------------------------(P68658056,U86942038)(P27395813,U21453697)(P27395813,U12142025)(P27395813,U26712632)-------------------------------------------Time: 1476182160000 ms-------------------------------------------(P68658056,U86942038)(P27395813,U21453697)(P27395813,U12142025)(P27395813,U26712632)loop-3-------------------------------------------Time: 1476182180000 ms-------------------------------------------(P68658056,U86142038)(P27395813,U21453697)(P27395813,U26941232)(P27395814,U12142025)-------------------------------------------Time: 1476182180000 ms-------------------------------------------(P68658056,U86142038)(P68658056,U86942038)(P27395813,U21453697)(P27395813,U12142025)(P27395813,U26712632)(P27395813,U26941232)(P27395814,U12142025)loop-4-------------------------------------------Time: 1476182200000 ms--------------------------------------------------------------------------------------Time: 1476182200000 ms-------------------------------------------(P68658056,U86142038)(P27395813,U21453697)(P27395813,U26941232)(P27395814,U12142025)-------------------------------------------Time: 1476182200000 ms-------------------------------------------(P68658056,U86142038)(P68658056,U86942038)(P27395813,U21453697)(P27395813,U12142025)(P27395813,U26712632)(P27395813,U26941232)(P27395814,U12142025)
View Code

本地目录调试OK,接下来用hdfs目录来调试:

先写一个日志生成程序,定时往一个临时hdfs目录写入日志文件,并移动至最终目录。先放临时目录的原因是为了保证其原子性。hadoop 的开发可以参考 hadoop 开发&调试

临时目录:"/user/rihai/logdata/tmp",最终目录:"/user/rihai/logdata/"

代码:

技术分享
  1 package com.rihai.hadoop.hdfs;  2   3   4 import java.io.IOException;  5 import java.text.SimpleDateFormat;  6 import java.util.*;  7   8 /**  9  * Created by rihaizhang on 9/7/2016. 10  */ 11 public class CreateLogging { 12  13     private static List<String> usrIdList; 14     private static List<String> pageList; 15     private static int usrCount = 100; 16     private static int pageCount = 1000; 17     private static int scoreMax = 5; 18     private static String tempPath = "/user/rihai/logdata/tmp"; 19     private static String mainPath = "/user/rihai/logdata/"; 20     //private static String tempPath = "hdfs://master:9000/user/rihai/logdata/tmp"; 21     //private static String mainPath = "hdfs://master:9000/user/rihai/logdata/"; 22  23     public static void main(String[] args) throws InterruptedException, IOException { 24  25         buildUsrIds(); 26         bulidPageList(); 27         bulidLog(); 28         System.out.println("正在运行"); 29         Scanner sc = new Scanner(System.in); 30         String input = sc.nextLine(); 31         System.out.println("运行结束"); 32     } 33  34     /** 35      * 生成日志 36      * 37      * @throws InterruptedException 38      * @throws IOException 39      */ 40     private static void bulidLog() throws InterruptedException, IOException { 41  42         SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SSS"); 43         SimpleDateFormat format2 = new SimpleDateFormat("yyyyMMddHHmmss"); 44         for (int i = 0; i < 100; i++) { 45             StringBuilder sb = new StringBuilder(); 46             //build log 47             for (int j = 0; j < 100; j++) { 48                 if (j > 0) { 49                     sb.append("\n"); 50                 } 51                 int pIndex = GetRandom(pageCount); 52                 int uIndex = GetRandom(usrCount); 53                 int score = GetRandom(scoreMax) + 1; 54                 String datestr = format.format(Calendar.getInstance().getTime()); 55                 sb.append(String.format("%s,%s,%s,%s", usrIdList.get(uIndex), pageList.get(pIndex), score, datestr)); 56             } 57             String fileName = String.format("log_%s.txt", format2.format(Calendar.getInstance().getTime())); 58             //send to hdfs 59             System.out.println("准备写入"); 60             SendToHdfs(fileName, sb.toString()); 61             //sleep 62             Thread.sleep(1000 * 60 * 2); 63         } 64  65     } 66  67     /** 68      * 发送Hdfs 69      * 70      * @param fileName 71      * @param content 72      * @throws IOException 73      */ 74     private static void SendToHdfs(String fileName, String content) throws IOException { 75  76         HdfsUtil hdfsUtil = new HdfsUtil(); 77  78         if (!hdfsUtil.exists(tempPath)) { 79             boolean result = hdfsUtil.createDirectory(mainPath); 80             if (result) { 81                 System.out.println(tempPath + " 创建成功!"); 82             } else { 83                 System.out.println(tempPath + " 创建失败!"); 84                 return; 85             } 86         } 87  88         String tempFileName = tempPath + "/" + fileName; 89         String newFileName = mainPath + "/" + fileName; 90  91         hdfsUtil.createFile(tempFileName, content); 92         System.out.println(String.format("写入%s成功", tempFileName)); 93  94         boolean result = hdfsUtil.renameFile(tempFileName, newFileName); 95  96         System.out.println(String.format("移动至%s%s", newFileName, result ? "成功" : "失败")); 97  98     } 99 100     /**101      * 随机生成页面102      */103     private static void bulidPageList() {104         Random random = new Random();105         /**106          * e.g.107          * P93002432108          */109         pageList = new ArrayList<String>();110         for (int i = 0; i < pageCount; i++) {111             int temp = random.nextInt(100000000);112             pageList.add(String.format("P%08d", temp));113         }114     }115 116     /**117      * 随机生成用户118      */119     private static void buildUsrIds() {120         Random random = new Random();121         /**122          *  e.g.123          *  U00234999124          */125         usrIdList = new ArrayList<String>();126         for (int i = 0; i < usrCount; i++) {127             int temp = random.nextInt(100000000);128             usrIdList.add(String.format("U%08d", temp));129         }130     }131 132     /**133      * 取随机数134      *135      * @param max136      * @return137      */138     private static int GetRandom(int max) {139         Random random = new Random();140         int temp = random.nextInt(max);141         return temp;142     }143 144 }
View Code
技术分享
  1 package com.rihai.hadoop.hdfs;  2   3 import org.apache.hadoop.conf.Configuration;  4 import org.apache.hadoop.fs.*;  5 import org.apache.hadoop.io.IOUtils;  6   7 import java.io.ByteArrayOutputStream;  8 import java.io.IOException;  9  10 /** 11  * hdfs 工具类 12  * Created by rihaizhang on 9/6/2016. 13  */ 14 public class HdfsUtil { 15     private Configuration conf = new Configuration(); 16  17     public HdfsUtil() { 18     } 19  20     public HdfsUtil(Configuration conf) { 21         this.conf = conf; 22     } 23  24     /** 25      * 检查文件或目录是否存在 26      * 27      * @param path 28      * @return 29      * @throws IOException 30      */ 31     public boolean exists(String path) throws IOException { 32         try (FileSystem fs = FileSystem.get(conf)) { 33             return fs.exists(new Path(path)); 34         } 35     } 36  37     /** 38      * 创建目录 39      * 40      * @param dirPath 41      * @return 42      * @throws IOException 43      */ 44     public boolean createDirectory(String dirPath) throws IOException { 45         try (FileSystem fs = FileSystem.get(conf)) { 46             boolean result = fs.mkdirs(new Path(dirPath)); 47             return result; 48         } 49     } 50  51     /** 52      * 创建文件 53      * 54      * @param filePath 55      * @param bytes 56      * @throws IOException 57      */ 58     public void createFile(String filePath, byte[] bytes) throws IOException { 59         try (FileSystem fs = FileSystem.get(conf)) { 60             try (FSDataOutputStream output = fs.create(new Path(filePath))) { 61                 output.write(bytes); 62             } 63         } 64     } 65  66     /** 67      * 创建文件 68      * 69      * @param filePath 70      * @param contents 71      * @throws IOException 72      */ 73     public void createFile(String filePath, String contents) throws IOException { 74         createFile(filePath, contents.getBytes("UTF-8")); 75     } 76  77     /** 78      * 追加文件 79      * 80      * @param filePath 81      * @param bytes 82      * @throws IOException 83      */ 84     public void appendFile(String filePath, byte[] bytes) throws IOException { 85         try (FileSystem fs = FileSystem.get(conf)) { 86             try (FSDataOutputStream output = fs.append(new Path(filePath))) { 87                 output.write(bytes); 88             } 89         } 90     } 91  92     /** 93      * 追加文件 94      * 95      * @param filePath 96      * @param contents 97      * @throws IOException 98      */ 99     public void appendFile(String filePath, String contents) throws IOException {100         appendFile(filePath, contents.getBytes("UTF-8"));101     }102 103     /**104      * 删除文件或目录105      *106      * @param filePath107      * @param recursive108      * @return109      * @throws IOException110      */111     public boolean deleteFile(String filePath, boolean recursive) throws IOException {112         try (FileSystem fs = FileSystem.get(conf)) {113             boolean result = fs.delete(new Path(filePath), recursive);114             return result;115         }116     }117 118     public boolean renameFile(String sourcePath, String targetPath) throws IOException {119 120         try (FileSystem fs = FileSystem.get(conf)) {121             boolean result = fs.rename(new Path(sourcePath), new Path(targetPath));122             return result;123         }124     }125 126     /**127      * 读取文件128      *129      * @param filePath130      * @return131      * @throws IOException132      */133     public String readFile(String filePath) throws IOException {134         try (FileSystem fs = FileSystem.get(conf)) {135             FSDataInputStream input = fs.open(new Path(filePath));136             //byte[] buffer = new byte[input.available()];137             //input.readFully(0, buffer);138             ByteArrayOutputStream output = new ByteArrayOutputStream(input.available());139 140             IOUtils.copyBytes(input, output, conf);141             String fileContent = output.toString("UTF-8");142             return fileContent;143         }144     }145 146 }
View Code

运行CreateLogging 程序

技术分享

 

重新配置PageLoggingStreaming 程序的run configuration ,设置checkpoint目录和数据源目录

技术分享

运行结果:

技术分享

 

spark streaming 实例