SwoPP2010で発表してきました

8/3 – 8/5の3日間、並列/分散/協調処理に関するサマー・ワークショップ(SWoPP 2010)のために金沢へ来ています。
初日に自分の研究を発表してきました。
発表スライドはこちら

内容はスライドを見ていただくとして、あった質問は、

Q: 実際には耐故障性のため複製が使われるが、今回の評価では複製なしの評価で、単にローカルディスクの性能を図っているだけに見えるが、複製は考慮していないのか?

A: もちろん考慮する必要はあります。Gfarmも複製の作成が可能です。しかし、今回複製ありの評価を出していない理由は、HDFSで複製を作成する時は、複製を作成するノード全てにデータを転送し終わってからcloseするが、Gfarmの場合は一つめを書いて、closeしてから非同期でレプリカを作成する。
Gfarmの場合、ジョブが終了してもレプリカの作成が完了していないことになる。
この状況で比較するのはファアではなく、何かしら同じ条件での比較が必要だが、まだそれを考えられていない。
今回は両者の基本的な性能を見るために、あえて複製なしでの評価をとって詳しくみたかった。

Q: ソートの性能においてGfarmが遅かったりするが、アプリケーションの性質による性能への影響の関係はわかったいるのか?
(正確な質問を思い出せないが、このような趣旨だったかと)

A: ソートでは Mapタスク側でGfarmがHDFSより遅い。つまり読み込みで遅いことになるが、単純な並列読み込み性能はほぼ同等だったので、アプリケーションになると負けてしまう理由はわかってない。プラグイン側の改良で同等までいけるかもしれない。
Reduceではデータの書き出し処理がメインになるが、この部分ではGfarmの方が速くて、これは並列書き込み性能がGfarmの方が速いという結果と合っているので妥当だと考えられる。

Gfarmのインストール

centos のマシンに0から最新版のGfarmをインストールしたメモ.

まずは postgres を入れる
XML拡張属性を利用する前には8.3以降が必要.postgres の前にlibxml2を入れる必要がある

$ yum install libxml2 libxml2-devel
$ wget ftp://ftp2.jp.postgresql.org/pub/postgresql/source/v8.4.4/postgresql-8.4.4.tar.gz
$ tar zxfp postgresql-8.4.4.tar.gz
$ cd postgresql-8.4.4
$ ./configure -prefix=/usr/local/pgsql-8.4.4 --with-libxml --without-readline --without-zlib
$ make
$ make install

次にGfarmをインストール

$ wget http://sourceforge.net/projects/gfarm/files/gfarm_v2/2.4.0/gfarm-2.4.0.tar.gz/download
$ tar zxfp gfarm-2.4.0.tar.gz
$ cd gfarm-2.4.0
$ yum install openssl-devel openldap-devel
$ ./configure --prefix=/usr/local/gfarm-2.4.0 --enable-xmlattr --with-postgresql=/usr/local/pgsql-8.4.4
$ make
$ make install

次から色々と設定をしていきます.gfarmとpostgresのパスを通しておきます.
.bashrcに追記して source .bashrc しておく

export GFARM_HOME=/usr/local/gfarm-2.4.0
export PGSQL_HOME=/usr/local/pgsql-8.4.4
export PATH=$PATH:/usr/java/default/bin:$GFARM_HOME/bin:$PGSQL_HOME/bin

次にgfarm-configやろうとしたけど,postgres ユーザを作っておく必要があるみたい
ここを参考して

useradd postgres
passwd postgres
su - postgres

んで postgres の方の.bashrc に

export PGSQL_HOME=/usr/local/pgsql-8.4.4
export PATH=$PATH:$PGSQL_HOME/bin
export MANPATH=$MANPATH:$PGSQL_HOME/man
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$PGSQL_HOME/lib
export PGDATA=$PGSQL_HOME/data

postgres を初期化しておく

$ exit
$ mkdir /usr/local/pgsql-8.4.4/data
$ chown postgres:postgres /usr/local/gfsql-8.4.4/data
$ su - postgres
$ initdb --encoding=UTF8 --no-locale

そして postgres のユーザ名を指定してconfig-gfarm

$ config-gfarm -u postgres

次にスレーブサーバの設定。通常メタデータサーバとスレーブはわけるけど、今回は同じノードでインストール。
認証の設定で、_gfarmfsユーザーを作成し、そのホームディレクトリに認証鍵をおく。
以下の例では鍵の期限は約1年(31,536,000秒)に設定。

$ useradd -c "Gfarm gfsd" _gfarmfs
$ su _gfarmfs
$ gfkey -f -p 31536000

次にデータを実際に保存するディレクトリを指定してconfig-gfsd

$config-gfsd /mnt/gfarm
created /mnt/gfarm
created /etc/init.d/gfsd
config-gfsd success
Please ask admin_user to register your host by the following command:
/usr/local/gfarm-2.4.0/bin/gfhost -c -a x86_64-fedora8-linux -p 600 -n 2 ip-10-202-45-139.ec2.internal
After that, start gfsd by the following command as a root:

スレーブの登録をするように言われるので、登録してから起動

$ /usr/local/gfarm-2.4.0/bin/gfhost -c -a x86_64-fedora8-linux -p 600 -n 2 ip-10-202-45-139.ec2.internal
$ service gfsd start

そしてgfdfで確認すると約400GB利用可能になってます。

$ gfdf
     1K-blocks          Used         Avail Capacity       Files
     411437600        203160     411234440     0%             2

