Collecting and analyzing log data via Flume and Hive
August 15th, 2010 by aphadke
Exponential growth, one of the few problems every organization loves, is usually alleviated by scaling out using clustered computing (Hadoop), CDN, EC2 and myriad of other solutions. While a lot of cycles are spent in making sure each scaled out machine contains requisite libraries, latest code deployments, matching configs, and the whole nine yards, very little time is spent in collecting the log files + data from these machines and analyzing them.
Few reasons why log collection is usually at tail of priorities:
- Nagios alerts usually do a good job of monitoring for critical situations. The scripts make sure the app’s always online by grep’ing for “ERROR, WARN” and other magic terms in logs, but what about errors that occur often but don’t bring down the app completely?
- Web-analytics give us all information we need. -Yes on a macroscopic view, but it’s really hard for an analytical software to provide fine granularity, such as how many hits did we receive pertaining to a given country for a given page for a given time-period?
- Ganglia graphs help us find out what machine/s are under heavy load – Absolutely, but trying to figure what triggered the load in first place is not always easy.
Chukwa, Scribe and Flume are headed in the right direction, but the final piece of puzzle of analyzing the data still remained unsolved, until few weeks back as we, at Mozilla, started integrating Flume with Hive.
Merge everything – Image courtesy Wikipedia.org
Flume is an open-source distributed log collection software that can be installed on multiple machines for monitoring log files with data slurped to a single HDFS location. The out of box solution only solved part of our problem of collecting data, but we needed a way to query it and thereby make intelligent decisions based on the results.
The teams first foray was to add a gzip patch that compressed the log data before transferring the files to HDFS. Once the data was transferred, we needed a way to query it. Our current production Hadoop cluster consists of modest 20 machines, has excellent monitoring in terms of nagios and ganglia, but the question of what might we be missing always lingered on our heads. A list of basic things needed to be taken care of while integrating Flume with Hive was created:
- How do we handle fail-overs when the HIVE metastore service, a possible single point of failure for HIVE goes down?
- How to query data by the day and hour.
- Can separate tables be used for different log locations?
- Can we split a single log line in its respective columns?
1. Handling fail-overs: We are currently running HIVE metastore in remote mode using MySQL. More information on metastore setup can be found at http://wiki.apache.org/hadoop/Hive/AdminManual/MetastoreAdmin. Flume node-agents reliable handle fail-overs by maintaining checksums at regular intervals and making sure data isn’t inserted twice. The same principle was extended by adding marker points. i.e. a file containing HQL query and the location of data will be written to HDFS after every successful FLUME roll-over. Flume agents would look at a common location for pending HIVE writes before writing any log data to HDFS, attempt to move the data inside HIVE and only delete the marker file if successful. In situations where two or more flume agents attempt to move files to HIVE partition, one of them will encounter an innocuous HDFS file not found error and proceed as usual.
2. Appending to sub-partitions: Flume supports rollover where data is written to disk every ‘x’ millis. This is particularly useful as data is available inside HDFS at regular intervals and can be queried by the hour or minute granularity. While whole data can be written to a single partition, partitioning data inside HIVE is a huge performance benefit as it only siphons through a specific range rather than whole data set. This was achieved by having two partitions for a table – by date and hour. An equivalent HIVE query looks something like:
LOAD DATA INPATH ‘” + dstPath + “‘ INTO TABLE ” + hiveTableName + ” PARTITION (ds=’” + dateFormatDay.format(cal.getTime()) +
“‘, ts=’” + dateFormatHourMinute.format(cal.getTime()));
3. Using separate tables for different log locations: We wanted to use separate tables for Hadoop and HBase log locations. Our initial approach was to add a config setting in flume-site.xml, but half way down that road we realized that config is wrong place, as it needs to exist on each node-agent and mapping different folders to tables will be a logistical nightmare.
A new sink named hiveCollectorSink(hdfs_path, prefix, table_name) was added to the existing family (http://archive.cloudera.com/cdh/3/flume/UserGuide.html#_output_bucketing). This allowed us to add hive tables on the fly for each log folder location, thereby giving a separate placeholder for Hadoop and Hbase logs.
4. Splitting a single log line in respective columns (a.k.a. regex): Log4J is a standard log file convention used by quite a few applications including Hadoop and HBase. A sample line looks something like this:
2010-08-15 12:36:59,850 INFO org.apache.hadoop.hdfs.server.datanode.DataBlockScanner: Verification succeeded for blk_-1857716372571578738_336272
Given the above structure, we decided to split the line in 5 columns:
date, time, message_type, class_name and message; the table definition given below –
CREATE TABLE cluster_logs (
PARTITIONED BY (ds STRING, ts STRING, hn STRING)
ROW FORMAT SERDE ‘org.apache.hadoop.hive.contrib.serde2.RegexSerDe’
WITH SERDEPROPERTIES (
STORED AS TEXTFILE;
NOTE: The “hn” (hostname) partition was added so we could query the data based on individual hostnames, enabling us to know what hostname has biggest chunk of ERROR, WARN messages.
The above framework has allowed us to reliably collect logs from our entire cluster to a single location and then query the data from a SQLish interface.
- Flume + Hive patch is still a work in progress and will be committed to the trunk in a couple of weeks.
- Socorro (Mozilla’s crash reporting system) 1.9 and above will be using processors in a distributed mode and we plan to insert the processor’s log data inside HIVE thereby helping us better understand throughput, avg. time to process each crash-data and other metrics. Watch this space for more related posts.
The developers of Flume + Hive usually hang on IRC (irc.freenode.net) in the following channels: #flume, #hive, #hbase
Feel free to ask questions/thoughts/suggestions and I will reply to them below.
-Anurag Phadke (email: first letter of my firstname followed by last name at – the – rate mozilla dot com)
4 Responses to “Collecting and analyzing log data via Flume and Hive”
on 15 Aug 2010 at 6:18 pm Tweets that mention Blog of Data » Blog Archive » Collecting and analyzing log data via Flume and Hive — Topsy.com
[…] This post was mentioned on Twitter by Planet Mozilla, Planet Repeater. Planet Repeater said: Blog of Data: Collecting and analyzing log data via Flume and Hivehttp://dlvr.it/3qzYR […]
on 20 Dec 2010 at 11:16 am dave sinclair
Has the Flume Hive Sink patch been committed?
on 21 Dec 2010 at 10:13 am anurag
The approach mentioned in the blog results in a race condition thereby causing a java.io.IOException: Filesystem closed
Reiterating the flow of events from the blog post: write a file to disk, close it and then move it to respective HIVE partition. The .close() method, void in nature, is immediately followed by a HQL move query. Part of HDFS file operations aren’t atomic in nature, refer https://issues.apache.org/jira/browse/HDFS-925.
A workaround involves not moving the file and creating a new HIVE partition that points to the respective folder. We are testing the patch in our stage environment and will release it as soon as its stable.
on 21 Dec 2010 at 2:42 pm dave sinclair
thanks for the info anurag. I have written the AMQP plugin (https://github.com/stampy88/flume-amqp-plugin) and am anxiously waiting this one!