dak ブログ

python、rubyなどのプログラミング、MySQL、サーバーの設定などの備忘録。レゴの写真も。

unittest での Flask のテスト

2022-07-29 21:21:13 | python
unittest で Flask の request を含むプログラムを unittest でテストする方法のメモ。
プログラムの構成は以下の通りです。
・main.py: Flask のアプリケーション
・app_controller.py: リクエスト処理の本体
・test_app_controller.py: app_controller.py のテストプログラム

test_app_controller.py が unittest を用いた app_controller.py のテストプログラムとなっています。

■main.py
Flask アプリのメインで、リクエストは AppController が処理します。
from flask import Flask, request
from app_controller import AppController

app = Flask(__name__)
gctrl = AppController()

@app.route('/')
def api_sample():
    res_obj, status = gctrl.process_request()
    return res_obj, status

if __name__ == '__main__':
    app.run('0.0.0.0', port=8080)


■app_controller.py
リクエスト処理の本体。
リクエストパラメータの str1 と str2 を連結した文字列をレスポンスのオブジェクトに入れて返却します。
from flask import request

class AppController:
    def create_response(self, str1, str2):
        resp_obj = {
            'message': f"{str1}_{str2}",
        }
        return resp_obj

    def process_request(self):
        str1 = request.args.get('str1', '')
        str2 = request.args.get('str2', '')
        resp_obj = self.create_response(str1, str2)
        status = 200
        return resp_obj, status


■Flask アプリを起動
$ python main.py
...
 * Running on http://127.0.0.1:8080
 * Running on http://192.168.8.129:8080 (Press CTRL+C to quit)


■curl でレスポンスを確認
$ curl 'http://localhost:8080/?str1=abc&str2=def'
{"message":"abc_def"}


■unittest のプログラム(test_app_controller.py)
process_request() は request を利用しているため、app = Flask() で Flask を起動し、
app.test_request_context() でテスト用のリクエストの設定を行います。
import unittest
from flask import Flask, request
from app_controller import AppController

class TestAppController(unittest.TestCase):
    def test_create_response(self):
	ctrl = AppController()
        resp_obj = ctrl.create_response('abc', 'def')
        self.assertEqual(resp_obj['message'], 'abc_def')

    def test_process_request(self):
        with app.test_request_context('/?str1=abc&str2=def') as req:
            ctrl = AppController()
            resp_obj, status = ctrl.process_request()
            self.assertEqual(resp_obj['message'], 'abc_def')
            self.assertEqual(status, 200)

if __name__ == '__main__':
    app = Flask(__name__)
    unittest.main()


■unittest の実行
$ python test_app_controller.py
..
----------------------------------------------------------------------
Ran 2 tests in 0.006s

OK



<img src="data:..."> での画像指定

2022-06-18 23:35:37 | python
<img src="data:..."> で画像を指定する方法のメモ。
python で <img src="data:..."> で画像を指定するプログラムを作成してみました。

■サーバ
import sys
import requests
import io
import base64
from PIL import Image
from flask import Flask, jsonify

app = Flask(__name__)

def image_base64(bytes):
    inst = io.BytesIO(bytes)
    img = Image.open(inst).convert('RGB')
    inst.close()

    outst = io.BytesIO()
    img.save(outst, format='PNG')
    img_data = outst.getvalue()
    outst.close()

    img_base64 = base64.b64encode(img_data).decode('utf-8')

    return img_base64

@app.route('/')
def view_html():
    img_url = 'https://www.xgoo.jp/top2018/public/img/img_socialmark.png'
    res = requests.get(img_url)

    img_base64 = image_base64(res.content)

    html = f'''
<html>
<head>
<meta http-equv="Content-Type" content="text/html; charset=utf-8">
</head>
<body>
<img src="data:image/png;base64,{img_base64}">
</body>
</html>
'''
    return html


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000)


■html
上記のサーバに "http://localhost:8000/" でアクセスした際のレスポンスは以下のようになります。
<html>
<head>
<meta http-equv=&quote;Content-Type&quote; content=&quote;text/html; charset=utf-8&quote;>
</head>
<body>
<img src="data:image/png;base64,...">
</body>
</html>


