python での Apache Beam によるデータ処理のサンプルプログラム。
■プログラム1
リスト内の文字列の出現頻度をカウントするプログラムです。
Create([...]) で文字列のリストを生成します。
Map(lambda str: (str, 1)) で、各文字列を出現頻度1回として、データを生成します。
CombinePerKey(sum) では、タプルの先頭要素(=文字列)をキーとして、同じキーの出現頻度を合計します。
Map(print) で各文字列毎に集計結果を出力します。
■実行結果1
■プログラム2
プログラム1 と同様に文字列の出現数をカウントしますが、入出力がファイルになっています。
ファイルからの読み込みには io.ReadFromText() を使用します。
ファイル名に * を含めることができ、複数のファイルを処理対象にすることができます。
ファイルへの出力は io.WriteToText() を使用します。
出力ファイル名には -mmmmm-of-nnnnn の形式で全nnnnnファイルの通し番号が付与されます。
■入力ファイル
data/input_1.txt:
data/input_2.txt:
■出力ファイル
data/output.txt-00000-of-00001
■プログラム1
import apache_beam as beam with beam.Pipeline() as p: (p | beam.Create(['abc', 'def', 'ghi', 'abc', 'def', 'abc']) | beam.Map(lambda str: (str, 1)) | beam.CombinePerKey(sum) | beam.Map(print) )
リスト内の文字列の出現頻度をカウントするプログラムです。
Create([...]) で文字列のリストを生成します。
Map(lambda str: (str, 1)) で、各文字列を出現頻度1回として、データを生成します。
CombinePerKey(sum) では、タプルの先頭要素(=文字列)をキーとして、同じキーの出現頻度を合計します。
Map(print) で各文字列毎に集計結果を出力します。
■実行結果1
('abc', 3) ('def', 2) ('ghi', 1)
■プログラム2
import apache_beam as beam import re input = 'data/input_*.txt' output = 'data/output.txt' with beam.Pipeline() as p: (p | beam.io.ReadFromText(input) | beam.FlatMap(lambda line: re.findall(r'[a-zA-Z0-9]+', line)) | beam.Map(lambda str: (str, 1)) | beam.CombinePerKey(sum) | beam.io.WriteToText(output) )
プログラム1 と同様に文字列の出現数をカウントしますが、入出力がファイルになっています。
ファイルからの読み込みには io.ReadFromText() を使用します。
ファイル名に * を含めることができ、複数のファイルを処理対象にすることができます。
ファイルへの出力は io.WriteToText() を使用します。
出力ファイル名には -mmmmm-of-nnnnn の形式で全nnnnnファイルの通し番号が付与されます。
■入力ファイル
data/input_1.txt:
w1 w1 w2 w1 w2 w3 w1 w2 w3 w4 w1 w2 w3 w4 w5
data/input_2.txt:
w11 w11 w12 w11 w12 w13 w11 w12 w13 w14 w11 w12 w13 w14 w15
■出力ファイル
data/output.txt-00000-of-00001
('w1', 5) ('w2', 4) ('w3', 3) ('w4', 2) ('w5', 1) ('w11', 5) ('w12', 4) ('w13', 3) ('w14', 2) ('w15', 1)