dak ブログ

python、rubyなどのプログラミング、MySQL、サーバーの設定などの備忘録。レゴの写真も。

python で Apache Beam を使ってみた

2022-05-01 16:54:00 | python
python での Apache Beam によるデータ処理のサンプルプログラム。

■プログラム1
import apache_beam as beam

with beam.Pipeline() as p:
    (p
     | beam.Create(['abc', 'def', 'ghi', 'abc', 'def', 'abc'])
     | beam.Map(lambda str: (str, 1))
     | beam.CombinePerKey(sum)
     | beam.Map(print)
    )

リスト内の文字列の出現頻度をカウントするプログラムです。
Create([...]) で文字列のリストを生成します。
Map(lambda str: (str, 1)) で、各文字列を出現頻度1回として、データを生成します。
CombinePerKey(sum) では、タプルの先頭要素(=文字列)をキーとして、同じキーの出現頻度を合計します。
Map(print) で各文字列毎に集計結果を出力します。

■実行結果1
('abc', 3)
('def', 2)
('ghi', 1)

■プログラム2
import apache_beam as beam
import re

input = 'data/input_*.txt'
output = 'data/output.txt'

with beam.Pipeline() as p:
    (p
     | beam.io.ReadFromText(input)
     | beam.FlatMap(lambda line: re.findall(r'[a-zA-Z0-9]+', line))
     | beam.Map(lambda str: (str, 1))
     | beam.CombinePerKey(sum)
     | beam.io.WriteToText(output)
    )

プログラム1 と同様に文字列の出現数をカウントしますが、入出力がファイルになっています。
ファイルからの読み込みには io.ReadFromText() を使用します。
ファイル名に * を含めることができ、複数のファイルを処理対象にすることができます。
ファイルへの出力は io.WriteToText() を使用します。
出力ファイル名には -mmmmm-of-nnnnn の形式で全nnnnnファイルの通し番号が付与されます。

■入力ファイル
data/input_1.txt:
w1
w1 w2
w1 w2 w3
w1 w2 w3 w4
w1 w2 w3 w4 w5

data/input_2.txt:
w11
w11 w12
w11 w12 w13
w11 w12 w13 w14
w11 w12 w13 w14 w15

■出力ファイル
data/output.txt-00000-of-00001
('w1', 5)
('w2', 4)
('w3', 3)
('w4', 2)
('w5', 1)
('w11', 5)
('w12', 4)
('w13', 3)
('w14', 2)
('w15', 1)