けっこうステップが多いのですが、最近はdebian のパッケージにも入ったようです。
http://packages.debian.org/ja/source/sid/gfarm

あとはマウントして利用するには gfarm2fs をインストールする必要がありますが、それは次回以降ということで

分割可能なLZO圧縮をhadoopで使う

Twitterでは基本的にファイルはLZO圧縮しているようで,

などのメリットがあると言っています.これは使わない手はないということで試してみました.

clouderaのこのブログ記事を参考にして進めます.
code.google.com/p/hadoop-gpl-compressionもありますが,Twitterが公開している分割可能なのを使います.
http://github.com/kevinweil/hadoop-lzo

今回の環境はclouderaのamiをベースにしました.
cloudera-ec2-hadoop-images/cloudera-hadoop-fedora-20090623-x86_64 ami-2359bf4
CDH3で,hadopoのバージョンはhadoop-0.20.2+228
まずlzoはGPLライセンスでhadoopに含めることが出来ないので自分でとってくる必要があります.
yumでlzoのライブラリと,コマンドで圧縮解凍できるようにlzopも入れておきます.

$ yum install -y lzo-devel lzop

次にhadoop-lzoをとってきて,ビルド用にantも入れておく

$ git clone git://github.com/kevinweil/hadoop-lzo.git
$ yum install -y ant

ここを参考にしてビルド
パスの設定とlzoライブラリがインストールされてるか確認

$ export JAVA_HOME=/usr/java/default
$ export CFLAGS=-m64
$ export CXXFLAGS=-m64
$ ls -l /usr/lib*/liblzo2*
lrwxrwxrwx 1 root root     16 Jul  8 05:23 /usr/lib/liblzo2.so -> liblzo2.so.2.0.0
lrwxrwxrwx 1 root root     16 Jul  8 05:23 /usr/lib/liblzo2.so.2 -> liblzo2.so.2.0.0
-rwxr-xr-x 1 root root 130876 Aug 22  2007 /usr/lib/liblzo2.so.2.0.0
lrwxrwxrwx 1 root root     16 Jul  8 05:23 /usr/lib64/liblzo2.so -> liblzo2.so.2.0.0
lrwxrwxrwx 1 root root     16 Jul  8 05:23 /usr/lib64/liblzo2.so.2 -> liblzo2.so.2.0.0
-rwxr-xr-x 1 root root 124504 Aug 22  2007 /usr/lib64/liblzo2.so.2.0.0

そしてビルド

$ cd hadoop-lzo
$ ant compile-native tar
/usr/bin/build-classpath: error: Could not find xml-commons-apis Java extension for this JVM
/usr/bin/build-classpath: error: Some specified jars were not found
Buildfile: build.xml
...
    [javac] 12 warnings

compile-native:
    [mkdir] Created dir: /root/work/hadoop-lzo/build/native/Linux-amd64-64/lib
    [mkdir] Created dir: /root/work/hadoop-lzo/build/native/Linux-amd64-64/src/com/hadoop/compression/lzo

BUILD FAILED
/root/work/hadoop-lzo/build.xml:235: Problem: failed to create task or type javah
Cause: the class org.apache.tools.ant.taskdefs.optional.Javah was not found.
        This looks like one of Ant's optional components.
Action: Check that the appropriate optional JAR exists in
        -/usr/share/ant/lib
        -/root/.ant/lib
        -a directory added on the command line with the -lib argument

Do not panic, this is a common problem.
The commonest cause is a missing JAR.

This is not a bug; it is a configuration problem

おやっ、何かがおかしいです。
しばらく経っても解決しそうにないから自分で解決しないと.
antのJNI用のオプションタスクがないらしい.

$ yum install ant-nodeps
$ ant compile-native tar

これで成功した.yumで入れたantは1.70だけど,1.8以降がいいって話もどこかにあったので,ソースから入れた方が良いのかも.
そして jar ファイルをclasspathの通っている所へ, nativeライブラリを java.libray.path の通ってるところへ置く.
JAVA_LIBRARY_PATHを設定しろって話もあるけど,今回の環境では$HADOOP_HOME/lib/native/Linux-amd64-64 以下におけばnativeライブラリの読み込みは問題なかった.

$ tar -cBf - -C build/hadoop-lzo-0.4.4/lib/native . | tar -xBvf - -C /usr/lib/hadoop/lib/native
$ cp build/hadoop-lzo-0.4.4.jar /usr/lib/hadoop/lib/

core-site.xmlに

<property>
 <name>io.compression.codecs</name>
 <value>org.apache.hadoop.io.compress.GzipCodec,org.apache.hadoop.io.compress.DefaultCodec,com.hadoop.compression.lzo.LzoCodec,com.hadoop.compression.lzo.LzopCodec,org.apache.ha\
doop.io.compress.BZip2Codec</value>
</property>
<property>
 <name>io.compression.codec.lzo.class</name>
  <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

LZOはもともとは分割不可なので,分割を可能にするためにはインデックスを付ける必要がある.

普通に1プロセスでインデックスするなら

$ hadoop jar /usr/lib/hadoop/lib/hadoop-lzo-0.4.4.jar com.hadoop.compression.lzo.LzoIndexer big_file.lzo

hadoopのジョブとしてやるなら

$ hadoop jar /usr/lib/hadoop/lib/hadoop-lzo-0.4.4.jar com.hadoop.compression.lzo.DistributedLzoIndexer big_file.lzo

実行すると big_file.lzo.index っていのが生成されて,LzoTextInputFormat で読み込んだ時にこのインデックスファイルを見て分割してくれるらしい.
実際に試してみる.teragenで1Gのファイルを作って圧縮してHDFSにおいて,それにインデックスをつけようとしてみる.

