クーの自由研究

マスターのかえるのクーは、弟子達の召喚術により新たな依り代を得てⅡ世として復活しました。

pythonでmmapを使ってプロセス間通信をしてみる

「キュー」に替わる方法を確認

七夕が楽しみな今日この頃。皆様、お元気でしょうか?

かえるのクーの助手の「井戸中 聖」です。

「キュー」での通信を期待していましたが、予想外にmultiprocessingでの別画面起動が(技術不足で)できなかったので、別の方法で通信を考えます。

余談ですが、キン・ザ・ザ星域では

「クー」:キュー以外の意味を表す言葉。

「キュー」:公言可能な罵倒の言葉。

です。久しくこのブログに貼られてないので、貼ります。旧ソ連ではスターウオーズを知らなくても「キンザザ」なら全国民が知っているほど、有名なSF映画だったそうです。数すくないBlueRayDiskで持っている映画の一つです。

数多くあるこのサイトページのモチーフ(ぱくりネタ)のひとつであり、メインテーマでもあります。

 

さて「キュー」がつかえない状況なので、他を探します。

mmapを使ってみよう

メモリ上のファイルを共有して使えるしくみがあるので、それで通信?します。直接メモリ経由の通信なので、TCP/IPよりは早いと思われます。(共有メモリ:Shared memoryとは別)

先駆者の方に教えていただきます。

特徴は以下な感じだと思います。(第一印象なので正確でないかも。。。)

・FIFOではなく、メモリ上へ実ファイルがマッピングされる。

・マッピングされた領域へは通常のfileのi/oのようにアクセスできる。

・マッピングされた領域へは通常のbyte配列のようにアクセスできる。当然のように配列スライスによる入出力もできる。

元のファイルサイズ以上の領域をアクセスしようとすると例外が発生する。

・元のファイルはmmapとしてオープンすると、通常のファイルの排他がかかる。

たとえばmm.write(value)のようにも、mm[5:10]のようにスライスして使用することも使える。

・mmapの更新を行うと実ファイルも更新される。(想像ですが、一定時間やmmapの更新で実ファイルへ遅延書き込みしていると思います。読むのは実ファイルではなく、メモリ上なので、即時性は問題ないと思われます。)

お詫びと訂正:英語を読み間違えておりました。&動作させて非常に高速だったので勘違いしました。「メモリが主体」ではなく、「ファイルが主体」のしくみのようです。なので、「共有メモリ領域」アクセスするのではなく、相手プロセスからすると「メモリのコピー」をアクセスするので、変更の「遅延」はそれなりに発生します。

参考:プロセス間共有メモリ - Advanced HSP

・以上より、ファイルの使用前と使用後ではファイルサイズはまったくかわらない。(ファイルサイズはメモリのアロケートサイズに相当する。ファイルサイズがゼロだとメモリ領域を全く使えないことになるので、オープン時に例外となる)

非常に特殊な感じです。慣れが必要だと思います。

通信?のデザイン

では通信を行うためのフォーマットをデザインしてみます。

f:id:AssistantOfKoo:20200708223202p:plain
実験なのできわめて簡略化します。

・最初の2バイトはデータの種類とする。ただし16進数では0x4040以上の値を使用する。(改行コードなどの制御コードエリアはさける)

・次の3バイト以降は可変長の情報のシリアル変換値を使う。

・シリアル変換はオブジェクトpickle.dumpsでバイト変換したあと、さらにuuencode(base64encode)で文字列ストリーム化する。

・データの最後は改行とする。=1行出力とする(上記シリアル変換では改行コードがデータ値に含まれることはない)

・読み込み側では1行読み込みとする。

・読み込みも書き込みも常に先頭から行う(FIFO的に一切使わない)=常にデータは1つだけ保存(まぁ、テスト用なので)

・行の最初の2バイトでデータの種類を識別する。

・3バイトめ以降の情報をオブジェクトに復元する。(uudecode(base64decode)でバイト列に戻し、さらにpickle.loadsでオブジェクトに戻す。)

