クーの自由研究

マスターのかえるのクーは、弟子達の召喚術により新たな依り代を得てⅡ世として復活しました。

pythonでのプロセス間通信の速度比較実験

「みちくさ」は自由の特権

すや~。たいへんよくできました!レム睡眠をむさぼる、かえるのクーの助手の「井戸中 聖」です。

f:id:AssistantOfKoo:20201107220846j:plain f:id:AssistantOfKoo:20201107220855p:plain

実験プログラム用のリアルタイムグラフ機能を充実させようとしていますが、そもそも今使用しているプロセス間通信方法(mmap)でよいのか!?と疑問に思ってしまいました。

自分で作成期日を設定しておらず「遅れたら遅れたなりでよい」「(スキルがなくて)完成しないこともある」という「ゆるさ」でやっております。

気になったら、さっそくやってみましょう。

動機、息切れ、めまい

独立したプロセス間の通信は、このブログサイトで扱う「気になる」案件(ISSUE)です。

気になったので、「python mmap」 で検索すると、当サイトのいい加減な記事が1ページ目の上位に出てきます。これは流石に申し訳ありません。(クー様へ:いい加減=ほどよい塩梅の意味です)情報を補完しようと思います。

mmapはメモリ共有に近いイメージを持っていたのですが、動作としてはメモリのレプリケーション(遠隔複製)のようです。そこでプロセス間通信の実効速度比較をしてみます。

mmapの実験のときのようにオブジェクトをシリアライズして通信してみます。

可能(なはず)の通信(もしくは情報通知)

・mmap(メモリマップドファイル)

・標準入出力の接続

・名前付きPIPE

・TCP/IP (socket)

・MPI (メッセージパッシングインターフェース)

・キュー接続(前回スキル不足でできなかったので、すこしだけ悪あがき。)

・ファイル渡し(RAM DISK)

※共有メモリ(shared memory)は今回対象外です。(完全に別プログラムで別プロセス間で情報共有を行うため)

レギュレーション:

・決まった大きさの配列ファイルを使用:今回は実験で使っているMNISTデータセットを使用

・オブジェクトアーカイブ(バイナリシリアライズ)にはpikleを使用 (最近はjoblibの方をよく使うが単純アーカイブだとpikleのほうが速いイメージがある)

・文字化シリアライズには base64を使用する。

・通信を開始してから、別プロセスに渡して、そのプロセスでオブジェクト復元をする。さらに再アーカイブ・シリアライズして、元のプロセスに送り返す。

・送信時間、別プロセス処理時間(復元~シリアライズ)、受信時間をそれぞれ測定

(シリアライズ/BASE64すると約300MB弱のサイズになります)

・それぞれのプロセスはできるだけ既に起動済、安定状態で計測(しくみ上タイミングがとれない場合は、こだわらない)

・各プロセスは同一マシン上で動作する。

Ready! GO! (or Let It GO!)

以下、長くて読めないと思いますので、最後の総評のみ御覧ください。(時間計測コードでソースも長いですが、実質はとても短いコードです。それぞれ、マスター側、スレーブ側のソフトがあります。)コーディングが「やっつけ」なのはご容赦ください。

まずは共通のメソッドをはっておきます。

KooStioWatch.py : ストップウオッチ計測用(世の中にはもっとよいストップウオッチがたくさんあります)

import time
import sys

class StopWatch(object):
def __init__(self, output=sys.stdout):
self.dict = {}
self.output = output

def start(self, msg):
start_time = time.process_time()
print(msg + ' start:', end='', file=self.output)
self.dict[msg] = start_time

def stop(self, msg):
print(msg + " STOP in %.5f sec" % ((time.process_time() - self.dict[msg])), file=self.output)

def lap(self, msg):
print(msg + " LAP in %.5f sec" % ((time.process_time() - self.dict[msg])), end='', file=self.output)

KooSimpleSync.py:UDP通信による開始同期(UDPなので、失敗よくあり)

import socket
import time

REQ_PORT = 50001
ACK_PORT = 50002

def req_message(msg, port=REQ_PORT):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
for send_count in range(3):
s.sendto(msg.encode(), ('127.0.0.1', REQ_PORT))
time.sleep(0.0001) # そんな厳密な測定でないので(^^;)
s.close()

def wait_message(msg, port=REQ_PORT):
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as r:
r.bind(('127.0.0.1', port))
while True:
data, addr = r.recvfrom(1024)
if data.decode() == msg:
time.sleep(0.0001) #そんな厳密な測定でないので(^^;)
break
r.close()

 mmap(メモリマップドファイル)の場合

MMAP_M01.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import subprocess
from sklearn.datasets import fetch_openml
import numpy as np
import pickle
import joblib
import os
import base64
import mmap
from KooStopWatch import StopWatch
import KooSimpleSync as ss

# 通信用データ識別
CONST_TYPE_READY = b'\x40\x40'
CONST_TYPE_DONE = b'\x40\x41'

proc = subprocess.Popen(['python', './MMAP_S01.py'] )

sw = StopWatch()
print('---Start mmap / process comm ---')
sw.start('<load MNIST dataset>')
if os.path.exists('mnist.jb'):
mnist = joblib.load("mnist.jb")
else:
mnist = fetch_openml('mnist_784', version=1, )
joblib.dump(mnist, "mnist.jb", compress=3)
sw.stop('<load MNIST dataset>')

