1月23日、「ビッグデータ管理入門」の後半を、NIIで聞いてきた
その内容(=表題のことをする)をメモメモのつづき
今回は、Cassndraにデータをいれて、
そのデータを取ってきて、
Wekaの標準フォーマットARFFに落とすところのソースコード。
■お題
今、2つのCSVファイルがある。
●1つはsnps.csvで、これは、
1,ba,aa,ba,ab,ba,aa,ab,bb,ba,aa,aa,ab,・・・・
のように、IDのあとに、aa,ab,ba,bbの組み合わせが100個入っている。
(1つのaa,とか(=以下SNPと呼ぶ)は、ある箇所の遺伝情報を表現している
と思ってください)
ID+SNP1、SNP2・・・・、SNP100
で1人分。これが、10000件ある(=1万人データ)
●もう一つは、diseses.csvで、これは
1,-,-,+
2,+,-,-
3,-,-,-
:
:
のように、IDの後に3つの病気にかかったか、どうかが書かれている。
現在は、この2つしか情報がないが、今後情報は増えていく見込みである。
ただし、同じ10000人すべての情報がそろうか分からないし、
どのような情報がくるかもわからない。
●そこで、柔軟なDB構造が取れるCassandraで、保存したい。
DataKeyspaceというキースペースの中に、(RDBのDBに相当)
SNPというカラムファミリー(RDBのテーブルに相当)に格納する。
ローキー(主キーに相当)は、ID(1~10000)とし、
その中に、SNPは、SNP1、SNP2・・・SNP100をカラム名、
aa,ab,ba,bbのうちのとり得る値をカラム値とする。
さらに、それに追加して、病気についてDIS1~DIS3のカラムも追加する。
●その中から、データをとりたし、Wekaで解析したい。
今回は、SNP1,SNP2,SNP3を使い、DIS1になるかどうかを、
決定木で会席するための、Weka標準ファイルARFFを作成する。
ちなみに、ファイルの中身はこんな感じ。
@relation Sample
@attribute id numeric
@attribute SNP1 {aa,ab,ba,bb}
@attribute SNP2 {aa,ab,ba,bb}
@attribute SNP3 {aa,ab,ba,bb}
@attribute DIS {+,-}
@data
1,ba,aa,ba,-
2,bb,ab,bb,-
3,bb,bb,ab,-
4,ab,ab,ba,-
5,bb,ba,ab,-
:
(以下省略)
書き出し処理は、以下のソースのfindAllSample1だけで行っている。
■ソースコード
その内容(=表題のことをする)をメモメモのつづき
今回は、Cassndraにデータをいれて、
そのデータを取ってきて、
Wekaの標準フォーマットARFFに落とすところのソースコード。
■お題
今、2つのCSVファイルがある。
●1つはsnps.csvで、これは、
1,ba,aa,ba,ab,ba,aa,ab,bb,ba,aa,aa,ab,・・・・
のように、IDのあとに、aa,ab,ba,bbの組み合わせが100個入っている。
(1つのaa,とか(=以下SNPと呼ぶ)は、ある箇所の遺伝情報を表現している
と思ってください)
ID+SNP1、SNP2・・・・、SNP100
で1人分。これが、10000件ある(=1万人データ)
●もう一つは、diseses.csvで、これは
1,-,-,+
2,+,-,-
3,-,-,-
:
:
のように、IDの後に3つの病気にかかったか、どうかが書かれている。
現在は、この2つしか情報がないが、今後情報は増えていく見込みである。
ただし、同じ10000人すべての情報がそろうか分からないし、
どのような情報がくるかもわからない。
●そこで、柔軟なDB構造が取れるCassandraで、保存したい。
DataKeyspaceというキースペースの中に、(RDBのDBに相当)
SNPというカラムファミリー(RDBのテーブルに相当)に格納する。
ローキー(主キーに相当)は、ID(1~10000)とし、
その中に、SNPは、SNP1、SNP2・・・SNP100をカラム名、
aa,ab,ba,bbのうちのとり得る値をカラム値とする。
さらに、それに追加して、病気についてDIS1~DIS3のカラムも追加する。
●その中から、データをとりたし、Wekaで解析したい。
今回は、SNP1,SNP2,SNP3を使い、DIS1になるかどうかを、
決定木で会席するための、Weka標準ファイルARFFを作成する。
ちなみに、ファイルの中身はこんな感じ。
@relation Sample
@attribute id numeric
@attribute SNP1 {aa,ab,ba,bb}
@attribute SNP2 {aa,ab,ba,bb}
@attribute SNP3 {aa,ab,ba,bb}
@attribute DIS {+,-}
@data
1,ba,aa,ba,-
2,bb,ab,bb,-
3,bb,bb,ab,-
4,ab,ab,ba,-
5,bb,ba,ab,-
:
(以下省略)
書き出し処理は、以下のソースのfindAllSample1だけで行っている。
■ソースコード
import java.io.BufferedReader; import java.io.File; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.Iterator; import weka.core.Attribute; import weka.core.FastVector; import weka.core.Instance; import weka.core.Instances; import weka.core.converters.ArffSaver; import me.prettyprint.cassandra.serializers.IntegerSerializer; import me.prettyprint.cassandra.serializers.StringSerializer; import me.prettyprint.cassandra.service.ColumnSliceIterator; import me.prettyprint.cassandra.service.KeyIterator; import me.prettyprint.cassandra.service.ThriftKsDef; import me.prettyprint.cassandra.service.template.ColumnFamilyResult; import me.prettyprint.cassandra.service.template.ColumnFamilyTemplate; import me.prettyprint.cassandra.service.template.ColumnFamilyUpdater; import me.prettyprint.cassandra.service.template.ThriftColumnFamilyTemplate; import me.prettyprint.hector.api.Cluster; import me.prettyprint.hector.api.Keyspace; import me.prettyprint.hector.api.beans.HColumn; import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition; import me.prettyprint.hector.api.ddl.ComparatorType; import me.prettyprint.hector.api.ddl.KeyspaceDefinition; import me.prettyprint.hector.api.factory.HFactory; import me.prettyprint.hector.api.query.SliceQuery; /** * Hello world! * */ public class Sample { public static String CLUSTER_NAME = "Test Cluster"; public static String DB_SERVER = "127.0.0.1"; public static String KEYSPACE_NAME = "DataKeyspace"; public static String COLUMN_FAMILY_NAME = "SNP"; static final String SNAPS_FILE_PATH= "snps.csv"; static final String DISESES_FILE_PATH= "diseses.csv"; static final String FILE_PATH= "sample.arff"; private Cluster cluster; private KeyspaceDefinition keyspaceDef; private Keyspace keyspace; private ColumnFamilyTemplate<Integer, String> template; /* * 主処理:処理内容 */ public static void main(String[] args) { Sample app = new Sample(); app.prepare(); // データをCassandraに app.insertSample(); // SNPデータ app.updateSample(); // 病気になったかデータ追加 // Cassandra→ARFF app.findAllSample1(); } /* * 初期化:クラスター取得 */ public Sample() { cluster = HFactory.getOrCreateCluster(CLUSTER_NAME, DB_SERVER + ":9160"); } /* * Cassandraアクセス準備:キースペース、カラムファミリの準備 */ private void prepare() { keyspaceDef = cluster.describeKeyspace(KEYSPACE_NAME); if (keyspaceDef == null) { KeyspaceDefinition newKeyspace = HFactory.createKeyspaceDefinition(KEYSPACE_NAME, ThriftKsDef.DEF_STRATEGY_CLASS, 1, null); cluster.addKeyspace(newKeyspace, true); ColumnFamilyDefinition cfDef = HFactory.createColumnFamilyDefinition(KEYSPACE_NAME, COLUMN_FAMILY_NAME, ComparatorType.BYTESTYPE); cluster.addColumnFamily(cfDef); } keyspace = HFactory.createKeyspace(KEYSPACE_NAME, cluster); template = new ThriftColumnFamilyTemplate<Integer, String>(keyspace, COLUMN_FAMILY_NAME, IntegerSerializer.get(), StringSerializer.get()); } /* * キースペース(DB相当)の削除 */ private void dropKeyspace() { cluster.dropKeyspace(KEYSPACE_NAME); } /* * SNPのCSVふぁいるをCassandraへ(SNP1からSNP100まで、10000件ある) */ private void insertSample() { System.out.println("----- insertSample -----"); try { FileReader fr = new FileReader(SNAPS_FILE_PATH); BufferedReader br = new BufferedReader(fr); String buf = null; while((buf = br.readLine())!= null) { String cell[] = buf.split(","); int i = Integer.parseInt(cell[0]); ColumnFamilyUpdater<Integer, String> updater = template.createUpdater(i); System.out.println("key"+i+":"); for(int j = 1 ; j < cell.length;j++) { System.out.println("\tSNP"+j+":"+cell[j]); updater.setString("SNP"+j, cell[j]); } template.update(updater); } br.close(); fr.close(); } catch(Exception e) { e.printStackTrace(); } } /* * DIS(病気になったか)のCSVふぁいるをCassandraへ(SNPに追加) */ private void updateSample() { System.out.println("----- updateSample -----"); try { FileReader fr = new FileReader(DISESES_FILE_PATH); BufferedReader br = new BufferedReader(fr); String buf = null; while((buf = br.readLine())!= null) { String cell[] = buf.split(","); int i = Integer.parseInt(cell[0]); ColumnFamilyUpdater<Integer, String> updater = template.createUpdater(i); System.out.println("key"+i+":"); for(int j = 1 ; j < cell.length;j++) { System.out.println("\tdis"+j+":"+cell[j]); updater.setString("DIS"+j, cell[j]); } template.update(updater); } br.close(); fr.close(); } catch(Exception e) { e.printStackTrace(); } } /* * CassandraのSNP1,2,3とDIS1を、IDつけてARFFファイルへ */ private void findAllSample1() { System.out.println("----- findAllSample1 -----"); FastVector attributes = new FastVector(); // IDの追加 attributes.addElement(new Attribute("id")); // SNP1~3の追加:とり得る値はどれもaa,ab,ba,bbの4種類 FastVector SNPValues = new FastVector(); SNPValues.addElement("aa"); SNPValues.addElement("ab"); SNPValues.addElement("ba"); SNPValues.addElement("bb"); attributes.addElement(new Attribute("SNP1", SNPValues)); attributes.addElement(new Attribute("SNP2", SNPValues)); attributes.addElement(new Attribute("SNP3", SNPValues)); // DIS追加:とり得る値は+、-の2種類 FastVector DISValues = new FastVector(); DISValues.addElement("+"); DISValues.addElement("-"); attributes.addElement(new Attribute("DIS", DISValues)); // Cassandraからデータを取り出し Instances data = new Instances("Sample", attributes, 0); for (int i = 1; i <= 10000; ++i) { int id = i; ColumnFamilyResult<Integer, String> res = template.queryColumns(id); String value = "" + res.getKey() + " : " + "SNP1=" + res.getString("SNP1") + " " + "dis1=" + res.getString("DIS1") + " " + "DIS2=" + res.getString("DIS2"); System.out.println(value); double[] values = new double[data.numAttributes()]; values[0] = i; values[1] = data.attribute(1).indexOfValue(res.getString("SNP1")); values[2] = data.attribute(2).indexOfValue(res.getString("SNP2")); values[3] = data.attribute(3).indexOfValue(res.getString("SNP3")); values[4] = data.attribute(4).indexOfValue(res.getString("DIS1")); data.add(new Instance(1.0, values)); } // ARFF書き出し try { ArffSaver arffSaver = new ArffSaver(); arffSaver.setInstances(data); arffSaver.setFile(new File(FILE_PATH)); arffSaver.writeBatch(); } catch (IOException e) { e.printStackTrace(); } } } |