首页 > 代码库 > shape into blocks--source code in python based on pySpark

shape into blocks--source code in python based on pySpark

这是微博深度和广度预测的原始代码,写了大约半个月,第一个版本不是这样的,但是这个版本包含所有需要的功能。

模块化的程度也更高。找工作前一直想用python完美解决这个问题,后来发现自己的方法和硬件都有很大的局限。

算是我的第一次正儿八经的尝试在分布式计算的框架下,计算海量的数据。

意识到很多问题,影响我面试时候很多的代码风格。

def get_basic_info():
    win_path = "E:/spark/weibo_predict/"
    linux_path = "/home/jason/spark/weibo_predict/"
    path = linux_path

    train_path = path + train/
    test_path = path + test/
    code_path = path + source_code/
    
    print(\n训练准备文件保存路径px:%s % train_path)
    print(\n测试准备文件保存路径py:%s % test_path)
    print(\n代码准备文件保存路径pz:%s % code_path)

    train_weibo_raw_path = path + "train_weibo_raw.txt"
    train_weibo_repost_path = path + "train_weibo_repost_back.txt" 

    test_weibo_raw_path = path + "test_weibo_raw.txt"
    test_weibo_repost_path = path + "test_weibo_repost.txt"

    user_relations_path = path + "user_relations_back.txt"         
                                                                                                                                                                                                                                                                                                                                            
    print("\n训练原始微博地址p1:%s" % train_weibo_raw_path)
    print("训练转发微博地址p2:%s" % train_weibo_repost_path)
    print("\n测试原始微博地址p3:%s" % test_weibo_raw_path)
    print("测试转发微博地址p4:%s" % test_weibo_repost_path)
    print("\n    用户关系地址p5:%s" % user_relations_path)
    return train_path,test_path,code_path,train_weibo_raw_path,train_weibo_repost_path,test_weibo_raw_path,test_weibo_repost_path,user_relations_path

 

#传递  训练(原始微博,转发微博) 或者 测试(原始微博,转发微博)  
#返回化简后的对应关系repost_id_line_time_reduce
#返回微博id对应的用户idwid_uid_rdd
from pyspark import SparkContext
def get_prime_rdd(train_or_test,sc, p1,p2,p3,p4):
    if train_or_test == train:
        inside_path_a = p1
        inside_path_b = p2
    elif train_or_test == test:
        inside_path_a = p3
        inside_path_b = p4
    else:
        print("only input train or test")
        return 0,0
      
    sc = sc
    train_weibo_raw_data = sc.textFile(inside_path_a)
    train_weibo_raw_data_count = train_weibo_raw_data.count()
    train_weibo_raw_data_rdd = train_weibo_raw_data.map(lambda x: x.split("\001"))
    w_id=train_weibo_raw_data_rdd.map(lambda x:x[0])
    u_id=train_weibo_raw_data_rdd.map(lambda x:x[1])
    wid_uid_rdd = w_id.zip(u_id)
    
    train_weibo_repost_data = sc.textFile(inside_path_b)
    train_weibo_repost_data_count = train_weibo_repost_data.count()
    train_weibo_repost_data_rdd = train_weibo_repost_data.map(lambda x: x.split("\001"))
    repost_id = train_weibo_repost_data_rdd.map(lambda x: x[0])
    repost_line_time = train_weibo_repost_data_rdd.map(lambda x: x[1:-1])
    repost_id_line_time = repost_id.zip(repost_line_time)
    repost_id_line_time_reduce = repost_id_line_time.groupByKey().mapValues(list)
    
    repost_id_line_time_reduce = repost_id_line_time_reduce.subtractByKey(repost_id_line_time_reduce.subtractByKey(wid_uid_rdd))
    wid_uid_rdd = wid_uid_rdd.subtractByKey(wid_uid_rdd.subtractByKey(repost_id_line_time_reduce))
        
    return repost_id_line_time_reduce,wid_uid_rdd 

 