$ hadoop jar /usr/lib/hadoop/hadoop-0.20.2+228-examples.jar teragen 10000000 1G
...
$ hadoop fs -copyToLocal 1G/part-00000 .
$ lzop part-00000
$ hadoop fs -copyFromLocal part-00000.lzo 1G.lzo
$ hadoop jar /usr/lib/hadoop/lib/hadoop-lzo-0.4.4.jar com.hadoop.compression.lzo.LzoIndexer 1G.lzo
10/07/09 09:06:57 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library
10/07/09 09:06:57 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 5c25e0073d3dae9ace4bd9eba72e4dc43650c646]
10/07/09 09:06:57 INFO lzo.LzoIndexer: LZO Indexing directory lzo...
10/07/09 09:06:57 INFO lzo.LzoIndexer:   [INDEX] LZO Indexing file hdfs://localhost:9000/user/hadoop/1G.lzo, size 0.18 GB...
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: 2
        at com.hadoop.compression.lzo.LzopInputStream.readInt(LzopInputStream.java:92)
        at com.hadoop.compression.lzo.LzopInputStream.readHeaderItem(LzopInputStream.java:103)
        at com.hadoop.compression.lzo.LzopInputStream.readHeader(LzopInputStream.java:189)
        at com.hadoop.compression.lzo.LzopInputStream.<init>(LzopInputStream.java:55)
        at com.hadoop.compression.lzo.LzopCodec.createInputStream(LzopCodec.java:70)
        at com.hadoop.compression.lzo.LzoIndex.createIndex(LzoIndex.java:224)
        at com.hadoop.compression.lzo.LzoIndexer.indexSingleFile(LzoIndexer.java:117)
        at com.hadoop.compression.lzo.LzoIndexer.indexInternal(LzoIndexer.java:98)
        at com.hadoop.compression.lzo.LzoIndexer.indexInternal(LzoIndexer.java:86)
        at com.hadoop.compression.lzo.LzoIndexer.index(LzoIndexer.java:52)
        at com.hadoop.compression.lzo.LzoIndexer.main(LzoIndexer.java:137)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
        at java.lang.reflect.Method.invoke(Method.java:597)
        at org.apache.hadoop.util.RunJar.main(RunJar.java:186)

おやっ、何かがおかしいです。
Successfully loaded & initialized native-lzo library なのに,,,
結局原因はまだわかっていませんが,mapの出力や結果の出力に使うのには問題なく使えた.

mapの中間出力にLZOを使うなら,

<property>
  <name>mapred.compress.map.output</name>
  <value>true</value>
</property>
<property>
  <name>mapred.map.output.compression.codec</name>
  <value>com.hadoop.compression.lzo.LzoCodec</value>
</property>

結果の出力に使うにはプログラム内で,

FileOutputFormat.setOutputCompressorClass(job,com.hadoop.compression.lzo.LzopCodec.class);
FileOutputFormat.setCompressOutput(job, true);

mapred-site.xml にmapred.output.compress:ture と mapred.output.compression.codec:com.hadoop.compression.lzo.LzoCodec を書けば出来るのかなと思ったけど,なぜか無視された.

ということで,一応hadoop-lzo使えましたが,インデックス作成ができず.
どなたか,インデックス作成出来た方教えてください...

– 追記[2010/07/12] –
ファイル名が3文字以下の時にこのエラーが発生することがわかりました.それ以外の時は発生しません.
lzopで圧縮する前のファイル名が3文字以下の時です.
実は上記の手順はエラーを発生した時と少し違っていたようで,上記の手順ではエラーはでません.

$ hadoop jar /usr/lib/hadoop/hadoop-0.20.2+228-examples.jar teragen 10000000 1G
...
$ hadoop fs -copyToLocal 1G/part-00000 1G
$ lzop 1G
$ hadoop fs -copyFromLocal 1G.lzo .
$ hadoop jar /usr/lib/hadoop/lib/hadoop-lzo-0.4.4.jar com.hadoop.compression.lzo.LzoIndexer 1G.lzo

このような手順だと発生してしまいます.
これはバグっぽいので issues へ報告しました. http://github.com/kevinweil/hadoop-lzo/issues/issue/3

Gfarm workshopへ行ってきました

7 月 2 日(金)にGfarm workshopへ行ってきました.
http://datafarm.apgrid.org/event/gfarm10/

Gfarmはオープンソースで開発が進められている分散ファイルシステムで,NFS の代替として汎用的に使えるファイルシステムです.
詳しくはこちら

アーキテクチャ的にはマスタ・スレーブで,スレーブのローカルディスクを使って,マスタ管理して単一の名前空間を提供します.
その点はHDFSに似ています.HDFSをファイルストレージとして使いたいっていう人は検討してみると良いかと思います.
コスト的にはHDFSと同じようにノードを追加していくだけで容量を増やせるので.
HDFSをファイルストレージとして使う場合は fuse-dfs とか使うんだろうけど,これはバグがあってファイルが壊れたり
性能的には良くないっていうことで cloudera の人も勧めてないっていう話を聞くので...

それで,gfarm workshopですが,僕は hadoop-gfarm の紹介をしてきました.
HadoopではファイルシステムとしてHDFSの代わりにS3やらKFSやら使えますが,Gfarmを使えるようにするプラグインです.
まだcloudera のディストリビューションでのインストレーションは対応してないのですが,
apacheのHadoopならどのバージョンでも動くはずです(0.20.2, 0.20,1, 0.18.3, 0.19.1 は試しました)
http://www.slideshare.net/shun0102/hadoop-gfarm
興味のある方,質問のある方はご連絡ください!

