ひしだまの変更履歴

ひしだまHPの更新履歴。
主にTRPGリプレイの元ネタ集、プログラミング技術メモと自作ソフト、好きなゲームや音楽です。

HadoopでWordCount(Scala)

2011-07-24 23:59:41 | PG(Scala)

いろんな人が書いているけれども、HadoopのMapReduceを使ってWordCount。をScalaで書いてみた

素のMapReduceだからどうしても面倒な部分はあるんだけど、Scalaの機能を使って多少は便利に書ける。
で、そういった簡単な便利機能はSHadoopだのScalaHadoopだのといったライブラリーで実現されている。
…どちらも更新日が古くて、あまり更新されていないみたいだけど(苦笑)

まぁそれでも自分で書いてみると面白いネタが出てくるもので。
型が同一であることをチェックしてみたくてGeneralized Type Constraintsを初めて使ってみたし。(これでめぼしい記号はだいたい網羅したかなぁ?w)

コメント
  • X
  • Facebookでシェアする
  • はてなブックマークに追加する
  • LINEでシェアする

mesos Spark

2011-07-23 03:22:36 | PG(Scala)

Scalaで分散処理が書けるライブラリー(だと思う)Sparkでプログラムを書いてみた。
サンプルを見ると、普通のScalaのコレクション操作と同じ感じで書けるのが素晴らしい。
(まだバージョン0.3なので、足りない関数は多いけど。特にsumとsortは欲しいところ。あとキーを指定した結合)

お題は、Cascadingでも書いた偏差値の算出。これって何段かの処理が必要になるから、題材には良い感じなんだよね~。
無理矢理感のあるコーディングをしてしまったけれど、それでもかなり簡単に書けた。
ただ、Windows上の単独環境では動作したけど、実際に分散環境で動かしたわけではないので、これで正しいのかは分からない^^;(…分散環境欲しいなぁ)

しかし、とりあえず出来上がったソースのサイズを見ても、プログラミングにかかった時間(1日もかかってない。Sparkの勉強開始から数えても3~4日)を考えても、CascadingよりSparkの方が楽なのは確か。これは期待大だ(笑)

なお、SparkではHadoop(というかHDFS)のファイルも扱えるみたいだが、実際に分散させるにはMesosを入れないとダメなようで、MesosはUNIXにしか対応してなくてC++でコンパイルする必要があるので、試せない><(…UNIX環境欲しいなぁ)

ところで、MesosとかSparkって、それで検索しても関係ないものがいっぱいひっかかるので、大変(苦笑)
そう考えると、ほんとHadoopは秀逸な命名だよ!

コメント
  • X
  • Facebookでシェアする
  • はてなブックマークに追加する
  • LINEでシェアする

Scalaの関数(Function)とか

2011-07-18 23:15:53 | PG(Scala)

久しぶりにScalaについてウェブページを書きまくった感じw

関数に関しては、今まで見聞きした内容をかなり詰め込んだ。
関数リテラルや関数の型は他のページに書いていたものと変わらないけれど、関数オブジェクトの話とかカリー化とかFunction・PartialFunctionトレイトとか。
まぁ、例によって合っているかどうか自信ないけど…。

他に、「単純にJavaより便利だよー」というネタを書こうと思って、Dateの話とかソート方法の話を書いてみた。
デバッグログ出力もその一環のつもりだったんだけど、あまりまとまりがつかなかった…。

コメント
  • X
  • Facebookでシェアする
  • はてなブックマークに追加する
  • LINEでシェアする

wget curl

2011-07-17 20:05:51 | PG(UNIX)

久しぶりにUNIXコマンド一覧にコマンド追加。前回の更新が2009-05-10だから、本当にすごく久しぶりだ^^;

一番簡単な使い方としては、どちらもURLを指定してそのファイルを取ってくるもの。
HTTP以外に何に対応しているかとかで違いがある模様。

コメント
  • X
  • Facebookでシェアする
  • はてなブックマークに追加する
  • LINEでシェアする

第1回分散処理ワークショップの感想

2011-07-10 11:05:21 | PG(NoSQL)

「第1回分散処理ワークショップ in Tokyo」に参加できましたので、その感想など。
(長らく補欠3番目だったので今回は無理かなーと思ってたんだけど、ぎりぎりでキャンセルが出たので入れた)

Togetter: 「第1回分散処理ワークショップ in Tokyo」 #dist_study
資料の場所はokuyamaooさんの「第1回分散処理ワークショップ in Tokyo」を開催しましたに全部出ています。

各DBMS(分散KVS)の紹介を1つ当たり15分。の予定が、Q&Aが入ったりして盛り上がり、けっこう伸び伸びに(笑)