# mnist.data : 70,000 784-dimensional vector data
mnist.data = mnist.data.astype(np.float32)
#mnist.data /= 255.0 # 0-1 of the data conversion

sw.start('<open mmap>')
with open("send.txt", "r+b") as s:
mm_send = mmap.mmap(s.fileno(), 0)
with open("recv.txt", "r+b") as r:
mm_recv = mmap.mmap(r.fileno(), 0)
sw.stop('<open mmap>')

mm_recv[0:2] = CONST_TYPE_READY

sw.start('<Total>')
sw.start('<Master dumps>')
bin_data = pickle.dumps(mnist.data) # オブジェクトをシリアル化
enc_value = base64.b64encode(bin_data) #扱いやすいようにBASE64
sw.stop('<Master dumps>')

ss.req_message('$start')
sw.start('<<Master throughput>>')
sw.start('<Master send>')
mm_send.seek(0) # 先頭位置を指定
mm_send.write(CONST_TYPE_READY + enc_value) # mmapへ出力
mm_send[0:2] = CONST_TYPE_DONE #送信完了通知
sw.stop('<Master send>')

sw.start('<Master wait>')
while mm_recv[0:2] != CONST_TYPE_DONE: #送信側完了待ち合わせ
pass
sw.stop('<Master wait>')

sw.start('<Master recv>')
mm_recv.seek(0)
recv_line = mm_recv.readline()
sw.stop('<Master recv>')
sw.stop('<<Master throughput>>')

sw.start('<Master decode>')
recv_data = pickle.loads(base64.b64decode(recv_line[2:])) # オブジェクトを復元
sw.stop('<Master decode>')

mm_send[0:2] = CONST_TYPE_READY
mm_send.flush()
sw.stop('<Total>')

test = recv_line[-6:]
if enc_value == recv_line[2:-2]:
#送信情報と受信情報を比較
#read情報は先頭2バイト制御用使用&末尾2バイトx0d0a(Windows)のため除外して比較
print("Validation OK!")
else:
print("Different! NG!")

MMAP_S01.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pickle
import sys
import base64
import mmap
from KooStopWatch import StopWatch
import KooSimpleSync as ss

# 通信用データ識別
CONST_TYPE_READY = b'\x40\x40'
CONST_TYPE_DONE = b'\x40\x41'

with open("send.txt", "r+b") as s:
mm_send = mmap.mmap(s.fileno(), 0)
with open("recv.txt", "r+b") as r:
mm_recv = mmap.mmap(r.fileno(), 0)
while mm_send[0:2] != CONST_TYPE_DONE:
pass
sw = StopWatch(sys.stderr)

ss.wait_message('$start') #Master Processからの開始指示待ち
sw.start('#1:Slave recv-decode-load#')

sw.start('#2:Slave recv#')
mm_send.seek(0)
recv_line = mm_send.readline() #mmapから情報取得(受信相当)
sw.stop('#2:Slave recv#')

sw.start('#3:Slave loads#')
recv_data = pickle.loads(base64.b64decode(recv_line[2:])) #デコード&復元
sw.stop('#3:Slave loads#')
sw.stop('#1:Slave recv-decode-load#')

sw.start('#4:Slave dumps#')
bin_data = pickle.dumps(recv_data)
enc_value = base64.b64encode(bin_data)
sw.stop('#4:Slave dumps#')

sw.start('#5:Slave send#')
mm_recv.seek(0) # 先頭位置を指定
mm_recv.write(CONST_TYPE_READY + enc_value) # mmapへ出力
mm_recv[0:2] = CONST_TYPE_DONE
sw.stop('#5:Slave send#')

処理結果

---Start mmap / process comm ---
<load MNIST dataset> start:<load MNIST dataset> STOP in 1.37500 sec
<open mmap> start:<open mmap> STOP in 0.00000 sec
<Total> start:<Master dumps> start:<Master dumps> STOP in 0.92188 sec
#1:Slave recv-decode-load# start:#2:Slave recv# start:<<Master throughput>> start:<Master send> start:<Master send> STOP in 0.28125 sec
<Master wait> start:#2:Slave recv# STOP in 0.29688 sec
#3:Slave loads# start:#3:Slave loads# STOP in 1.09375 sec
#1:Slave recv-decode-load# STOP in 1.39062 sec
#4:Slave dumps# start:#4:Slave dumps# STOP in 0.90625 sec
#5:Slave send# start:#5:Slave send# STOP in 0.28125 sec
<Master wait> STOP in 2.29688 sec
<Master recv> start:<Master recv> STOP in 0.29688 sec
<<Master throughput>> STOP in 2.87500 sec
<Master decode> start:<Master decode> STOP in 0.92188 sec
<Total> STOP in 4.75000 sec
Validation OK!

Process finished with exit code 0

充分速いです。今の用途(実験情報通信)には問題ありません。

標準入出力の接続(通常のPIPE)

PIPE_M01.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import subprocess
from sklearn.datasets import fetch_openml
import numpy as np
import pickle
import joblib
import os
import base64
from KooStopWatch import StopWatch
import KooSimpleSync as ss