gridmix2を試したみた

yahoo!が公開しているHadoopのベンチマークツールgridmixを試してみた。
gridmixというのは要するに色々なタイプのジョブを色々なデータセットに対して実行させて、実際の複数人によって使用され、色々なジョブが実行されるhadoopクラスタの負荷をエミュレートするツールである。
単一のhadoopジョブを実行するだけでは分からないようなことも分かる。特にスケジューラーの比較などには良さそう。
今はgridmix3が最新版で、 git://git.apache.org/hadoop-mapreduce.git/src/contrib/gridmix/ にて公開されているが、まずはhadoop-0.20.2にも含まれているgridmix2を試した。含まれているといってもビルドしてやる必要がある

ビルド
antをインストールする必要があります。
今回使用したantのversionは1.8.0(それがいいのか調べてませんが、入ってたのを使用した)

% cd ${HADOOP_HOME}/src/benchmarks/gridmix2
% ant
...
BUILD SUCCESSFUL

ビルドが成功すると${HADOOP_HOME}/src/benchmarks/gridmix2/build 以下にgridmix.jarが作成されます

設定
gridmix用のディレクトリを作って、作成したgridmix.jarと起動スクリプト、設定ファイルなどをコピーします。

% mkdir ${HADOOP_HOME}/gridmix
% cp generateGridmix2data.sh gridmix-env-2 build/gridmix.jar gridmix_config.xml rungridmix_2 ${HADOOP_HOME}/gridmix/
% cd ${HADOOP_HOME}
% ls gridmix
generateGridmix2data.sh  gridmix-env-2  gridmix.jar  gridmix_config.xml  rungridmix_2

gridmix-env-2
export USE_REAL_DATASET=TURE はコメントアウトしておく。

export HADOOP_INSTALL_HOME=${HOME}/hadoop
export HADOOP_VERSION=hadoop-0.20.2
export HADOOP_HOME=${HADOOP_INSTALL_HOME}/${HADOOP_VERSION}
export HADOOP_CONF_DIR=${HADOOP_HOME}/conf
# export USE_REAL_DATASET=TURE
export APP_JAR=${HADOOP_HOME}/${HADOOP_VERSION}-test.jar
export EXAMPLE_JAR=${HADOOP_HOME}/${HADOOP_VERSION}-examples.jar
export STREAMING_JAR=${HADOOP_HOME}/contrib/streaming/${HADOOP_VERSION}-streaming.jar

generateGridmix2data.sh
生成するデータサイズ、ファイル数を設定する。圧縮データサイズ:非圧縮データサイズ は 4:1 くらいが適当らしい。
使用したクラスタが同時に起動できる最大タスク数が60なので生成データのファイル数も60にしておいた。

# 500GB
COMPRESSED_DATA_BYTES=536870912000
# 125GB
UNCOMPRESSED_DATA_BYTES=134217728000
# Number of partitions for output data
NUM_MAPS=60

データ生成
まずデータ生成スクリプトを実行した。実際には3つのrandom-text-writerプログラムが実行され、3種類の入力用データが生成される

% ./generateGridmix2data.sh
...
% hadoop fs -du /gridmix/data
Found 3 items
328554850538  hdfs://tsukuba000:9000/gridmix/data/MonsterQueryBlockCompressed
134459001063  hdfs://tsukuba000:9000/gridmix/data/SortUncompressed
158806557578  hdfs://tsukuba000:9000/gridmix/data/WebSimulationBlockCompressed

実行可能なジョブの種類
次にgridmix_config.xml にて色々とジョブの設定があるけれど、それぞれのジョブがどんなものかわかってないとどう設定すれば良いかわからないので調べた。

name data type description
Combiner Text combinerを使用しているwordcont
GridmixJavaSorter Text javaでソート
GridmixStreamingSorter Text streamingでソート
GridmixWebdatascan SequenceFile 設定した割合でデータを出力する
GridmixMonsterQuery SequenceFile 3段階のジョブで、pigなどのworkloadをまねている。こちらも入力データに対する出力データサイズを設定できるよう
GridmixWebdataSort SequenceFile sequencefileのソート

それぞれのジョブに対して入力データのボリュームをがsmall, medium, largeの3通りある
設定ファイルを見ればわかりますが、デフォルトではsmallは3個分(全体に対する割合は3 / NUM_MAPS となる)、mediumは全体の30%、largeがデータセット全体を入力とする

<property>
  <name>streamSort.smallJobs.inputFiles</name>
  <value>${VARINFLTEXT}/{part-00000,part-00001,part-00002}</value>
</property>
<property>
  <name>streamSort.mediumJobs.inputFiles</name>
  <value>${VARINFLTEXT}/{part-000*0,part-000*1,part-000*2}</value>
  <description></description>
</property>
<property>
  <name>streamSort.largeJobs.inputFiles</name>
  <value>${VARINFLTEXT}</value>
  <description></description>
</property>

実行するジョブ数とそれぞれのジョブのmap,reduceタスク数
それぞれのジョブに対して上記のinputFilesの設定に加えて、以下の4つの項目があるが、一部よくわからなかった。

<property>
  <name>combiner.mediumJobs.numOfJobs</name>
  <value>2</value>
  <description>実行するジョブ数</description>
