I created this just to publish data I
collect to BAM, so this do not adhere to good programming practices.
package org.wso2.carbon.usage.agent.util;
import java.net.MalformedURLException;
import java.net.SocketException;
import java.net.UnknownHostException;
import javax.security.sasl.AuthenticationException;
import org.wso2.carbon.eventbridge.agent.thrift.Agent;
import org.wso2.carbon.eventbridge.agent.thrift.DataPublisher;
import org.wso2.carbon.eventbridge.agent.thrift.conf.AgentConfiguration;
import org.wso2.carbon.eventbridge.agent.thrift.exception.AgentException;
import org.wso2.carbon.eventbridge.commons.Event;
import org.wso2.carbon.eventbridge.commons.exception.DifferentStreamDefinitionAlreadyDefinedException;
import org.wso2.carbon.eventbridge.commons.exception.MalformedStreamDefinitionException;
import org.wso2.carbon.eventbridge.commons.exception.NoStreamDefinitionExistException;
import org.wso2.carbon.eventbridge.commons.exception.StreamDefinitionException;
import org.wso2.carbon.eventbridge.commons.exception.TransportException;
public class PublishUtil2 {
public static final String STREAM_NAME1 = "org.wso2.db6.kpiii";
public static final String VERSION1 = "1.0.6";
private static String streamId1;
private static DataPublisher dataPublisher = null;
public static void publish(long exceededBytes, long databasesize, String tenentdomain) throws AgentException, MalformedStreamDefinitionException,
StreamDefinitionException, DifferentStreamDefinitionAlreadyDefinedException, MalformedURLException,
AuthenticationException, NoStreamDefinitionExistException,
org.wso2.carbon.eventbridge.commons.exception.AuthenticationException, TransportException, SocketException, UnknownHostException{
System.out.println("Starting BAM KPI Agent");
AgentConfiguration agentConfiguration = new AgentConfiguration();
String currentDir = System.getProperty("user.dir");
System.setProperty("javax.net.ssl.trustStore", currentDir + "/repository/resources/security/client-truststore.jks");
System.setProperty("javax.net.ssl.trustStorePassword", "wso2carbon");
Agent agent = new Agent(agentConfiguration);
dataPublisher = null;
try {
dataPublisher = new DataPublisher("tcp://10.100.3.80:7613", "admin", "admin", agent);
} catch (Throwable e){
e.printStackTrace();
}
streamId1 = null;
try {
streamId1 = dataPublisher.findEventStream(STREAM_NAME1, VERSION1);
System.out.println("Stream already defined");
} catch (NoStreamDefinitionExistException e) {
streamId1 = dataPublisher.defineEventStream("{" +
" 'name':'" + STREAM_NAME1 + "'," +
" 'version':'" + VERSION1 + "'," +
" 'nickName': 'DSSUsage'," +
" 'description': 'Exceeded DB Use'," +
" 'metaData':[" +
" {'name':'clientType','type':'STRING'}" +
" ]," +
" 'payloadData':[" +
" {'name':'exceededBytes','type':'LONG'}," +
" {'name':'databasesize','type':'LONG'}," +
" {'name':'tenentdomain','type':'STRING'}" +
" ]" +
"}");
}
//Publish event for a valid stream
if (!streamId1.isEmpty()) {
System.out.println("Stream ID: " + streamId1);
publishEvents(tenentdomain, exceededBytes, databasesize);
// for (int i = 0; i < 1; i++) {
// publishEvents("malinga");
// System.out.println("Events published : " + (i + 1) * 2);
// }
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// }
dataPublisher.stop();
}
}
public static void publishEvents(String name, long exceededBytes, long databasesize) throws AgentException {
System.out.println(name);
publishEvents(dataPublisher, streamId1, name, exceededBytes, databasesize);
}
public static void publishEvents(DataPublisher dataPublisher, String streamId, String name, long exceededBytes, long databasesize) throws AgentException {
Event eventOne = new Event(streamId, System.currentTimeMillis(), new Object[]{"external"}, null,
new Object[]{exceededBytes, databasesize, 3600.0, name});
dataPublisher.publish(eventOne);
}
}
Problems I had to face.
I tried to change the streamId1, but
it was not possible. It gave a error in BAM side. Then I got to know
that schema is saved under the STREAM_NAME1, and if I want to change
it, I have to do that with a change in STREAM_NAME1 too.
There was no way to check the the
published data as it would show some rubbish, in the data viewer in
BAM. I got a nice client for one of my mentors that can get the data
from Cassandra cluster. This is written by 'Shariq Muhammed', he is a
software engineer at WSO2. This is also written just to read the data
and he haven't thought much about adhering to good programming
practices. You can have it from below link
First I couldn't sent long, It took me
a while to figure that out. It was because the number I send was
taken as a int. I had to add the 'L' to end of it to get it working.