proc = subprocess.Popen(['python', './PIPE_S01.py'], stdout=subprocess.PIPE, stdin=subprocess.PIPE);

sw = StopWatch()
print('---Start PIPE / process comm ---')
sw.start('<load MNIST dataset>')
if os.path.exists('mnist.jb'):
mnist = joblib.load("mnist.jb")
else:
mnist = fetch_openml('mnist_784', version=1, )
joblib.dump(mnist, "mnist.jb", compress=3)
sw.stop('<load MNIST dataset>')

# mnist.data : 70,000 784-dimensional vector data
mnist.data = mnist.data.astype(np.float32)

sw.start('<Master dumps>')
bin_data = pickle.dumps(mnist.data) # オブジェクトをシリアル化
enc_value = base64.b64encode(bin_data) #扱いやすいようにBASE64
sw.stop('<Master dumps>')

ss.req_message('$start')
sw.start('<<Master throughput>>')
sw.start('<Master send/recv>')
stdout_value = proc.communicate(enc_value)[0] #標準出力を取得
sw.stop('<Master send/recv>')

sw.start('<Master decode>')
recv_data = pickle.loads(base64.b64decode(stdout_value)) # オブジェクトを復元
sw.stop('<Master decode>')
sw.stop('<<Master throughput>>')

if enc_value == stdout_value:
print("Validation OK!")
else:
print("Different! NG!")

PIPE_S01.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pickle
import sys
import base64
from KooStopWatch import StopWatch
import KooSimpleSync as ss

sw = StopWatch(sys.stderr)
ss.wait_message('$start')
sw.start('#1:Slave recv-decode-load#')
sw.start('#2:Slave recv#')
recv = sys.stdin.readline() #標準入力から読み込み
sw.stop('#2:Slave recv#')
sw.start('#3:Slave decode#')
decode_stream = base64.b64decode(recv) #デコード
sw.stop('#3:Slave decode#')
sw.start('#4:Slave loads#')
recv_data = pickle.loads(decode_stream) #復元
sw.stop('#4:Slave loads#')
sw.stop('#1:Slave recv-decode-load#')

sw.start('#5:Slave dumps#')
bin_data = pickle.dumps(recv_data)
enc_value = base64.b64encode(bin_data)
sw.stop('#5:Slave dumps#')

sw.start('#6:Slave send#')
sys.stdout.write(enc_value.decode())
sw.stop('#6:Slave send#')

処理結果

---Start PIPE / process comm ---
<load MNIST dataset> start:<load MNIST dataset> STOP in 1.39062 sec
<Master dumps> start:<Master dumps> STOP in 0.90625 sec
#1:Slave recv-decode-load# start:#2:Slave recv# start:<<Master throughput>> start:<Master send/recv> start:#2:Slave recv# STOP in 0.67188 sec
#3:Slave decode# start:#3:Slave decode# STOP in 0.79688 sec
#4:Slave loads# start:#4:Slave loads# STOP in 0.26562 sec
#1:Slave recv-decode-load# STOP in 1.73438 sec
#5:Slave dumps# start:#5:Slave dumps# STOP in 0.87500 sec
#6:Slave send# start:#6:Slave send# STOP in 0.40625 sec
<Master send/recv> STOP in 1.48438 sec
<Master decode> start:<Master decode> STOP in 0.78125 sec
<<Master throughput>> STOP in 2.26562 sec
Validation OK!

Process finished with exit code 0

send/recv動作は時間測定の開始同期がとれないので、長くかかっているように見えますが、サブプロセスからの応答はmmapよりも速いです。(通信のスループットとして本当にどれだけ速いのかは正確にわかりませんでした)数値から、どこかが非同期で並列で動作している感じですが、正確にはわかりません。深追いはしません。

名前付きPIPE (Win32機能を使用して確認)

ねむくなったので、おやすみなさい。すや~。

おはようございます!実験をつづけます。

名前付きPIPEはOSの機能に依存します。Windowsでやってみます。

これも同期をとれないので、実用プログラムにするには(継続性や繰り返し、エラー処理など)考慮がいろいろ必要だと思いますが、とりあえず測定には十分な簡単な範囲で行っています。(バッファは必要以上にとっています)

NAMED_PIPE_M01.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import subprocess
import win32pipe, win32file
from sklearn.datasets import fetch_openml
import numpy as np
import pickle
import joblib
import os
import base64
from KooStopWatch import StopWatch
import KooSimpleSync as ss

M2S_pipe = win32pipe.CreateNamedPipe(
r'\\.\pipe\M2S',
win32pipe.PIPE_ACCESS_DUPLEX,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
1, 65536, 65536,
0,
None)

proc = subprocess.Popen(['python', './NAMED_PIPE_S01.py'])

sw = StopWatch()
print('---Start NAMED PIPE / process comm ---')
sw.start('<load MNIST dataset>')
if os.path.exists('mnist.jb'):
mnist = joblib.load("mnist.jb")
else:
mnist = fetch_openml('mnist_784', version=1, )
joblib.dump(mnist, "mnist.jb", compress=3)
sw.stop('<load MNIST dataset>')

# mnist.data : 70,000 784-dimensional vector data
mnist.data = mnist.data.astype(np.float32)
#mnist.data /= 255.0 # 0-1 of the data conversion

