ウィリアムのいたずらの、まちあるき、たべあるき

ウィリアムのいたずらが、街歩き、食べ物、音楽等の個人的見解を主に書くブログです(たま~にコンピューター関係も)

Cassandraにデータをいれ、ARFFで書き出し、Wekaで分析する-その4 ARFF

2015-01-29 10:44:53 | AI・BigData
1月23日、「ビッグデータ管理入門」の後半を、NIIで聞いてきた
その内容(=表題のことをする)をメモメモのつづき

今回は、Cassndraにデータをいれて、
そのデータを取ってきて、
Wekaの標準フォーマットARFFに落とすところのソースコード。

■お題

今、2つのCSVファイルがある。

●1つはsnps.csvで、これは、

 1,ba,aa,ba,ab,ba,aa,ab,bb,ba,aa,aa,ab,・・・・

のように、IDのあとに、aa,ab,ba,bbの組み合わせが100個入っている。
(1つのaa,とか(=以下SNPと呼ぶ)は、ある箇所の遺伝情報を表現している
 と思ってください)

ID+SNP1、SNP2・・・・、SNP100
で1人分。これが、10000件ある(=1万人データ)

●もう一つは、diseses.csvで、これは

  1,-,-,+
  2,+,-,-
  3,-,-,-
   :
   :

のように、IDの後に3つの病気にかかったか、どうかが書かれている。

現在は、この2つしか情報がないが、今後情報は増えていく見込みである。
ただし、同じ10000人すべての情報がそろうか分からないし、
どのような情報がくるかもわからない。




●そこで、柔軟なDB構造が取れるCassandraで、保存したい。
DataKeyspaceというキースペースの中に、(RDBのDBに相当)
SNPというカラムファミリー(RDBのテーブルに相当)に格納する。

ローキー(主キーに相当)は、ID(1~10000)とし、
その中に、SNPは、SNP1、SNP2・・・SNP100をカラム名、
aa,ab,ba,bbのうちのとり得る値をカラム値とする。

さらに、それに追加して、病気についてDIS1~DIS3のカラムも追加する。




●その中から、データをとりたし、Wekaで解析したい。
 今回は、SNP1,SNP2,SNP3を使い、DIS1になるかどうかを、
 決定木で会席するための、Weka標準ファイルARFFを作成する。

 ちなみに、ファイルの中身はこんな感じ。

@relation Sample
@attribute id numeric
@attribute SNP1 {aa,ab,ba,bb}
@attribute SNP2 {aa,ab,ba,bb}
@attribute SNP3 {aa,ab,ba,bb}
@attribute DIS {+,-}

@data
1,ba,aa,ba,-
2,bb,ab,bb,-
3,bb,bb,ab,-
4,ab,ab,ba,-
5,bb,ba,ab,-
   :
  (以下省略)

書き出し処理は、以下のソースのfindAllSample1だけで行っている。




■ソースコード

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Iterator;

import weka.core.Attribute;
import weka.core.FastVector;
import weka.core.Instance;
import weka.core.Instances;
import weka.core.converters.ArffSaver;
import me.prettyprint.cassandra.serializers.IntegerSerializer;
import me.prettyprint.cassandra.serializers.StringSerializer;
import me.prettyprint.cassandra.service.ColumnSliceIterator;
import me.prettyprint.cassandra.service.KeyIterator;
import me.prettyprint.cassandra.service.ThriftKsDef;
import me.prettyprint.cassandra.service.template.ColumnFamilyResult;
import me.prettyprint.cassandra.service.template.ColumnFamilyTemplate;
import me.prettyprint.cassandra.service.template.ColumnFamilyUpdater;
import me.prettyprint.cassandra.service.template.ThriftColumnFamilyTemplate;
import me.prettyprint.hector.api.Cluster;
import me.prettyprint.hector.api.Keyspace;
import me.prettyprint.hector.api.beans.HColumn;
import me.prettyprint.hector.api.ddl.ColumnFamilyDefinition;
import me.prettyprint.hector.api.ddl.ComparatorType;
import me.prettyprint.hector.api.ddl.KeyspaceDefinition;
import me.prettyprint.hector.api.factory.HFactory;
import me.prettyprint.hector.api.query.SliceQuery;


/**
 * Hello world!
 *
 */
public class Sample {
    public static String CLUSTER_NAME = "Test Cluster";
    public static String DB_SERVER = "127.0.0.1";
    public static String KEYSPACE_NAME = "DataKeyspace";
    public static String COLUMN_FAMILY_NAME = "SNP";
    
    static final String SNAPS_FILE_PATH= "snps.csv";
    static final String DISESES_FILE_PATH= "diseses.csv";
    static final String FILE_PATH= "sample.arff";
    

    private Cluster cluster;
    private KeyspaceDefinition keyspaceDef;
    private Keyspace keyspace;
    private ColumnFamilyTemplate<Integer, String> template;

    /*
     *	主処理:処理内容
     */
    public static void main(String[] args) {
        Sample app = new Sample();

        app.prepare();

		//	データをCassandraに
        app.insertSample();		//	SNPデータ
        app.updateSample();		//	病気になったかデータ追加

		//	Cassandra→ARFF
        app.findAllSample1();

    }
    
    /*
     *	初期化:クラスター取得
     */
    public Sample() {
        cluster = HFactory.getOrCreateCluster(CLUSTER_NAME, DB_SERVER + ":9160");
    }