全般的な感想としては、やはりDBMS(分散KVS)を実際に作っている人はすごい(笑)
方式を考えて実装してみて試行錯誤しているだろうから、他のDBMSについても質問内容がけっこう深くなる。
自分はHBaseCassandraVoldemortはちょっと触ったけど他は名前くらいしか知らなくて「ドキュメントベースってなんじゃろ?」くらいのレベルなので、あんな質問は思い付きもしないなぁ(苦笑)

しかしドキュメントベースでも結局keyとvalueでデータ(valueは自由)を格納しているので、ColumnFamilyタイプのDBMSがkeyの種類を拡張しているのと似たようなものなんだろうと思った。
そして、どうやってデータを格納していて、どのように分散させたり整合性を担保したりするかといった実装方法や考え方の違いが色々あるので、DBMSが色々あるんだな、と。(…書いてて思ったけど、なんて当たり前な(爆))
そこから逆に、実装方法に応じてデータの持ち方にも特徴(制限)が出てきて、分類名(ドキュメントベースとかカラム指向とか)が名付けられているんだろうなぁ。つまりDBMSの特徴が詳しく分からないと、なんでそういう分類になっているのかはきっと理解できないんだろうなー。

あと、MapReduce対応(Hadoopやそれ以外)も意外と意識されてるようだ。

あとあと、どれも対応OSはほとんどUNIX系ばっかり。Windowsマシンしか持っていないのでは何も試せなくなってきている(汗)
いいかげん、新しいマシンを買うか。VMwareでLinuxが使えるスペックのやつ(爆)

以下、個別のDBMSの話のメモ・感想。基本的にはスライドを見れば分かるので、その補足的な。


okuyamaooさんのokuyama

okuyamaは、データの持ち方はシンプルな分散KVSかな?

Tagは、データの種類に応じたタグ(ラベル)付けをして、データをグルーピングするものっぽい。そして、検索に使える感じ。

SerializeMapに関して面白い事を言っていたな。Javaのデータをバイナリーのまま保持し、インスタンス化しないんだって。必要に応じてインスタンス化(デシリアライズ)するってことかな?

あと、JavaScriptをDataNodeで実行できるとのこと。(エージェント指向みたいなもの?
Hadoopでもプログラムの方をデータのある場所に転送して実行するという考え方だし。
しかもJavaScriptだとコンパイルしないから、テキストで書いてそのまま転送できるから便利、とかあるんだろうか)

パーティション機能は、RDBMSがDB(ユーザー)毎に使用領域を制限して互いにアクセスできないようにするようなものか。アプリケーション毎にパーティションを分けるイメージだそうだ。
(テスト環境で1つのクラスターを開発者が共有して別々に使う場合にも、こういう機能は欲しいよね)

あとは、Valueに対する検索機能あり、と。(Tagでグループ化しておくとかインデックスを使れるとか)


doryokujinさんのMongoDB

MongoDBの構成はMySQLに似ているのだそうだ。
database-collection-document-field(key-value)
collectionがRDBMSのテーブルに相当する感じか。
valueはJSON形式で、どの項目(型)に対しても検索・更新がかけられる。
paddingであらかじめ予備領域を確保しておくこともでき、後からその領域にデータを追加することで、データ追加しても物理的な再配置をしなくて済むらしい。

CAP定理のC(一貫性)重視。

Replica Set: データの複製を作っておくが、writeは必ずprimaryのみ。→一貫性を保つ為
readはオプションでsecondaryからも可能(読んだデータが古いこともあるかもしれないが、スケールする)

自動フェールオーバー: primaryが死んだら、secondaryの中からどれか1つがprimaryになる。このとき、他のsecondaryに有って新primaryに無いデータは、削除される。→一貫性が保たれる
分断されて過半数になれない方はprimaryが作られない。→一貫性が保たれる

Sharding: GFSをモデルにしている。document内のkey(複数可)をShard keyにすることができ、それを元に分割する。
Shardingを構成するサーバーはmongos・config・dataとあり、configがShardに関する情報を持っていて、mongosは情報を持たずにルーティングするのみ。クライアントはmongosにアクセスする。mongosはconfigサーバーから情報を取得する。それぞれ冗長化が可能なので、SPoFが存在しない。(今までの経験では、configサーバーに負荷がかかったことは無いそう)

(Hadoopでなく独自の)MapReduceを実行可能。
JavaScriptベースで、1コア/Shard、Mapper数の上限はShard数などの制約あり。Shuffle機能は無く、どれか1つのShardでReduceを行う。