sw.start('<Master dumps>')
bin_data = pickle.dumps(mnist.data) # オブジェクトをシリアル化
enc_value = base64.b64encode(bin_data) #扱いやすいようにBASE64
sw.stop('<Master dumps>')

try:
sw.start('<Master ConnectNamedPipe>')
win32pipe.ConnectNamedPipe(M2S_pipe, None)
sw.stop('<Master ConnectNamedPipe>')
ss.req_message('$start')
sw.start('<<Master throughput>>')
sw.start('<Master send>')
win32file.WriteFile(M2S_pipe, enc_value)
sw.stop('<Master send>')
finally:
win32file.CloseHandle(M2S_pipe)

S2M_handle = win32file.CreateFile(
r'\\.\pipe\S2M',
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0,
None,
win32file.OPEN_EXISTING,
0,
None)

try:
sw.start('<Master recv>')
res = win32pipe.SetNamedPipeHandleState(S2M_handle, win32pipe.PIPE_READMODE_MESSAGE, None, None)
recv_stream = win32file.ReadFile(S2M_handle, 1024 * 1024 * 1024)[1]
sw.stop('<Master recv>')
sw.stop('<<Master throughput>>')
sw.start('<Master decode>')
recv_data = pickle.loads(base64.b64decode(recv_stream)) # オブジェクトを復元
sw.stop('<Master decode>')
if (enc_value == recv_stream):
print("Validation OK!")
else:
print("Different! NG!")
finally:
win32file.CloseHandle(S2M_handle)

NAMED_PIPE_S01.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-

import win32pipe, win32file
import pickle
import sys
import base64
from KooStopWatch import StopWatch
import KooSimpleSync as ss

M2S_handle = win32file.CreateFile(
r'\\.\pipe\M2S',
win32file.GENERIC_READ | win32file.GENERIC_WRITE,
0,
None,
win32file.OPEN_EXISTING,
0,
None)

S2M_pipe = win32pipe.CreateNamedPipe(
r'\\.\pipe\S2M',
win32pipe.PIPE_ACCESS_DUPLEX,
win32pipe.PIPE_TYPE_MESSAGE | win32pipe.PIPE_READMODE_MESSAGE | win32pipe.PIPE_WAIT,
1, 65536, 65536,
0,
None)

sw = StopWatch(sys.stderr)
try:
ss.wait_message('$start') # Master Processからの開始指示待ち
sw.start('#1:Slave recv-decode-load#')
sw.start('#2:Slave recv#')
res = win32pipe.SetNamedPipeHandleState(M2S_handle, win32pipe.PIPE_READMODE_MESSAGE, None, None)
recv_stream = win32file.ReadFile(M2S_handle, 1024 * 1024 * 1024)[1]
sw.stop('#2:Slave recv#')
finally:
win32file.CloseHandle(M2S_handle)

sw.start('#3:Slave loads#')
recv_data = pickle.loads(base64.b64decode(recv_stream))
sw.stop('#3:Slave loads#')
sw.stop('#1:Slave recv-decode-load#')

sw.start('#4:Slave dumps#')
bin_data = pickle.dumps(recv_data)
enc_value = base64.b64encode(bin_data)
sw.stop('#4:Slave dumps#')

try:
sw.start('#5:Slave ConnectNamedPipe#')
win32pipe.ConnectNamedPipe(S2M_pipe, None)
sw.stop('#5:Slave ConnectNamedPipe#')
sw.start('#6:Slave send#')
win32file.WriteFile(S2M_pipe, enc_value)
sw.stop('#6:Slave send#')
finally:
win32file.CloseHandle(S2M_pipe)
---Start NAMED PIPE / process comm ---
<load MNIST dataset> start:<load MNIST dataset> STOP in 1.32812 sec
<Master dumps> start:<Master dumps> STOP in 0.89062 sec
#1:Slave recv-decode-load# start:#2:Slave recv# start:<Master ConnectNamedPipe> start:<Master ConnectNamedPipe> STOP in 0.00000 sec
<<Master throughput>> start:<Master send> start:<Master send> STOP in 0.06250 sec
<Master recv> start:#2:Slave recv# STOP in 0.28125 sec
#3:Slave loads# start:#3:Slave loads# STOP in 0.89062 sec
#1:Slave recv-decode-load# STOP in 1.17188 sec
#4:Slave dumps# start:#4:Slave dumps# STOP in 0.89062 sec
#5:Slave ConnectNamedPipe# start:#5:Slave ConnectNamedPipe# STOP in 0.00000 sec
#6:Slave send# start:#6:Slave send# STOP in 0.04688 sec
<Master recv> STOP in 0.31250 sec
<<Master throughput>> STOP in 0.37500 sec
<Master decode> start:<Master decode> STOP in 0.76562 sec
Validation OK!

Process finished with exit code 0

 参考文献:PythonとWindowsの名前付きパイプ

DUPLEX(双方向通信)パイプにしてはいますが、送信用、受信用の専用のパイプとして使用しています。(DUPLEXにしていることに意図はありません)また、パイプなので、処理完了する前にスループット表示されます。

TCP/IP (socket):ただし同一マシン

