Showing posts with label BAM2. Show all posts
Showing posts with label BAM2. Show all posts

Thursday, November 15, 2012

Hive & Me Part 2

Continued from Hive & Me Part 1.....

After creating the required MySQL and Hive tables I moved in to the logic part of the script. I have to get the sum of all the bandwidth-In and bandwidth-Out entries separately. Then sum (bandwidth-In)-sum (bandwidth-Out) will give the current value and sum (bandwidth-Out) will give the history value. But doing it hourly is extremely costly. But if we can sum the entries from the last hour and calculate the current and history values based on the early current and history values, it will be better. I got to know we are keeping the time of the last run of the script in a MySQL table, and we write it to the hive configuration file using a Java class. I used that value to get the sum of the entries in the last hour. But it is not possible to add this last hour summarization to the previous current, and history values in the same query. So I add the summarization of the last hour with new id and sum the final and last hour rows in the table.

INSERT INTO TABLE REGISTRY_USAGE_HOURLY_ANALYTICS 
SELECT concat(TID, "LastHour"), TID, HISTORY_USAGE, CURRUNT-HISTORY_USAGE FROM
(SELECT TENANT_ID AS TID,
        sum(PAYLOAD_VALUE) AS HISTORY_USAGE
FROM USAGE_STATS_TABLE
WHERE USAGE_STATS_TABLE.PAYLOAD_TYPE = 'ContentBandwidth-Out' AND Timestmp > ${hiveconf:last_hourly_ts}
GROUP BY SERVER_NAME, PAYLOAD_TYPE, TENANT_ID) table1
JOIN
(SELECT TENANT_ID AS TID2,
        sum(PAYLOAD_VALUE) AS CURRUNT
FROM USAGE_STATS_TABLE
WHERE USAGE_STATS_TABLE.PAYLOAD_TYPE = 'ContentBandwidth-In' AND Timestmp > ${hiveconf:last_hourly_ts}
GROUP BY SERVER_NAME, PAYLOAD_TYPE, TENANT_ID) table2
ON(table2.TID2 = table1.TID);


 Above script get the summery of the usage in the last hour and inset it in to the table. Below query add the last our summary to final(in the last hour) and create the final value for the current hour.

INSERT INTO TABLE REGISTRY_USAGE_HOURLY_ANALYTICS 
SELECT concat(TENANT_ID, "Final"),
        TENANT_ID,
        sum(HISTORY_USAGE) as HISTORY_USAGE,
        sum(CURRENT_USAGE) as CURRENT_USAGE
FROM REGISTRY_USAGE_HOURLY_ANALYTICS
GROUP BY TENANT_ID

This query results in a MySQL table where each tenant has two rows as 'final' and 'last hour'. Final row gives the current (size of all the data that user currently have in his directory) and history (size of all the data that user has deleted up to now). This information should be available to for each tenant correct to the last hour.

Hive & Me Part 1

Started with the new project to summarize registry bandwidth data (refers to the space used in the registry). As you might know we can have BAM to summarize data in Cassandra key spaces using hive scripts. It was not easy to work with lack of examples under hive.

What I have to do
There was a table in Cassandra that contains registry usage data. When a user adds or remove something from his registry a entry is marked as “registryBandwidth-In” (when we adds something) or “registryBandwidth-Out”(when he deletes something). I have to summarize those recodes in such a way that we have access to the current (size of all the data that user currently have in his directory) and history (size of all the data that user has deleted up to now). This information should be available to for each tenant correct to the last hour.

Implementation Plan
If I can write the current and history values in to a MySQL table, where each tenant will have a separate row, it is good enough. First I thought of having a table in hive with current and history values and a MySQL table mapped to it.

Below code uses the JDBC Storage Handler for Hive and more information on how to use it can be found in Kasun's blog: http://kasunweranga.blogspot.com/2012/06/jdbc-storage-handler-for-hive.html

CREATE EXTERNAL TABLE IF NOT EXISTS REGISTRY_USAGE_HOURLY_ANALYTICS ( 
        ID STRING,
        TENANT_ID STRING,      
        HISTORY_USAGE BIGINT,
        CURRENT_USAGE BIGINT)
        STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' TBLPROPERTIES (
        "mapred.jdbc.driver.class" = "com.mysql.jdbc.Driver",
        "mapred.jdbc.url" = "jdbc:mysql://localhost:3306/WSO2USAGE_DB",
        "mapred.jdbc.username" = "root",
        "mapred.jdbc.password" = "root",
        "hive.jdbc.update.on.duplicate" = "true",
        "hive.jdbc.primary.key.fields" = "ID",
        "hive.jdbc.table.create.query" = "CREATE TABLE REGISTRY_USAGE_HOURLY_ANALYTICS (
        ID VARCHAR(50),
        TENANT_ID VARCHAR(50),
        HISTORY_USAGE BIGINT,
        CURRENT_USAGE  BIGINT)"
);