Mongo-Hadoopというプラグインがあり、Chunk単位でMapperが立ち上げられる。
Pig対応も進んでいる(が、Hiveの話は出ていないらしい)。
FlumeのsinkにMongoDBを指定することも可能。


tatsuya6502さんのHibari

Hibariは、ジェミナイ・モバイル・テクノロジーズ(GEMINI)のOSS。2006年から開発している。
(なんで日本じゃないのに雲雀(ひばり)なんだろうな(笑)→ジェミナイ東京で開発していて、OSS化した時にこの名前になったようです

ウェブメールに使われているだけあって、強い一貫性・耐久性があり、readに強い。
耐久性:最初にWALに書き、fsyncしてからクライアントに成功を通知する(データがメールだと、ロストは許容できない)
一貫性:古いデータが見えない(メールだと、書いたものが見えないと困る)
ロック:タイムスタンプを用いたCAS(楽観的ロック)
マイクロ・トランザクション:制約はあるものの、複数のキー/バリューにまたがるトランザクション可

データモデルは、RDBMSのテーブルに近いものが「チェイン」(→厳密には違う模様。コメント欄参照)。その中にキー・バリューを持つ。
キーは辞書順でソートされる。
バリューはあまり大きなものは持たせられない。数kB~16MBくらい。
他にタイムスタンプ・有効期限・フラグを持つ。フラグはokuyamaのTagのようなものらしい。検索条件(フィルター)に使える。
バリュー以外はRAM常駐。

データはコンシステント・ハッシングによる分散。キー全体でなく、キーの先頭部分だけでハッシュ値を算出できるのが特徴的。
例えばキーの先頭をユーザーIDにしておけば、同じユーザーは同じチェインに入ることになる。

データの一貫性保証(複製の作成?)はチェイン・レプリケーションで行う。
同じデータを3つのサーバーに複製するとき、Head・Middle・Tailという順番を付ける。
書き込みは必ずHeadから行い、Tailまで伝播したら成功とする。
読み込みは常にTailから行う。その為、読み込みが多い使い方だとTailに負荷が集中する。
そこで、サーバーへの負荷を分散するという観点から、異なるチェインのTailは別サーバーに置く。(1つのデータに対する読み込みが集中しても分散できないが、複数データなら負荷分散できるということだろう)
1つのサーバーがダウンしたら、各チェインではHead・Middle・Tailのいずれかが消失することになる。Headが無くなると書き込めなくなるし、Tailが無くなると読み込みも書き込みも出来なくなる(書き込みはTailまで成功しないとダメだから)。そこで、「生き残ったMiddle」がHeadやTailに変わることで、チェインが稼動するようにする。

チェイン・マイグレーション: チェイン追加時は、各チェインの一部ずつ(最小限のデータ)が新チェインへ割り振られる。

HibariはErlang(アーラン。並行処理に優れている言語)で書かれており、GCがJavaより優秀なんだそうだ。(JavaのGCはたまに止まっているように見えることがあり、死活監視に引っかかって障害になる?)


yukimさんのCassandra

最近のCassandraの動向を追っていなかったので、その話が出たのがありがたい^^
0.7で動的なスキーマ変更とセカンダリーインデックスが使えるようになった。
0.8でカウンター(分散環境での値のインクリメント・デクリメント)が使えるようになり、CQLも入った。
0.9をとばして1.0が10月に出る予定。

CQLはCassandraで使えるSQLっぽいもの。
CREATE TABLEならぬCREATE KEYSPACEとか、SELECT文とか。(さすがにbegin transactionやcommitは無いだろうがw)
CQLの利点は、Cassandraで使われているThrift APIの変更をあまり受けなくなること。もしかするとJDBCからも使えるようになるかもしれない?

CassandraはHadoop MapReduceやAWSもサポートしている。

また、DataStaxのBriskでは、HDFSの代わりにCassandraでデータを保存するなんて事をやっている。Hiveにも対応しているらしい。

Cassandraは(一貫性レベルにロケーションを意識した分散(データの複製の配置)があるので)、バッチ用とオンライン用の複製を作るのが簡単に出来るかもしれない。
HBaseクラスター(オンライン)上でMapReduce(バッチ)を実行すると負荷が高いから別クラスターにすべきかもという話があり、その場合はデータコピーをどうするのかが問題になるけれども、Cassandraなら分散配置までやってくれるので楽かも)


ueshinさんのHBase

ueshinさんの自宅HBaseクラスターの写真は、いつも受けが良い(笑)
Twitterのストリーミングをやっていて、LZOで圧縮していても500リージョン、1台当たり100リージョンまで達したそうだ。