TCP/IPのコネクションが可能になるまですこし時間がかかるので、スレーブ側プロセスの起動タイミングでなんとなく調整しています。極力手抜きしていますが、速度実験用なのでご容赦ください。

TCPIP_M01.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import subprocess
from sklearn.datasets import fetch_openml
import numpy as np
import pickle
import joblib
import os
import base64
import socket
from KooStopWatch import StopWatch
import KooSimpleSync as ss

#TCP/IP PORT
M2S_PORT = 50011
S2M_PORT = 50012
LOCALHOST = '127.0.0.1'


sw = StopWatch()
print('---Start TCP/IP / process comm ---')
sw.start('<load MNIST dataset>')
if os.path.exists('mnist.jb'):
mnist = joblib.load("mnist.jb")
else:
mnist = fetch_openml('mnist_784', version=1, )
joblib.dump(mnist, "mnist.jb", compress=3)
sw.stop('<load MNIST dataset>')

# mnist.data : 70,000 784-dimensional vector data
mnist.data = mnist.data.astype(np.float32)

sw.start('<Master dumps>')
bin_data = pickle.dumps(mnist.data) # オブジェクトをシリアル化
enc_value = base64.b64encode(bin_data) #扱いやすいようにBASE64
sw.stop('<Master dumps>')
proc = None

sw.start('<<Master throughput>>')
sw.start('<Master send>')
with socket.socket() as s:
try:
s.bind((LOCALHOST, M2S_PORT))
s.listen()
proc = subprocess.Popen(['python', './TCPIP_S01.py'])
c, addr = s.accept()
ss.req_message('$start')
c.send(enc_value)
c.close()
except Exception as e:
print('M send message:' + str(e))
sw.stop('<Master send>')

sw.start('<Master recv>')
with socket.socket() as r:
r.connect((LOCALHOST, S2M_PORT))
try:
recv_stream = r.recv(1024 * 1024 * 1024) #これもCHUNKで回すべきですが、手抜きます。
sw.stop('<Master recv>')
sw.start('<Master decode>')
recv_data = pickle.loads(base64.b64decode(recv_stream)) # オブジェクトを復元
sw.stop('<Master decode>')
sw.stop('<<Master throughput>>')
if (enc_value == recv_stream):
print("Validation OK!")
else:
print("Different! NG!")
except Exception as e:
print('M recv message:' + str(e))

TCPIP_S01.py  

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pickle
import base64
import sys
import socket
from KooStopWatch import StopWatch
import KooSimpleSync as ss

#TCP/IP PORT
M2S_PORT = 50011
S2M_PORT = 50012
LOCALHOST = '127.0.0.1'
CHUNK_SIZE = 8 * 1024

sw = StopWatch(sys.stderr)
sw.start('#Slave recv#')
recv_stream = None
recv_data = None
s = socket.socket()
s.bind((LOCALHOST, S2M_PORT))
s.listen()
with socket.socket() as r:
r.connect((LOCALHOST, M2S_PORT))
try:
ss.wait_message('$start') # Master Processからの開始指示待ち
sw.start('#1:Slave recv-decode-load#')
sw.start('#2:Slave recv#')
recv_stream = r.recv(1024 * 1024 * 1024) # CHUNKで回すべきですが、測定なので(^_^;)
sw.stop('#2:Slave recv#')
sw.start('#3:Slave loads#')
recv_data = pickle.loads(base64.b64decode(recv_stream))
sw.stop('#3:Slave loads#')
except Exception as e:
print('S recv message:' + str(e))

sw.start('#4:Slave dumps#')
bin_data = pickle.dumps(recv_data)
enc_value = base64.b64encode(bin_data)
sw.stop('#4:Slave dumps#')
sw.stop('#1:Slave recv-decode-load#')

try:
sw.start('#5:Slave send#')
c, addr = s.accept()
c.send(enc_value)
c.close()
sw.stop('#5:Slave send#')
except Exception as e:
print('S send message:' + str(e))
---Start TCP/IP / process comm ---
<load MNIST dataset> start:<load MNIST dataset> STOP in 1.35938 sec
<Master dumps> start:<Master dumps> STOP in 0.90625 sec
<<Master throughput>> start:<Master send> start:#Slave recv# start:#1:Slave recv-decode-load# start:#2:Slave recv# start:<Master send> STOP in 0.17188 sec
<Master recv> start:#2:Slave recv# STOP in 0.06250 sec
#3:Slave loads# start:#3:Slave loads# STOP in 0.90625 sec
#4:Slave dumps# start:#4:Slave dumps# STOP in 0.90625 sec
#1:Slave recv-decode-load# STOP in 1.87500 sec
#5:Slave send# start:#5:Slave send# STOP in 0.15625 sec
<Master recv> STOP in 0.07812 sec
<Master decode> start:<Master decode> STOP in 0.75000 sec
<<Master throughput>> STOP in 1.00000 sec
Validation OK!

Process finished with exit code 0

これも、非同期になっている部分が多々ありそうです。

MPI:ただし同一マシン

ケアレスミスに気付けなくて時間がかかりましたが、できました。相変わらず、プロセスが起動するまで時間がかかる原因はわかっていません。MPIなのでマスター、スレーブ共用プログラムにしています。