</property>
<property>
  <name>combiner.mediumJobs.numOfReduces</name>
  <value>10</value>
  <description>reduceタスクの数</description>
</property>
<property>
  <name>combiner.mediumJobs.numOfMapoutputCompressed</name>
  <value>8</value>
  <description>?</description>
</property>
<property>
  <name>combiner.mediumJobs.numOfOutputCompressed</name>
  <value>0</value>
  <description>?</description>
</property>

ジョブの実行
今回は6種類のジョブをsmall 5, medium 3, large 1回に設定して実行した

./rungridmix_2
Jobs in waiting state: 18
Jobs in ready state: 0
Jobs in running state: 54
Jobs in success state: 0
Jobs in failed state: 0
...

Jobs in waiting state: 18 はMonsterQueryは3段階あるので、9個 x 2段階分が後に控えているということ

結果
yahooとかでは結果をグラフ化していたが、そういうツールが含まれてはいない様子。。。
下のような出力をジョブごとに出してくれるが、それをどう見れば良いのかけっこう難しい

JobId   job_201006140032_0184
JobName GridmixStreamingSorter.small
job_201006140032_0184.GridmixStreamingSorter.small.FileSystemCounters.FILE_BYTES_READ   198424586
job_201006140032_0184.GridmixStreamingSorter.small.FileSystemCounters.FILE_BYTES_WRITTEN        397650591
job_201006140032_0184.GridmixStreamingSorter.small.mapEndTime   1276447958730
job_201006140032_0184.GridmixStreamingSorter.small.mapExecutionTimeStats.avg    10528
job_201006140032_0184.GridmixStreamingSorter.small.mapExecutionTimeStats.max    12079
job_201006140032_0184.GridmixStreamingSorter.small.mapExecutionTimeStats.medium 12028
job_201006140032_0184.GridmixStreamingSorter.small.mapExecutionTimeStats.min    6008
job_201006140032_0184.GridmixStreamingSorter.small.mapExecutionTimeStats.numOfItems     12
job_201006140032_0184.GridmixStreamingSorter.small.mapStartTime 1276447861969
job_201006140032_0184.GridmixStreamingSorter.small.numOfMapTasks        12
job_201006140032_0184.GridmixStreamingSorter.small.numOfReduceTasks     15
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.JobInProgress$Counter.DATA_LOCAL_MAPS       1
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.JobInProgress$Counter.RACK_LOCAL_MAPS       11
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.JobInProgress$Counter.TOTAL_LAUNCHED_MAPS   12
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.JobInProgress$Counter.TOTAL_LAUNCHED_REDUCES        18
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.Task$Counter.COMBINE_INPUT_RECORDS  0
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.Task$Counter.COMBINE_OUTPUT_RECORDS 0
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.Task$Counter.MAP_INPUT_BYTES        806756221
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.Task$Counter.MAP_INPUT_RECORDS      723817
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.Task$Counter.MAP_OUTPUT_BYTES       808069849
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.Task$Counter.MAP_OUTPUT_RECORDS     723817
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.Task$Counter.REDUCE_INPUT_GROUPS    640805
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.Task$Counter.REDUCE_INPUT_RECORDS   723817
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.Task$Counter.REDUCE_OUTPUT_RECORDS  723817
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.Task$Counter.REDUCE_SHUFFLE_BYTES   183261658
job_201006140032_0184.GridmixStreamingSorter.small.org.apache.hadoop.mapred.Task$Counter.SPILLED_RECORDS        1447634
job_201006140032_0184.GridmixStreamingSorter.small.reduceEndTime        1276448402086
job_201006140032_0184.GridmixStreamingSorter.small.reduceExecutionTimeStats.avg 21383
job_201006140032_0184.GridmixStreamingSorter.small.reduceExecutionTimeStats.max 24076
job_201006140032_0184.GridmixStreamingSorter.small.reduceExecutionTimeStats.medium      21134
job_201006140032_0184.GridmixStreamingSorter.small.reduceExecutionTimeStats.min 21074
job_201006140032_0184.GridmixStreamingSorter.small.reduceExecutionTimeStats.numOfItems  15
job_201006140032_0184.GridmixStreamingSorter.small.reduceStartTime      1276447861969
job_201006140032_0184.jobStatus successful

わかったことや感想など
今までは全てノードからログをNFSに書いていたけど、このように細かいジョブを大量に発生させる時は負荷が大きくなるのでやめた方が良い。
単一の単純なベンチマークジョブだけだとdiskがボトルネックとなってCPU利用率がそれほど上がらないが、今回実行したworkloadではCPU, disk, network共に高い利用率だった。
jobtracerのスケジューラーで性能が大きく変わりそうで、スケジューラーの性能比較には持って来い。
gridmix2は単純に指定したジョブを全て実行してしまうけど、gridmix3は指定した間隔でジョブを実行させたり出来るので試してみたい。

個人的には、このgridmixを分散ファイルシステムの性能比較に使いたいわけだけど、パラメータが多くて純粋にファイルシステムの性能比較をしたいという用途には向かなそう。
また、自分自身はアプリケーションがないので、どのようなワークロードを発生すれば良いかわからない。yahoo!のデータを参考にするしかできない。
あとは、結果をグラフで表示したりするツールを作らないと分析が難しい。

Hadoop-ec2環境の構築[ClouderaのAMIをベースにHadoop環境を構築]

自分で設定したカスタムのAMIを作っておけば、hadoop-ec2スクリプトで簡単に計算ノードを追加できます。
まずはClouderaのAMIをベースにしてHadoop環境を構築してカスタムのAMIとして保存しようと思います。