def get_uid_fnum_rdd(sc,p5):
    sc = sc
    user_relations_data = sc.textFile(p5)
    user_relations_data_count = user_relations_data.count()
    user_relations_data_rdd_1 = user_relations_data.map(lambda x: x.split("\t")[0])
    user_relations_data_rdd_2 = user_relations_data.map(lambda x: x.split("\t")[1])
    user_relations_data_rdd_user = user_relations_data_rdd_1
    user_relations_data_rdd_fans = user_relations_data_rdd_2.map(lambda x: x.split("\x01"))
    user_fans = user_relations_data_rdd_user.zip(user_relations_data_rdd_fans)
    fans_nums = user_relations_data_rdd_fans.map(lambda s:len(s))
    uid_fnum_rdd = user_fans.keys().zip(fans_nums)
    return uid_fnum_rdd

 

##版本 2  分时间段计算指定时间段的转发量
def cal_times_j(list,j):
    ct = 0
    for i in range(len(list)):
        #if int(list[i][-1]) >= j*900 and int(list[i][-1]) <= (j+1)*900:
        #这里可以切换求累计的转发量还是区间的转发量
        if int(list[i][-1]) <= (j)*900:
            ct += 1
    return ct
def cal_id_times_j(rdd,j):
    times = rdd.values().map(lambda x: cal_times_j(x,j))
    rdd = rdd.keys().zip(times)
    return rdd 

def generate_times_file(rdd,k,path):
    for j in range(k-1,k+1):
        import csv
        a_path = str(path) + wid_times/wid_times_+str(j)+.csv
        #print(path)
        out_file_train_times_j = open(a_path,w)
        writer = csv.writer(out_file_train_times_j);
        zhuanfa = cal_id_times_j(rdd,j+1)
        for lists in zhuanfa.collect():
            writer.writerow(lists)
        out_file_train_times_j.close()

 

#计算深度
#定义函数,计算出指定阶段的,发生过的转发关系
def cal_during(list,j):
    new_list=[]
    for i in range(len(list)):
        if int(list[i][-1]) <= j*900:
            new_list.append(list[i])
    return new_list

#定义函数,计算一个rdd中,指定阶段,发生过的转发关系
def cal_rdd_during(rdd,j):
    return rdd.map(lambda x: cal_during(x,j))
    
#定义函数,如果一个转发关系的尾部,是另外一个转发关系的头,那么久把这个头的尾部,加到这个转发关系的尾部
def add_deep(list):
    kkk = len(list)
    if kkk<=1:
        return list
    else:
        for i in range(kkk):
            for j in range(kkk):
                if list[i][-1] == list[j][0]:
                    list[i].append(list[j][-1])
    return list

#定义函数返回序列中的数组的最长的值,作为最大的深度
def max_deep(list):
    max=2
    if len(list)==0:
        return 0
    else:
        for i in range(len(list)):
            max = (len(list[i]) if len(list[i])> max else max)
    return max-1

#定义函数,取出其中的两列
def ti_qu(list):
    for i in range(len(list)):
        list[i] = list[i][:-1]
    return list

def cal_cal(all_in_one_rdd, j):
    id_rdd = all_in_one_rdd.keys()                     #获取ID的RDD
    line_time_rdd = all_in_one_rdd.values()            #获取转发关系和转发时间对应的RDD
    line_time_rdd_j = cal_rdd_during(line_time_rdd,j)  #指定时间段,获取这个时间段发生过的转发和时间组成的RDD
    line_rdd_j = line_time_rdd_j.map(lambda x : ti_qu(x))#提取转发关系
    line_rdd_j_extend = line_rdd_j.map(lambda x: add_deep(x))#延长转发关系
    line_rdd_j_extend_maxdeep = line_rdd_j_extend.map(lambda x:max_deep(x))#计算最大深度
    id_deep_rdd_j = id_rdd.zip(line_rdd_j_extend_maxdeep)#组合微博ID与深度
    return id_deep_rdd_j

def generate_deeps_file(rdd,k,path):
    import csv
    for j in range(k-1,k+1):
        b_path = str(path) + wid_deeps/wid_deeps_+str(j)+.csv
        #print(path)
        out_file_train_deeps_j = open(b_path,w)
        writer = csv.writer(out_file_train_deeps_j);
        shendu = cal_cal(rdd,j+1)
        for lists in shendu.collect():
            writer.writerow(lists)
        out_file_train_deeps_j.close()

 