HBaseの特徴は、
高い書き込みスループット(WALはシーケンシャルに書き込み、実データはメモリー上で更新。定期的にフラッシュ)
サーバーを追加すれば自動的にリージョンを再配分(動的なシャーディング)
実データはHDFS上に保存されるので、HDFSの機能で複製、チェックサムで自動修復、MapReduceも出来る。
強い一貫性: ROWに対する操作はアトミック、CAS操作可

リージョンがROWでソートされていてROWの範囲で分かれている事の説明が漏れてしまった為^^;、キー指定でアクセスするときに全件スキャンするのかという質問が。tatsuyaさんから、どのリージョンに存在するのかはテーブルに持っているのですぐ分かるが、その中はスキャンすることになるとの説明。(実際はJavaで言うTreeMapのようなもの(キーでソートされている)なので、二分探索か何かで単なるスキャンよりは速いんじゃないかと思う)

それから、HMasterが死んだらどうなるかという質問。HDFSのNameNodeはSPoFだけど、HMasterは複数立てられるから大丈夫らしい。
yutuki_rさんから「NameNodeが死んでもすぐにはクラスターは停止しない」との補足があったが、これはHMasterの誤りだったらしい。HBaseではデータやWALもHDFSに書き込むので、HDFS内のどの場所に書けばいいかはNameNodeに問い合わせないと分からないから、NameNodeが死ぬと何も出来ないだろうとのこと。

データノード(リージョンサーバー)を追加した場合、5分おきにバランサーが動いているので、新しく追加されたリージョンサーバーにリージョンが移動する(リージョン数で判断)。
なお、リージョンの移動にはデータの移動は伴わない。

(それにしても、HBaseのリージョンとHDFS(物理配置)の関係はいつも混乱の元(苦笑)
自分もいまだに分かってないんだけど(汗)、あるデータzをリージョンZが持っているとき、リージョンサーバーAがZを管理していても、それはAのメモリー内(とローカルディスク?)にzがあり、HDFSではデータノードB・C・D(つまりA以外)がzを保持している可能性もある、という理解でいいのだろうか?
ん? リージョン移動にデータ移動が伴わないということは、Aはz自体も保持していない?
それから、Zを持つリージョンサーバーは1台のみ? それが壊れたら、HMasterが別リージョンサーバーにZを割り当てるのかな? データ自体はHDFSで分散しているから復元できるはずだし。
あと、MapReduceするときはA経由でなくB・C・Dのいずれかで実行されるんだよな?
むー、まだまだ知識不足だ…つーか、単独環境ではこういうのは実験できないしなぁ
この辺り、やはり色々と違っているようです。コメント欄参照


frsyukiさんのMessagePackとFluentとKumofs

■Kumofs
エトラボのバックエンドで使われている。

サーバー構成
Gateway: クライアントはここにアクセスする。
Manager:
Server: データを持つ。実体はTokyo Cabinet?

Managerも2台構成なので、SPoFなし。
Gatewayは1アプリにつき1台という感じ。Gatewayとアプリは一蓮托生という思想らしい(笑)

コンシステントハッシング
高い一貫性を持つ(eventual consistencyではない) CASもサポート
リバランス中でも読み書き可能
split brainしない(ネットワーク分断しても生き残らせない。一貫性重視で、一貫性をゆるくするくらいなら失敗させるという考え)

■MessagePack
JSONよりデータサイズが小さい、速い
Kumofs間でMessagePackを使っていたが、そこからスピンアウト(←言葉は違った気がする^^;)

MessagePack+Hadoop
Columnar Fileでデータサイズが減らせる。
キー+データを複数持つのではなく、キー毎にデータを複数持つという持ち方。データがさらにネストして構造を持っている場合の効率化はまだ。Googleの論文(Dremel?)では出ているらしい。

■ログ収集ツール Fluent(フルーエント)
Facebookのscribeと似ている。

Fluentでログを受け付け(キューイングして)、HDFSに書き込む。その後、マージジョブでColumar Fileに変換するとか。
HDFSが落ちてもFluentがデータを保持しているので再送できる。
ログ→Fluent→HDFS

Fluentで受けたデータをさらに次のFluentで受け付けるモデル(ログ転送・カスケード)も考えている。
ログ→Fluent→Fluent→ストレージ

okachimachiorzさんから、障害時に関するツッコミあり。
壊れたときに別のFluentへ受け渡すような仕組みにすると、ログが重複する可能性があり、そうなるよりはHAで冗長化した方がいいのかも?


いやはや、色んな視点があるもので、勉強になります。
ありがとうございました。

コメント (4)
  • X
  • Facebookでシェアする
  • はてなブックマークに追加する
  • LINEでシェアする