首页 > 代码库 > 开发系列:01、使用Java和Maven开发Spark应用

开发系列:01、使用Java和Maven开发Spark应用

1、POM.xml

  1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">  3     <modelVersion>4.0.0</modelVersion>  4   5     <groupId>org.hansight.spark</groupId>  6     <artifactId>examples</artifactId>  7     <version>0.0.1-SNAPSHOT</version>  8     <packaging>jar</packaging>  9  10     <name>examples</name> 11     <url>http://maven.apache.org</url> 12  13     <properties> 14         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 15         <elasticsearch.version>1.2.1</elasticsearch.version> 16         <jdk.version>1.7</jdk.version> 17         <logback.version>1.1.2</logback.version> 18         <slf4j.version>1.7.7</slf4j.version> 19         <junit.version>4.11</junit.version> 20         <jcl.over.slf4j.version>1.7.7</jcl.over.slf4j.version> 21         <metrics.version>3.0.2</metrics.version> 22         <avro.version>1.7.7</avro.version> 23         <jna.version>4.1.0</jna.version> 24         <spark.version>1.0.2</spark.version> 25     </properties> 26     <dependencies> 27         <dependency> 28             <groupId>junit</groupId> 29             <artifactId>junit</artifactId> 30             <version>3.8.1</version> 31             <scope>test</scope> 32         </dependency> 33         <dependency> 34             <groupId>com.fasterxml.jackson.core</groupId> 35             <artifactId>jackson-core</artifactId> 36             <version>2.4.2</version> 37         </dependency> 38         <dependency> 39             <groupId>com.google.guava</groupId> 40             <artifactId>guava</artifactId> 41             <version>14.0.1</version> 42             <scope>provided</scope> 43         </dependency> 44         <dependency> 45             <groupId>org.apache.spark</groupId> 46             <artifactId>spark-streaming-kafka_2.10</artifactId> 47             <version>${spark.version}</version> 48             <exclusions> 49                 <exclusion> 50                     <groupId>javax.servlet</groupId> 51                     <artifactId>servlet-api</artifactId> 52                 </exclusion> 53                 <exclusion> 54                     <groupId>org.apache.hadoop</groupId> 55                     <artifactId>hadoop-client</artifactId> 56                 </exclusion> 57             </exclusions> 58         </dependency> 59         <dependency> 60             <groupId>org.elasticsearch</groupId> 61             <artifactId>elasticsearch</artifactId> 62             <version>1.2.1</version> 63         </dependency> 64         <dependency> 65             <groupId>org.apache.hadoop</groupId> 66             <artifactId>hadoop-hdfs</artifactId> 67             <version>2.4.0.2.1.4.0-632</version> 68         </dependency> 69         <dependency> 70             <groupId>org.apache.hadoop</groupId> 71             <artifactId>hadoop-common</artifactId> 72             <version>2.4.0.2.1.4.0-632</version> 73             <exclusions> 74                 <exclusion> 75                     <groupId>jdk.tools</groupId> 76                     <artifactId>jdk.tools</artifactId> 77                 </exclusion> 78             </exclusions> 79         </dependency> 80         <dependency> 81             <groupId>org.apache.hadoop</groupId> 82             <artifactId>hadoop-mapreduce-client-common</artifactId> 83             <version>2.4.0.2.1.4.0-632</version> 84         </dependency> 85         <dependency> 86             <groupId>org.elasticsearch</groupId> 87             <artifactId>elasticsearch-hadoop</artifactId> 88             <version>2.0.1</version> 89         </dependency> 90     </dependencies> 91  92     <build> 93         <plugins> 94             <plugin> 95                 <groupId>org.apache.maven.plugins</groupId> 96                 <artifactId>maven-compiler-plugin</artifactId> 97                 <version>3.1</version> 98                 <configuration> 99                     <source>${jdk.version}</source>100                     <target>${jdk.version}</target>101                 </configuration>102             </plugin>103             <plugin>104                 <groupId>org.apache.maven.plugins</groupId>105                 <artifactId>maven-assembly-plugin</artifactId>106                 <version>2.4</version>107                 <configuration>108                     <descriptorRefs>109                         <descriptorRef>jar-with-dependencies</descriptorRef>110                     </descriptorRefs>111                 </configuration>112             </plugin>113         </plugins>114     </build>115 </project>

 

 