def get_wid_fnum_rdd(uid_fnum_rdd,wid_uid_rdd,path):
    #print("用户和粉丝个数的对应关系,取出来一个看看:")
    #print(uid_fnum_rdd.take(3))
    #print(uid_fnum_rdd.count())
    #print("\n训练原始约减微博的id和发送微博的人的id的对应rdd:")
    #print(wid_uid_rdd.take(3))
    #print(wid_uid_rdd.count())
    uid_wid_rdd = wid_uid_rdd.values().zip(wid_uid_rdd.keys())
    uid__wid_fnum = uid_wid_rdd.leftOuterJoin(uid_fnum_rdd)
    wid_fnum_rdd = uid__wid_fnum.values().map(lambda x: x[0]).zip(uid__wid_fnum.values().map(lambda x: x[1])) 
    #print(wid_fnum_rdd.take(2))
    #print(wid_fnum_rdd.count())
    import csv
    c_path = str(path) + wid_fnum_file.csv
    wid_fnum_file = open(c_path,"w")
    writer = csv.writer(wid_fnum_file);
    for lists in wid_fnum_rdd.collect():
        writer.writerow(lists);
    wid_fnum_file.close()
    
    return wid_fnum_rdd

 

#定义函数,将列表数组扁平化
def add_flat(list):
    if list==None:
        return 0
    else: 
        kkk = len(list)
        list0 = list[0]
        for i in range(kkk):
            if i==0:
                pass
            else:
                list0 = list0.append(list[i])
        return list0

#定义函数,计算覆盖用户数目
def clac_cover(list):
    total_cover=0
    for i in range(len(list)):
        total_cover += cover_value(list[i])
    return total_cover
        
#定义函数,计算某个用户的粉丝数:
def cover_value(user):
    ‘‘‘
    try:
        return uid_fnum_dict[user]
    except:
        return 0
    ‘‘‘
    for i in range(len(list_uid_fnum)):
        if user == list_uid_fnum[i][0]:
            return list_uid_fnum[i][1]
    else:
        return 0
def flatmapvalues(x):
    return x

def cal_sum(x):
    sum = 0
    if x==None and len(x)==0:
        return sum
    else:
        for i in range(len(x)):
            if x[i]== None:
                pass
            else:
                sum += int(x[i])
        return sum