Clouderaが提供しているAMIの一覧からfedoraの64bitを選びました。
cloudera-ec2-hadoop-images/cloudera-hadoop-fedora-20090623-x86_64 ami-2359bf4a

以下はインスタンスを1台起動してrootでログインしての作業です。

  ________                               __
 /\  _____\  /\                         /\ \
 \ \ \____/ /\ \     ____   __  __     _\_\ \     ____   __  ____ _____
  \ \ \     \ \ \   / __ \ /\ \/\ \   / ___  \   / __ \ /\ \/ __// __  \
   \ \ \     \ \ \ /\ \/\ \\ \ \ \ \ /\ \ /\  \ /\ \_\_\\ \  /_//\ \/\  \
    \ \ \_____\ \ \\ \ \_\ \\ \ \_\ \\ \ \__\  \\ \ \____\ \ \  \ \ \_\  \
     \ \______\\ \_\\ \____/ \ \____/ \ \______/ \ \_____\\ \_\  \ \___/\_\
      \/______/ \/_/ \/___/   \/___/   \/_____/   \/_____/ \/_/   \/__/\/_/

            Fedora 8 x86_64
            Cloudera's Distribution for Hadoop
            build 20090623 on 2009-06-23
$ wget http://archive.cloudera.com/redhat/cdh/cloudera-cdh3.repo
$ cp cloudera-cdh3.repo /etc/yum.repos.d/
$ yum update yum
$ yum install hadoop-0.20 -y
$ yum install hadoop-pig -y
$ yum install hadoop-hive -y

これでhadoopとpig、hiveがさくっとインストールできました。
次に設定に入ります。clouderaではalternativesを使って設定を管理するのを推奨しているので従ってみた。
alternativesを使うと、クラスタの設定の切り替えが容易にできる

$ cp -r /etc/hadoop-0.20/conf.empty /etc/hadoop-0.20/conf.my_cluster
$ alternatives --install /etc/hadoop-0.20/conf hadoop-0.20-conf /etc/hadoop-0.20/conf.my_cluster 50
$ alternatives --display hadoop-0.20-conf
hadoop-0.20-conf - status is auto.
 link currently points to /etc/hadoop-0.20/conf.my_cluster
