DynamoDB + S3 + EMRでコホート分析(cohort analysis)をする(3)

前回からかなり間が空いてしまったけど、今回で完結予定。

前回はHiveの話を中心に、S3に置いたファイルをHiveでどう扱うかなどについて書いた。また、第1回では全体の流れを書いたので、どんなことをやるかは詳しくはそちらを参照。

今回は、DynamoDBに書き込んでいるデータを定期的にS3にエクスポートしたり、MySQLからエクスポートしたデータに対して、EMR上のHiveからクエリーを実行して結果を取得してみる。

DynamoDBからS3にエクスポート

DynamoDBは、事前に設定した読み書きのスループット以上のアクセスは出来ないし、同じデータに対して違うクエリーを複数実行する場合とかに無駄なので、過去データを定期的にS3にエクスポートする事にした。

テーブル定義

まずはDynamoDBのテーブルをHiveに認識させるために、以下のようなテーブルを作成する。

CREATE EXTERNAL TABLE dynamo_log_users (
user_id string,
action string,
device string,
time string,
createtime bigint
)
STORED BY
'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' 
TBLPROPERTIES (
"dynamodb.table.name" = "log_users",
"dynamodb.column.mapping" = 
"action:action,time:time,user_id:user_id,createtime:createtime,device:device");

次に、上のテーブル(実体はDynamoDB)からエクスポートするデータを格納するテーブル(実体はS3上のファイル)を作成。下の例では、createtimeフィールドの値によって、yearとmonthでパーティション分割するような設定にしているが、データ量等、その他の制約に応じて、1日単位でのパーティション作成でも良いかも。

CREATE EXTERNAL TABLE log_users (
user_id string,
action string,
device string,
time string,
createtime bigint
)
PARTITIONED BY (year string, month string)
ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\t'
   LINES TERMINATED BY '\n'
LOCATION 's3://bucket/hive/log_users/';

Dynamic Partitions

Hive上にテーブルが出来ちゃえば、後は通常通りINSERT OVERWRITE TABLEでデータをコピーするだけ、なんだけど、その前に以下のような設定をしておく必要がある。詳しくはWikiのDynamic Partitionsの項を参照。

SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

エクスポート

その後、通常通りINSERT OVERWRITE TABLEでDynamoDBのテーブル(dynamo_log_users)からS3のテーブル(log_users)にデータをコピーするため、以下のようなクエリーを実行。

INSERT OVERWRITE TABLE log_users
PARTITION(year, month)
SELECT *,
       from_unixtime(createtime, 'yyyy') year,
       from_unixtime(createtime, 'MM') month
 FROM dynamo_log_users
WHERE from_unixtime(createtime, 'yyyyMM') >= '201311'
  AND from_unixtime(createtime, 'yyyyMM') <  '201312'
DISTRIBUTE BY year, month;

MySQLのテーブルをエクスポート

MySQLに格納されているユーザー情報も定期的にS3にエクスポートする。CSVかTSVでエクスポートするのが良いと思われる。ちょっと調べた感じ、一番楽なのは以下の方法でTSVにする方法。

echo "SELECT * FROM users WHERE ..." | 
mysql -h host -u user -ppass dbname --skip-column-names > file

あとは出来たファイルをs3cmd等でS3にアップロードする(詳細は省略)。

結果用のテーブル作成

以下のように結果用テーブルを作成。

CREATE EXTERNAL TABLE results_weekly
(cohort_start_date string, value float)
PARTITIONED BY (result_date string)
ROW FORMAT DELIMITED
   FIELDS TERMINATED BY '\t'
   LINES TERMINATED BY '\n'
LOCATION 's3://bucket/hive/results_weekly/';

カラムは以下の3つ。

  • コホート名(日付):例→2013/11/4週
  • データ集計対象:例→2013/12/9週
  • データ:例→40.5

集計クエリー実行

ここまででようやく準備が整ったので、揃ったデータに対してHiveからクエリーを実行する。

例えばアクティブユーザー数を測りたい場合、該当期間中のlog_usersにuser_idが含まれればアクティブとすると、以下のようなクエリーとなる。

INSERT OVERWRITE TABLE results_weekly
PARTITION(result_date)
SELECT ucm.cohort, COUNT(DISTINCT(l.user_id)), dcm.cohort_week
  FROM user_cohort_map ucm
  JOIN log_users l ON ucm.user_id = l.user_id
  JOIN date_cohort_map dcm ON dcm.date = from_unixtime(l.createtime, 'yyyy-MM-dd')
WHERE dcm.cohort_week = '2013-12-09' -- 集計日
GROUP BY ucm.cohort, dcm.cohort_week ;

実際の業務に合わせて、テーブル構造やクエリーとかを変えてみて下さい。

まとめ

分かってしまえば、AWSでコホート分析をするのはそんなに大変ではない。Elastic MapReduce, S3, DynamoDBなどの使えるサービスが最初から揃っているというのが大きい。

と言ってもデータが増えてくると色々苦労がと思うので、テーブル設計などはしっかり考えて実行する事をオススメ。

コメントを残す

メールアドレスが公開されることはありません。 が付いている欄は必須項目です