def fans_cover_till_j(all_in_one_rdd,j):
    id_rdd = all_in_one_rdd.keys()                     #获取微博ID的RDD
    line_time_rdd = all_in_one_rdd.values()            #获取转发关系和转发时间对应的RDD
    line_time_rdd_j = cal_rdd_during(line_time_rdd,j)  #指定时间段,获取这个时间段发生过的转发和时间组成的RDD
    #print("\n指定时间段,获取这个时间段发生过的转发和时间组成的RDD");print(line_time_rdd_j.first())
    line_rdd_j = line_time_rdd_j.map(lambda x : ti_qu(x))#提取转发关系
    #print("\n提取转发关系");print(line_rdd_j.first())
    
    #line_rdd_j.flatMap(lambda x: re.sub(r‘\D‘," ",x).split())
    #line_rdd_j_flat = line_rdd_j.map(lambda x: add_flat(x))#扁平化转发关系,不行
    import re
    line_rdd_j_flat = line_rdd_j.map(lambda x: re.sub(r\D," ",str(x)).split())#扁平化转发关系
    #print("\n提取扁平化的转发关系");print(line_rdd_j_flat.first())
    
    line_rdd_j_flat_disc = line_rdd_j_flat.map(lambda x:list(set(list(x))))   #扁平化之后约减重复的用户ID
    #print("\n看看去重之后的转发用户");print(line_rdd_j_flat_disc.first())
    
    fans_cover_rdd_j = id_rdd.zip(line_rdd_j_flat_disc)
    #print("\n看看去重之后的微博ID和转发用户");print(fans_cover_rdd_j.first())
    
    fans_cover_rdd_j = fans_cover_rdd_j.flatMapValues(flatmapvalues)
    #print("\n看看去重之后的微博ID和转发用户,一对一flatmap之后");print(fans_cover_rdd_j.first())
    
    fans_cover_rdd_j = fans_cover_rdd_j.values().zip(fans_cover_rdd_j.keys())
    #print("\n翻转id和用户");print(fans_cover_rdd_j.first())
    
    fans_cover_rdd_j = fans_cover_rdd_j.leftOuterJoin(uid_fnum_rdd).values()
    #print("\n得到用户id_(微博ID,粉丝)");print(fans_cover_rdd_j.first())
    #print(fans_cover_rdd_j.count())
    
    fans_cover_rdd_j = fans_cover_rdd_j.map(lambda x: x[0]).zip(fans_cover_rdd_j.map(lambda x:x[1]))
    #print("\n得微博id_粉丝");print(fans_cover_rdd_j.first())
    #print(fans_cover_rdd_j.count())
    
    fans_cover_rdd_j = fans_cover_rdd_j.groupByKey().mapValues(list)
    #print("\n组合,");print(fans_cover_rdd_j.first())
    #print(fans_cover_rdd_j.count())
    
    fans_cover_rdd_j = fans_cover_rdd_j.keys().zip(fans_cover_rdd_j.values().map(lambda x: cal_sum(x)))
    #print("\nmap求和");print(fans_cover_rdd_j.first())
    
    #cover_rdd = line_rdd_j_flat_disc.map(lambda x: clac_cover(x))
    #fans_cover_rdd_j = id_rdd.zip(cover_rdd)#组合微博ID与覆盖数目
    #print(id_deep_rdd_j.first())
    #return line_rdd_j_extend_maxdeep
    temp_key_0 = all_in_one_rdd.keys().zip(all_in_one_rdd.values().map(lambda x: 0))
    
    fans_cover_rdd_j = temp_key_0.leftOuterJoin(fans_cover_rdd_j)
    fans_cover_rdd_j = fans_cover_rdd_j.keys().zip(fans_cover_rdd_j.values().map(lambda x: cal_sum(x)))
    
    
    return fans_cover_rdd_j

def generate_covers_file(rdd,k,path):
    #按理说没问题
    import csv
    for j in range(k-1,k+1):
        c_path = str(path) + wid_covers/wid_covers_+str(j)+.csv
        #print(c_path)
        out_file_train_covers_j = open(c_path,w)
        writer = csv.writer(out_file_train_covers_j)
        covers  = fans_cover_till_j(rdd,j+1)
        for lists in covers.collect():
            writer.writerow(lists)
        out_file_train_covers_j.close()

 

 

px,py,pz,p1,p2,p3,p4,p5 = get_basic_info()
uid_fnum_rdd = get_uid_fnum_rdd(sc,p5)
train_repost_id_line_time_reduce, train_wid_uid_rdd = get_prime_rdd(train,sc,p1,p2,p3,p4)
#wid_fnum_rdd = get_wid_fnum_rdd(uid_fnum_rdd,train_wid_uid_rdd,px)
#generate_times_file(train_repost_id_line_time_reduce,292,px)
#generate_deeps_file(train_repost_id_line_time_reduce,292,px)
#generate_covers_file(train_repost_id_line_time_reduce,292,px)

test_repost_id_line_time_reduce, test_wid_uid_rdd = get_prime_rdd(test,sc,p1,p2,p3,p4)
#test_wid_fnum_rdd = get_wid_fnum_rdd(uid_fnum_rdd,test_wid_uid_rdd,py)
#generate_times_file(test_repost_id_line_time_reduce,16,py)
#generate_deeps_file(test_repost_id_line_time_reduce,16,py)
#generate_covers_file(test_repost_id_line_time_reduce,16,py)

 

from pyspark.mllib.regression import LabeledPoint
import numpy as np 
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.ml.linalg import Vectors
from pyspark.ml.linalg import SparseVector,DenseVector

