Showing posts with label hive. Show all posts
Showing posts with label hive. Show all posts

Friday, November 30, 2012

Hive Summarization script for billing needs

This contains links for the posts related to the Hive Summarization script for billing needs 
Hive & Me Part 1
Hive & Me Part 2

Thursday, November 15, 2012

Hive & Me Part 2

Continued from Hive & Me Part 1.....

After creating the required MySQL and Hive tables I moved in to the logic part of the script. I have to get the sum of all the bandwidth-In and bandwidth-Out entries separately. Then sum (bandwidth-In)-sum (bandwidth-Out) will give the current value and sum (bandwidth-Out) will give the history value. But doing it hourly is extremely costly. But if we can sum the entries from the last hour and calculate the current and history values based on the early current and history values, it will be better. I got to know we are keeping the time of the last run of the script in a MySQL table, and we write it to the hive configuration file using a Java class. I used that value to get the sum of the entries in the last hour. But it is not possible to add this last hour summarization to the previous current, and history values in the same query. So I add the summarization of the last hour with new id and sum the final and last hour rows in the table.

INSERT INTO TABLE REGISTRY_USAGE_HOURLY_ANALYTICS 
SELECT concat(TID, "LastHour"), TID, HISTORY_USAGE, CURRUNT-HISTORY_USAGE FROM
(SELECT TENANT_ID AS TID,
        sum(PAYLOAD_VALUE) AS HISTORY_USAGE
FROM USAGE_STATS_TABLE
WHERE USAGE_STATS_TABLE.PAYLOAD_TYPE = 'ContentBandwidth-Out' AND Timestmp > ${hiveconf:last_hourly_ts}
GROUP BY SERVER_NAME, PAYLOAD_TYPE, TENANT_ID) table1
JOIN
(SELECT TENANT_ID AS TID2,
        sum(PAYLOAD_VALUE) AS CURRUNT
FROM USAGE_STATS_TABLE
WHERE USAGE_STATS_TABLE.PAYLOAD_TYPE = 'ContentBandwidth-In' AND Timestmp > ${hiveconf:last_hourly_ts}
GROUP BY SERVER_NAME, PAYLOAD_TYPE, TENANT_ID) table2
ON(table2.TID2 = table1.TID);


 Above script get the summery of the usage in the last hour and inset it in to the table. Below query add the last our summary to final(in the last hour) and create the final value for the current hour.

INSERT INTO TABLE REGISTRY_USAGE_HOURLY_ANALYTICS 
SELECT concat(TENANT_ID, "Final"),
        TENANT_ID,
        sum(HISTORY_USAGE) as HISTORY_USAGE,
        sum(CURRENT_USAGE) as CURRENT_USAGE
FROM REGISTRY_USAGE_HOURLY_ANALYTICS
GROUP BY TENANT_ID

This query results in a MySQL table where each tenant has two rows as 'final' and 'last hour'. Final row gives the current (size of all the data that user currently have in his directory) and history (size of all the data that user has deleted up to now). This information should be available to for each tenant correct to the last hour.

Hive & Me Part 1

Started with the new project to summarize registry bandwidth data (refers to the space used in the registry). As you might know we can have BAM to summarize data in Cassandra key spaces using hive scripts. It was not easy to work with lack of examples under hive.

What I have to do
There was a table in Cassandra that contains registry usage data. When a user adds or remove something from his registry a entry is marked as “registryBandwidth-In” (when we adds something) or “registryBandwidth-Out”(when he deletes something). I have to summarize those recodes in such a way that we have access to the current (size of all the data that user currently have in his directory) and history (size of all the data that user has deleted up to now). This information should be available to for each tenant correct to the last hour.

Implementation Plan
If I can write the current and history values in to a MySQL table, where each tenant will have a separate row, it is good enough. First I thought of having a table in hive with current and history values and a MySQL table mapped to it.

Below code uses the JDBC Storage Handler for Hive and more information on how to use it can be found in Kasun's blog: http://kasunweranga.blogspot.com/2012/06/jdbc-storage-handler-for-hive.html