・アプリではオブジェクトの内容をformatで表示形式にして受信欄に表示する。

効率は悪いですが、お手軽に変換できるはずです。

 実装してみました

上記路線で実装すると以下のようになりました。メイン側とサブ側ほとんど同じなのですが、貼りつけます。なお、controlDialog.uiは前回までのファイルをそのまま使用しています。

TEST_MAIN01.py

#!python3
# -*- coding: utf-8 -*-

from PyQt5 import uic
import sys
from PyQt5.QtWidgets import QApplication
import numpy as np
from subprocess import Popen
import mmap
import pickle
import base64
'''
=== 複数画面を起動して、mmapで相互に情報通信するプログラム(Main) ===
'''
# load ui file
uifile_main = './controlDialog.ui'
form_main, base_main = uic.loadUiType(uifile_main)

''' --- メインダイアログ --- '''
class MainDialog(base_main, form_main):
def __init__(self):
super(base_main, self).__init__()
self.setupUi(self)
self.setWindowTitle('main dialog')
self.lineEdit_title.setText('Main Form')
#イベントの定義をします
self.btn_sub_form.clicked.connect(self.start_sub_process)
self.btn_sendtext.clicked.connect(self.send)
self.btn_sendpos.clicked.connect(self.send)
self.btn_sendarray.clicked.connect(self.send)
self.btn_sendclose.clicked.connect(self.send)
self.btn_recv.clicked.connect(self.recv)

#メモリマップファイルを定義します。あらかじめ所定のバイト数のデータが格納されている必要があります。
#格納されている以上の情報を出力しようとすると、例外が発生します。
with open("send.txt", "r+b") as s:
self.mm_send = mmap.mmap(s.fileno(), 0)
with open("recv.txt", "r+b") as r:
self.mm_recv = mmap.mmap(r.fileno(), 0)

#サブプロセスを起動します。
def start_sub_process(self):
try:
xpos = self.pos().x() + self.rect().width() + 11
ypos = self.pos().y() + 31
dwidth = self.rect().width()
dheight = self.rect().height()
#メイン画面の横にサブ画面を表示するための情報を引数で渡します。
self.child_process = Popen(['python', './TEST_SUB01.py',
"{0}".format(xpos),
"{0}".format(ypos),
"{0}".format(dwidth),
"{0}".format(dheight)])
#printは動作確認用です。
print('trace')
print('PID:{0}'.format(self.child_process.pid))
print('Error:{0}'.format(self.child_process.errors))
except:
print('exception!')

#通信用データ識別
CONST_TYPE_TEXT = b'\x40\x41'
CONST_TYPE_POSITION = b'\x40\x42'
CONST_TYPE_NDARRAY = b'\x40\x43'
CONST_TYPE_CLOSE = b'\x40\x5A'
#送信用:オブジェクトを変換してデータ種別をシリアライズしてデータ種別情報を付加してmmapに書き込みます。
def send(self):
try:
request = self.sender()

if request == self.btn_sendtext:
self.encode_write(self.CONST_TYPE_TEXT, self.lineEdit_sendtext.text())
elif request == self.btn_sendpos:
self.encode_write(self.CONST_TYPE_POSITION, self.create_pos())
elif request == self.btn_sendarray:
self.encode_write(self.CONST_TYPE_NDARRAY, self.create_array())
elif request == self.btn_sendclose:
self.encode_write(self.CONST_TYPE_CLOSE, 'close')
except:
print('send exception!')

#エンコードしてmmapに書き込み
def encode_write(self, enc_type, item):
bin_data = pickle.dumps(item)
uu_encode = base64.b64encode(bin_data)
enc_value = enc_type + uu_encode + '\r\n'.encode() #オブジェクトをシリアル化
self.mm_send.seek(0) #先頭位置を指定
self.mm_send.write(enc_value) #mmapへ出力

