您當前的位置:首頁 > 攝影

pyspark使用總結

作者:由 朝天椒 發表于 攝影時間:2021-05-23

Spark使用總結

spark生態架構圖

spark作為在hadoop的基礎上進行效能的最佳化,其生態圖也基本和hadoop相似,而不同之處在於spark的計算準則是基於RDD進行設計DAG的,而hadoop則是透過MR來進行的,下面為spark的生態圖:

pyspark使用總結

Spark Core:包含Spark的基本功能;尤其是定義RDD的API、操作以及這兩者上的動作。其他Spark的庫都是構建在RDD和Spark Core之上的。

Spark SQL:提供透過Apache Hive的SQL變體Hive查詢語言(HiveQL)與Spark進行互動的API。每個資料庫表被當做一個RDD,Spark SQL查詢被轉換為Spark操作。Spark提供的sql形式的對接Hive、JDBC、HBase等各種資料渠道的API,用Java開發人員的思想來講就是面向介面、解耦合,ORMapping、Spring Cloud Stream等都是類似的思想。

Spark Streaming:基於SparkCore實現的可擴充套件、高吞吐、高可靠性的實時資料流處理。支援從Kafka、Flume等資料來源處理後儲存到HDFS、DataBase、Dashboard中。對實時資料流進行處理和控制。Spark Streaming允許程式能夠像普通RDD一樣處理實時資料。

MLlib:一個常用機器學習演算法庫,演算法被實現為對RDD的Spark操作。這個庫包含可擴充套件的學習演算法,比如分類、迴歸等需要對大量資料集進行迭代的操作。

GraphX:控制圖、並行圖操作和計算的一組演算法和工具的集合。GraphX擴充套件了RDD API,包含控制圖、建立子圖、訪問路徑上所有頂點的操作

spark架構圖

由於hadoop在執行的過程中是基於磁碟進行操作的,無論是MapReduce還是YARN都是將資料從磁碟中加載出來,經過DAG,然後重新寫回到磁碟中,計算過程的中間資料又需要寫入到HDFS的臨時檔案,這些都使得Hadoop在大資料運算上表現太“慢”,而Spark則基於記憶體進行DAG操作,具體執行框架圖如下所示:

pyspark使用總結

上圖中,sparkcontext程式向叢集控制中心cluster申請計算和資源,而cluster將設計好的dag配送給各個worker幾點進行分配計算,具體的執行流程如下:

pyspark使用總結

構建Spark Application的執行環境,啟動SparkContext

SparkContext向資源管理器(可以是Standalone,Mesos,Yarn)申請執行Executor資源,並啟動StandaloneExecutorbackend,

Executor向SparkContext申請Task

SparkContext將應用程式分發給Executor

SparkContext構建成DAG圖,將DAG圖分解成Stage、將Taskset傳送給Task Scheduler,最後由Task Scheduler將Task傳送給Executor執行

Task在Executor上執行,執行完釋放所有資源

spark進行DAG/TASK任務排程流程

spark進行任務排程的流程如下所示:

pyspark使用總結

pyspark使用總結

使用者編排的程式碼由一個個的RDD Objects組成,DAGScheduler負責根據RDD的寬依賴拆分DAG為一個個的Stage,而每個Stage包含一組邏輯完全相同的可以併發執行的Task。TaskScheduler負責將Task推送給從ClusterManager那裡獲取到的Worker啟動的Executor。

DAGScheduler的具體流程

DAG負責的是將RDD中的資料依賴劃分為不同可以並行的寬依賴task, 這些不同的task集合統稱為stage,最後將這些stage推送給TaskScheduler進行排程,DAG的具體劃分過程如下所示:

pyspark使用總結

窄依賴經歷的是map、filter等操作沒有進行相關的shuffle,而寬依賴則通常都是join等操作需要進行一定的shuffle意味著需要打散均勻等操作

1 stage是觸發action的時候