MPI_01.py 

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from mpi4py import MPI
from sklearn.datasets import fetch_openml
import numpy as np
import pickle
import joblib
import os
import base64
from KooStopWatch import StopWatch
import KooSimpleSync as ss

comm = MPI.COMM_WORLD #COMM_WORLDは全体
rank = comm.Get_rank() #ランク(何番目のプロセスか。プロセスID

sw = StopWatch()
if rank == 0:
# Master
print('---Start MPI / process comm Master ---')
sw.start('<load MNIST dataset>')
if os.path.exists('mnist.jb'):
mnist = joblib.load("mnist.jb")
else:
mnist = fetch_openml('mnist_784', version=1, )
joblib.dump(mnist, "mnist.jb", compress=3)
sw.stop('<load MNIST dataset>')

# mnist.data : 70,000 784-dimensional vector data
mnist.data = mnist.data.astype(np.float32)

sw.start('<Master dumps>')
bin_data = pickle.dumps(mnist.data) # オブジェクトをシリアル化
enc_value = base64.b64encode(bin_data) #扱いやすいようにBASE64
sw.stop('<Master dumps>')

ss.req_message('$start')
sw.start('<<Master throughput>>')
sw.start('<Master send>')
#今回サブノードは1つだけなのでブロードキャストしません。相手は Rank#1ノードだけです。
comm.send(enc_value, dest=1, tag=1)
sw.stop('<Master send>')

sw.start('<Master recv>')
#今回サブノードは1つだけなのでまわしません。相手は Rank#1ノードだけです。
recv_stream = comm.recv(source=1, tag=2)
sw.stop('<Master recv>')
sw.start('<Master decode>')
recv_data = pickle.loads(base64.b64decode(recv_stream)) # オブジェクトを復元
sw.stop('<Master decode>')
sw.stop('<<Master throughput>>')
if (enc_value == recv_stream):
print("Validation OK!")
else:
print("Different! NG!")
elif rank == 1:
# Slave
print('---Start MPI / process comm Slave ---')
sw = StopWatch()
ss.wait_message('$start') # Master Processからの開始指示待ち :MPIでこれをいれるのは違和感がありますが、他と同じ条件にするためです。
sw.start('#1:Slave recv-decode-load#')
sw.start('#2:Slave recv#')
recv = comm.recv(source=0, tag=1)
sw.stop('#2:Slave recv#')
sw.start('#3:Slave loads#')
recv_data = pickle.loads(base64.b64decode(recv))
sw.stop('#3:Slave loads#')
sw.stop('#1:Slave recv-decode-load#')
sw.start('#4:Slave dumps#')
bin_data = pickle.dumps(recv_data)
enc_value = base64.b64encode(bin_data)
sw.stop('#4:Slave dumps#')

sw.start('#5:Slave send#')
comm.send(enc_value, dest=0, tag=2)
sw.stop('#5:Slave send#')

MPI.Finalize()
C:\Users\Mariel\PycharmProjects\ManualEncoder>mpiexec -n 2 -host localhost python MPI_01.py---Start MPI / process comm Slave ---
#1:Slave recv-decode-load# start:#2:Slave recv# start:#2:Slave recv# STOP in 0.60938 sec
#3:Slave loads# start:#3:Slave loads# STOP in 0.78125 sec
#1:Slave recv-decode-load# STOP in 1.39062 sec
#4:Slave dumps# start:#4:Slave dumps# STOP in 0.89062 sec
#5:Slave send# start:#5:Slave send# STOP in 0.23438 sec
---Start MPI / process comm Master ---
<load MNIST dataset> start:<load MNIST dataset> STOP in 1.32812 sec
<Master dumps> start:<Master dumps> STOP in 0.87500 sec
<<Master throughput>> start:<Master send> start:<Master send> STOP in 0.25000 sec
<Master recv> start:<Master recv> STOP in 2.57812 sec
<Master decode> start:<Master decode> STOP in 0.76562 sec
<<Master throughput>> STOP in 3.59375 sec
Validation OK!

MPIなので、各ノード間のメッセージは時系列に表示されません。自ノードメッセージのみ時系列に表示されます。(ノード動作完了時に該当ノードの標準出力内容がまとめて、起動コマンド発行側に表示されます。)

キューの接続

これは、前回断念しています。キューの「ハンドリング」を相手プロセスに通知する方法が見つからなかったためです。ネットを調べて、ヒットがなければ今回も断念します。同一プログラムでmultiprocessingにてキューを渡せばよいのですが、わたくしの要件は全く別のプログラム間(起動pythonも別)にてデータを渡したいのです。(これより前のプログラムではsubprocessで別pythonを起動しています。)

がんばって探しています。。。。わからないので、欲しいのは「コレジャナイ」プログラムを貼ります。(同一pythonからのプロセス起動なので)

QUEUE_01.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import multiprocessing as mp

from sklearn.datasets import fetch_openml
import numpy as np
import pickle
import joblib
import os
import sys
import base64
from KooStopWatch import StopWatch
import KooSimpleSync as ss

def master_process(q, p):
sw = StopWatch()
print('---Start QUEUE / process comm Master ---')
sw.start('<load MNIST dataset>')
if os.path.exists('mnist.jb'):
mnist = joblib.load("mnist.jb")
else:
mnist = fetch_openml('mnist_784', version=1, )
joblib.dump(mnist, "mnist.jb", compress=3)
sw.stop('<load MNIST dataset>')