#テスト用のタプル作成
def create_pos(self):
return (self.ds_x.value(), self.ds_y.value())

#テスト用の配列作成
def create_array(self):
return np.ones((self.sp_ycount.value(), self.sp_xcount.value())) * self.ds_val.value()

#受信用
def recv(self):
try:
self.mm_recv.seek(0)
recv_line = self.mm_recv.readline()
recv_command = b'' + recv_line[0:2]
if recv_command == self.CONST_TYPE_TEXT:
strText = pickle.loads(base64.b64decode(recv_line[2:])) #オブジェクトを復元
self.textBrowser_Display.setText('from Sub Text:' + strText)
elif recv_command == self.CONST_TYPE_POSITION:
r_pos = pickle.loads(base64.b64decode(recv_line[2:])) #オブジェクトを復元
self.textBrowser_Display.setText('from Sub Pos:{0}'.format(r_pos))
elif recv_command == self.CONST_TYPE_NDARRAY:
r_array = pickle.loads(base64.b64decode(recv_line[2:])) #オブジェクトを復元
self.textBrowser_Display.setText('from Sub Array:{0}'.format(r_array))
elif recv_command == self.CONST_TYPE_CLOSE:
self.close()
except:
print('recv exception!')


if __name__ == '__main__':
print("start!")
app = QApplication(sys.argv)
ex = MainDialog()
ex.show()
print("end!")
sys.exit(app.exec_())

 TEST_SUB01.py

#!python3
# -*- coding: utf-8 -*-
from PyQt5 import uic
import sys
from PyQt5.QtWidgets import QApplication
import numpy as np
import mmap
import pickle
import base64
'''
=== 複数画面を起動して、mmapで相互に情報通信するプログラム(Sub) ===
'''
# load ui file
uifile_main = './controlDialog.ui'
form_sub, base_sub = uic.loadUiType(uifile_main)

''' --- サブダイアログ --- '''
class SubDialog(base_sub, form_sub):
def __init__(self):
super(base_sub, self).__init__()
self.setupUi(self)
self.setWindowTitle('sub dialog')
self.lineEdit_title.setText('Sub Form')
self.btn_sub_form.setEnabled(False)
self.setGeometry(int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3]), int(sys.argv[4]))

#イベントの定義をします
self.btn_sendtext.clicked.connect(self.send)
self.btn_sendpos.clicked.connect(self.send)
self.btn_sendarray.clicked.connect(self.send)
self.btn_sendclose.clicked.connect(self.send)
self.btn_recv.clicked.connect(self.recv)

#メモリマップファイルを定義します。あらかじめ所定のバイト数のデータが格納されている必要があります。
#格納されている以上の情報を出力しようとすると、例外が発生します。
with open("recv.txt", "r+b") as s: #Main側主体に考えるので、Sub側はrecv.txtへ出力
self.mm_send = mmap.mmap(s.fileno(), 0)
with open("send.txt", "r+b") as r: #Main側主体に考えるので、Sub側はsend.txtから入力
self.mm_recv = mmap.mmap(r.fileno(), 0)

#通信用データ識別
CONST_TYPE_TEXT = b'\x40\x41'
CONST_TYPE_POSITION = b'\x40\x42'
CONST_TYPE_NDARRAY = b'\x40\x43'
CONST_TYPE_CLOSE = b'\x40\x5A'

#送信用:オブジェクトを変換してデータ種別をシリアライズしてデータ種別情報を付加してmmapに書き込みます
def send(self):
try:
request = self.sender()

if request == self.btn_sendtext:
self.encode_write(self.CONST_TYPE_TEXT, self.lineEdit_sendtext.text())
elif request == self.btn_sendpos:
self.encode_write(self.CONST_TYPE_POSITION, self.create_pos())
elif request == self.btn_sendarray:
self.encode_write(self.CONST_TYPE_NDARRAY, self.create_array())
elif request == self.btn_sendclose:
self.encode_write(self.CONST_TYPE_CLOSE, 'close')
except:
print('send exception!')