/etc/hadoop-0.20/conf.empty - priority 10
/etc/hadoop-0.20/conf.my_cluster - priority 50
Current `best' version is /etc/hadoop-0.20/conf.my_cluster.

あとは普通に /etc/hadoop-0.20/conf.my_cluster 内の設定ファイルを変更していく

[hadoop-env.sh]

export JAVA_HOME=/usr

HDFSは使わずにs3を使います。とりあえずは1台で動かす設定
[core-site.xml]

  <property>
    <name>hadoop.tmp.dir</name>
    <value>/mnt/hadoop/${user.name}</value>
  </property>
  <property>
    <name>fs.default.name</name>
    <value>s3n://mybucket/</value>
  </property>
  <property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>accesskeyid</value>
  </property>
  <property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>secretaccesskey</value>
  </property>
<property>
  <name>mapred.job.tracker</name>
  <value>localhost:9001</value>
</property>

デーモンはhadoopユーザーで起動するのでhadoopで使用するディレクトリはhadoopユーザー所有にしておく

chown hadoop:hadoop /mnt/hadoop

そしてデーモンを起動。HDFS使わないのでjobtrackerとtasktrackerだけでOK

$ service hadoop-0.20-jobtracker start
$ service hadoop-0.20-tasktracker start

立ち上がったのでそのままテスト。

$ hadoop fs -mkdir /input
$ hadoop fs -put /etc/hadoop-0.20/conf/*.xml /input
$ hadoop fs -ls /input
Found 6 items
-rwxrwxrwx   1       3936 2010-06-05 14:52 /input/capacity-scheduler.xml
-rwxrwxrwx   1        622 2010-06-05 14:52 /input/core-site.xml
-rwxrwxrwx   1       3032 2010-06-05 14:52 /input/fair-scheduler.xml
-rwxrwxrwx   1       4190 2010-06-05 14:52 /input/hadoop-policy.xml
-rwxrwxrwx   1        258 2010-06-05 14:52 /input/hdfs-site.xml
-rwxrwxrwx   1        276 2010-06-05 14:52 /input/mapred-site.xml

S3上にファイルをおけました。
次にgrepのサンプルプログラムを動かしてみます。

$ hadoop jar /usr/lib/hadoop-0.20/hadoop-*-examples.jar grep /input /output 'dfs[a-z.]+'
10/06/05 15:00:06 INFO mapred.FileInputFormat: Total input paths to process : 6
10/06/05 15:00:07 INFO mapred.JobClient: Running job: job_201006051449_0001
10/06/05 15:00:08 INFO mapred.JobClient:  map 0% reduce 0%
10/06/05 15:00:23 INFO mapred.JobClient:  map 33% reduce 0%
...
$ hadoop fs -cat /output/part-00000|head
1       dfs.replication
1       dfsadmin

とりあえず1台のEc2インスタンスにhadoopをインストールし、S3上のファイルに対してプログラムを実行する所まで行きました。
次回は設定をちゃんとして、カスタムのAMIを保存する所までいきたいと思います。

[論文紹介]Haceph: Scalable Metadata Management for Hadoop using Ceph

タイトル:
Haceph: Scalable Metadata Management for Hadoop using Ceph
概要:
hadoopのファイルシステムとしてHDFSの代わりにCephを使う。
HDFSの単一のネームサーバはファイル数の限界、単一障害点、線形的な性能向上のボトルネックとなる可能性などの問題がある。
Cephの利点はダイナミックサブツリーパーティショニングを用いることによって分散メタデータサーバーで、一貫性、信頼性、高性能、線形的なスケーラビリティを提供する。
予備評価としてHadoop/HDFSとHadoop/Ceph(HadoppはPOSIX IOインターフェースを使って、Cephはデータの位置情報を提供しない)の比較を行った。
40ノードでワードカウントを行った所、ほぼ同程度の性能が見られた。

感想:
ポスターに性能評価が載っているらしいが、ウェブ上では見つからなかった。データの位置情報なしで同程度の性能とは言ってるけど、測り方でいろいろ変わるだろうから、詳しい環境がわからないとなんとも。
探したところhadoop/cephの実装はHADOOP-6253にあって、データの位置情報を返すgetFileBlockLocations()も定義されている。
もしかするとこの論文では独自で実装していてそれはまだ公開されていないのかも。。。

rails2.3.4からrails2.3.5にアップデート

rails2.3.4から2.3.5にアップデートしようとして少しハマったのでメモ

$ sudo gem install rails -v 2.3.5

でアップデートして、サーバーを起動しようとしたら

$ ruby script/server
/usr/local/lib/ruby/site_ruby/1.8/rubygems.rb:777:in `report_activate_error': RubyGem version error: rack(0.9.1 not ~> 1.0.1) (Gem::LoadError)
	from /usr/local/lib/ruby/site_ruby/1.8/rubygems.rb:211:in `activate'
	from /usr/local/lib/ruby/site_ruby/1.8/rubygems.rb:1056:in `gem'
	from /usr/local/lib/ruby/gems/1.8/gems/actionpack-2.3.5/lib/action_controller.rb:34
	from /usr/local/lib/ruby/site_ruby/1.8/rubygems/custom_require.rb:31:in `gem_original_require'
	from /usr/local/lib/ruby/site_ruby/1.8/rubygems/custom_require.rb:31:in `require'
	from /Users/mikamishunsuke/.gem/ruby/1.8/gems/activesupport-2.3.5/lib/active_support/dependencies.rb:156:in `require'
	from /Users/mikamishunsuke/.gem/ruby/1.8/gems/activesupport-2.3.5/lib/active_support/dependencies.rb:521:in `new_constants_in'
	from /Users/mikamishunsuke/.gem/ruby/1.8/gems/activesupport-2.3.5/lib/active_support/dependencies.rb:156:in `require'
	from /usr/local/lib/ruby/gems/1.8/gems/rails-2.3.5/lib/commands/server.rb:2
	from /usr/local/lib/ruby/site_ruby/1.8/rubygems/custom_require.rb:31:in `gem_original_require'
	from /usr/local/lib/ruby/site_ruby/1.8/rubygems/custom_require.rb:31:in `require'
	from script/server:3

rackのバージョンが古いから駄目なのかと思って

gem update rack

これでも駄目。
それでよく見たら、rack(0.9.1 not ~> 1.0.1) って書いてある。え、rack 1.0.1限定!?

$ gem list |grep rack
rack (1.1.0, 1.0.0, 0.9.1)

確かに入っていたのは1.1.0だったので

sudo gem install rack --version 1.0.1

これで動いた・・・

HadoopのファイルシステムとしてS3を利用する

EC2でHadoopを使う場合、インスタンスを停止するとデータがなくなる(EBSを使うという手もありますが)関係でデータはS3に置く場合が多いと思います。
その場合、S3からEC2上のHDFSに読み込んでから処理をして最後にS3に書き出すということも考えられますが、やはりS3上のファイルを直接Hadoopのジョブ入出力として使うのが効率が良い。(更に言うと、複数のジョブを実行する時の中間のデータを保存するのにはHDFSを使った方が速い)

そもそもHadoopはファイルシステムの部分を抽象化していて、HDFSもその実装の一つに過ぎません。
AmazonS3 – Hadoop Wikiにもありますが、S3を使うための実装には2種類あるので注意。

試しにS3上のファイルを見たい時は

% hadoop fs -ls s3n://ID:SECRET@BUCKET/

とすれば見れるはず。IDはS3のアクセスキーIDで、SECRETはシークレットキー。
ここで注意が必要なのは、シークレットキーが / を含んでいる場合に正しく見れないということです。。。
詳細はHADOOP-3733にありますが、これはHadoopがパスをURIとしてパースするためで、根が深く、まだ修正されていません。
といっても、このようなアクセス方よりも設定ファイルに書いておくのが一般的でしょう。

s3nの場合は、

<property>
  <name>fs.s3n.awsAccessKeyId</name>
  <value>ID</value>
</property>

<property>
  <name>fs.s3n.awsSecretAccessKey</name>
  <value>SECRET</value>
</property>

s3の場合は。

<property>
  <name>fs.s3.awsAccessKeyId</name>
  <value>ID</value>
</property>

<property>
  <name>fs.s3.awsSecretAccessKey</name>
  <value>SECRET</value>
</property>

こうやって書いておくと、

% hadoop fs -ls s3n://BUCKET/

このようにバケット名だけでアクセスできます。
さらにHDFSを使わずにS3をデフォルトで使う場合は

<property>
  <name>fs.default.name</name>
  <value>s3n://BUCKET</value>
</property>

と書いておくと

% hadoop fs -ls /

これだけでS3のファイルを見に行きます。
ただ、fs.default.nameにHDFSを指定しない場合NameNode、DataNodeが起動せず、HDFSを使えないので、併用する場合はawsAccessKeyIdとawsSecretAccessKeyだけ設定してデフォルトはHDFSに設定する必要があります。

性能が気になるところですが、だいたい読み込みでノードあたり5~10MB/secくらい出ていて、(ちゃんと測ったわけではないのであまり突っ込まないでくださいw)20台くらいまでしか試していませんが、台数を増やした時にスケールしてました。

あと、ブロックベースのファイルシステム(s3://)については、おそらくメタデータをS3上に保存しているんだろうけど、どんな形式で保存しているんだろう??
メタデータへのアクセスが遅かったり、今後のHadoopのバージョンアップで形式を変えたりして互換性がなくなったりとかしないだろうか・・・

Hadoop Hack Nightを振り返って

Hadoop Hack Night、大盛況でしたね。有意義な会になったと思います。
技評さんからまとめも出ると思いますが、トークセッションで自分がした質問と答え、さらにそれに対して自分で思ってた感想を載せておこうと思います。
全てQが私でAがowen氏です。

Q:HDFSの利用に関して、ウェブのクローリングデータだけでなく、顧客情報など絶対になくしてはいけないデータもレプリカは作るにしても、HDFSだけにおいてそれ以外のストレージにバックアップをとったりはしていないのか?
A:以前はバックアップもとっていたが、現在はHDFSだけにしかないデータもある。しかし、データセンターが壊れた時にデータを失っては困るので、データセンター間でのバックアップも検討している。
実際に今まで運用してきた中で、ファイルを失ったこともあるが、その原因の大部分はHadoopのレプリカの数を1にしていた。しかしながらHDFSはとても信頼性が高い。
Q:レプリカを2、3にしていた時はデータを失ったことはないのか?
A:実際にはレプリカ2や3でもデータを失ったことがある。しかし、それよりもユーザーのミスによって失ったデータの方がはるかに多い。

感想:やはりレプリカ3でデータを失ったことがあるということで、残念。実はトークセッション以外でも話す機会があって、そこでの話を含めると、別のデータセンターへっていうのはおそらくデータセンターごとに別のHDFS、別のNameNodeによる別の名前空間のHDFSがあり、そこへのバックアップするという話の様です。どちらにしてもHDFS以外のストレージを使っていないようだった。ユーザーのミスって言ってたけど、これってもしかするとユーザーというかオペレーションのミスの事を差してた可能性もあるような。

Q:Hadoopとgoogleの実装ではMapReduceのモデルが違っていて、googleの実装ではreduceでのkeyの変更ができないが、Hadoopではできる。それは意図があってそのようにしたのか?また、それによってアプリケーションが柔軟に作れるようになったりするのか?
A:それは意図したもので、reduceでkeyの変更が可能になっている。完全に違うkeyに変えることもできるが、それだと変更前のkeyでソートされているので、変更するとそのソートが崩れてしまう。アプリケーション側で崩れないように責任を持つことが重要だ。

感想:やっぱりそうなのかという印象。実際には型を変更したり、IDなどから名前に変更するなど一体一対応の変換をするのかなと。それだとmap側で変えればいいって話もありそう。IDでソートして出力は名前に変更するとかはあり得るのかな。

Q:HDDの変わりにSSDを利用することについて何か考えがあるのか?
A:SSDには興味があるが、コストの点でまだ利用できていない。ネームノードの情報をメタデータに保存する用途では良い。なぜかというとそういったデータのアクセスには頻繁にシークが発生する。もう一つの使い方としてはHDDのキャッシュとして使用することも考えられます。

感想:なるほど、ネームノードに使用か。全部置き換えれるのはまだまだ先の話だろうし。

Q:Hadoop1.0になるための条件は?また、それは何時頃か?
A:長い間1.0がどのようなものになるべきか議論があった。我々が欲しかった多くの機能に関しては速いスピードで開発が進んでいる。そのため、0.22が1.0の一歩手前、もしくは0.22自身が1.0に成り得ると考えている。すでに0.20がプロダクト環境で使われており、これが1.0と言えるのかもしれない。

感想:0.20とかだから1.0までまだまだ遠いと考えてるのかと思ったけど、そういうわけではなかった。0.20に関してはまだ足りないと思いますが、0.22にセキュリティも加わって、十分にテストされれば確かに1.0と言えるのかなと。

Q:他の会社との棲み分けについて、yahooではpigがあって、facebookではHiveがある。スケジューラーについてはyahooのcapacity schedulerとfacebookのFair Schedulerがありますが。
A:始めにスケジューラに関しては、すでにプラガブルになっていて異なった実装が可能になっている。そして、yahooとfacebookでは別々に開発しているが、良い点などを話しあって、良い点については両方の実装に組み入れている。ユーザーには混乱があるかもしれないが、自分にあった方を選択することができる。PigとHiveに関しても似たことが言えて、お互いの良い点を合わせて一つにして行くようなことも考えている。

感想:スケジューラについては環境によって良いスケジューラーは変わってくるのかなと。ただHiveとPigに関してはYahoo Japanの吉田さんに、「Yahoo JapanではHiveとPig両方使われているという話がありましたが、その使い分けはどうなっていますか?」と聞いたところ、エンジニアの好みの問題だとおっしゃってました。根本的な違いがあるわけではないのかなと。

全体を通しての感想
今回のイベントではHadoopの開発者にしか聞けないような質問を多くできて有意義なイベントでした。
またこのようなイベントを通して知識の共有などしていけたらいいと思います。