首页 > 代码库 > 开发系列:01、使用Java和Maven开发Spark应用
开发系列:01、使用Java和Maven开发Spark应用
1、POM.xml
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 2 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3 <modelVersion>4.0.0</modelVersion> 4 5 <groupId>org.hansight.spark</groupId> 6 <artifactId>examples</artifactId> 7 <version>0.0.1-SNAPSHOT</version> 8 <packaging>jar</packaging> 9 10 <name>examples</name> 11 <url>http://maven.apache.org</url> 12 13 <properties> 14 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 15 <elasticsearch.version>1.2.1</elasticsearch.version> 16 <jdk.version>1.7</jdk.version> 17 <logback.version>1.1.2</logback.version> 18 <slf4j.version>1.7.7</slf4j.version> 19 <junit.version>4.11</junit.version> 20 <jcl.over.slf4j.version>1.7.7</jcl.over.slf4j.version> 21 <metrics.version>3.0.2</metrics.version> 22 <avro.version>1.7.7</avro.version> 23 <jna.version>4.1.0</jna.version> 24 <spark.version>1.0.2</spark.version> 25 </properties> 26 <dependencies> 27 <dependency> 28 <groupId>junit</groupId> 29 <artifactId>junit</artifactId> 30 <version>3.8.1</version> 31 <scope>test</scope> 32 </dependency> 33 <dependency> 34 <groupId>com.fasterxml.jackson.core</groupId> 35 <artifactId>jackson-core</artifactId> 36 <version>2.4.2</version> 37 </dependency> 38 <dependency> 39 <groupId>com.google.guava</groupId> 40 <artifactId>guava</artifactId> 41 <version>14.0.1</version> 42 <scope>provided</scope> 43 </dependency> 44 <dependency> 45 <groupId>org.apache.spark</groupId> 46 <artifactId>spark-streaming-kafka_2.10</artifactId> 47 <version>${spark.version}</version> 48 <exclusions> 49 <exclusion> 50 <groupId>javax.servlet</groupId> 51 <artifactId>servlet-api</artifactId> 52 </exclusion> 53 <exclusion> 54 <groupId>org.apache.hadoop</groupId> 55 <artifactId>hadoop-client</artifactId> 56 </exclusion> 57 </exclusions> 58 </dependency> 59 <dependency> 60 <groupId>org.elasticsearch</groupId> 61 <artifactId>elasticsearch</artifactId> 62 <version>1.2.1</version> 63 </dependency> 64 <dependency> 65 <groupId>org.apache.hadoop</groupId> 66 <artifactId>hadoop-hdfs</artifactId> 67 <version>2.4.0.2.1.4.0-632</version> 68 </dependency> 69 <dependency> 70 <groupId>org.apache.hadoop</groupId> 71 <artifactId>hadoop-common</artifactId> 72 <version>2.4.0.2.1.4.0-632</version> 73 <exclusions> 74 <exclusion> 75 <groupId>jdk.tools</groupId> 76 <artifactId>jdk.tools</artifactId> 77 </exclusion> 78 </exclusions> 79 </dependency> 80 <dependency> 81 <groupId>org.apache.hadoop</groupId> 82 <artifactId>hadoop-mapreduce-client-common</artifactId> 83 <version>2.4.0.2.1.4.0-632</version> 84 </dependency> 85 <dependency> 86 <groupId>org.elasticsearch</groupId> 87 <artifactId>elasticsearch-hadoop</artifactId> 88 <version>2.0.1</version> 89 </dependency> 90 </dependencies> 91 92 <build> 93 <plugins> 94 <plugin> 95 <groupId>org.apache.maven.plugins</groupId> 96 <artifactId>maven-compiler-plugin</artifactId> 97 <version>3.1</version> 98 <configuration> 99 <source>${jdk.version}</source>100 <target>${jdk.version}</target>101 </configuration>102 </plugin>103 <plugin>104 <groupId>org.apache.maven.plugins</groupId>105 <artifactId>maven-assembly-plugin</artifactId>106 <version>2.4</version>107 <configuration>108 <descriptorRefs>109 <descriptorRef>jar-with-dependencies</descriptorRef>110 </descriptorRefs>111 </configuration>112 </plugin>113 </plugins>114 </build>115 </project>
2、样例代码
1 package com.hansight.spark.utils; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.JavaSparkContext; 5 6 public class SparkUtils { 7 8 public static JavaSparkContext get(String name) { 9 SparkConf conf = new SparkConf();10 // conf.setMaster("local[1]");11 // conf.setMaster("spark://hdp125:7077");12 conf.setAppName(name);13 return new JavaSparkContext(conf);14 }15 }
1 package com.hansight.spark.streaming; 2 3 import java.util.Iterator; 4 5 import org.apache.spark.api.java.JavaRDD; 6 import org.apache.spark.api.java.JavaSparkContext; 7 import org.apache.spark.api.java.function.Function; 8 import org.apache.spark.api.java.function.VoidFunction; 9 10 import com.hansight.spark.utils.SparkUtils;11 12 public class HttpParser {13 @SuppressWarnings({ "unchecked", "serial" })14 public static void main(String[] args) {15 if (args.length == 0) {16 System.out.println("Usage: <file path>");17 return;18 }19 System.setProperty("hadoop.home.dir", "E:/tools/hadoop-2.4.1");20 JavaSparkContext sc = SparkUtils.get("HttpLog");21 String path = args[0];22 JavaRDD<String> rdd = sc23 .textFile(path);24 JavaRDD<HttpLog> parsed = rdd.map(new Function<String, HttpLog>() {25 public HttpLog call(String line) throws Exception {26 return HttpLog.parser(line);27 }28 });29 System.out.println(parsed.count());30 parsed.foreachPartition(new VoidFunction<Iterator<HttpLog>>() {31 @Override32 public void call(Iterator<HttpLog> t) throws Exception {33 HttpLog.save(t);34 }35 });36 }37 }
1 package com.hansight.spark.streaming; 2 3 import java.lang.reflect.Field; 4 import java.util.HashMap; 5 import java.util.Iterator; 6 import java.util.Map; 7 8 import org.elasticsearch.action.bulk.BulkRequestBuilder; 9 import org.elasticsearch.action.bulk.BulkResponse; 10 import org.elasticsearch.client.Client; 11 12 import com.hansight.spark.utils.EsUtils; 13 14 public class HttpLog { 15 private String rawlog; 16 // VARCHAR2(8 BYTE) 记录类型,表示此记录为HTTP浏览业务记录(取值为’HTTP’) 17 private String RECORD_TYPE; 18 // TIMESTAMP 开始时间,格式为:YYYY-MM-DD HH24:MI:SS.xxxxxxxxx 19 private String CAPTURETIME; 20 // VARCHAR2(16 BYTE) 手机号码(从创建PDP上下文消息中获取) 21 private String MSISDN; 22 // IMSI VARCHAR2(18 BYTE) 国际移动用户识别码(从创建PDP上下文消息中获取) 23 private String IMSI; 24 // IMEI(SV) VARCHAR2(20 BYTE) IMEI(SV)号(从创建PDP上下文消息中获取) 25 private String IMEI; 26 // VARCHAR2(32 BYTE) APN 27 private String APN; 28 // UEIP VARCHAR2(20 BYTE) 终端的IP 29 private String UEIP; 30 // SPIP VARCHAR2(20 BYTE) SP的IP 31 private String SPIP; 32 // UEPORT NUMBER(12) 终端端口 33 private int UEPORT; 34 // SPPORT NUMBER(12) SP端端口 35 private int SPPORT; 36 // USERAGENT VARCHAR2(64 BYTE) User Agent信息 37 private String USERAGENT; 38 // URL VARCHAR2(256 BYTE) URL,该字段的错误率应不超过万分之一 39 private String URL; 40 // HOST VARCHAR2(64 BYTE) HOST信息 41 private String HOST; 42 // CONTENTLEN NUMBER(12) 内容大小 43 private String CONTENTLEN; 44 // CONTENTTYPE VARCHAR2(64 BYTE) 内容类型 45 private String CONTENTTYPE; 46 // BSKIP NUMBER(12) 是否是链接访问,0=否,1=是 47 private boolean BSKIP; 48 // REFERER VARCHAR2(256 BYTE) 链接源信息 49 private String REFERER; 50 // HTTPSTATUS NUMBER(12) 状态码,请参照附录HTTPSTATUS表 51 private long HTTPSTATUS; 52 // RESPDELAY NUMBER(12) 响应时延,单位毫秒 53 private long RESPDELAY; 54 // HTTPACTION NUMBER(12) HTTP操作类型(5: Post, 6:Get) 55 private String HTTPACTION; 56 // DURATION NUMBER(12) 持续时长 57 private long DURATION; 58 // FLOW NUMBER(12) 总流量 59 private long FLOW; 60 // UPFLOW NUMBER(12) 上行流量 61 private long UPFLOW; 62 // DOWNFLOW NUMBER(12) 下行流量 63 private long DOWNFLOW; 64 // SGSNIP VARCHAR2(20 BYTE) SGSN 用户面 IP 65 private String SGSNIP; 66 // GGSNIP VARCHAR2(20 BYTE) GGSN 用户面 IP 67 private String GGSNIP; 68 // LAC NUMBER(12) LAC信息 69 private long LAC; 70 // CI NUMBER(12) CI/SAC信息 71 private String CI; 72 // RATTYPE NUMBER(12) RAT Type,1=2G,2=3G 73 private String RATTYPE; 74 // STOPTIME TIMESTAMP 结束时间,格式为:YYYY-MM-DD HH24:MI:SS.xxxxxxxxx 75 private String STOPTIME; 76 // PBIP VARCHAR2(20 BYTE) 采集解析设备IP地址 77 private String PBIP; 78 // PBID NUMBER(12) 采集解析设备编号 79 private long PBID; 80 81 public String getRawlog() { 82 return rawlog; 83 } 84 85 public void setRawlog(String rawlog) { 86 this.rawlog = rawlog; 87 } 88 89 public String getRECORD_TYPE() { 90 return RECORD_TYPE; 91 } 92 93 public void setRECORD_TYPE(String rECORD_TYPE) { 94 RECORD_TYPE = rECORD_TYPE; 95 } 96 97 public String getCAPTURETIME() { 98 return CAPTURETIME; 99 }100 101 public void setCAPTURETIME(String cAPTURETIME) {102 CAPTURETIME = cAPTURETIME;103 }104 105 public String getMSISDN() {106 return MSISDN;107 }108 109 public void setMSISDN(String mSISDN) {110 MSISDN = mSISDN;111 }112 113 public String getIMSI() {114 return IMSI;115 }116 117 public void setIMSI(String iMSI) {118 IMSI = iMSI;119 }120 121 public String getIMEI() {122 return IMEI;123 }124 125 public void setIMEI(String iMEI) {126 IMEI = iMEI;127 }128 129 public String getAPN() {130 return APN;131 }132 133 public void setAPN(String aPN) {134 APN = aPN;135 }136 137 public String getUEIP() {138 return UEIP;139 }140 141 public void setUEIP(String uEIP) {142 UEIP = uEIP;143 }144 145 public String getSPIP() {146 return SPIP;147 }148 149 public void setSPIP(String sPIP) {150 SPIP = sPIP;151 }152 153 public int getUEPORT() {154 return UEPORT;155 }156 157 public void setUEPORT(int uEPORT) {158 UEPORT = uEPORT;159 }160 161 public int getSPPORT() {162 return SPPORT;163 }164 165 public void setSPPORT(int sPPORT) {166 SPPORT = sPPORT;167 }168 169 public String getUSERAGENT() {170 return USERAGENT;171 }172 173 public void setUSERAGENT(String uSERAGENT) {174 USERAGENT = uSERAGENT;175 }176 177 public String getURL() {178 return URL;179 }180 181 public void setURL(String uRL) {182 URL = uRL;183 }184 185 public String getHOST() {186 return HOST;187 }188 189 public void setHOST(String hOST) {190 HOST = hOST;191 }192 193 public String getCONTENTLEN() {194 return CONTENTLEN;195 }196 197 public void setCONTENTLEN(String cONTENTLEN) {198 CONTENTLEN = cONTENTLEN;199 }200 201 public String getCONTENTTYPE() {202 return CONTENTTYPE;203 }204 205 public void setCONTENTTYPE(String cONTENTTYPE) {206 CONTENTTYPE = cONTENTTYPE;207 }208 209 public boolean isBSKIP() {210 return BSKIP;211 }212 213 public void setBSKIP(boolean bSKIP) {214 BSKIP = bSKIP;215 }216 217 public String getREFERER() {218 return REFERER;219 }220 221 public void setREFERER(String rEFERER) {222 REFERER = rEFERER;223 }224 225 public long getHTTPSTATUS() {226 return HTTPSTATUS;227 }228 229 public void setHTTPSTATUS(long hTTPSTATUS) {230 HTTPSTATUS = hTTPSTATUS;231 }232 233 public long getRESPDELAY() {234 return RESPDELAY;235 }236 237 public void setRESPDELAY(long rESPDELAY) {238 RESPDELAY = rESPDELAY;239 }240 241 public String getHTTPACTION() {242 return HTTPACTION;243 }244 245 public void setHTTPACTION(String hTTPACTION) {246 HTTPACTION = hTTPACTION;247 }248 249 public long getDURATION() {250 return DURATION;251 }252 253 public void setDURATION(long dURATION) {254 DURATION = dURATION;255 }256 257 public long getFLOW() {258 return FLOW;259 }260 261 public void setFLOW(long fLOW) {262 FLOW = fLOW;263 }264 265 public long getUPFLOW() {266 return UPFLOW;267 }268 269 public void setUPFLOW(long uPFLOW) {270 UPFLOW = uPFLOW;271 }272 273 public long getDOWNFLOW() {274 return DOWNFLOW;275 }276 277 public void setDOWNFLOW(long dOWNFLOW) {278 DOWNFLOW = dOWNFLOW;279 }280 281 public String getSGSNIP() {282 return SGSNIP;283 }284 285 public void setSGSNIP(String sGSNIP) {286 SGSNIP = sGSNIP;287 }288 289 public String getGGSNIP() {290 return GGSNIP;291 }292 293 public void setGGSNIP(String gGSNIP) {294 GGSNIP = gGSNIP;295 }296 297 public long getLAC() {298 return LAC;299 }300 301 public void setLAC(long lAC) {302 LAC = lAC;303 }304 305 public String getCI() {306 return CI;307 }308 309 public void setCI(String cI) {310 CI = cI;311 }312 313 public String getRATTYPE() {314 return RATTYPE;315 }316 317 public void setRATTYPE(String rATTYPE) {318 RATTYPE = rATTYPE;319 }320 321 public String getSTOPTIME() {322 return STOPTIME;323 }324 325 public void setSTOPTIME(String sTOPTIME) {326 STOPTIME = sTOPTIME;327 }328 329 public String getPBIP() {330 return PBIP;331 }332 333 public void setPBIP(String pBIP) {334 PBIP = pBIP;335 }336 337 public long getPBID() {338 return PBID;339 }340 341 public void setPBID(long pBID) {342 PBID = pBID;343 }344 345 public static HttpLog parser(String line) {346 if (line == null) {347 return null;348 }349 String[] arr = line.split("‘,‘");350 HttpLog log = new HttpLog();351 log.setRawlog(line);352 if (arr.length != 31) {353 return log;354 }355 String start = arr[0];356 if (start != null) {357 start = arr[0].substring(1);358 }359 log.setRECORD_TYPE(start);360 log.setCAPTURETIME(arr[1]);361 log.setMSISDN(arr[2]);362 log.setIMSI(arr[3]);363 log.setIMEI(arr[4]);364 log.setAPN(arr[5]);365 log.setUEIP(arr[6]);366 log.setSPIP(arr[7]);367 log.setUEPORT(Integer.parseInt(arr[8]));368 log.setSPPORT(Integer.parseInt(arr[9]));369 log.setUSERAGENT(arr[10]);370 log.setURL(arr[11]);371 log.setHOST(arr[12]);372 log.setCONTENTLEN(arr[13]);373 log.setCONTENTTYPE(arr[14]);374 log.setBSKIP("1".equals(arr[15]));375 log.setREFERER(arr[16]);376 log.setHTTPSTATUS(Long.parseLong(arr[17]));377 log.setRESPDELAY(Long.parseLong(arr[18]));378 String action = arr[19];379 if ("5".equals(action)) {380 action = "POST";381 } else if ("6".equals(action)) {382 action = "GET";383 }384 log.setHTTPACTION(action);385 386 log.setDURATION(Long.parseLong(arr[20]));387 log.setFLOW(Long.parseLong(arr[21]));388 log.setUPFLOW(Long.parseLong(arr[22]));389 log.setDOWNFLOW(Long.parseLong(arr[23]));390 log.setSGSNIP(arr[24]);391 log.setGGSNIP(arr[25]);392 log.setLAC(Long.parseLong(arr[26]));393 log.setCI(arr[27]);394 String ratType = arr[28];395 if ("1".equals(ratType)) {396 ratType = "2G";397 } else if ("2".equals(ratType)) {398 ratType = "3G";399 }400 log.setRATTYPE(ratType);401 log.setSTOPTIME(arr[29]);402 log.setPBIP(arr[30]);403 String stop = arr[31];404 if (stop != null) {405 stop = stop.substring(0, stop.length() - 1);406 }407 log.setPBID(Long.parseLong(stop));408 return log;409 }410 411 public static void save(Iterator<HttpLog> t) {412 try {413 Client client = EsUtils.getEsClient();414 BulkRequestBuilder bulk = client.prepareBulk();415 int index = 0;416 while (t.hasNext()) {417 HttpLog log = t.next();418 index++;419 bulk.add(client.prepareIndex("logs_nuoxi", "http").setSource(420 log.toJSON()));421 if (index >= 500) {422 BulkResponse bulkResponse = bulk.execute().actionGet();423 if (bulkResponse.hasFailures()) {424 // 处理错误425 System.out.println(bulkResponse.buildFailureMessage());426 }427 index = 0;428 }429 }430 if (index != 0) {431 BulkResponse bulkResponse = bulk.execute().actionGet();432 if (bulkResponse.hasFailures()) {433 // 处理错误434 System.out.println(bulkResponse.buildFailureMessage());435 }436 }437 } catch (Exception e) {438 e.printStackTrace();439 throw e;440 }441 // client.close();442 }443 444 private Map<String, Object> toJSON() {445 Field[] fields = this.getClass().getDeclaredFields();446 Map<String, Object> map = new HashMap<>();447 for (Field field : fields) {448 field.setAccessible(true);449 try {450 map.put(field.getName().toLowerCase(), field.get(this));451 } catch (IllegalArgumentException | IllegalAccessException e) {452 e.printStackTrace();453 }454 }455 return map;456 }457 }
开发系列:01、使用Java和Maven开发Spark应用
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。