mongo-hadoopを読んでいます

mongodb/mongo · GitHub
ありていにいえば、文字通り、hadoop mapreduceでmongodbを使うためのプロジェクトです。

mapreduceのmainで

    public static void main( String[] args ) throws Exception{
        final Configuration conf = new Configuration();
        MongoConfigUtil.setInputURI( conf, "mongodb://localhost/test.in" );
        MongoConfigUtil.setOutputURI( conf, "mongodb://localhost/test.out" );

        final Job job = new Job( conf , "word count" );

        job.setInputFormatClass( MongoInputFormat.class );
        job.setOutputFormatClass( MongoOutputFormat.class );
}

こんな封に使います。

単純にこれだけだと、「それ、mahoutでもできるよ」とかorg.apache.mahout.cf.taste.impl.model.mongodb.MongoDBDataModel.javaあたりのひとがゴニョゴニョになりそうですが、mongo-hadoopがすごいのは、chunkやshardの数だけmapperを立ち上げて処理できるところです!すごい!

これを実現しているのが、com.mongo.hadoop.MongoInputFormat.javaでの以下のような実装です。

  1. MapReduceで呼ばれる「getSplits」の中(実際には設定を取得してのパターンが使われます)では、まず、jobで指定されたuriにコネクトします。
  2. その後uriで指定したcollectionの状態をgetStats()で取得
  3. shardされているかどうかを取得
  4. chunk使う設定書いた場合getSplitsChunksの中で、config dbに問い合せてconfig.shardsからshard hosts一覧を取得
  5. 次にconfig.chunksを走査し、chunkのrange情報をもつ、MongoInputSplitでList splitsを埋めて返却

となっています。
実際、試してみましたが、WordCountサンプルは普通に動いていました。素敵ですね。

これ、もうちょっと頑張れば、もうちょっといろいろと使えると思うので、もう少しほっくりかえしてみたいと思います。