mecab-python3 のインストール

2022-06-14 22:08:50 | python
CentOS8 で mecab-python3 をインストールする方法のメモ

■mecab のインストール
sudo yum install mecab mecab-ipadic


■mecab-python3 のインストール
現時点で最新の 1.0.5 だとインストールに失敗するため、1.0.4 をインストールします。
pip install mecab-python3==1.0.4


■mecabrc
sudo ln -s /etc/mecabrc /usr/local/etc/mecabrc


gunicorn + flask でマルチプロセス

2022-06-10 21:51:17 | python
gunicorn + flask でマルチプロセスのウェブアプリケーションを作成する方法のメモ。

■インストール
pip install gunicorn
pip install flask


■プログラム構成
test.py: ウェブアプリケーション本体
settings.py: gunicorn の設定

■test.py
3秒スリープして、サーバのプロセスIDを含む文字列を返却します。
import os
import time
from flask import Flask

app = Flask(__name__)

@app.route('/test')
def view_test():
    time.sleep(3)
    pid = os.getpid()
    msg = f'''hello world ({pid})\n'''
    return msg


■settings.py
以下は 4 ワーカー(プロセス)でアプリケーションを実行する例です。
bind = '127.0.0.1:8000'
proc_name = 'test'
workers = 4


■ウェブアプリケーション実行
gunicorn test:app -c settings.py

実行すると、以下のように複数のプロセスIDで worker が
[4942] [INFO] Booting worker with pid: 4942
[4943] [INFO] Booting worker with pid: 4943
[4944] [INFO] Booting worker with pid: 4944
[4945] [INFO] Booting worker with pid: 4945


■サーバにアクセス
curl コマンドを & をつけて4回実行してみます。
curl 'http://localhost:8000/test'&
curl 'http://localhost:8000/test'&
curl 'http://localhost:8000/test'&
curl 'http://localhost:8000/test'&

以下のように複数のプロセスからレスポンスがあったことが確認できます。
hello world (4944)
hello world (4943)
hello world (4942)
hello world (4945)


flask でのパラメータ取得

2022-06-03 23:41:00 | python
flask でのパラメータ取得方法のメモ。

GET、POST でのパラメータの取得と、ファイルの受信方法をまとめてみました。

■GET
プログラム
import sys
from flask import Flask, request

app = Flask(__name__)

@app.route('/', methods=["GET"])
def view_get():
    var1 = request.args.get('var1', '')
    return f'''var1={var1}\n'''

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)


リクエスト
curl -X GET 'http://localhost:8080/?var1=foo'


レスポンス
var1=foo


■POST
プログラム
import sys
from flask import Flask, request

app = Flask(__name__)

@app.route('/', methods=["GET", "POST"])
def view_get():
    if request.method == "GET":
        val1 = request.args.get('var1', '')
        val2 = request.args.get('var2', '')
    elif request.method == "POST":
        val1 = request.form.get('var1', '')
        val2 = request.form.get('var2', '')

    return f'''var1={val1}\nvar2={val2}\n'''

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)


リクエスト
curl 'http://localhost:8080/' \
     -X POST \
     -F 'var1=foo' \
     -F 'var2=bar'


レスポンス
var1=foo
var2=bar


■ファイルの受信
プログラム
import sys
from flask import Flask, request

app = Flask(__name__)

@app.route('/', methods=["POST"])
def view_get():
    val1 = request.form.get('var1', '')
    val2 = request.form.get('var2', '')
    file1 = request.files.get('file1')

    bytes = file1.stream.read()
    str = bytes.decode('utf-8')

    return f'''var1={val1}\nvar2={val2}\nfile1={str}\n'''

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8080)


リクエスト
curl 'http://localhost:8080/' \
     -X POST \
     -F 'var1=foo' \
     -F 'var2=bar' \
     -F 'file1=@file.txt'


レスポンス
var1=foo
var2=bar
file1=abc


python で gzip ファイルを読み込む方法