CREATE EXTERNAL TABLE IF NOT EXISTS REGISTRY_USAGE_HOURLY_ANALYTICS ( 
        ID STRING,
        TENANT_ID STRING,      
        HISTORY_USAGE BIGINT,
        CURRENT_USAGE BIGINT)
        STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' TBLPROPERTIES (
        "mapred.jdbc.driver.class" = "com.mysql.jdbc.Driver",
        "mapred.jdbc.url" = "jdbc:mysql://localhost:3306/WSO2USAGE_DB",
        "mapred.jdbc.username" = "root",
        "mapred.jdbc.password" = "root",
        "hive.jdbc.update.on.duplicate" = "true",
        "hive.jdbc.primary.key.fields" = "ID",
        "hive.jdbc.table.create.query" = "CREATE TABLE REGISTRY_USAGE_HOURLY_ANALYTICS (
        ID VARCHAR(50),
        TENANT_ID VARCHAR(50),
        HISTORY_USAGE BIGINT,
        CURRENT_USAGE  BIGINT)"
);

This will create a  2 tables, One is a Hive table and other is a mySQL table. Both will have the name "REGISTRY_USAGE_HOURLY_ANALYTICS" What ever we write the to the hive table will be written to the MySQL table. In the next code block I create a mapping to the MySQL table. Using this temporary hive table I can query the MySQL table.

CREATE EXTERNAL TABLE IF NOT EXISTS REGISTRY_USAGE_HOURLY_ANALYTICS_TEMP (
        ID STRING,
        TENANT_ID STRING,      
        HISTORY_USAGE BIGINT,
        CURRENT_USAGE BIGINT)
        STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' TBLPROPERTIES (
        "mapred.jdbc.driver.class" = "com.mysql.jdbc.Driver",
        "mapred.jdbc.url" = "jdbc:mysql://localhost:3306/WSO2USAGE_DB",
        "mapred.jdbc.username" = "root",
        "mapred.jdbc.password" = "root",
        "hive.jdbc.primary.key.fields" = "TENANT_ID",
        "mapred.jdbc.input.table.name" = "REGISTRY_USAGE_HOURLY_ANALYTICS"
);

Continued to the part 2....

Thursday, September 20, 2012

Using Hive Scripts to Analyze and Summarize BAM data


As I have completed up to publishing usage data, now I need to analyze and summarize those data. This can be simply done by a hive script and scheduling it within BAM. In the main menu of BAM you will find a manage menu. In manage menu, there is a menu item analyze. Under analyze menu item you get two more sub menus, one to list existing scripts and one to add new scripts.

Now go to 'add' sub menu there(Main>Manage>Analytics>Add). Here you get the chance to write your script and schedule it.

Bellow is a simple script written by Shariq Muhammed, SE @ WSO2. I used this script to summarize data in one of my tables created while pumping data in to BAM. I have removed some parts init as It won't be relevant to you.


CREATE EXTERNAL TABLE IF NOT EXISTS UsageStatsTable (id STRING,
        payload_ServerName STRING,
        payload_TenantID STRING,
        payload_Data STRING,
        payload_Value BIGINT,
        timestamp BIGINT) 
        STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( 
        //sort Properties
);

CREATE EXTERNAL TABLE IF NOT EXISTS UsageStatsHourFact (id String, 
        hour_fact STRING,
        payload_ServerName STRING, 
        payload_TenantID STRING,        
        payload_Data STRING,
        payload_Value BIGINT) 
        STORED BY 'org.apache.hadoop.hive.cassandra.CassandraStorageHandler' WITH SERDEPROPERTIES ( 
        //sort properties
);

select #some columns from my table#

insert into table #some other table#
like above you can select and group the pumped data and insert summery data into a new table. If you don't know hive syntax, it is similar to SQL and you can have a great tutorial @ the following link, https://cwiki.apache.org/Hive/tutorial.html.