python でマルチプロセスでデータを処理するプログラムです。
mp = MultiProcessUtil(num_proc, func) で func を実行する num_proc の子プロセスを生成し、mp.start() で子プロセスを実行します。
親プロセスから子プロセスへは、mp.childs[i].p2c で送信します。
子プロセスの関数 func は 2 つの引数 (cinfo, args) をとり、cinfo.p2c で親プロセスから送信された情報を読み込みます。
親プロセスが mp.childs[i].p2c を close し、子プロセス側でデータの読み込みが完了すると、子プロセス側の読み込みループが終了します。
親プロセスは mp.join() を実行すると、子プロセスが終了するまで待ちます。
■ライブラリ
■プログラム
■入力ファイル
■実行結果
mp = MultiProcessUtil(num_proc, func) で func を実行する num_proc の子プロセスを生成し、mp.start() で子プロセスを実行します。
親プロセスから子プロセスへは、mp.childs[i].p2c で送信します。
子プロセスの関数 func は 2 つの引数 (cinfo, args) をとり、cinfo.p2c で親プロセスから送信された情報を読み込みます。
親プロセスが mp.childs[i].p2c を close し、子プロセス側でデータの読み込みが完了すると、子プロセス側の読み込みループが終了します。
親プロセスは mp.join() を実行すると、子プロセスが終了するまで待ちます。
■ライブラリ
import os class ChildInfo: def __init__(self): self.id = -1 self.pid = None self.func = None self.args = None self.status = None self.p2c = None self.c2p = None self.p2c_r = None self.p2c_w = None self.c2p_r = None self.c2p_w = None class MultiProcessUtil: def __init__(self, num_childs, func, args=None): self.num_childs = num_childs self.childs = [None] * num_childs self.func = func self.args = args # len(args) == num_childs if self.args is None: self.args = [None] * self.num_childs for i in range(0, self.num_childs): cinfo = ChildInfo() cinfo.id = i cinfo.func = self.func cinfo.args = self.args[i] cinfo.p2c_r, cinfo.p2c_w = os.pipe() cinfo.c2p_r, cinfo.c2p_w = os.pipe() self.childs[i] = cinfo def start(self): for i in range(0, self.num_childs): cinfo = self.childs[i] pid = os.fork() if pid == 0: # child # 不要な fd を close for j in range(0, i): os.close(self.childs[j].p2c_w) os.close(self.childs[j].c2p_r) os.close(cinfo.p2c_w) cinfo.p2c_w = None os.close(cinfo.c2p_r) cinfo.c2p_r = None cinfo.p2c = os.fdopen(cinfo.p2c_r, 'r') cinfo.c2p = os.fdopen(cinfo.c2p_w, 'w') try: res = cinfo.func(cinfo, cinfo.args) except Exception as e: res = 1 exit(res) else: # parent # 不要な fd を close cinfo.pid = pid os.close(cinfo.p2c_r) cinfo.p2c_r = None os.close(cinfo.c2p_w) cinfo.c2p_w = None cinfo.p2c = os.fdopen(cinfo.p2c_w, 'w') cinfo.c2p = os.fdopen(cinfo.c2p_r, 'r') return self.childs def join(self): for child in self.childs: pid, status = os.wait() child.status = status
■プログラム
import sys import os import re from multi_process_util import MultiProcessUtil def proc1(cinfo, args=None): print('run: %d' % (os.getpid())) for line in cinfo.p2c: line = re.sub('[\r\n]+$', '', line) print('[%d]: [%s]' % (os.getpid(), line)) return 0 def main(): num_procs = 2 mp = MultiProcessUtil(num_procs, proc1) mp.start() line_no = -1 for line in sys.stdin: line_no += 1 c = line_no % num_procs #print('c: %d' % (c)) #print(mp.childs[c].json()) mp.childs[c].p2c.write(line) for child in mp.childs: child.p2c.close() mp.join() return 0 if __name__ == '__main__': res = main() exit(res)
■入力ファイル
abc def ghi jkl mno pqr stu vwx 012 345 678
■実行結果
run: 8223 [8223]: [def] run: 8222 [8222]: [abc] [8223]: [jkl] [8223]: [pqr] [8223]: [vwx] [8223]: [345] [8222]: [ghi] [8222]: [mno] [8222]: [stu] [8222]: [012] [8222]: [678]