2022-05-26 21:29:37 | python
python で gzip されたテキストファイルを読み込む方法のメモ。

■gzip.open() のパラメータでテキスト('rt')、エンコーディングを指定
import gzip

with gzip.open('test.txt.gz', 'rt', 'utf-8') as inst:
    for line in inst:
        print(line, end='')

■各行をバイト列として読み込み
import gzip

with gzip.open('test.txt.gz') as inst:
    for line in inst:
        line = line.decode('utf-8')
        print(line, end='')

■stdin から読み込み
import sys
import gzip

with gzip.open(sys.stdin.buffer, 'rt', 'utf-8') as inst:
    for line in inst:
        print(line, end='')


python でのマルチプロセスでのデータ処理

2022-05-16 21:00:15 | python
python でマルチプロセスでデータを処理するプログラムです。
mp = MultiProcessUtil(num_proc, func) で func を実行する num_proc の子プロセスを生成し、mp.start() で子プロセスを実行します。

親プロセスから子プロセスへは、mp.childs[i].p2c で送信します。
子プロセスの関数 func は 2 つの引数 (cinfo, args) をとり、cinfo.p2c で親プロセスから送信された情報を読み込みます。

親プロセスが mp.childs[i].p2c を close し、子プロセス側でデータの読み込みが完了すると、子プロセス側の読み込みループが終了します。

親プロセスは mp.join() を実行すると、子プロセスが終了するまで待ちます。
■ライブラリ
import os

class ChildInfo:
    def __init__(self):
        self.id = -1
        self.pid = None
        self.func = None
        self.args = None
        self.status = None
        self.p2c = None
        self.c2p = None
        self.p2c_r = None
        self.p2c_w = None
        self.c2p_r = None
        self.c2p_w = None

class MultiProcessUtil:
    def __init__(self, num_childs, func, args=None):
        self.num_childs = num_childs
        self.childs = [None] * num_childs
        self.func = func
        self.args = args  # len(args) == num_childs
        if self.args is None:
            self.args = [None] * self.num_childs

        for i in range(0, self.num_childs):
            cinfo = ChildInfo()
            cinfo.id = i
            cinfo.func = self.func
            cinfo.args = self.args[i]
            cinfo.p2c_r, cinfo.p2c_w = os.pipe()
            cinfo.c2p_r, cinfo.c2p_w = os.pipe()
            self.childs[i] = cinfo

    def start(self):
        for i in range(0, self.num_childs):
            cinfo = self.childs[i]

            pid = os.fork()
            if pid == 0:
                # child
                # 不要な fd を close
                for j in range(0, i):
                    os.close(self.childs[j].p2c_w)
                    os.close(self.childs[j].c2p_r)

                os.close(cinfo.p2c_w)
                cinfo.p2c_w = None
                os.close(cinfo.c2p_r)
                cinfo.c2p_r = None
                cinfo.p2c = os.fdopen(cinfo.p2c_r, 'r')
                cinfo.c2p = os.fdopen(cinfo.c2p_w, 'w')
                try:
                    res = cinfo.func(cinfo, cinfo.args)
                except Exception as e:
                    res = 1
                exit(res)
            else:
                # parent
                # 不要な fd を close
                cinfo.pid = pid
                os.close(cinfo.p2c_r)
                cinfo.p2c_r = None
                os.close(cinfo.c2p_w)
                cinfo.c2p_w = None
                cinfo.p2c = os.fdopen(cinfo.p2c_w, 'w')
                cinfo.c2p = os.fdopen(cinfo.c2p_r, 'r')

        return self.childs

    def join(self):
        for child in self.childs:
            pid, status = os.wait()
            child.status = status

■プログラム
import sys
import os
import re
from multi_process_util import MultiProcessUtil

def proc1(cinfo, args=None):
    print('run: %d' % (os.getpid()))

    for line in cinfo.p2c:
        line = re.sub('[\r\n]+$', '', line)
        print('[%d]: [%s]' % (os.getpid(), line))

    return 0

