dak ブログ

python、rubyなどのプログラミング、MySQL、サーバーの設定などの備忘録。レゴの写真も。

apache beam によるパイプラインでのデータ加工処理の例

2022-05-09 23:26:48 | python
apache beam でパイプラインでデータ加工処理を行ってみました。
以下のプログラムでは jsonl のデータを読み込み、price を 2倍する処理と、price に 1000 を加算する処理をパイプラインでつないで実行しています。
■プログラム
import sys
import os
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import DirectOptions

input_file = 'data/test.jsonl'

opts = PipelineOptions()
opts.view_as(StandardOptions).runner = 'DirectRunner'
opts.view_as(DirectOptions).direct_running_mode = 'multi_processing'
opts.view_as(DirectOptions).direct_num_workers = 4

def price_mul2(jsonl_str):
    obj = json.loads(jsonl_str)
    obj['price'] *= 2
    jsonl_str = json.dumps(obj, ensure_ascii=False)
    return jsonl_str

def price_add1k(jsonl_str):
    obj = json.loads(jsonl_str)
    obj['price'] += 1000
    jsonl_str = json.dumps(obj, ensure_ascii=False)
    return jsonl_str

def print_obj(jsonl_str):
    print(jsonl_str)
    return jsonl_str

with beam.Pipeline(options=opts) as p:
    (p
     | beam.io.ReadFromText(input_file)
     | beam.Map(price_mul2)
     | beam.Map(price_add1k)
     | beam.Map(print_obj)
    )

■入力データ
{"id": "id_001", "title": "item 001", "price": 101}
{"id": "id_002", "title": "item 002", "price": 102}
{"id": "id_003", "title": "item 003", "price": 103}
{"id": "id_004", "title": "item 004", "price": 104}
{"id": "id_005", "title": "item 005", "price": 105}
{"id": "id_006", "title": "item 006", "price": 106}
{"id": "id_007", "title": "item 007", "price": 107}
{"id": "id_008", "title": "item 008", "price": 108}
{"id": "id_009", "title": "item 009", "price": 109}
{"id": "id_010", "title": "item 010", "price": 110}
{"id": "id_011", "title": "item 011", "price": 111}
{"id": "id_012", "title": "item 012", "price": 112}
{"id": "id_013", "title": "item 013", "price": 113}
{"id": "id_014", "title": "item 014", "price": 114}
{"id": "id_015", "title": "item 015", "price": 115}
{"id": "id_016", "title": "item 016", "price": 116}
{"id": "id_017", "title": "item 017", "price": 117}
{"id": "id_018", "title": "item 018", "price": 118}
{"id": "id_019", "title": "item 019", "price": 119}
{"id": "id_020", "title": "item 020", "price": 120}

■実行結果
{"id": "id_001", "title": "item 001", "price": 1202}
{"id": "id_002", "title": "item 002", "price": 1204}
{"id": "id_003", "title": "item 003", "price": 1206}
{"id": "id_004", "title": "item 004", "price": 1208}
{"id": "id_005", "title": "item 005", "price": 1210}
{"id": "id_006", "title": "item 006", "price": 1212}
{"id": "id_007", "title": "item 007", "price": 1214}
{"id": "id_008", "title": "item 008", "price": 1216}
{"id": "id_009", "title": "item 009", "price": 1218}
{"id": "id_010", "title": "item 010", "price": 1220}
{"id": "id_011", "title": "item 011", "price": 1222}
{"id": "id_012", "title": "item 012", "price": 1224}
{"id": "id_013", "title": "item 013", "price": 1226}
{"id": "id_014", "title": "item 014", "price": 1228}
{"id": "id_015", "title": "item 015", "price": 1230}
{"id": "id_016", "title": "item 016", "price": 1232}
{"id": "id_017", "title": "item 017", "price": 1234}
{"id": "id_018", "title": "item 018", "price": 1236}
{"id": "id_019", "title": "item 019", "price": 1238}
{"id": "id_020", "title": "item 020", "price": 1240}