apache beam でパイプラインでデータ加工処理を行ってみました。
以下のプログラムでは jsonl のデータを読み込み、price を 2倍する処理と、price に 1000 を加算する処理をパイプラインでつないで実行しています。
■プログラム
■入力データ
■実行結果
以下のプログラムでは jsonl のデータを読み込み、price を 2倍する処理と、price に 1000 を加算する処理をパイプラインでつないで実行しています。
■プログラム
import sys import os import json import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import StandardOptions from apache_beam.options.pipeline_options import DirectOptions input_file = 'data/test.jsonl' opts = PipelineOptions() opts.view_as(StandardOptions).runner = 'DirectRunner' opts.view_as(DirectOptions).direct_running_mode = 'multi_processing' opts.view_as(DirectOptions).direct_num_workers = 4 def price_mul2(jsonl_str): obj = json.loads(jsonl_str) obj['price'] *= 2 jsonl_str = json.dumps(obj, ensure_ascii=False) return jsonl_str def price_add1k(jsonl_str): obj = json.loads(jsonl_str) obj['price'] += 1000 jsonl_str = json.dumps(obj, ensure_ascii=False) return jsonl_str def print_obj(jsonl_str): print(jsonl_str) return jsonl_str with beam.Pipeline(options=opts) as p: (p | beam.io.ReadFromText(input_file) | beam.Map(price_mul2) | beam.Map(price_add1k) | beam.Map(print_obj) )
■入力データ
{"id": "id_001", "title": "item 001", "price": 101} {"id": "id_002", "title": "item 002", "price": 102} {"id": "id_003", "title": "item 003", "price": 103} {"id": "id_004", "title": "item 004", "price": 104} {"id": "id_005", "title": "item 005", "price": 105} {"id": "id_006", "title": "item 006", "price": 106} {"id": "id_007", "title": "item 007", "price": 107} {"id": "id_008", "title": "item 008", "price": 108} {"id": "id_009", "title": "item 009", "price": 109} {"id": "id_010", "title": "item 010", "price": 110} {"id": "id_011", "title": "item 011", "price": 111} {"id": "id_012", "title": "item 012", "price": 112} {"id": "id_013", "title": "item 013", "price": 113} {"id": "id_014", "title": "item 014", "price": 114} {"id": "id_015", "title": "item 015", "price": 115} {"id": "id_016", "title": "item 016", "price": 116} {"id": "id_017", "title": "item 017", "price": 117} {"id": "id_018", "title": "item 018", "price": 118} {"id": "id_019", "title": "item 019", "price": 119} {"id": "id_020", "title": "item 020", "price": 120}
■実行結果
{"id": "id_001", "title": "item 001", "price": 1202} {"id": "id_002", "title": "item 002", "price": 1204} {"id": "id_003", "title": "item 003", "price": 1206} {"id": "id_004", "title": "item 004", "price": 1208} {"id": "id_005", "title": "item 005", "price": 1210} {"id": "id_006", "title": "item 006", "price": 1212} {"id": "id_007", "title": "item 007", "price": 1214} {"id": "id_008", "title": "item 008", "price": 1216} {"id": "id_009", "title": "item 009", "price": 1218} {"id": "id_010", "title": "item 010", "price": 1220} {"id": "id_011", "title": "item 011", "price": 1222} {"id": "id_012", "title": "item 012", "price": 1224} {"id": "id_013", "title": "item 013", "price": 1226} {"id": "id_014", "title": "item 014", "price": 1228} {"id": "id_015", "title": "item 015", "price": 1230} {"id": "id_016", "title": "item 016", "price": 1232} {"id": "id_017", "title": "item 017", "price": 1234} {"id": "id_018", "title": "item 018", "price": 1236} {"id": "id_019", "title": "item 019", "price": 1238} {"id": "id_020", "title": "item 020", "price": 1240}