#エンコードしてmmapに書き込み
def encode_write(self, enc_type, item):
bin_data = pickle.dumps(item)
uu_encode = base64.b64encode(bin_data)
enc_value = enc_type + uu_encode + '\r\n'.encode() #オブジェクトをシリアル化
self.mm_send.seek(0) #先頭位置を指定
self.mm_send.write(enc_value) #mmapへ出力

#テスト用のタプル作成
def create_pos(self):
return (self.ds_x.value(), self.ds_y.value())

#テスト用の配列作成
def create_array(self):
return np.ones((self.sp_ycount.value(), self.sp_xcount.value())) * self.ds_val.value()

#受信用:受信データをオブジェクトに復元して、受信欄に表示する
def recv(self):
try:
self.mm_recv.seek(0)
recv_line = self.mm_recv.readline()
recv_command = b'' + recv_line[0:2]
if recv_command == self.CONST_TYPE_TEXT:
strText = pickle.loads(base64.b64decode(recv_line[2:])) #オブジェクトを復元
self.textBrowser_Display.setText('from Main Text:' + strText)
elif recv_command == self.CONST_TYPE_POSITION:
r_pos = pickle.loads(base64.b64decode(recv_line[2:])) #オブジェクトを復元
self.textBrowser_Display.setText('from Main Pos:{0}'.format(r_pos))
elif recv_command == self.CONST_TYPE_NDARRAY:
r_array = pickle.loads(base64.b64decode(recv_line[2:])) #オブジェクトを復元
self.textBrowser_Display.setText('from Main Array:{0}'.format(r_array))
elif recv_command == self.CONST_TYPE_CLOSE:
self.close()
except:
print('recv exception!')

if __name__ == '__main__':
print("start!")
print(sys.argv)
app = QApplication(sys.argv)
ex = SubDialog()
ex.show()
print("end!")
sys.exit(app.exec_())

 

 実行例

f:id:AssistantOfKoo:20200706232654p:plain

 Main側を起動すると左側の画面が立ち上がり、subFormボタンをクリックすると右側の画面が立ち上がります。サブ画面はいくつでも立ち上がります。

目標をセンターに入れてスイッチデータを入力してクリックするとmmapに書き込まれます。受信画面側のReceiveボタンをクリックすると、入力された情報が表示されます。

mmapはメモリ(ファイル)を介して多対多の通信もできそうなところがよいですね。

これで、マルチプロセスにして、他プロセスに情報を通知するスキルを会得しました。

次は本題にもどって、グラフを表示する画面での実装をしてみるのが順当ですが、もう少し脇道にそれて、プロセス間通信をもう少し勉強するかもしれません。

(なにかにつづく)

 今日の格言

「継続は力なり」

わたくしはSF好きですが、残念なことに読書力・読解力・想像力が貧弱です。「世界の中心で愛を叫んだけものハーラン・エリスン(ヒューゴー賞・ネビュラ賞を両受賞)がイメージできるまで20年かかりました。

 酉島 伝法の「皆勤の徒」(第34回日本SF大賞受賞)は1年かかってもまったく読み進められません。1行読むごとに辞書を引いている状態で、少し時間があくと最初から読むはめになります。でも、これほど絶賛されるSFを他に知らないす。(「新世界より」(第29回日本SF大賞受賞作品)でさえ賛否があるくらい、世間は厳しいです。

「皆勤の徒」はSFを「読み始めたころ」あまりにわからないで、1ページを何時間もかけて読んでいたころの感覚があり新鮮です。

だいたいのことは、諦めなければなんとかなると思っています。

(2020/11/15追記)

MMAPと他のプロセス通信の速度比較をしてみました。

 (追記:重要

わたくしの実験ではmmapは1対多の通信ができてしまいましたが、mmapは1対1の通信とのことです(python公式より)

ネットワーク通信とプロセス間通信 — Python 3.9.0 ドキュメント