Monday, July 9, 2012

Publishing to BAM


I created this just to publish data I collect to BAM, so this do not adhere to good programming practices.

package org.wso2.carbon.usage.agent.util;

import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.UnknownHostException;

import javax.security.sasl.AuthenticationException;

import org.wso2.carbon.eventbridge.agent.thrift.Agent;
import org.wso2.carbon.eventbridge.agent.thrift.DataPublisher;
import org.wso2.carbon.eventbridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.eventbridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.eventbridge.commons.Event;
import org.wso2.carbon.eventbridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.eventbridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.eventbridge.commons.exception.NoStreamDefinitionExistException;
import org.wso2.carbon.eventbridge.commons.exception.StreamDefinitionException;
import org.wso2.carbon.eventbridge.commons.exception.TransportException;

public class PublishUtil2 {
    public static final String STREAM_NAME1 = "org.wso2.db6.kpiii";
    public static final String VERSION1 = "1.0.6";
    private static String streamId1;
    private static DataPublisher dataPublisher = null;
    
    

    public static void publish(long exceededBytes, long databasesize, String tenentdomain) throws AgentException, MalformedStreamDefinitionException,
    StreamDefinitionException, DifferentStreamDefinitionAlreadyDefinedException, MalformedURLException,
    AuthenticationException, NoStreamDefinitionExistException,
    org.wso2.carbon.eventbridge.commons.exception.AuthenticationException, TransportException, SocketException, UnknownHostException{
        
     System.out.println("Starting BAM KPI Agent");
        AgentConfiguration agentConfiguration = new AgentConfiguration();
        String currentDir = System.getProperty("user.dir");
        System.setProperty("javax.net.ssl.trustStore", currentDir + "/repository/resources/security/client-truststore.jks");
        System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
        Agent agent = new Agent(agentConfiguration);
        
     dataPublisher = null;
        try {
        dataPublisher = new DataPublisher("tcp://10.100.3.80:7613", "admin", "admin", agent);
        } catch (Throwable e){
         e.printStackTrace();
        }

        streamId1 = null;


        try {
            streamId1 = dataPublisher.findEventStream(STREAM_NAME1, VERSION1);
            System.out.println("Stream already defined");

        } catch (NoStreamDefinitionExistException e) {
            streamId1 = dataPublisher.defineEventStream("{" +
                    "  'name':'" + STREAM_NAME1 + "'," +
                    "  'version':'" + VERSION1 + "'," +
                    "  'nickName': 'DSSUsage'," +
                    "  'description': 'Exceeded DB Use'," +
                    "  'metaData':[" +
                    "          {'name':'clientType','type':'STRING'}" +
                    "  ]," +
                    "  'payloadData':[" +
                    "          {'name':'exceededBytes','type':'LONG'}," +
                    "          {'name':'databasesize','type':'LONG'}," +
                    "          {'name':'tenentdomain','type':'STRING'}" +
                    "  ]" +
                    "}");
        }
        //Publish event for a valid stream
        if (!streamId1.isEmpty()) {
            System.out.println("Stream ID: " + streamId1);
            publishEvents(tenentdomain, exceededBytes, databasesize);

//            for (int i = 0; i < 1; i++) {
//                publishEvents("malinga");
//                System.out.println("Events published : " + (i + 1) * 2);
//            }
//            try {
//                Thread.sleep(3000);
//            } catch (InterruptedException e) {
//            }

            dataPublisher.stop();
        }
    }
    
    public static void publishEvents(String name, long exceededBytes, long databasesize) throws AgentException {
     System.out.println(name);
     publishEvents(dataPublisher, streamId1, name, exceededBytes, databasesize);

    }

    
    public static void publishEvents(DataPublisher dataPublisher, String streamId, String name, long exceededBytes, long databasesize) throws AgentException {
        Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
                new Object[]{exceededBytes, databasesize, 3600.0, name});
        dataPublisher.publish(eventOne);

    }

}

Problems I had to face.

I tried to change the streamId1, but it was not possible. It gave a error in BAM side. Then I got to know that schema is saved under the STREAM_NAME1, and if I want to change it, I have to do that with a change in STREAM_NAME1 too.

There was no way to check the the published data as it would show some rubbish, in the data viewer in BAM. I got a nice client for one of my mentors that can get the data from Cassandra cluster. This is written by 'Shariq Muhammed', he is a software engineer at WSO2. This is also written just to read the data and he haven't thought much about adhering to good programming practices. You can have it from below link


First I couldn't sent long, It took me a while to figure that out. It was because the number I send was taken as a int. I had to add the 'L' to end of it to get it working.

No comments:

Post a Comment