dak ブログ

python、rubyなどのプログラミング、MySQL、サーバーの設定などの備忘録。レゴの写真も。

apache beam のマルチプロセスの動作確認

2022-05-10 20:45:23 | python
apache beam でマルチプロセスで実行し、各処理でプロセスIDを確認します。
ここでは以下の形式のデータを処理します。
Confidence NN B-NP
in IN B-PP
the DT B-NP

以下のプログラムでは、ParDo() で WordReader を並列化し、以降の処理をシリアルに実行します。
各処理でプロセスIDを取得し、最後に行番号、プロセスID、データを出力します。

■プログラム
import sys
import os
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import DirectOptions

input_file = '~/nltk_data/corpora/conll2000/train.txt'

class WordReader(beam.DoFn):
    def __init__(self):
        self.line_no = 0

    def process(self, line):
        self.line_no += 1
        items = line.split(' ')
        if len(items) < 3:
            return
        obj = {
            'line_no': self.line_no,
            'form': items[0],
            'pos': items[1],
            'tag': items[2],
            'pids': [os.getpid()],
        }
        yield obj

opts = PipelineOptions()
opts.view_as(StandardOptions).runner = 'DirectRunner'
opts.view_as(DirectOptions).direct_running_mode = 'multi_processing'
opts.view_as(DirectOptions).direct_num_workers = 2

def form_lower(obj):
    obj['form'] = obj['form'].lower()
    obj['pids'].append(os.getpid())
    return obj

def pos_lower(obj):
    obj['pos'] = obj['pos'].lower()
    obj['pids'].append(os.getpid())
    return obj

def tag_lower(obj):
    obj['tag'] = obj['tag'].lower()
    obj['pids'].append(os.getpid())
    return obj

def print_obj(obj):
    print("%d\t%s [%s, %s, %s]" %
          (obj['line_no'],
           obj['pids'],
           obj['form'],
           obj['pos'],
           obj['tag']
           ))
    return obj

with beam.Pipeline(options=opts) as p:
    (p
     | beam.io.ReadFromText(input_file)
     | beam.ParDo(WordReader())
     | beam.Map(form_lower)
     | beam.Map(pos_lower)
     | beam.Map(tag_lower)
     | beam.Map(print_obj)
    )
</pre>
■出力結果
WordReader が並列で実行されるため、行番号がプロセス毎に1から始まっています。
1       [3768, 3768, 3768, 3768] [its, prp$, b-np]
2       [3768, 3768, 3768, 3768] [existing, vbg, i-np]
3       [3768, 3768, 3768, 3768] [authorization, nn, i-np]
...
1       [3769, 3769, 3769, 3769] [confidence, nn, b-np]
2       [3769, 3769, 3769, 3769] [in, in, b-pp]
3       [3769, 3769, 3769, 3769] [the, dt, b-np]
...