This will create a  2 tables, One is a Hive table and other is a mySQL table. Both will have the name "REGISTRY_USAGE_HOURLY_ANALYTICS" What ever we write the to the hive table will be written to the MySQL table. In the next code block I create a mapping to the MySQL table. Using this temporary hive table I can query the MySQL table.

CREATE EXTERNAL TABLE IF NOT EXISTS REGISTRY_USAGE_HOURLY_ANALYTICS_TEMP (
        ID STRING,
        TENANT_ID STRING,      
        HISTORY_USAGE BIGINT,
        CURRENT_USAGE BIGINT)
        STORED BY 'org.wso2.carbon.hadoop.hive.jdbc.storage.JDBCStorageHandler' TBLPROPERTIES (
        "mapred.jdbc.driver.class" = "com.mysql.jdbc.Driver",
        "mapred.jdbc.url" = "jdbc:mysql://localhost:3306/WSO2USAGE_DB",
        "mapred.jdbc.username" = "root",
        "mapred.jdbc.password" = "root",
        "hive.jdbc.primary.key.fields" = "TENANT_ID",
        "mapred.jdbc.input.table.name" = "REGISTRY_USAGE_HOURLY_ANALYTICS"
);

Continued to the part 2....

Friday, November 2, 2012

End of the quite October to hopeful November

It was a really quite October, when you look in to the blog, nothing is written. Actually it was a busy October, that is why I didn't had time to write articles to the blog. I worked with BAM and hive summarization scripts for BAM. So I am thinking about writing on "hive and summarization scripts for BAM". Next project I worked in was to improve the basic filter functionality of GREG basic filter and add filter by LC(life-cycle) to it.

What you should expect in the up coming month
About Hive
About BAM summarization scripts
About Bandwidth usage data summarization
About Greg Basic Filter improvement
About Greg LC filtering feature.

Looking for lot of articles in this November! Hopefully :)

Monday, July 9, 2012

Publishing to BAM


I created this just to publish data I collect to BAM, so this do not adhere to good programming practices.

package org.wso2.carbon.usage.agent.util;

import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.UnknownHostException;

import javax.security.sasl.AuthenticationException;

import org.wso2.carbon.eventbridge.agent.thrift.Agent;
import org.wso2.carbon.eventbridge.agent.thrift.DataPublisher;
import org.wso2.carbon.eventbridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.eventbridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.eventbridge.commons.Event;
import org.wso2.carbon.eventbridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.eventbridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.eventbridge.commons.exception.NoStreamDefinitionExistException;
import org.wso2.carbon.eventbridge.commons.exception.StreamDefinitionException;
import org.wso2.carbon.eventbridge.commons.exception.TransportException;

public class PublishUtil2 {
    public static final String STREAM_NAME1 = "org.wso2.db6.kpiii";
    public static final String VERSION1 = "1.0.6";
    private static String streamId1;
    private static DataPublisher dataPublisher = null;
    
    

    public static void publish(long exceededBytes, long databasesize, String tenentdomain) throws AgentException, MalformedStreamDefinitionException,
    StreamDefinitionException, DifferentStreamDefinitionAlreadyDefinedException, MalformedURLException,
    AuthenticationException, NoStreamDefinitionExistException,
    org.wso2.carbon.eventbridge.commons.exception.AuthenticationException, TransportException, SocketException, UnknownHostException{
        
     System.out.println("Starting BAM KPI Agent");
        AgentConfiguration agentConfiguration = new AgentConfiguration();
        String currentDir = System.getProperty("user.dir");
        System.setProperty("javax.net.ssl.trustStore", currentDir + "/repository/resources/security/client-truststore.jks");
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
        Agent agent = new Agent(agentConfiguration);
        
     dataPublisher = null;
        try {
        dataPublisher = new DataPublisher("tcp://10.100.3.80:7613", "admin", "admin", agent);
        } catch (Throwable e){
         e.printStackTrace();
        }

        streamId1 = null;


        try {
            streamId1 = dataPublisher.findEventStream(STREAM_NAME1, VERSION1);
            System.out.println("Stream already defined");

        } catch (NoStreamDefinitionExistException e) {
            streamId1 = dataPublisher.defineEventStream("{" +
                    "  'name':'" + STREAM_NAME1 + "'," +
                    "  'version':'" + VERSION1 + "'," +
                    "  'nickName': 'DSSUsage'," +
                    "  'description': 'Exceeded DB Use'," +
                    "  'metaData':[" +
                    "          {'name':'clientType','type':'STRING'}" +
                    "  ]," +
                    "  'payloadData':[" +
                    "          {'name':'exceededBytes','type':'LONG'}," +
                    "          {'name':'databasesize','type':'LONG'}," +
                    "          {'name':'tenentdomain','type':'STRING'}" +
                    "  ]" +
                    "}");
        }
        //Publish event for a valid stream
        if (!streamId1.isEmpty()) {
            System.out.println("Stream ID: " + streamId1);
            publishEvents(tenentdomain, exceededBytes, databasesize);

//            for (int i = 0; i < 1; i++) {
//                publishEvents("malinga");
//                System.out.println("Events published : " + (i + 1) * 2);
//            }
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//            }

            dataPublisher.stop();
        }
    }
    
