apache beam でマルチプロセスで実行し、各処理でプロセスIDを確認します。
ここでは以下の形式のデータを処理します。
以下のプログラムでは、ParDo() で WordReader を並列化し、以降の処理をシリアルに実行します。
各処理でプロセスIDを取得し、最後に行番号、プロセスID、データを出力します。
■プログラム
ここでは以下の形式のデータを処理します。
Confidence NN B-NP in IN B-PP the DT B-NP
以下のプログラムでは、ParDo() で WordReader を並列化し、以降の処理をシリアルに実行します。
各処理でプロセスIDを取得し、最後に行番号、プロセスID、データを出力します。
■プログラム
import sys import os import json import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import DirectOptions input_file = '~/nltk_data/corpora/conll2000/train.txt' class WordReader(beam.DoFn): def __init__(self): self.line_no = 0 def process(self, line): self.line_no += 1 items = line.split(' ') if len(items) < 3: return obj = { 'line_no': self.line_no, 'form': items[0], 'pos': items[1], 'tag': items[2], 'pids': [os.getpid()], } yield obj opts = PipelineOptions() opts.view_as(StandardOptions).runner = 'DirectRunner' opts.view_as(DirectOptions).direct_running_mode = 'multi_processing' opts.view_as(DirectOptions).direct_num_workers = 2 def form_lower(obj): obj['form'] = obj['form'].lower() obj['pids'].append(os.getpid()) return obj def pos_lower(obj): obj['pos'] = obj['pos'].lower() obj['pids'].append(os.getpid()) return obj def tag_lower(obj): obj['tag'] = obj['tag'].lower() obj['pids'].append(os.getpid()) return obj def print_obj(obj): print("%d\t%s [%s, %s, %s]" % (obj['line_no'], obj['pids'], obj['form'], obj['pos'], obj['tag'] )) return obj with beam.Pipeline(options=opts) as p: (p | beam.io.ReadFromText(input_file) | beam.ParDo(WordReader()) | beam.Map(form_lower) | beam.Map(pos_lower) | beam.Map(tag_lower) | beam.Map(print_obj) ) </pre> ■出力結果 WordReader が並列で実行されるため、行番号がプロセス毎に1から始まっています。1 [3768, 3768, 3768, 3768] [its, prp$, b-np] 2 [3768, 3768, 3768, 3768] [existing, vbg, i-np] 3 [3768, 3768, 3768, 3768] [authorization, nn, i-np] ... 1 [3769, 3769, 3769, 3769] [confidence, nn, b-np] 2 [3769, 3769, 3769, 3769] [in, in, b-pp] 3 [3769, 3769, 3769, 3769] [the, dt, b-np] ...