#获取用户ID和粉丝数的对比
def get_wid_fnum_rdd(path):
    path = path+ wid_fnum_file+.csv
    wid_fnum_rdd = sc.textFile(path)
    wid_fnum_rdd = wid_fnum_rdd.map(lambda x:x.split(","))
    wid_fnum_rdd = wid_fnum_rdd.map(lambda x:x[0]).zip(wid_fnum_rdd.map(lambda x:x[1]))
    wid_fnum_rdd = wid_fnum_rdd.sortByKey()
    return wid_fnum_rdd

def add_keys(rdd1):
    rdd1 = rdd1
    #path = ‘/home/jason/spark/weibo_predict/predicts/times_time_data_‘+str(15)+‘.txt‘
    #rdd1 = sc.textFile(path)
    rdd2 = sc.textFile(/home/jason/spark/weibo_predict/test/wid_times/wid_times_0.csv)
    rdd2 = rdd2.map(lambda x:x.split(,)[0]).zip(rdd2.map(lambda x:x.split(,)[1]))
    rdd2 = rdd2.sortByKey()
    rdd1 = rdd1.zipWithIndex()
    rdd1 = rdd1.values().zip(rdd1.keys())
    rdd2 = rdd2.keys().zipWithIndex()
    rdd2 = rdd2.values().zip(rdd2.keys())
    rdd = rdd2.join(rdd1)
    rdd = rdd.values()
    rdd = rdd.map(lambda x: x[0]).zip(rdd.map(lambda x: x[1]))
    return rdd

#获取其他三个需要的参数
def get_wid_x(j,path,times_or_deeps_or_covers):
    if times_or_deeps_or_covers == times:
        if path == px:
            path = str(path) + wid_times/wid_times_+str(j)+.csv
        elif path ==py:
            if j>=0 and j<15:
                path = str(path) + wid_times/wid_times_+str(j)+.csv
            elif j>=15 and j<=291:
                path = /home/jason/spark/weibo_predict/predicts/times_time_data_+str(j)+.txt
                rdd1 = sc.textFile(path)
                rdd = add_keys(rdd1)
                return rdd
    elif times_or_deeps_or_covers == deeps:
        if path == px:
            path = str(path) + wid_deeps/wid_deeps_+str(j)+.csv
        elif path ==py:
            if j>=0 and j<15:
                path = str(path) + wid_deeps/wid_deeps_+str(j)+.csv
            elif j>=15 and j<=291:
                path = /home/jason/spark/weibo_predict/predicts/deeps_time_data_+str(j)+.txt
                rdd1 = sc.textFile(path)
                rdd = add_keys(rdd1)
                return rdd    
    elif times_or_deeps_or_covers == covers:
        if path == px:
            path = str(path) + wid_covers/wid_covers_+str(j)+.csv
        elif path ==py:
            if j>=0 and j<15:
                path = str(path) + wid_covers/wid_covers_+str(j)+.csv
            elif j>=15 and j<=291:
                path = /home/jason/spark/weibo_predict/predicts/covers_time_data_+str(j)+.txt
                rdd1 = sc.textFile(path)
                rdd = add_keys(rdd1)
                return rdd
    else:
        print(wrong input about times_or_deeps_or_covers)
        return 0
    rdd = sc.textFile(path)
    rdd = rdd.map(lambda x:x.split(","))
    rdd = rdd.map(lambda x:x[0]).zip(rdd.map(lambda x:x[1]))
    rdd = rdd.sortByKey()
    return rdd



#将两个RDDjoin返回一个rdd的函数
def my_join(rdd1,rdd2):
    import re
    rdd = rdd1.join(rdd2).keys().zip(rdd1.join(rdd2).values().map(lambda x:re.sub(r\D,"  ",str(x)).split()))
    return rdd

#根据rdd的元素制作lib_svm格式文件
def lib_svm(x):
    str1 = str(x[0] +  )
    for i in range(len(x)):
        if i == 0:
            pass
        else:
            str1 += str(str(i) + ":" +str(x[i])+  )
    return str1