    public static void publishEvents(String name, long exceededBytes, long databasesize) throws AgentException {
     System.out.println(name);
     publishEvents(dataPublisher, streamId1, name, exceededBytes, databasesize);

    }

    
    public static void publishEvents(DataPublisher dataPublisher, String streamId, String name, long exceededBytes, long databasesize) throws AgentException {
        Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
                new Object[]{exceededBytes, databasesize, 3600.0, name});
        dataPublisher.publish(eventOne);

    }

}

Problems I had to face.

I tried to change the streamId1, but it was not possible. It gave a error in BAM side. Then I got to know that schema is saved under the STREAM_NAME1, and if I want to change it, I have to do that with a change in STREAM_NAME1 too.

There was no way to check the the published data as it would show some rubbish, in the data viewer in BAM. I got a nice client for one of my mentors that can get the data from Cassandra cluster. This is written by 'Shariq Muhammed', he is a software engineer at WSO2. This is also written just to read the data and he haven't thought much about adhering to good programming practices. You can have it from below link


First I couldn't sent long, It took me a while to figure that out. It was because the number I send was taken as a int. I had to add the 'L' to end of it to get it working.

Thursday, June 7, 2012

Followed the BAM samples.


Continued what I did in the last day. Downloaded the BAM2 alpha 2 binary distribution and documents form the BAM home page (http://wso2.com/products/business-activity-monitor). There I followed the example project given in the samples (KPI Sample Guide: wso2bam-2.0.0-ALPHA2-docs/kpi-sample.html). It was so interesting to see that raw data is been visualized in various types of charts. There you can find a agent that will pump events to the BAM server.

Problems that I encounter:

How to run that agent(jar) that we get after building with ant: We don't have to run it, ant script will automatically run it after building

Some steps are impossible to complete: I had the alpha1 version with me and that was not syncing with the tutorial. I had to download the alpha 2.

Wednesday, June 6, 2012

Collecting and summarizing the captured data


As I have came to some kind of a understanding of how to capture data and what data to capture, I was starting to think about, how we can manage and summarize those data. For that reason I started searching about “WSO2 Business Activity Monitor”. This tool has been used in summarizing the bandwidth data currently collected in the AS. Find more information about BAM 2 form following link (http://wso2.com/products/business-activity-monitor/)

If you are interested in BAM, you can watch a webinar(http://www.youtube.com/watch?

[Image: from wso2 bam2 docs]
feature=player_embedded&v=toIeQNG_Y8E) or you tube or you can follow the documentation found in the following link(http://wso2.org/project/bam/1.3.2/docs/samples.html).

[Image: from wso2 bam2 docs]

Business-activity-monitoring

This is a 3 step process,
  1. Capturing
  2. Analysis
  3. Presenting

When working with BAM only you have to worry about capturing and sending those data to BAM server. When going through BAM you may find these two words Hadoop, Cassandra. In BAM data storage is based on Cassandra where as analyzer framework is based on Hadoop. If you don't know what they are read the below definitions.

[Image: from wso2 bam2 docs]

Hadoop: Hadoop is open source software that enables distributed parallel processing of huge amounts of data across inexpensive, commodity servers. (http://www.cloudera.com/what-is-hadoop/)

Cassandra: Apache Cassandra is an open source distributed database management system. It is an Apache Software Foundation top-level project designed to handle very large amounts of data spread out across many commodity servers while providing a highly available service with no single point of failure. (http://en.wikipedia.org/wiki/Apache_Cassandra)