资源简介

记得自己要引入环境 (1)利用SparkStreaming从文件目录读入日志信息,日志内容包含: ”日志级别、函数名、日志内容“ 三个字段,字段之间以空格拆分。请看数据源的文件。 (2)对读入都日志信息流进行指定筛选出日志级别为error或warn的,并输出到外部MySQL中。 需要用到的函数 (1)输入采用textFileStream()算子 (2)输出采用foreachRDD()算子 (3)将RDD转为DataFrame (4)DataFrame注册为临时表,使用SQL过滤 (5)将过滤后的数据保存到MySQL

资源截图

代码片段和文件信息

from pyspark.shell import sc
from pyspark.sql import Row
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.streaming import StreamingContext
spark = SparkSession.builder.appName(“Streaming“).getOrCreate()
sc=spark.sparkContext
#两个参数:1、sc参数 2、采样时间间隔(秒)
ssc =StreamingContext(sc1)
#在ubuntu环境下数据源路劲
ds1 =ssc.textFileStream(“/home/zhuang/138/input/test“)
#把所有数据划分为[[][]]格式
ds3 = ds1.map(lambda line:line.split(“\t“))
def func(rdd):
    if not rdd.isEmpty():
        #记得转码很重要
        url = “jdbc:mysql://ip地址:3306/pyspark?user=root&password=zhuang&characterEncoding=UTF-8“
        #构建表结构
        schema = StructType([StructField(“日志级别“ StringType() True) StructField(“函数名“ StringType() True)
                             StructField(“日志内容“ StringType() True)])
        #对[[][][]]数据转换成[[[][][]][][][]]因为todf数据是数据格式传值
        rdd.map(lambda x:tuple(x)).toDF(schema).registerTempTable(“test_person1“)
        df1 = spark.sql(“select * from test_person1 where ‘日志级别‘!=‘[info]‘“)
        # df2 = spark
        df1.show()
        #写入mysql
        df1.write.jdbc(mode=“overwrite“url=urltable=“test_person1“ properties={“driver“:‘com.mysql.jdbc.Driver‘})
        df1.show()
ds3.pprint()
ds3.foreachRDD(func)
# print(ds4.foreachRDD(func))
ssc.start();ssc.awaitTermination()

 属性            大小     日期    时间   名称
----------- ---------  ---------- -----  ----
     文件         501  2019-05-04 11:03  20180103.log
     文件         501  2019-05-04 11:03  20180104.log
     文件     1007502  2019-03-12 14:38  mysql-connector-java-5.1.47.jar
     文件        1514  2019-05-30 08:58  test02.py

评论

共有 条评论