2、样例代码

 1 package com.hansight.spark.utils; 2  3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.JavaSparkContext; 5  6 public class SparkUtils { 7  8     public static JavaSparkContext get(String name) { 9         SparkConf conf = new SparkConf();10         // conf.setMaster("local[1]");11         // conf.setMaster("spark://hdp125:7077");12         conf.setAppName(name);13         return new JavaSparkContext(conf);14     }15 }

 

 1 package com.hansight.spark.streaming; 2  3 import java.util.Iterator; 4  5 import org.apache.spark.api.java.JavaRDD; 6 import org.apache.spark.api.java.JavaSparkContext; 7 import org.apache.spark.api.java.function.Function; 8 import org.apache.spark.api.java.function.VoidFunction; 9 10 import com.hansight.spark.utils.SparkUtils;11 12 public class HttpParser {13     @SuppressWarnings({ "unchecked", "serial" })14     public static void main(String[] args) {15         if (args.length == 0) {16             System.out.println("Usage: <file path>");17             return;18         }19         System.setProperty("hadoop.home.dir", "E:/tools/hadoop-2.4.1");20         JavaSparkContext sc = SparkUtils.get("HttpLog");21         String path = args[0];22         JavaRDD<String> rdd = sc23                 .textFile(path);24         JavaRDD<HttpLog> parsed = rdd.map(new Function<String, HttpLog>() {25             public HttpLog call(String line) throws Exception {26                 return HttpLog.parser(line);27             }28         });29         System.out.println(parsed.count());30         parsed.foreachPartition(new VoidFunction<Iterator<HttpLog>>() {31             @Override32             public void call(Iterator<HttpLog> t) throws Exception {33                 HttpLog.save(t);34             }35         });36     }37 }
  1 package com.hansight.spark.streaming;  2   3 import java.lang.reflect.Field;  4 import java.util.HashMap;  5 import java.util.Iterator;  6 import java.util.Map;  7   8 import org.elasticsearch.action.bulk.BulkRequestBuilder;  9 import org.elasticsearch.action.bulk.BulkResponse; 10 import org.elasticsearch.client.Client; 11  12 import com.hansight.spark.utils.EsUtils; 13  14 public class HttpLog { 15     private String rawlog; 16     // VARCHAR2(8 BYTE) 记录类型,表示此记录为HTTP浏览业务记录(取值为’HTTP’) 17     private String RECORD_TYPE; 18     // TIMESTAMP 开始时间,格式为:YYYY-MM-DD HH24:MI:SS.xxxxxxxxx 19     private String CAPTURETIME; 20     // VARCHAR2(16 BYTE) 手机号码(从创建PDP上下文消息中获取) 21     private String MSISDN; 22     // IMSI VARCHAR2(18 BYTE) 国际移动用户识别码(从创建PDP上下文消息中获取) 23     private String IMSI; 24     // IMEI(SV) VARCHAR2(20 BYTE) IMEI(SV)号(从创建PDP上下文消息中获取) 25     private String IMEI; 26     // VARCHAR2(32 BYTE) APN 27     private String APN; 28     // UEIP VARCHAR2(20 BYTE) 终端的IP 29     private String UEIP; 30     // SPIP VARCHAR2(20 BYTE) SP的IP 31     private String SPIP; 32     // UEPORT NUMBER(12) 终端端口 33     private int UEPORT; 34     // SPPORT NUMBER(12) SP端端口 35     private int SPPORT; 36     // USERAGENT VARCHAR2(64 BYTE) User Agent信息 37     private String USERAGENT; 38     // URL VARCHAR2(256 BYTE) URL,该字段的错误率应不超过万分之一 39     private String URL; 40     // HOST VARCHAR2(64 BYTE) HOST信息 41     private String HOST; 42     // CONTENTLEN NUMBER(12) 内容大小 43     private String CONTENTLEN; 44     // CONTENTTYPE VARCHAR2(64 BYTE) 内容类型 45     private String CONTENTTYPE; 46     // BSKIP NUMBER(12) 是否是链接访问,0=否,1=是 47     private boolean BSKIP; 48     // REFERER VARCHAR2(256 BYTE) 链接源信息 49     private String REFERER; 50     // HTTPSTATUS NUMBER(12) 状态码,请参照附录HTTPSTATUS表 51     private long HTTPSTATUS; 52     // RESPDELAY NUMBER(12) 响应时延,单位毫秒 53     private long RESPDELAY; 54     // HTTPACTION NUMBER(12) HTTP操作类型(5: Post, 6:Get) 55     private String HTTPACTION; 56     // DURATION NUMBER(12) 持续时长 57     private long DURATION; 58     // FLOW NUMBER(12) 总流量 59     private long FLOW; 60     // UPFLOW NUMBER(12) 上行流量 61     private long UPFLOW; 62     // DOWNFLOW NUMBER(12) 下行流量 63     private long DOWNFLOW; 64     // SGSNIP VARCHAR2(20 BYTE) SGSN 用户面 IP 65     private String SGSNIP; 66     // GGSNIP VARCHAR2(20 BYTE) GGSN 用户面 IP 67     private String GGSNIP; 68     // LAC NUMBER(12) LAC信息 69     private long LAC; 70     // CI NUMBER(12) CI/SAC信息 71     private String CI; 72     // RATTYPE NUMBER(12) RAT Type,1=2G,2=3G 73     private String RATTYPE; 74     // STOPTIME TIMESTAMP 结束时间,格式为:YYYY-MM-DD HH24:MI:SS.xxxxxxxxx 75     private String STOPTIME; 76     // PBIP VARCHAR2(20 BYTE) 采集解析设备IP地址 77     private String PBIP; 78     // PBID NUMBER(12) 采集解析设备编号 79     private long PBID; 80  81     public String getRawlog() { 82         return rawlog; 83     } 84  85     public void setRawlog(String rawlog) { 86         this.rawlog = rawlog; 87     } 88  89     public String getRECORD_TYPE() { 90         return RECORD_TYPE; 91     } 92  93     public void setRECORD_TYPE(String rECORD_TYPE) { 94         RECORD_TYPE = rECORD_TYPE; 95     } 96  97     public String getCAPTURETIME() { 98         return CAPTURETIME; 99     }100 101     public void setCAPTURETIME(String cAPTURETIME) {102         CAPTURETIME = cAPTURETIME;103     }104 105     public String getMSISDN() {106         return MSISDN;107     }108 109     public void setMSISDN(String mSISDN) {110         MSISDN = mSISDN;111     }112 113     public String getIMSI() {114         return IMSI;115     }116 117     public void setIMSI(String iMSI) {118         IMSI = iMSI;119     }120 121     public String getIMEI() {122         return IMEI;123     }124 125     public void setIMEI(String iMEI) {126         IMEI = iMEI;127     }128 129     public String getAPN() {130         return APN;131     }132 133     public void setAPN(String aPN) {134         APN = aPN;135     }136 137     public String getUEIP() {138         return UEIP;139     }140 141     public void setUEIP(String uEIP) {142         UEIP = uEIP;143     }144 145     public String getSPIP() {146         return SPIP;147     }148 149     public void setSPIP(String sPIP) {150         SPIP = sPIP;151     }152 153     public int getUEPORT() {154         return UEPORT;155     }156 157     public void setUEPORT(int uEPORT) {158         UEPORT = uEPORT;159     }160 161     public int getSPPORT() {162         return SPPORT;163     }164 165     public void setSPPORT(int sPPORT) {166         SPPORT = sPPORT;167     }168 169     public String getUSERAGENT() {170         return USERAGENT;171     }172 173     public void setUSERAGENT(String uSERAGENT) {174         USERAGENT = uSERAGENT;175     }176 177     public String getURL() {178         return URL;179     }180 181     public void setURL(String uRL) {182         URL = uRL;183     }184 185     public String getHOST() {186         return HOST;187     }188 189     public void setHOST(String hOST) {190         HOST = hOST;191     }192 193     public String getCONTENTLEN() {194         return CONTENTLEN;195     }196 197     public void setCONTENTLEN(String cONTENTLEN) {198         CONTENTLEN = cONTENTLEN;199     }200 201     public String getCONTENTTYPE() {202         return CONTENTTYPE;203     }204 205     public void setCONTENTTYPE(String cONTENTTYPE) {206         CONTENTTYPE = cONTENTTYPE;207     }208 209     public boolean isBSKIP() {210         return BSKIP;211     }212 213     public void setBSKIP(boolean bSKIP) {214         BSKIP = bSKIP;215     }216 217     public String getREFERER() {218         return REFERER;219     }220 221     public void setREFERER(String rEFERER) {222         REFERER = rEFERER;223     }224 225     public long getHTTPSTATUS() {226         return HTTPSTATUS;227     }228 229     public void setHTTPSTATUS(long hTTPSTATUS) {230         HTTPSTATUS = hTTPSTATUS;231     }232 233     public long getRESPDELAY() {234         return RESPDELAY;235     }236 237     public void setRESPDELAY(long rESPDELAY) {238         RESPDELAY = rESPDELAY;239     }240 241     public String getHTTPACTION() {242         return HTTPACTION;243     }244 245     public void setHTTPACTION(String hTTPACTION) {246         HTTPACTION = hTTPACTION;247     }248 249     public long getDURATION() {250         return DURATION;251     }252 253     public void setDURATION(long dURATION) {254         DURATION = dURATION;255     }256 257     public long getFLOW() {258         return FLOW;259     }260 261     public void setFLOW(long fLOW) {262         FLOW = fLOW;263     }264 265     public long getUPFLOW() {266         return UPFLOW;267     }268 269     public void setUPFLOW(long uPFLOW) {270         UPFLOW = uPFLOW;271     }272 273     public long getDOWNFLOW() {274         return DOWNFLOW;275     }276 277     public void setDOWNFLOW(long dOWNFLOW) {278         DOWNFLOW = dOWNFLOW;279     }280 281     public String getSGSNIP() {282         return SGSNIP;283     }284 285     public void setSGSNIP(String sGSNIP) {286         SGSNIP = sGSNIP;287     }288 289     public String getGGSNIP() {290         return GGSNIP;291     }292 293     public void setGGSNIP(String gGSNIP) {294         GGSNIP = gGSNIP;295     }296 297     public long getLAC() {298         return LAC;299     }300 301     public void setLAC(long lAC) {302         LAC = lAC;303     }304 305     public String getCI() {306         return CI;307     }308 309     public void setCI(String cI) {310         CI = cI;311     }312 313     public String getRATTYPE() {314         return RATTYPE;315     }316 317     public void setRATTYPE(String rATTYPE) {318         RATTYPE = rATTYPE;319     }320 321     public String getSTOPTIME() {322         return STOPTIME;323     }324 325     public void setSTOPTIME(String sTOPTIME) {326         STOPTIME = sTOPTIME;327     }328 329     public String getPBIP() {330         return PBIP;331     }332 333     public void setPBIP(String pBIP) {334         PBIP = pBIP;335     }336 337     public long getPBID() {338         return PBID;339     }340 341     public void setPBID(long pBID) {342         PBID = pBID;343     }344 345     public static HttpLog parser(String line) {346         if (line == null) {347             return null;348         }349         String[] arr = line.split("‘,‘");350         HttpLog log = new HttpLog();351         log.setRawlog(line);352         if (arr.length != 31) {353             return log;354         }355         String start = arr[0];356         if (start != null) {357             start = arr[0].substring(1);358         }359         log.setRECORD_TYPE(start);360         log.setCAPTURETIME(arr[1]);361         log.setMSISDN(arr[2]);362         log.setIMSI(arr[3]);363         log.setIMEI(arr[4]);364         log.setAPN(arr[5]);365         log.setUEIP(arr[6]);366         log.setSPIP(arr[7]);367         log.setUEPORT(Integer.parseInt(arr[8]));368         log.setSPPORT(Integer.parseInt(arr[9]));369         log.setUSERAGENT(arr[10]);370         log.setURL(arr[11]);371         log.setHOST(arr[12]);372         log.setCONTENTLEN(arr[13]);373         log.setCONTENTTYPE(arr[14]);374         log.setBSKIP("1".equals(arr[15]));375         log.setREFERER(arr[16]);376         log.setHTTPSTATUS(Long.parseLong(arr[17]));377         log.setRESPDELAY(Long.parseLong(arr[18]));378         String action = arr[19];379         if ("5".equals(action)) {380             action = "POST";381         } else if ("6".equals(action)) {382             action = "GET";383         }384         log.setHTTPACTION(action);385 386         log.setDURATION(Long.parseLong(arr[20]));387         log.setFLOW(Long.parseLong(arr[21]));388         log.setUPFLOW(Long.parseLong(arr[22]));389         log.setDOWNFLOW(Long.parseLong(arr[23]));390         log.setSGSNIP(arr[24]);391         log.setGGSNIP(arr[25]);392         log.setLAC(Long.parseLong(arr[26]));393         log.setCI(arr[27]);394         String ratType = arr[28];395         if ("1".equals(ratType)) {396             ratType = "2G";397         } else if ("2".equals(ratType)) {398             ratType = "3G";399         }400         log.setRATTYPE(ratType);401         log.setSTOPTIME(arr[29]);402         log.setPBIP(arr[30]);403         String stop = arr[31];404         if (stop != null) {405             stop = stop.substring(0, stop.length() - 1);406         }407         log.setPBID(Long.parseLong(stop));408         return log;409     }410 411     public static void save(Iterator<HttpLog> t) {412         try {413             Client client = EsUtils.getEsClient();414             BulkRequestBuilder bulk = client.prepareBulk();415             int index = 0;416             while (t.hasNext()) {417                 HttpLog log = t.next();418                 index++;419                 bulk.add(client.prepareIndex("logs_nuoxi", "http").setSource(420                         log.toJSON()));421                 if (index >= 500) {422                     BulkResponse bulkResponse = bulk.execute().actionGet();423                     if (bulkResponse.hasFailures()) {424                         // 处理错误425                         System.out.println(bulkResponse.buildFailureMessage());426                     }427                     index = 0;428                 }429             }430             if (index != 0) {431                 BulkResponse bulkResponse = bulk.execute().actionGet();432                 if (bulkResponse.hasFailures()) {433                     // 处理错误434                     System.out.println(bulkResponse.buildFailureMessage());435                 }436             }437         } catch (Exception e) {438             e.printStackTrace();439             throw e;440         }441         // client.close();442     }443 444     private Map<String, Object> toJSON() {445         Field[] fields = this.getClass().getDeclaredFields();446         Map<String, Object> map = new HashMap<>();447         for (Field field : fields) {448             field.setAccessible(true);449             try {450                 map.put(field.getName().toLowerCase(), field.get(this));451             } catch (IllegalArgumentException | IllegalAccessException e) {452                 e.printStackTrace();453             }454         }455         return map;456     }457 }

 

开发系列:01、使用Java和Maven开发Spark应用