首页 > 代码库 > SparkStreaming python 读取kafka数据将结果输出到单个指定本地文件

SparkStreaming python 读取kafka数据将结果输出到单个指定本地文件

 

# -*- coding: UTF-8 -*-
#!/bin/env python3

# filename readFromKafkaStreamingGetLocation.py

import IP
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import datetime


class KafkaMessageParse:

    def extractFromKafka(self,kafkainfo):
        if type(kafkainfo) is tuple and len(kafkainfo) == 2:
            return kafkainfo[1]

    def lineFromLines(self,lines):
        if lines is not None and len(lines) > 0:
            return lines.strip().split("\n")

    def messageFromLine(self,line):
        if line is not None and "message" in line.keys():
            return line.get("message")

    def ip2location(self,ip):
        result = []
        country = country
        province = province
        city = city
        ipinfo = IP.find(ip.strip())
        try:
            location = ipinfo.split("\t")
            if len(location) == 3:
                country = location[0]
                province = location[1]
                city = location[2]
            elif len(location) == 2:
                country = location[0]
                province = location[1]
            else:
                pass
        except Exception:
            pass
        result.append(ip)
        result.append(country)
        result.append(province)
        result.append(city)
        return result

    def vlistfromkv(self, strori, sep1, sep2):
        resultlist = []
        fields = strori.split(sep1)
        for field in fields:
            kv = field.split(sep2)
            resultlist.append(kv[1])
        return resultlist


    def extractFromMessage(self, message):
        if message is not None and len(message) > 1:
            if len(message.split("\u0001")) == 8:
                resultlist = self.vlistfromkv(message, "\x01", "\x02")
                source = resultlist.pop()
                ip = resultlist.pop()
                resultlist.extend(self.ip2location(ip))
                resultlist.append(source)
                result = "\x01".join(resultlist)
        return result


def tpprint(val, num=10000):
    """
    Print the first num elements of each RDD generated in this DStream.
    @param num: the number of elements from the first will be printed.
    """
    def takeAndPrint(time, rdd):
        taken = rdd.take(num + 1)
        print("########################")
        print("Time: %s" % time)
        print("########################")
        DATEFORMAT = ‘%Y%m%d‘
        today = datetime.datetime.now().strftime(DATEFORMAT)
        myfile = open("/data/speech/speech." + today, "a")
        for record in taken[:num]:
            print(record)
            myfile.write(str(record)+"\n")
        myfile.close()
        if len(taken) > num:
            print("...")
        print("")

    val.foreachRDD(takeAndPrint)


if __name__ == __main__:
    zkQuorum = datacollect-1:2181,datacollect-2:2181,datacollect-3:2181
    topic = {speech-1: 1, speech-2: 1, speech-3: 1, speech-4:1, speech-5:1}
    groupid = "rokid-speech-get-location"
    master = "local[*]"
    appName = "SparkStreamingRokid"
    timecell = 5

    sc = SparkContext(master=master, appName=appName)
    ssc = StreamingContext(sc, timecell)
    # ssc.checkpoint("checkpoint_"+time.strftime("%Y-%m-%d", time.localtime(time.time())))

    kvs = KafkaUtils.createStream(ssc, zkQuorum, groupid, topic)
    kmp = KafkaMessageParse()
    lines = kvs.map(lambda x: kmp.extractFromKafka(x))
    lines1 = lines.flatMap(lambda x: kmp.lineFromLines(x))
    valuedict = lines1.map(lambda x: eval(x))
    message = valuedict.map(lambda x: kmp.messageFromLine(x))
    rdd2 = message.map(lambda x: kmp.extractFromMessage(x))

    # rdd2.pprint()

    tpprint(rdd2)
    # rdd2.fileprint(filepath="result.txt")

    # rdd2.foreachRDD().saveAsTextFiles("/home/admin/agent/spark/result.txt")

    # sc.parallelize(rdd2.cache()).saveAsTextFile("/home/admin/agent/spark/result", "txt")

    # rdd2.repartition(1).saveAsTextFiles("/home/admin/agent/spark/result.txt")

    ssc.start()
    ssc.awaitTermination()

 

主要是重写pprint()函数

 

参考:https://stackoverflow.com/questions/37864526/append-spark-dstream-to-a-single-file-in-python

 

SparkStreaming python 读取kafka数据将结果输出到单个指定本地文件