# mnist.data : 70,000 784-dimensional vector data
mnist.data = mnist.data.astype(np.float32)

sw.start('<Master dumps>')
bin_data = pickle.dumps(mnist.data) # オブジェクトをシリアル化
enc_value = base64.b64encode(bin_data) #扱いやすいようにBASE64
sw.stop('<Master dumps>')

ss.req_message('$start')
sw.start('<<Master throughput>>')
sw.start('<Master send>')
q.put(enc_value) #キューへ出力
sw.stop('<Master send>')

sw.start('<Master recv>')
recv_stream = p.get() #キューから取得
sw.stop('<Master recv>')
sw.start('<Master decode>')
recv_data = pickle.loads(base64.b64decode(recv_stream)) # オブジェクトを復元
sw.stop('<Master decode>')
sw.stop('<<Master throughput>>')
if enc_value == recv_stream:
print("Validation OK!")
else:
print("Different! NG!")

def slave_process(q, p):
# Slave
print('---Start QUEUE / process comm Slave ---')
sw = StopWatch(sys.stderr)
ss.wait_message('$start') # Master Processからの開始指示待ち
sw.start('#1:Slave recv-decode-load#')
sw.start('#2:Slave recv#')
recv = q.get() #キューから取得
sw.stop('#2:Slave recv#')
sw.start('#3:Slave loads#')
recv_data = pickle.loads(base64.b64decode(recv))
sw.stop('#3:Slave loads#')
sw.stop('#1:Slave recv-decode-load#')
sw.start('#4:Slave dumps#')
bin_data = pickle.dumps(recv_data)
enc_value = base64.b64encode(bin_data)
sw.stop('#4:Slave dumps#')

sw.start('#5:Slave send#')
p.put(enc_value) #キューへ出力
sw.stop('#5:Slave send#')

if __name__ == '__main__':
ctx = mp.get_context('spawn') #Windowsですが、おまじないで書いてます。
q = mp.Queue()
p = mp.Queue()
master= mp.Process(target=master_process, args=(q,p)) #qが出力、pが入力
slave = mp.Process(target=slave_process, args=(q,p)) #qが入力、pが出力
master.start()
slave.start()
master.join()
slave.join()
---Start QUEUE / process comm Master ---
<load MNIST dataset> start:---Start QUEUE / process comm Slave ---
<load MNIST dataset> STOP in 1.32812 sec
<Master dumps> start:<Master dumps> STOP in 0.89062 sec
<<Master throughput>> start:<Master send> start:<Master send> STOP in 0.00000 sec#1:Slave recv-decode-load# start:#2:Slave recv# start:
<Master recv> start:#2:Slave recv# STOP in 0.40625 sec
#3:Slave loads# start:#3:Slave loads# STOP in 0.73438 sec
#1:Slave recv-decode-load# STOP in 1.14062 sec
#4:Slave dumps# start:#4:Slave dumps# STOP in 0.87500 sec
#5:Slave send# start:#5:Slave send# STOP in 0.00000 sec
<Master recv> STOP in 0.48438 sec
<Master decode> start:<Master decode> STOP in 0.75000 sec
<<Master throughput>> STOP in 1.51562 sec
Validation OK!

Process finished with exit code 0

番外編:ファイル渡し(RAM DISK)

今のマシンにRAM DISKないんだった~。(作業中のマシンは安定指向なので方針に従い余計なソフトは極力控えます)

SSDで代用します。タイミングのハンドリングは、ファイルの存在、非存在にします。

あいも変わらず、ベタベタで酷いプログラムですみません。

FILE_M01.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import subprocess
from sklearn.datasets import fetch_openml
import numpy as np
import pickle
import joblib
import os
import base64
import pathlib
from KooStopWatch import StopWatch
import KooSimpleSync as ss #ファイルでやってみたので、つかいませんでした

#FILE
M2S_FILE = './M2S_FILE.DATA'
S2M_FILE = './S2M_FILE.DATA'
MASTER_DONE = './MASTER_DONE'
SLAVE_DONE = './SLAVE_DONE'

def remove_file(finalame):
if os.path.isfile(finalame):
os.remove(finalame)

sw = StopWatch()
print('---Start FILE / process comm ---')
sw.start('<Master prepare>')
remove_file(M2S_FILE)
remove_file(S2M_FILE)
remove_file(MASTER_DONE)
remove_file(SLAVE_DONE)
sw.stop('<Master prepare>')
proc = subprocess.Popen(['python', './FILE_S01.py'])
sw.start('<load MNIST dataset>')
if os.path.exists('mnist.jb'):
mnist = joblib.load("mnist.jb")
else:
mnist = fetch_openml('mnist_784', version=1, )
joblib.dump(mnist, "mnist.jb", compress=3)
sw.stop('<load MNIST dataset>')

# mnist.data : 70,000 784-dimensional vector data
mnist.data = mnist.data.astype(np.float32)

sw.start('<Master dumps>')
bin_data = pickle.dumps(mnist.data) # オブジェクトをシリアル化
enc_value = base64.b64encode(bin_data) #扱いやすいようにBASE64
sw.stop('<Master dumps>')