從後往前劃分

的,所以本圖要從RDD_G開始劃分。

2 RDD_G依賴於RDD_B和RDD_F,隨機決定先判斷哪一個依賴,但是對於結果無影響。

3 RDD_B與RDD_G屬於窄依賴,所以他們屬於同一個stage,RDD_B與老爹RDD_A之間是寬依賴的關係,所以他們不能劃分在一起,所以RDD_A自己是一個stage1

4 RDD_F與RDD_G是屬於寬依賴,他們不能劃分在一起,所以最後一個stage的範圍也就限定了,RDD_B和RDD_G組成了Stage3

5 RDD_F與兩個爹RDD_D、RDD_E之間是窄依賴關係,RDD_D與爹RDD_C之間也是窄依賴關係,所以他們都屬於同一個stage2

6 執行過程中stage1和stage2相互之間沒有前後關係所以可以並行執行,相應的每個stage內部各個partition對應的task也並行執行

7 stage3依賴stage1和stage2執行結果的partition,只有等前兩個stage執行結束後才可以啟動stage3。

8 我們前面有介紹過Spark的Task有兩種:ShuffleMapTask和ResultTask,其中後者在DAG最後一個階段推送給Executor,其餘所有階段推送的都是ShuffleMapTask。在這個案例中stage1和stage2中產生的都是ShuffleMapTask,在stage3中產生的ResultTask。

9 雖然stage的劃分是從後往前計算劃分的,但是依賴邏輯判斷等結束後真正建立stage是從前往後的。也就是說如果從stage的ID作為標識的話,先需要執行的stage的ID要小於後需要執行的ID。就本案例來說,stage1和stage2的ID要小於stage3,至於stage1和stage2的ID誰大誰小是隨機的,是由前面第2步決定的。

pyspark使用總結

pyspark的使用有兩種形式:

rdd、dataframe

, 其中第一種基本就和python的lambda的使用基本一致,第二種就和pandas中的dataframe一致,而且比pandas的dataframe使用更加的靈活,個人感覺比pandas更加好用點,主要就是除錯過程,如果和jupyter搭建在一起來使用就完美了。

資料讀寫的使用總結

spark的輸入資料可以有

rdd進行轉換、hdfs讀取、hive表讀取

, 這個就比較靈活簡單,沒有啥需要注意的,主要就是spark資源和對於的hive表以及hdfs地址需要保持配置一致。 具體的方法如下所示:

1。 從集合建立(序列化)

rdd = sc。parallelize([1,2,3,4], 1)

rdd。collect() # 其中collect方法就是將rdd轉為list可以理解為反序列化

2。 或者自行建立

aa_list = [[1,2], [3,4]]

df = spark。createDataFrame(query_core_list, schema=[‘col1’, ‘col2’])

3。 從hdfs上面讀取,這個時候需要注意的是,這個物件應該是sparkcontext,比如下所示

sc_conf = SparkConf()。setAppName(‘hello’)

spark = SparkSession。builder。config(conf=sc_conf)。enableHiveSupport()。getOrCreate()

sc = spark。sparkContext # 需要用sc去讀取hdfs而不是sql