#生成测试或者训练需要的数据
def generate_train_or_test_data(path,j,times_or_deeps):
    if times_or_deeps == times:
        if path == px:
            data_path = str(px) + train_data/times_train_data_+str(j)+.txt
            wid_times_rdd = get_wid_x(j+1,path,times)
        elif path == py:
            data_path = str(py) + test_data/times_test_data_+str(j)+.txt
            wid_times_rdd = get_wid_x(j,path,times)
            #print(wid_times_rdd.count())
        else:
            return 0
        wid_fnum_rdd = get_wid_fnum_rdd(path)
        wid_deeps_rdd = get_wid_x(j,path,deeps)
        wid_covers_rdd = get_wid_x(j,path,covers)
        #wid_covers_rdd = wid_covers_rdd.keys().zip(wid_covers_rdd.values().map(lambda x:float(x)/1000))
        records = my_join(wid_times_rdd,wid_fnum_rdd)
        records = my_join(records,wid_deeps_rdd)
        records = my_join(records,wid_covers_rdd)
        records = records.sortByKey()
        #print(‘看看训练集合中的keys()的顺序-------------------------------------------‘)
        #print(records.keys().take(10))
        records = records.values()
        data = records.map(lambda x:lib_svm(x))
        open_data_path = open(data_path,w)
        for lines in data.collect():
            open_data_path.write(lines)
            open_data_path.write(\n)
    elif times_or_deeps == deeps:
        if path == px:
            data_path = str(px) + train_data/deeps_train_data_+str(j)+.txt
        elif path == py:
            data_path = str(py) + test_data/deeps_test_data_+str(j)+.txt
        else:
            return 0
        wid_fnum_rdd = get_wid_fnum_rdd(path)
        if path == py:
            wid_deeps_rdd = get_wid_x(j,path,deeps)
        else:
            wid_deeps_rdd = get_wid_x(j+1,path,deeps)
        wid_times_rdd = get_wid_x(j,path,times)
        wid_deeps_rdd = get_wid_x(j,path,deeps)
        wid_covers_rdd = get_wid_x(j,path,covers)
        #wid_covers_rdd = wid_covers_rdd.keys().zip(wid_covers_rdd.values().map(lambda x:float(x)/1000))
        records = my_join(wid_deeps_rdd,wid_fnum_rdd)
        records = my_join(records,wid_times_rdd)
        records = my_join(records,wid_covers_rdd)
        records = records.values()
        data = records.map(lambda x:lib_svm(x))
        open_data_path = open(data_path,w)
        for lines in data.collect():
            open_data_path.write(lines)
            open_data_path.write(\n)
        open_data_path.close()
    elif times_or_deeps == covers:
        if path == px:
            data_path = str(px) + train_data/covers_train_data_+str(j)+.txt
        elif path == py:
            data_path = str(py) + test_data/covers_test_data_+str(j)+.txt
        else:
            return 0
        wid_fnum_rdd = get_wid_fnum_rdd(path)
        if path == py:
            wid_covers_rdd = get_wid_x(j,path,covers)
        else:
            wid_covers_rdd = get_wid_x(j+1,path,covers)
        #wid_covers_rdd = wid_covers_rdd.keys().zip(wid_covers_rdd.values().map(lambda x:float(x)/1000))
        wid_times_rdd = get_wid_x(j,path,times)
        wid_deeps_rdd = get_wid_x(j,path,deeps)
        
        records = my_join(wid_covers_rdd,wid_fnum_rdd)
        records = my_join(records,wid_times_rdd)
        records = my_join(records,wid_deeps_rdd)
        records = records.values()
        data = records.map(lambda x:lib_svm(x))
        open_data_path = open(data_path,w)
        for lines in data.collect():
            open_data_path.write(lines)
            open_data_path.write(\n)
        open_data_path.close()
    else:
        return 0



