多程序加速處理大量資料 Python
目前碰到這樣一種情況:
1、需要處理的資料量達幾百萬,而且將來資料量會越來越大
2、特別多的判斷規則,因此處理每一行資料,都需要比較多時間
3、只用一個程序的指令碼來處理,需要消耗幾十分鐘的時間,很沒有效率
大家都知道,Python的多執行緒跟假的差不多,所以改用多程序的方式進行最佳化處理。
下圖結果是分別使用1個、2個、5個、10個程序處理100+萬行資料所用的時間。
為什麼程序數量到了5個、10個的時候,處理時間沒有明顯下降呢?。。。
主要原因是公司的CPU不咋地。
透過對比,1個程序和5個程序處理資料的時間,1分22秒 vs 25秒,還是明顯節約很多時間的。
這裡面有幾個比較大的問題是:
1、似乎multiprocessing。Process,不能和tqdm一起使用。所以我用了multiprocessing。Pool來解決
2、Anaconda的Spyder的IPython Console,在func函數里面使用print或者tqdm,是不會列印到Console的。需要在 Run>>Configurations per file>>execute in an external system terminal才會顯示print或者tdqm。但是這樣的話,我們無法在IPython Console裡面透過Variable explorer檢視變數的。。這就不好了
3、不過Pool。imap()函式,也有一個缺點,就是隻能夠傳入一個列表作為引數項。如果有多個引數的話,需要另外用starmap來處理。。。這樣其實更麻煩了。所以直接把多餘的引數,放到類裡面,def _
init
_()初始化。
4、下面這行程式碼,雖然能夠實現進度條。不過有一定的滯後性。。。必須是完成了一個程序後,才會開始讀進度條。但是有好過沒有。。。至少能看到處理花了多少時間。
# 設定執行緒池數量,並且用tqdm顯示進度條
with Pool(processes=self。n) as p:
r = list(tqdm(p。imap(self。func, range(self。n)), total=self。n))
# 作者:肥豬仔宋同學
import
pandas
as
pd
import
math
from
tqdm
import
tqdm
from
multiprocessing
import
Manager
,
Pool
class
myMulti
:
def
__init__
(
self
,
df
,
n
):
self
。
df
=
df
self
。
n
=
n
def
run
(
self
):
# 建立一個管理物件
manager
=
Manager
()
# 建立一個共享資料物件
self
。
return_list
=
manager
。
list
()
# 設定執行緒池數量,並且用tqdm顯示進度條
with
Pool
(
processes
=
self
。
n
)
as
p
:
r
=
list
(
tqdm
(
p
。
imap
(
self
。
func
,
range
(
self
。
n
)),
total
=
self
。
n
))
# 建立一個dataframe容器,用於合併return_list中的dataframes
table
=
pd
。
DataFrame
()
# 遍歷將dataframes放入table
dfs
=
list
(
self
。
return_list
)
for
d
in
dfs
:
table
=
pd
。
concat
([
table
,
d
])
return
table
def
func
(
self
,
i
):
# 計算一共多少行
rows
=
self
。
df
。
shape
[
0
]
# 組距
interval
=
rows
/
self
。
n
# 如果是最後一組的話,需要包含最後一行
if
i
<
self
。
n
-
1
:
data
=
self
。
df
[
math
。
floor
(
interval
*
i
):
math
。
floor
(
interval
*
(
i
+
1
))]
else
:
data
=
self
。
df
[
math
。
floor
(
interval
*
i
):
math
。
floor
(
interval
*
(
i
+
1
))
+
1
]
# 將judge函式應用於dataframe的每一行
data
=
data
。
apply
(
self
。
judge
,
axis
=
1
)
# 每個程序處理完後,資料返回給共享資料物件return_list
self
。
return_list
。
append
(
data
)
# 用於處理dataframe的每一行
def
judge
(
self
,
row
):
if
‘abc’
in
str
(
row
[
‘columnA’
])
。
lower
():
row
[
‘columnX’
]
=
‘abc’
else
:
row
[
‘columnX’
]
=
‘其他’
return
row
if
__name__
==
‘__main__’
:
# 讀取表格
df
=
pd
。
read_csv
(
‘1。csv’
)
df
[
‘column_name’
]
=
‘’
# 設定多少個程序同時處理資料
n
=
10
# 例項化類
m
=
myMulti
(
df
,
n
)
# 執行run函式,得到處理後的資料
table
=
m
。
run
()
上一篇:西班牙語詞彙趣味記憶法