    /*
     *	Cassandraアクセス準備:キースペース、カラムファミリの準備
     */
    private void prepare() {
        keyspaceDef = cluster.describeKeyspace(KEYSPACE_NAME);
        if (keyspaceDef == null) {
            KeyspaceDefinition newKeyspace = HFactory.createKeyspaceDefinition(KEYSPACE_NAME, ThriftKsDef.DEF_STRATEGY_CLASS, 1, null);
            cluster.addKeyspace(newKeyspace, true);

            ColumnFamilyDefinition cfDef = HFactory.createColumnFamilyDefinition(KEYSPACE_NAME, COLUMN_FAMILY_NAME, ComparatorType.BYTESTYPE);
            cluster.addColumnFamily(cfDef);
        }
        keyspace = HFactory.createKeyspace(KEYSPACE_NAME, cluster);

        template = new ThriftColumnFamilyTemplate<Integer, String>(keyspace, COLUMN_FAMILY_NAME, IntegerSerializer.get(), StringSerializer.get());
    }

    /*
     *	キースペース(DB相当)の削除
     */
    private void dropKeyspace() {
        cluster.dropKeyspace(KEYSPACE_NAME);
    }

  /*
     *  SNPのCSVふぁいるをCassandraへ(SNP1からSNP100まで、10000件ある)
     */
    private void insertSample() {
        System.out.println("----- insertSample -----");
        try
        {
        	FileReader fr = new FileReader(SNAPS_FILE_PATH);
        	BufferedReader br = new BufferedReader(fr);
        	String buf = null;
        	while((buf = br.readLine())!= null)
        	{
        		String cell[] = buf.split(",");
        		int i = Integer.parseInt(cell[0]);
        		ColumnFamilyUpdater<Integer, String> updater = template.createUpdater(i);
        		System.out.println("key"+i+":");
        		for(int j = 1 ; j < cell.length;j++)
        		{
            			System.out.println("\tSNP"+j+":"+cell[j]);
        			updater.setString("SNP"+j, cell[j]);
        		}
        		template.update(updater);
        	}
        	br.close();
        	fr.close();
        }
       	catch(Exception e)
       	{
       		e.printStackTrace();
       	}
    }

  /*
     *  DIS(病気になったか)のCSVふぁいるをCassandraへ(SNPに追加)
     */
    private void updateSample() {
        System.out.println("----- updateSample -----");
        try
        {
        	FileReader fr = new FileReader(DISESES_FILE_PATH);
        	BufferedReader br = new BufferedReader(fr);
        	String buf = null;
        	while((buf = br.readLine())!= null)
        	{
        		String cell[] = buf.split(",");
        		int i = Integer.parseInt(cell[0]);
        		ColumnFamilyUpdater<Integer, String> updater = template.createUpdater(i);
        		System.out.println("key"+i+":");
        		for(int j = 1 ; j < cell.length;j++)
        		{
            		System.out.println("\tdis"+j+":"+cell[j]);
        			updater.setString("DIS"+j, cell[j]);
        		}
        		template.update(updater);
        	}
        	br.close();
        	fr.close();
        }
       	catch(Exception e)
       	{
       		e.printStackTrace();
       	}
    }

  /*
     *  CassandraのSNP1,2,3とDIS1を、IDつけてARFFファイルへ
     */
    private void findAllSample1() {
        System.out.println("----- findAllSample1 -----");

        FastVector attributes = new FastVector();

		//	IDの追加
        attributes.addElement(new Attribute("id"));

		//	SNP1~3の追加:とり得る値はどれもaa,ab,ba,bbの4種類
        FastVector SNPValues = new FastVector();
        SNPValues.addElement("aa");
        SNPValues.addElement("ab");
        SNPValues.addElement("ba");
        SNPValues.addElement("bb");
        attributes.addElement(new Attribute("SNP1", SNPValues));
        attributes.addElement(new Attribute("SNP2", SNPValues));
        attributes.addElement(new Attribute("SNP3", SNPValues));

		//	DIS追加:とり得る値は+、-の2種類
        FastVector DISValues = new FastVector();
        DISValues.addElement("+");
        DISValues.addElement("-");
        attributes.addElement(new Attribute("DIS", DISValues));

		//	Cassandraからデータを取り出し
        Instances data = new Instances("Sample", attributes, 0);
        for (int i = 1; i <= 10000; ++i) {
            int id = i;
            ColumnFamilyResult<Integer, String> res = template.queryColumns(id);
            String value = "" + res.getKey() + " : " + "SNP1=" + res.getString("SNP1") + " " + "dis1=" + res.getString("DIS1") + " " + "DIS2=" + res.getString("DIS2");
            System.out.println(value);
            double[] values = new double[data.numAttributes()];
            values[0] = i;
            values[1] = data.attribute(1).indexOfValue(res.getString("SNP1"));
            values[2] = data.attribute(2).indexOfValue(res.getString("SNP2"));
            values[3] = data.attribute(3).indexOfValue(res.getString("SNP3"));
            values[4] = data.attribute(4).indexOfValue(res.getString("DIS1"));
            data.add(new Instance(1.0, values));
        }


		//	ARFF書き出し
        try {
            ArffSaver arffSaver = new ArffSaver();
            arffSaver.setInstances(data);
            arffSaver.setFile(new File(FILE_PATH));
            arffSaver.writeBatch();
        } catch (IOException e) {
            e.printStackTrace();
        }
        
    }

}


  • X
  • Facebookでシェアする
  • はてなブックマークに追加する
  • LINEでシェアする