#生成指定时段的预测结果
def generate_test_predict(j,times_or_deeps):
    if times_or_deeps == times:
        from pyspark.mllib.tree import RandomForest, RandomForestModel
        from pyspark.mllib.util import MLUtils
        tr_path = /home/jason/spark/weibo_predict/train/train_data/+times_train_data_+str(j)+.txt
        te_path = /home/jason/spark/weibo_predict/test/test_data/+times_test_data_+str(j)+.txt
        train_data = MLUtils.loadLibSVMFile(sc,tr_path)
        test_data = MLUtils.loadLibSVMFile(sc,te_path)
        model = RandomForest.trainRegressor(train_data, categoricalFeaturesInfo={},
                                            numTrees=3, featureSubsetStrategy="auto",
                                            impurity=variance, maxDepth=4, maxBins=32,seed=42)
        predictions = model.predict(test_data.map(lambda x: x.features))
        pre_path = /home/jason/spark/weibo_predict/predicts/+times_time_data_+str(j+1)+.txt
        times_predict = open(pre_path,w)
        for lines in predictions.collect():
            times_predict.write(str(int(lines)))
            times_predict.write(\n)
        times_predict.close()
    elif times_or_deeps == deeps:
        from pyspark.mllib.tree import RandomForest, RandomForestModel
        from pyspark.mllib.util import MLUtils
        tr_path = /home/jason/spark/weibo_predict/train/train_data/+deeps_train_data_+str(j)+.txt
        te_path = /home/jason/spark/weibo_predict/test/test_data/+deeps_test_data_+str(j)+.txt
        train_data = MLUtils.loadLibSVMFile(sc,tr_path)
        test_data = MLUtils.loadLibSVMFile(sc,te_path)
        model = RandomForest.trainRegressor(train_data, categoricalFeaturesInfo={},
                                            numTrees=3, featureSubsetStrategy="auto",
                                            impurity=variance, maxDepth=4, maxBins=32,seed=42)
        predictions = model.predict(test_data.map(lambda x: x.features))
        pre_path = /home/jason/spark/weibo_predict/predicts/+deeps_time_data_+str(j+1)+.txt
        times_predict = open(pre_path,w)
        for lines in predictions.collect():
            times_predict.write(str(int(lines)))
            times_predict.write(\n)
        times_predict.close()
    elif times_or_deeps == covers:
        from pyspark.mllib.tree import RandomForest, RandomForestModel
        from pyspark.mllib.util import MLUtils
        tr_path = /home/jason/spark/weibo_predict/train/train_data/+covers_train_data_+str(j)+.txt
        te_path = /home/jason/spark/weibo_predict/test/test_data/+covers_test_data_+str(j)+.txt
        train_data = MLUtils.loadLibSVMFile(sc,tr_path)
        test_data = MLUtils.loadLibSVMFile(sc,te_path)
        model = RandomForest.trainRegressor(train_data, categoricalFeaturesInfo={},
                                            numTrees=3, featureSubsetStrategy="auto",
                                            impurity=variance, maxDepth=4, maxBins=32,seed=42)
        predictions = model.predict(test_data.map(lambda x: x.features))
        pre_path = /home/jason/spark/weibo_predict/predicts/+covers_time_data_+str(j+1)+.txt
        times_predict = open(pre_path,w)
        for lines in predictions.collect():
            times_predict.write(str(int(lines)))
            times_predict.write(\n)
        times_predict.close()
        
    
    
def generate_test_data_beyond15(j):
    path = /home/jason/spark/weibo_predict/predicts/+time_data_+str(j)+.txt
    rdd2 = sc.textFile(path)
    rdd1 = get_wid_fnum_rdd(py).keys()
    rdd = rdd1.zip(rdd2)
    return rdd
    

 

 

def add_keys(rdd1):
    rdd1 = rdd1
    #path = ‘/home/jason/spark/weibo_predict/predicts/times_time_data_‘+str(15)+‘.txt‘
    #rdd1 = sc.textFile(path)
    rdd2 = sc.textFile(/home/jason/spark/weibo_predict/test/wid_times/wid_times_0.csv)
    rdd2 = rdd2.map(lambda x:x.split(,)[0]).zip(rdd2.map(lambda x:x.split(,)[1]))
    rdd2 = rdd2.sortByKey()
    rdd1 = rdd1.zipWithIndex()
    rdd1 = rdd1.values().zip(rdd1.keys())
    rdd2 = rdd2.keys().zipWithIndex()
    rdd2 = rdd2.values().zip(rdd2.keys())
    rdd = rdd2.join(rdd1)
    rdd = rdd.values()
    rdd = rdd.map(lambda x: x[0]).zip(rdd.map(lambda x: x[1]))
    return rdd

 

 