sw.start('<<Master throughput>>')
sw.start('<Master send>')
with open(M2S_FILE, 'w') as s:
s.write(enc_value.decode()) #ファイル出力
st = pathlib.Path(MASTER_DONE)
st.touch()
sw.stop('<Master send>')
while (os.path.isfile(SLAVE_DONE) == False):
pass
sw.start('<Master recv>')
with open(S2M_FILE, 'r') as r:
recv_stream = r.read() #ファイル読み込み
sw.stop('<Master recv>')
sw.start('<Master decode>')
recv_data = pickle.loads(base64.b64decode(recv_stream)) # オブジェクトを復元
sw.stop('<Master decode>')
sw.stop('<<Master throughput>>')
if (enc_value== recv_stream.encode()):
print("Validation OK!")
else:
print("Different! NG!")

FILE_S01.py

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import pickle
import sys
import os
import base64
import pathlib
from KooStopWatch import StopWatch
import KooSimpleSync as ss #ファイルでやってみたので、つかいませんでした

#FILE
M2S_FILE = './M2S_FILE.DATA'
S2M_FILE = './S2M_FILE.DATA'
MASTER_DONE = './MASTER_DONE'
SLAVE_DONE = './SLAVE_DONE'

sw = StopWatch(sys.stderr)
while (os.path.isfile(MASTER_DONE) == False):
pass
sw.start('#1:Slave recv-decode-load#')
sw.start('#2:Slave recv#')
with open(M2S_FILE, 'r') as r:
recv_stream = r.read() #ファイル読み込み
sw.stop('#2:Slave recv#')
sw.start('#3:Slave loads#')
recv_data = pickle.loads(base64.b64decode(recv_stream))
sw.stop('#3:Slave loads#')
sw.stop('#1:Slave recv-decode-load#')
sw.start('#4:Slave dumps#')
bin_data = pickle.dumps(recv_data)
enc_value = base64.b64encode(bin_data)
sw.stop('#4:Slave dumps#')
sw.start('#5:Slave send#')
with open(S2M_FILE, 'w') as s:
s.write(enc_value.decode()) #ファイル出力
st = pathlib.Path(SLAVE_DONE)
st.touch()
sw.stop('#5:Slave send#')

いちおう高速NVMe SSDです。(環境が簡単に変えられないので、RAM DISK計測は保留します)

---Start FILE / process comm ---
<Master prepare> start:<Master prepare> STOP in 0.04688 sec
<load MNIST dataset> start:<load MNIST dataset> STOP in 1.37500 sec
<Master dumps> start:<Master dumps> STOP in 0.92188 sec
<<Master throughput>> start:<Master send> start:<Master send> STOP in 1.03125 sec
#1:Slave recv-decode-load# start:#2:Slave recv# start:#2:Slave recv# STOP in 1.09375 sec
#3:Slave loads# start:#3:Slave loads# STOP in 1.07812 sec
#1:Slave recv-decode-load# STOP in 2.17188 sec
#4:Slave dumps# start:#4:Slave dumps# STOP in 0.90625 sec
#5:Slave send# start:#5:Slave send# STOP in 1.06250 sec
<Master recv> start:<Master recv> STOP in 1.09375 sec
<Master decode> start:<Master decode> STOP in 0.89062 sec
<<Master throughput>> STOP in 7.20312 sec
Validation OK!

Process finished with exit code 0
総評:

 非同期処理との処理速度比較がうまくできなかったので、最小限のタイミング同期をとって計測しました。

比較対象はマスター送信開始から、スレーブ側でのオブジェクト送信復元までの時間です。(結果の赤色行部分です。)マスター側の「みかけ上のスループット」も参考までに記載します。(パイプなどの非同期は処理が完了していなくても比較直前まで到達するので、あくまで参考値です)

順位# 処理の種類

スレーブ側での
復元処理まで(sec)

マスター側の
見かけ上
スループット(sec)

※備考
1🏆 名前付きパイプ 1.17188 0.37500 .
2🥈 MMAP 1.39062 2.29688 .
2🥈 MPI 1.39062 3.59375 .
4 標準入出力の接続 1.73438 2.26562 .
5 TCP/IP 1.87500 1.00000 .
6 ファイル渡し(SSD) 2.17188 7.20312 .
番外 キュー接続 1.14062 1.51562 わたくしの要件を満たす動作条件でないため、順位判定から除外。

TCP/IPの順位が意外すぎですが、通常は必要ない箇所で他と条件をあわせるために同期をとっているのが原因かもしれません。名前付きパイプはWin32版機能をつかっているので、同一機器でのパイプのため、メモリでストリームを渡しているの「かも」しれません。

上記より、より「リアルタイム性」を求めるのであれば、名前付きパイプのほうがよいですが、(わたくしの場合は)現行のMMAPで十分であることがわかりました。

(ちなみに渡しているデータはBase64エンコード後で292,693,552バイト(300MB弱)です)*1

慣れてくるとMMAPでの情報渡しは本当に「楽」です。

おすすめです。

*1:通信でストリームをBase64する必要はないものがほとんどなのですが、処理によってはBase64のほうがずっと組みやすいのです。今回の比較では条件をあわせるため、すべてBase64にして通信しています。