def main():
    num_procs = 2
    mp = MultiProcessUtil(num_procs, proc1)
    mp.start()

    line_no = -1
    for line in sys.stdin:
        line_no += 1
        c = line_no % num_procs
        #print('c: %d' % (c))
        #print(mp.childs[c].json())
        mp.childs[c].p2c.write(line)

    for child in mp.childs:
        child.p2c.close()

    mp.join()
    return 0

if __name__ == '__main__':
    res = main()
    exit(res)

■入力ファイル
abc
def
ghi
jkl
mno
pqr
stu
vwx
012
345
678

■実行結果
run: 8223
[8223]: [def]
run: 8222
[8222]: [abc]
[8223]: [jkl]
[8223]: [pqr]
[8223]: [vwx]
[8223]: [345]
[8222]: [ghi]
[8222]: [mno]
[8222]: [stu]
[8222]: [012]
[8222]: [678]


apache beam のマルチプロセスの動作確認

2022-05-10 20:45:23 | python
apache beam でマルチプロセスで実行し、各処理でプロセスIDを確認します。
ここでは以下の形式のデータを処理します。
Confidence NN B-NP
in IN B-PP
the DT B-NP

以下のプログラムでは、ParDo() で WordReader を並列化し、以降の処理をシリアルに実行します。
各処理でプロセスIDを取得し、最後に行番号、プロセスID、データを出力します。

■プログラム
import sys
import os
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import DirectOptions

input_file = '~/nltk_data/corpora/conll2000/train.txt'

class WordReader(beam.DoFn):
    def __init__(self):
        self.line_no = 0

    def process(self, line):
        self.line_no += 1
        items = line.split(' ')
        if len(items) < 3:
            return
        obj = {
            'line_no': self.line_no,
            'form': items[0],
            'pos': items[1],
            'tag': items[2],
            'pids': [os.getpid()],
        }
        yield obj

opts = PipelineOptions()
opts.view_as(StandardOptions).runner = 'DirectRunner'
opts.view_as(DirectOptions).direct_running_mode = 'multi_processing'
opts.view_as(DirectOptions).direct_num_workers = 2

def form_lower(obj):
    obj['form'] = obj['form'].lower()
    obj['pids'].append(os.getpid())
    return obj

def pos_lower(obj):
    obj['pos'] = obj['pos'].lower()
    obj['pids'].append(os.getpid())
    return obj

def tag_lower(obj):
    obj['tag'] = obj['tag'].lower()
    obj['pids'].append(os.getpid())
    return obj

def print_obj(obj):
    print("%d\t%s [%s, %s, %s]" %
          (obj['line_no'],
           obj['pids'],
           obj['form'],
           obj['pos'],
           obj['tag']
           ))
    return obj

with beam.Pipeline(options=opts) as p:
    (p
     | beam.io.ReadFromText(input_file)
     | beam.ParDo(WordReader())
     | beam.Map(form_lower)
     | beam.Map(pos_lower)
     | beam.Map(tag_lower)
     | beam.Map(print_obj)
    )
</pre>
■出力結果
WordReader が並列で実行されるため、行番号がプロセス毎に1から始まっています。
1       [3768, 3768, 3768, 3768] [its, prp$, b-np]
2       [3768, 3768, 3768, 3768] [existing, vbg, i-np]
3       [3768, 3768, 3768, 3768] [authorization, nn, i-np]
...
1       [3769, 3769, 3769, 3769] [confidence, nn, b-np]
2       [3769, 3769, 3769, 3769] [in, in, b-pp]
3       [3769, 3769, 3769, 3769] [the, dt, b-np]
...


apache beam によるパイプラインでのデータ加工処理の例

2022-05-09 23:26:48 | python
apache beam でパイプラインでデータ加工処理を行ってみました。
以下のプログラムでは jsonl のデータを読み込み、price を 2倍する処理と、price に 1000 を加算する処理をパイプラインでつないで実行しています。
■プログラム
import sys
import os
import json
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import DirectOptions

input_file = 'data/test.jsonl'

opts = PipelineOptions()
opts.view_as(StandardOptions).runner = 'DirectRunner'
opts.view_as(DirectOptions).direct_running_mode = 'multi_processing'
opts.view_as(DirectOptions).direct_num_workers = 4

