您當前的位置:首頁 > 文化

多程序加速處理大量資料 Python

作者:由 對豬彈琴 發表于 文化時間:2021-09-17

目前碰到這樣一種情況:

1、需要處理的資料量達幾百萬,而且將來資料量會越來越大

2、特別多的判斷規則,因此處理每一行資料,都需要比較多時間

3、只用一個程序的指令碼來處理,需要消耗幾十分鐘的時間,很沒有效率

大家都知道,Python的多執行緒跟假的差不多,所以改用多程序的方式進行最佳化處理。

下圖結果是分別使用1個、2個、5個、10個程序處理100+萬行資料所用的時間。

為什麼程序數量到了5個、10個的時候,處理時間沒有明顯下降呢?。。。

主要原因是公司的CPU不咋地。

透過對比,1個程序和5個程序處理資料的時間,1分22秒 vs 25秒,還是明顯節約很多時間的。

這裡面有幾個比較大的問題是:

1、似乎multiprocessing。Process,不能和tqdm一起使用。所以我用了multiprocessing。Pool來解決

2、Anaconda的Spyder的IPython Console,在func函數里面使用print或者tqdm,是不會列印到Console的。需要在 Run>>Configurations per file>>execute in an external system terminal才會顯示print或者tdqm。但是這樣的話,我們無法在IPython Console裡面透過Variable explorer檢視變數的。。這就不好了

3、不過Pool。imap()函式,也有一個缺點,就是隻能夠傳入一個列表作為引數項。如果有多個引數的話,需要另外用starmap來處理。。。這樣其實更麻煩了。所以直接把多餘的引數,放到類裡面,def _

init

_()初始化。

4、下面這行程式碼,雖然能夠實現進度條。不過有一定的滯後性。。。必須是完成了一個程序後,才會開始讀進度條。但是有好過沒有。。。至少能看到處理花了多少時間。

# 設定執行緒池數量,並且用tqdm顯示進度條

with Pool(processes=self。n) as p:

r = list(tqdm(p。imap(self。func, range(self。n)), total=self。n))

多程序加速處理大量資料 Python

# 作者:肥豬仔宋同學

import

pandas

as

pd

import

math

from

tqdm

import

tqdm

from

multiprocessing

import

Manager

Pool

class

myMulti

def

__init__

self

df

n

):

self

df

=

df

self

n

=

n

def

run

self

):

# 建立一個管理物件

manager

=

Manager

()

# 建立一個共享資料物件

self

return_list

=

manager

list

()

# 設定執行緒池數量,並且用tqdm顯示進度條

with

Pool

processes

=

self

n

as

p

r

=

list

tqdm

p

imap

self

func

range

self

n

)),

total

=

self

n

))

# 建立一個dataframe容器,用於合併return_list中的dataframes

table

=

pd

DataFrame

()

# 遍歷將dataframes放入table

dfs

=

list

self

return_list

for

d

in

dfs

table

=

pd

concat

([

table

d

])

return

table

def

func

self

i

):

# 計算一共多少行

rows

=

self

df

shape

0

# 組距

interval

=

rows

/

self

n

# 如果是最後一組的話,需要包含最後一行

if

i

<

self

n

-

1

data

=

self

df

math

floor

interval

*

i

):

math

floor

interval

*

i

+

1

))]

else

data

=

self

df

math

floor

interval

*

i

):

math

floor

interval

*

i

+

1

))

+

1

# 將judge函式應用於dataframe的每一行

data

=

data

apply

self

judge

axis

=

1

# 每個程序處理完後,資料返回給共享資料物件return_list

self

return_list

append

data

# 用於處理dataframe的每一行

def

judge

self

row

):

if

‘abc’

in

str

row

‘columnA’

])

lower

():

row

‘columnX’

=

‘abc’

else

row

‘columnX’

=

‘其他’

return

row

if

__name__

==

‘__main__’

# 讀取表格

df

=

pd

read_csv

‘1。csv’

df

‘column_name’

=

‘’

# 設定多少個程序同時處理資料

n

=

10

# 例項化類

m

=

myMulti

df

n

# 執行run函式,得到處理後的資料

table

=

m

run

()

標簽: self  list  df  tqdm  __