for i in range(15):
    generate_train_or_test_data(px,i,times)
    generate_train_or_test_data(py,i,times)
    generate_test_predict(i,times)
    generate_train_or_test_data(px,i,deeps)
    generate_train_or_test_data(py,i,deeps)
    generate_test_predict(i,deeps)
    generate_train_or_test_data(px,i,covers)
    generate_train_or_test_data(py,i,covers)
    generate_test_predict(i,covers)
for i in range(15,292):
    print(i)
    generate_train_or_test_data(px,i,times)
    generate_train_or_test_data(py,i,times)
    generate_test_predict(i,times)
    generate_train_or_test_data(px,i,deeps)
    generate_train_or_test_data(py,i,deeps)
    generate_test_predict(i,deeps)
    generate_train_or_test_data(px,i,covers)
    generate_train_or_test_data(py,i,covers)
    generate_test_predict(i,covers)

 

generate_train_or_test_data(px,291,times)
generate_train_or_test_data(py,291,times)
generate_test_predict(291,times)
generate_train_or_test_data(px,291,deeps)
generate_train_or_test_data(py,291,deeps)
generate_test_predict(291,deeps)
generate_train_or_test_data(px,291,covers)
generate_train_or_test_data(py,291,covers)
generate_test_predict(291,covers)

 

#组团搞出来最后的文件

rdd1 = sc.textFile(/home/jason/spark/weibo_predict/predicts/times_time_data_+str(1)+.txt)
rdd1 = add_keys(rdd1)
for j in range(4,292):
    j = j+1
    if j==1:
        pass
    else:
        rdd2 = sc.textFile(/home/jason/spark/weibo_predict/predicts/times_time_data_+str(j)+.txt)
        rdd2 = add_keys(rdd2)
        rdd1 = my_join(rdd1,rdd2)
    
for j in range(4,292):
    j=j+1
    rdd3 = sc.textFile(/home/jason/spark/weibo_predict/predicts/deeps_time_data_+str(j)+.txt)
    rdd3 = add_keys(rdd3)
    rdd1 = my_join(rdd1,rdd3)
    
    
def add_head(x):
    str1 = testWeibo
    str1 = str1+str(x)
    return str1

import re  
rdd1 = rdd1.map(lambda x: re.sub(r\D," ",str(x)).split())
rdd1 = rdd1.sortBy(lambda x: int(x[0]))
rdd1 = rdd1.map(lambda x:x[0]).zip(rdd1.map(lambda x:x[1:]))
rdd1_key = rdd1.keys().map(lambda x:add_head(x))
rdd1 = rdd1_key.zip(rdd1.values())
rdd1 = rdd1.map(lambda x: re.sub(r\D," ",str(x)).split())

import csv
path = /home/jason/spark/weibo_predict/
end_path = str(path) + end_of_end.csv
end_f = open(end_path,w)
writer = csv.writer(end_f)
for lists in rdd1.collect():
    writer.writerow(lists)
end_f.close()

 

a=,
s1 = [scaleT+str((i+1)*15) for i in range(4,292)]
s1 = a.join(s1)
s2 = [depthT+str((i+1)*15) for i in range(4,292)]
s2 = a.join(s2)
s3 = WeiboID (Time Unit: Minutes)+a+s1+s2
#print(s3)
end_path_2 = /home/jason/spark/weibo_predict/end_of_end.csv
end_path_1 = /home/jason/spark/weibo_predict/end_of_end_.csv
rdd = sc.textFile(end_path_2)
rdd = rdd.map(lambda x:add_head(x))
end_ff = open(end_path_1,w)
end_ff.write(s3)
end_ff.write(\n)
for lists in rdd.collect():
    end_ff.write(lists)
    end_ff.write(\n)
end_ff.close()

 

shape into blocks--source code in python based on pySpark