def price_mul2(jsonl_str):
    obj = json.loads(jsonl_str)
    obj['price'] *= 2
    jsonl_str = json.dumps(obj, ensure_ascii=False)
    return jsonl_str

def price_add1k(jsonl_str):
    obj = json.loads(jsonl_str)
    obj['price'] += 1000
    jsonl_str = json.dumps(obj, ensure_ascii=False)
    return jsonl_str

def print_obj(jsonl_str):
    print(jsonl_str)
    return jsonl_str

with beam.Pipeline(options=opts) as p:
    (p
     | beam.io.ReadFromText(input_file)
     | beam.Map(price_mul2)
     | beam.Map(price_add1k)
     | beam.Map(print_obj)
    )

■入力データ
{"id": "id_001", "title": "item 001", "price": 101}
{"id": "id_002", "title": "item 002", "price": 102}
{"id": "id_003", "title": "item 003", "price": 103}
{"id": "id_004", "title": "item 004", "price": 104}
{"id": "id_005", "title": "item 005", "price": 105}
{"id": "id_006", "title": "item 006", "price": 106}
{"id": "id_007", "title": "item 007", "price": 107}
{"id": "id_008", "title": "item 008", "price": 108}
{"id": "id_009", "title": "item 009", "price": 109}
{"id": "id_010", "title": "item 010", "price": 110}
{"id": "id_011", "title": "item 011", "price": 111}
{"id": "id_012", "title": "item 012", "price": 112}
{"id": "id_013", "title": "item 013", "price": 113}
{"id": "id_014", "title": "item 014", "price": 114}
{"id": "id_015", "title": "item 015", "price": 115}
{"id": "id_016", "title": "item 016", "price": 116}
{"id": "id_017", "title": "item 017", "price": 117}
{"id": "id_018", "title": "item 018", "price": 118}
{"id": "id_019", "title": "item 019", "price": 119}
{"id": "id_020", "title": "item 020", "price": 120}

■実行結果
{"id": "id_001", "title": "item 001", "price": 1202}
{"id": "id_002", "title": "item 002", "price": 1204}
{"id": "id_003", "title": "item 003", "price": 1206}
{"id": "id_004", "title": "item 004", "price": 1208}
{"id": "id_005", "title": "item 005", "price": 1210}
{"id": "id_006", "title": "item 006", "price": 1212}
{"id": "id_007", "title": "item 007", "price": 1214}
{"id": "id_008", "title": "item 008", "price": 1216}
{"id": "id_009", "title": "item 009", "price": 1218}
{"id": "id_010", "title": "item 010", "price": 1220}
{"id": "id_011", "title": "item 011", "price": 1222}
{"id": "id_012", "title": "item 012", "price": 1224}
{"id": "id_013", "title": "item 013", "price": 1226}
{"id": "id_014", "title": "item 014", "price": 1228}
{"id": "id_015", "title": "item 015", "price": 1230}
{"id": "id_016", "title": "item 016", "price": 1232}
{"id": "id_017", "title": "item 017", "price": 1234}
{"id": "id_018", "title": "item 018", "price": 1236}
{"id": "id_019", "title": "item 019", "price": 1238}
{"id": "id_020", "title": "item 020", "price": 1240}


python で XML を SAX でパーズ

2022-05-06 23:33:31 | python
python で XML を SAX でパーズして、JSONL で出力します。
■XML
<?xml version=&quote;1.0&quote;?>
<items>
  <item id=&quote;item1&quote;>
    <title>商品1</title>
    <price>100</price>
  </item>
  <item id=&quote;item2&quote;>
    <title>商品2</title>
    <price>500</price>
  </item>
  <item id=&quote;item3&quote;>
    <title>商品3</title>
    <price>1000</price>
  </item>
</items>

■プログラム
startElement() には開始タグを読み取った際の処理を記述し、
endElement() には終了タグを読み取った際の処理を記述します。
そして、characters() にはタグ内の文字列を読み取った際の処理を記述します。
import sys
import json
import xml.sax
import xml.sax.handler