core_rdd = sc。textFile(‘afs://hdfs地址:埠號/路徑/*/*’)

news = core_rdd。map(lambda line: line。split(‘\t’))。map(lambda p: Row(query = p[0], core_query = p[1]))

# 將rdd變成dataframe格式資料

df_core = spark。createDataFrame(news)

4。 從hive表中讀取資料

sc_conf = SparkConf()。setAppName(‘test’)

spark = SparkSession。builder。config(conf=sc_conf)。enableHiveSupport()。getOrCreate()

sc = spark。sparkContext

sql_query = “”“select a。id, a。query from 表地址。表名 a \

where a。event_day = {} and

(a。a = ‘a’ or

(a。b = ‘b’ and c in (‘s’,‘s/index’,‘shop/index’,‘shopindex’,‘c’,‘c/index’))) and

a。d != ‘d’ and

a。e not like ‘%有限公司%’ and

a。f not like ‘%http%’ and

a。h != ”x“ and

a。g != ”x“ and

length(a。a) < 25

group by a。d, a。f

”“”。format(day)

df_res = spark。sql(sql_query)

rdd的基本使用總結

spark的執行基本由兩部分組成:Transformnation(轉換)和action,其中第一部分這類方法僅僅是定義邏輯,並不會立即執行,即lazy特性。目的是將一個RDD轉為新的RDD。action不會產生新的RDD,而是直接執行,得到我們想要的結果

常用rdd的transfromation

map:

這個類似於python的map函式對rdd的列進行一些轉換操作, 返回為一個rdd物件,如果需要其值,則進行collect操作即可

reduce:

是針對RDD對應的列表中的元素,遞迴地選擇第一個和第二個元素進行操作,操作的結果作為一個元素用來替換這兩個元素,注意,reduce返回的是一個Python可以識別的物件,非RDD物件。

reduceByKey:

對輸入函式按照k進行合併

filter:

按條件過濾rdd的值

distinct:

去重操作

join:

對每個序列化的kv根據k進行兩個或多個rdd合併,類似於sql中的表連線

union()和intersection():

第一個為合併兩個rdd且不進行去重,第二個為兩個rdd的交集

rdd = sc。parallelize([1,2,3,4,5])

print(rdd。reduce(lambda a, b : a + b)) # output 15

rdd_2 = sc。parallelize([[1,10], [2,20], [3,30], [1,1], [3,4]])

rdd_2。reduceByKey(lambda x, y: x + y) # [1,11], [2,20], [3,34]

# rdd一般用的比較多的就是map和reduce函式,一般用來存在一些結果,比如

df_user_query。rdd。map(lambda x: json。dumps((x[0], x[1])))

常用的action操作

collect(): 以陣列的形式,返回資料集中所有的元素 count(): 返回資料集中元素的個數 take(n): 返回資料集的前N個元素 takeOrdered(n): 升序排列,取出前N個元素 takeOrdered(n, lambda x: -x): 降序排列,取出前N個元素 first(): 返回資料集的第一個元素 min(): 取出最小值 max(): 取出最大值 stdev(): 計算標準差 sum(): 求和 mean(): 平均值 countByKey(): 統計各個key值對應的資料的條數 lookup(key): 根據傳入的key值來查詢對應的Value值 foreach(func): 對集合中每個元素應用func

sparksql類的dataframe使用

一般目前對於pyspark進行操作,基本都是處理一些log資料,這個時候一般透過將資料轉換為dataframe進行操作會比較常用,對於演算法工程師來說,目前我在的公司基本資料以及被進行ETL操作了,基本是結構化的資料,不需要我們透過scala進行資料轉換了,因此基本都是對dataframe的資料進行資料清洗、特徵提取。

sparksql類初始化總結

對於pyspark來講,常用的就是sparkcontext類,一般都是先啟動一個這樣的程式才可以進行相應的操作,而SparkSession可以在不建立SparkConf,SparkContext或SQLContext的情況下建立SparkSession(它們封裝在SparkSession中), 這個sparksession就是用來調取dataframe相關操作的類,具體的操作如下:

# 一般在進行spark-submit的。py檔案在初始化的時候,需要建立spark物件,具體如下

from pyspark import SparkConf

from pyspark import SparkContext

from pyspark。sql import SparkSession

from pyspark。sql import SQLContext

from pyspark。sql import Row

from pyspark。sql。types import StructType

from pyspark。sql。types import StructField

from pyspark。sql。types import StringType

from pyspark。sql import functions as fn

from pyspark。sql import types as T

sc_conf = SparkConf()。setAppName(‘b2b_fraud_data’)

spark = SparkSession。builder。config(conf=sc_conf)。enableHiveSupport()。getOrCreate()

而對於上述的兩個class來說,經常對建立相關的臨時表,createOrReplaceTempView:建立臨時檢視,此檢視的生命週期與用於建立此資料集的[SparkSession]相關聯。 createGlobalTempView:建立全域性臨時檢視,此時圖的生命週期與Spark Application繫結。如果想要刪除的話就得使用spark。catalog。dropGlobalTempView(“tempViewName”)

sparksq使用方法論:增刪查改、行列操作、排序去重、分組統計、表連線、自定義

而且spark的寫法有個好處就是可以一個操作後接著對之前的結果進行在操作(選擇等)

常規操作

test = spark。createDataFrame([[‘a’, 0,2], [‘b’, 0, 0], [‘c’, 4, 4],

[‘c’, 0,2], [‘c’, 1,0]], schema=[‘a’, ‘b’, ‘c’])

test。show()

1。 型別

test。dtypes

2。列名

test。columns

3。 統計shape大小

test。count()

4。 輸出shcamal

test。printSchema()

這裡需要注意的是,pandas中一些列的統計函式series會自動,而spark都整合在了function函式中

增刪查改

from pyspark。sql import functions as fn

1。 查詢: 根據需要選擇列按條件進行過濾

test = spark。createDataFrame([[‘a’, 0,2], [‘b’, 0, 0], [‘c’, 4, 4],

[‘c’, 0,2], [‘c’, 1,0]], schema=[‘a’, ‘b’, ‘c’])

test。show()

# 按條件進行過濾可以選擇filter也可以選擇where

test。select(‘a’)。filter(test[‘a’] != 2)。show()

test。filter((test[‘a’] != 0) | (test[‘c’] != 0))。show()

# 多次filter條件,按區間選擇值,對列按照sql語句進行選擇

test。filter(test。b。betwen(1,4))。filter(“a like ‘a%’”)。show()

# 這裡需要注意的事,如果透過withcolumn建立列了新的列需要透過function。col函式來指定列

test。withColumn(‘aa’,fn。size(test[‘a’]))。filter((fn。col(‘aa’) > 1))

# 也可以透過選擇條件賦值給新的列在進行過濾

test。withColumn(“aa”, when((test[‘a’] == 0)

&& (test[‘b’] == 0, 0)。otherwise(1))。filter(fn。col(‘aa’) == 1)

# 或者可以建立一個臨時表,在臨時表上使用sql

test。createOrReplaceTempView(“test_df”)

spark。sql(“select distinct(a) from test_df”)。show()

2。 修改/增加新列:withcolumn, 比如對string透過正則表示式進行過濾

test = test。withcolumn(‘aa’, fn。regexp_replace(‘query’, “[^\u4e00-\u9fa5\u0030-\u0039\u0041-\u005a\u0061-\u007a]”, “”))

3。 刪除一列: drop

test = test。drop(‘a’)

# 也可以直接選擇自己需要的一些列

test = test。select([‘a’, ‘b’])

行列操作

from pyspark。sql import functions as func

1。 修改列名,有好幾種方法,selectExpr, withcolumnrenamed,

test = test。selectExpr(‘a as a_new’)

test = test。withColumnRenamed(‘a’, ‘a_new’)

2。 處理列缺失值

test。dropna()。show()

test。na。drop()。show()

test。where(~test[‘a’]。isNull())。show()

# 用均值填充缺失值

mean_salary = test。select(func。mean(‘b’))。collect()[0][0]

test = test。na。fill({‘a’: mean_salary})

3。 計算離群值操作

mean = test。select(func。mean(‘b’))。collect()[0][0]

std = test。select(func。stddev(‘b’))。collect()[0][0]

# 用均值的兩倍標準差替代離群值

no_outlier = test。select(‘b’,

func。when(test。b。between(mean-2*std, mean+2*std), test。b)

。otherwise(mean)

。alias(“no_outlier”)

排序去重

1。 排序

test。sort(‘b’,ascending=False)。show()

# 多欄位排序

test。sort(‘b’, ‘c’, ascending=False)。show()

# 混合排序

test。sort(test。b。desc(), test。c。asc())。show()

2。 去重

# 只要某一列有重複值,則去重

test。dropDuplicates(subset=[‘a’])。show()

分組統計

# 這個基本也是在做特徵工程中用的比較多的操作

# 統計每個分組下影響因子,這個基本和pandas的一樣,所以沒啥新的東西,而且spark相對比較好用一些,可以有各個function函式進行支援

user_factor = test。select([‘a’, ‘b’])。drop_duplicates()。

groupby(‘a’)。agg(

(1 / fn。log(1 + fn。countDistinct(‘b’)))。alias(‘user_factor’))

對於這個有啥難搞的就去查詢function下面的函式就ok了,如果業務比較複雜就直接udf,比如下面的

from pyspark。sql。types import StructType

from pyspark。sql import functions as fn

from pyspark。sql import types as T

def add_com(a):

res = []

a = sorted(a)

if len(a) < 2:

return

for i in range(len(a)-1):

for j in range(i+1, len(a)):

res。append([a[i], a[j]])

return res

# 這裡定義的時候需要注意一下udf函式的型別,方便後續的操作,比如這裡需要將後續按a_new列對a進行展開

pair_func_udf = fn。udf(add_com, T。ArrayType(T。ArrayType(T。StringType())))

df_rdd1 = test。select(‘a’, pair_func_udf(‘a’)。alias(‘a_new’))

# 按a_new對展開

df_rdd2 = df_rdd1。select(‘a’, fn。explode(df_rdd1[‘a_new’])。alias(‘a_new’))

表連線

# 這個基本和sql以及pandas一致,只是函式有點不同

df1 = df1。join(df2, df1[‘core_1’] == df2[‘core_query’],

how=‘left’)。drop(‘core_query’)。withColumnRenamed(‘core_factor’, ‘core_1_factor’)

# 兩個結構相同的表合併

test1 = spark。createDataFrame([[‘a’, 0,2], [‘b’, 0, 0], [‘c’, 4, 4],

[‘c’, 0,2], [‘c’, 1,0]], schema=[‘a’, ‘b’, ‘c’])

test2 = test1

test3 = test2。union(test2)

自定義

# 有時候我們需要對列進行比較複雜的操作,這個時候function下的函式基本不能滿足我們的需求,這個時候我們可以透過udf來進行操作

from pyspark。sql。types import DoubleType

import pandas as pd

import numpy as np

pdf = pd。DataFrame(np。random。rand(1000, 3))

pdf = pdf。rename(columns={0: “one”, 1: “two”, 2: “three”})

pdf[“id”] = np。random。randint(0, 50, size=len(pdf))

sdf = spark。createDataFrame(pdf)

def plus_one(a):

return a + 1

plus_one_udf = fn。udf(plus_one, returnType=DoubleType())

df = df。withColumn(“one_processed”, plus_one_udf(sdf[“one”]))

sdf。show()

有了上述的環境之後,就可以對大量的資料進行資料分析了,但是最好還是將pyspark和jupyter連線起來一起使用會比較友好,具體的設定如下, 寫一個env。sh穩健即可,內容如下:

export PYSPARK_DRIVER_PYTHON=jupyter

export PYSPARK_DRIVER_PYTHON_OPTS=‘notebook’

export SPARK_HOME=/home/spark

export PATH=$PATH:$SPARK_HOME/bin

總的來說pyspark的dataframe用起來比pandas的會爽一點,如果要問一個為什麼,就是處理寫的程式碼在快速處理上億的資料啊,哈哈!

如果使用過程中遇到什麼比較難處理的資料處理,可以直接查詢相關的api文件即可,具體為

spark_api

標簽: test  RDD  Spark  PySpark  SQL