DynamoDB + S3 + EMRでコホート分析(cohort analysis)をする(3)
前回からかなり間が空いてしまったけど、今回で完結予定。
前回はHiveの話を中心に、S3に置いたファイルをHiveでどう扱うかなどについて書いた。また、第1回では全体の流れを書いたので、どんなことをやるかは詳しくはそちらを参照。
今回は、DynamoDBに書き込んでいるデータを定期的にS3にエクスポートしたり、MySQLからエクスポートしたデータに対して、EMR上のHiveからクエリーを実行して結果を取得してみる。
DynamoDBからS3にエクスポート
DynamoDBは、事前に設定した読み書きのスループット以上のアクセスは出来ないし、同じデータに対して違うクエリーを複数実行する場合とかに無駄なので、過去データを定期的にS3にエクスポートする事にした。
テーブル定義
まずはDynamoDBのテーブルをHiveに認識させるために、以下のようなテーブルを作成する。
CREATE EXTERNAL TABLE dynamo_log_users ( user_id string, action string, device string, time string, createtime bigint ) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ( "dynamodb.table.name" = "log_users", "dynamodb.column.mapping" = "action:action,time:time,user_id:user_id,createtime:createtime,device:device");
次に、上のテーブル(実体はDynamoDB)からエクスポートするデータを格納するテーブル(実体はS3上のファイル)を作成。下の例では、createtimeフィールドの値によって、yearとmonthでパーティション分割するような設定にしているが、データ量等、その他の制約に応じて、1日単位でのパーティション作成でも良いかも。
CREATE EXTERNAL TABLE log_users ( user_id string, action string, device string, time string, createtime bigint ) PARTITIONED BY (year string, month string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://bucket/hive/log_users/';
Dynamic Partitions
Hive上にテーブルが出来ちゃえば、後は通常通りINSERT OVERWRITE TABLEでデータをコピーするだけ、なんだけど、その前に以下のような設定をしておく必要がある。詳しくはWikiのDynamic Partitionsの項を参照。
SET hive.exec.dynamic.partition=true; SET hive.exec.dynamic.partition.mode=nonstrict;
エクスポート
その後、通常通りINSERT OVERWRITE TABLEでDynamoDBのテーブル(dynamo_log_users)からS3のテーブル(log_users)にデータをコピーするため、以下のようなクエリーを実行。
INSERT OVERWRITE TABLE log_users PARTITION(year, month) SELECT *, from_unixtime(createtime, 'yyyy') year, from_unixtime(createtime, 'MM') month FROM dynamo_log_users WHERE from_unixtime(createtime, 'yyyyMM') >= '201311' AND from_unixtime(createtime, 'yyyyMM') < '201312' DISTRIBUTE BY year, month;
MySQLのテーブルをエクスポート
MySQLに格納されているユーザー情報も定期的にS3にエクスポートする。CSVかTSVでエクスポートするのが良いと思われる。ちょっと調べた感じ、一番楽なのは以下の方法でTSVにする方法。
echo "SELECT * FROM users WHERE ..." | mysql -h host -u user -ppass dbname --skip-column-names > file
あとは出来たファイルをs3cmd等でS3にアップロードする(詳細は省略)。
結果用のテーブル作成
以下のように結果用テーブルを作成。
CREATE EXTERNAL TABLE results_weekly (cohort_start_date string, value float) PARTITIONED BY (result_date string) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://bucket/hive/results_weekly/';
カラムは以下の3つ。
- コホート名(日付):例→2013/11/4週
- データ集計対象:例→2013/12/9週
- データ:例→40.5
集計クエリー実行
ここまででようやく準備が整ったので、揃ったデータに対してHiveからクエリーを実行する。
例えばアクティブユーザー数を測りたい場合、該当期間中のlog_usersにuser_idが含まれればアクティブとすると、以下のようなクエリーとなる。
INSERT OVERWRITE TABLE results_weekly PARTITION(result_date) SELECT ucm.cohort, COUNT(DISTINCT(l.user_id)), dcm.cohort_week FROM user_cohort_map ucm JOIN log_users l ON ucm.user_id = l.user_id JOIN date_cohort_map dcm ON dcm.date = from_unixtime(l.createtime, 'yyyy-MM-dd') WHERE dcm.cohort_week = '2013-12-09' -- 集計日 GROUP BY ucm.cohort, dcm.cohort_week ;
実際の業務に合わせて、テーブル構造やクエリーとかを変えてみて下さい。
まとめ
分かってしまえば、AWSでコホート分析をするのはそんなに大変ではない。Elastic MapReduce, S3, DynamoDBなどの使えるサービスが最初から揃っているというのが大きい。
と言ってもデータが増えてくると色々苦労がと思うので、テーブル設計などはしっかり考えて実行する事をオススメ。