class XmlToJsonHandler(xml.sax.handler.ContentHandler):
    def __init__(self):
        self.tags = []
        self.item = None

    def create_item(self):
        item = {
            'id': '',
            'title': '',
            'price': '',
        }
        return item

    def create_tag(self, name, attrs):
        tag = {
            'name': name,
            'attrs': attrs,
        }
        return tag

    def startElement(self, name, attrs):
        tag = self.create_tag(name, attrs)
        self.tags.append(tag)

        if name == 'item':
            self.item = self.create_item()
            self.item['id'] = attrs.get('id')

    def endElement(self, name):
        self.tags.pop()
        if name == 'item':
            print(json.dumps(self.item, ensure_ascii=False))

    def characters(self, text):
        name = self.tags[-1]['name']
        if name == 'title':
            self.item[name] = text
        elif name == 'price':
            self.item[name] = int(text)

def main():
    parser = xml.sax.make_parser()
    parser.setContentHandler(XmlToJsonHandler())
    parser.parse(sys.stdin)
    return 0

if __name__== '__main__':
    res = main()
    exit(res)

■実行結果
上記の XML は以下の JSONL に変換されます。
{"id": "item1", "title": "商品1", "price": 100}
{"id": "item2", "title": "商品2", "price": 500}
{"id": "item3", "title": "商品3", "price": 1000}

apache beam でマルチプロセスで実行

2022-05-02 21:39:22 | python
apache beam でマルチプロセスで実行

apache beam で python でマルチプロセスで実行する方法のメモ。
簡易にマルチプロセスを実現するため、ここでは DirectRunner を使用します。
Pipeline のオプションに以下を指定することで、マルチプロセスでプログラムを実行することができます。
 runner = 'DirectRunner'
 direct_running_mode = 'multi_processing'
 direct_num_workers = {ワーカー数}

■入力データ
今回読み込み対象としているファイルは以下のようなデータで、
単語の出現をカウントします。
Confidence NN B-NP
in IN B-PP
the DT B-NP
pound NN I-NP
is VBZ B-VP

■プログラム
import sys
import os
import re
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import DirectOptions

input_file = '~/nltk_data/corpora/conll2000/train.txt'
output_file = 'data/output.txt'

class ParseLine(beam.DoFn):
    def __init__(self):
        self.re_chomp = re.compile('[\r\n]+$')

    def process(self, line):
        line = self.re_chomp.sub('', line)
        items = line.split(' ')
        if len(items) < 3:
            return

        kw = items[0].lower()
        yield (kw, 1)

opts = PipelineOptions()
opts.view_as(StandardOptions).runner = 'DirectRunner'
opts.view_as(DirectOptions).direct_running_mode = 'multi_processing'
opts.view_as(DirectOptions).direct_num_workers = 2

with beam.Pipeline(options=opts) as p:
    (p
     | beam.io.ReadFromText(input_file)
     | beam.ParDo(ParseLine())
     | beam.CombinePerKey(sum)
     | beam.io.WriteToText(output_file)
    )
</pre>
■実行結果
('existing', 13)
('to', 5080)
('as', 972)
...


python で Apache Beam を使ってみた

2022-05-01 16:54:00 | python
python での Apache Beam によるデータ処理のサンプルプログラム。

■プログラム1
import apache_beam as beam

with beam.Pipeline() as p:
    (p
     | beam.Create(['abc', 'def', 'ghi', 'abc', 'def', 'abc'])
     | beam.Map(lambda str: (str, 1))
     | beam.CombinePerKey(sum)
     | beam.Map(print)
    )

リスト内の文字列の出現頻度をカウントするプログラムです。
Create([...]) で文字列のリストを生成します。
Map(lambda str: (str, 1)) で、各文字列を出現頻度1回として、データを生成します。
CombinePerKey(sum) では、タプルの先頭要素(=文字列)をキーとして、同じキーの出現頻度を合計します。
Map(print) で各文字列毎に集計結果を出力します。

■実行結果1
('abc', 3)
('def', 2)
('ghi', 1)

■プログラム2
import apache_beam as beam
import re

input = 'data/input_*.txt'
output = 'data/output.txt'

with beam.Pipeline() as p:
    (p
     | beam.io.ReadFromText(input)
     | beam.FlatMap(lambda line: re.findall(r'[a-zA-Z0-9]+', line))
     | beam.Map(lambda str: (str, 1))
     | beam.CombinePerKey(sum)
     | beam.io.WriteToText(output)
    )

プログラム1 と同様に文字列の出現数をカウントしますが、入出力がファイルになっています。
ファイルからの読み込みには io.ReadFromText() を使用します。
ファイル名に * を含めることができ、複数のファイルを処理対象にすることができます。
ファイルへの出力は io.WriteToText() を使用します。
出力ファイル名には -mmmmm-of-nnnnn の形式で全nnnnnファイルの通し番号が付与されます。

■入力ファイル
data/input_1.txt:
w1
w1 w2
w1 w2 w3
w1 w2 w3 w4
w1 w2 w3 w4 w5

data/input_2.txt:
w11
w11 w12
w11 w12 w13
w11 w12 w13 w14
w11 w12 w13 w14 w15

■出力ファイル
data/output.txt-00000-of-00001
('w1', 5)
('w2', 4)
('w3', 3)
('w4', 2)
('w5', 1)
('w11', 5)
('w12', 4)
('w13', 3)
('w14', 2)
('w15', 1)


python での正規表現によるひらがな、カタカナ、漢字の判定

2022-04-01 14:25:15 | python
python での正規表現によるひらがな、カタカナ、漢字の判定方法のメモ。

ひらがな、カタカナはコードポイントの範囲指定でチェックすることができます。
 ひらがな:u+3040 - u+309F
 カタカナ:u+30A0 - u+30FF

また、regex では Script=Hiragana/Katakana/Han を指定することで
ひらがな、カタカナ、漢字の判定を行うことができます。
import sys
import re
import regex

# ひらがな u+3040 - u+309F
str = 'あいうえお'
res = re.match('^[\u3040-\u309F]+$', str)
print(res)
-->
<re.Match object; span=(0, 5), match='あいうえお'>

res = regex.match('^\p{Script=Hiragana}+$', str)
print(res)
-->
<regex.Match object; span=(0, 5), match='あいうえお'>

# カタカナ u+30A0 - u+30FF
str = 'アイウエオ'
res = re.match('^[\u30A0-\u30FF]+$', str)
print(res)
-->
<re.Match object; span=(0, 5), match='アイウエオ'>

res = regex.match('^\p{Script=Katakana}+$', str)
print(res)
-->
<regex.Match object; span=(0, 5), match='アイウエオ'>

# 漢字
str = '漢字'
res = regex.match('^\p{Script=Han}+$', str)
print(res)
-->
<regex.Match object; span=(0, 2), match='漢字'>

# ひらがな+カタカナ
str = 'ひらがなカタカナ'
res = re.match('^[\u3040-\u309F\u30A0-\u30FF]+$', str)
print(res)
-->
<re.Match object; span=(0, 8), match='ひらがなカタカナ'>

# ひらがな+カタカナ+漢字
str = 'ひらがなカタカナ漢字'
res = regex.match('^(?:\p{Script=Hiragana}|\p{Script=Katakana}|\p{Script=Han})+$', str)
print(res)
-->
<regex.Match object; span=(0, 10), match='ひらがなカタカナ漢字'>


lxml で xpath での子孫の検索

2022-04-01 13:48:43 | python
lxml で、あるノードの子孫のノードのみを検索する方法のメモ。

あるノードで node.xpath("//タグ") で検索すると、node が root ノードでなくても
全ノードが検索対象となってしまいます。
子孫のみを検索対象とするには、node.xpath(".//タグ") で検索します。

以下、実行例です。
import sys
import lxml.html

htmlstr = """
<html>
<body>
  <div id="d1">
    <div id="d2-1">
      <div id="d3-1">
        <p id="p4-1">p4-1 text</p>
        <p id="p4-2">p4-2 text</p>
      </div>
      <div id="d3-2"></div>
    </div>
    <div id="d2-2">
      <p id="p3-3">p3-3 text</p>
      <p id="p3-4">p3-4 text</p>
    </div>
  </div>
</body>
</html>
"""

dom = lxml.html.fromstring(htmlstr)

■id 指定でノードを検索
node = dom.xpath("//div[@id='d3-1']")[0]
print(node.tag)
print(node.attrib)
-->
div
{'id': 'd3-1'}

■//p で検索すると、子孫以外のノードも含まれます
ps = node.xpath("//p")
print(len(ps))
for p in ps:
    print(p.attrib)
-->
4
{'id': 'p4-1'}
{'id': 'p4-2'}
{'id': 'p3-3'}
{'id': 'p3-4'}

■.//p で検索すると、子孫のノードのみになります
ps = node.xpath(".//p")
print(len(ps))
for p in ps:
    print(p.attrib)
    print(p.text)
-->
2
{'id': 'p4-1'}
p4-1 text
{'id': 'p4-2'}
p4-2 text


Typescript で chevrotain による構文解析

2022-03-23 00:24:55 | python
Typescript で chevrotain による構文解析を試してみました。
受理する構文は以下の2つです。
{数字} + {数字}
{数字} - {数字}

chevrotain で文字列を解析して、上記の計算結果を返します。
■文法
calc ⇒ expr
expr ⇒ val Plus val
val Minus val
val ⇒ [0-9]+
Plus ⇒ +
Minus ⇒ -

■プログラム
import { CstParser, Lexer, createToken, Rule } from 'chevrotain'

// lexer
const Num = createToken({ name: "Num", pattern: /[0-9]+/ });
const Plus = createToken({ name: "Plus", pattern: /[+]/ });
const Minus = createToken({ name: "Minus", pattern: /[-]/ });

const allTokens = [
  Num,
  Plus,
  Minus,
];

const calcLexer = new Lexer(allTokens);

// parser
class CalcParser extends CstParser {
  public value_stack: any[] = [];

  constructor() {
    super(allTokens);
    this.performSelfAnalysis();
  }

  public calc = this.RULE("calc", () => {
    this.SUBRULE(this.expr);
    if (! this.RECORDING_PHASE) {
      const obj = this.value_stack.pop();
      this.value_stack.push(obj);
    }
  });

  public expr = this.RULE("expr", () => {
    const item1: any = this.SUBRULE(this.val);
    let val1: any;
    if (! this.RECORDING_PHASE) {
      const obj1: any = this.value_stack.pop();
      val1 = obj1.value;
    }

    let val2: any;
    const item: any = this.OR([
      { ALT: () => {
        this.CONSUME1(Plus);
        this.SUBRULE1(this.val);
        if (! this.RECORDING_PHASE) {
          const obj2 = this.value_stack.pop();
          val2 = obj2.value;
          this.value_stack.push({ value: val1 + val2 });
        }
      }},
      { ALT: () => {
        this.CONSUME2(Minus);
        this.SUBRULE2(this.val);
        if (! this.RECORDING_PHASE) {
          const obj2 = this.value_stack.pop();
          let val2 = obj2.value;
          this.value_stack.push({ value: val1 - val2 });
        }
      }},
    ]);
  });

  private val = this.RULE("val", () => {
    const item: any = this.CONSUME(Num);
    if (! this.RECORDING_PHASE) {
      this.value_stack.push({ value: parseInt(item.image) });
    }
  });
}

const parser = new CalcParser();
const productions: Record<string, Rule> = parser.getGAstProductions();

const texts = [
  '1+2',
  '7-3',
];

for (let text of texts) {
  console.log(text);

  let lex_result = calcLexer.tokenize(text);
  parser.input = lex_result.tokens;
  parser.calc();
  const obj = parser.value_stack.pop();
  console.log(JSON.stringify(obj, null, 2));
}

■実行結果
1+2
{
  "value": 3
}
7-3
{
  "value": 4
}