Developing a Custom Siddhi Application to push APIM-Analytics Alert Data to an External Message System.

Umesha Guruge
4 min readJun 28, 2022

--

If you have used WSO2 APIM Analytics, then you already know we have several kinds of alerts divided among the Publisher and the Store to get alerted for several types of unexpected behaviours. For example if you are seeing an abnormal response time or abnormal request counts and etc. You can read more about alerts in [1].

What we are trying to do is publish this alert information to an external message system. And for this blog, we are focusing on publisher alerts, once you gain the idea you can do Store alerts as well. I have created a custom siddhi application and I will walk you through the code line by line.

Here we have used the three of the available publisher alerts namely AbnormalBackend Time Alert, Abnormal Response Time Alert and Api Health Monitor Alert. (You can add the rest of the alerts as per your requirement).

First of all, we have three in-memory topics used by the default email alert notification siddhi app(this default application can be found in <APIM-ANALYTICS-HOME>/wso2/worker/deployment/siddhi-files/ directory), which will contain the alert events.

@source(type = "inMemory", topic = "AbnormalBackendTimeAlertStream")
define stream AbnormalBackendTimeAlertStream( apiName string, apiVersion string, apiCreator string, apiCreatorTenantDomain string, apiResourceTemplate string, apiMethod string, backendTime long, thresholdBackendTime long, message string, severity int, alertTimestamp long);
@source(type = 'inMemory' , topic = 'AbnormalResponseTimeAlertStream')
define stream AbnormalResponseTimeAlertStream( apiName string, apiVersion string, apiCreator string, apiCreatorTenantDomain string, apiResourceTemplate string, apiMethod string, responseTime long, thresholdResponseTime long, message string, severity int, alertTimestamp long);
@source(type = 'inMemory', topic = 'ApimApiHealthMonitorAlertStream')
define stream ApiHealthMonitorAlertStream(apiName string, apiVersion string, apiCreator string, apiCreatorTenantDomain string, message string, severity int, alertTimestamp long);

And then we have a JMS sink which will receive the selected attributes as follow. For our testing purposes, we have used the WSO2 Message Broker queue as our message consumer. You can edit the JMS sink with the required parameters. Please find more information regarding configuring a JMS sink in the official siddhi documentation [3].

@sink(type='jms', factory.initial='org.wso2.andes.jndi.PropertiesFileInitialContextFactory', provider.url='../../resources/jndi.properties', connection.factory.type='queue', destination ='mynotiqueue', connection.factory.jndi.name='QueueConnectionFactory', @map(type='json'))
define stream JMSStream (apiName string, alertType string, attribute string);

We will create a queue named mynotiqueue in WSO2 Message Broker using the above code line. And the provider.url is given in the <Analytics-Home>/resources/jndi.properties file we have added.

The content in the jndi.properties file is as below.

connectionfactory.QueueConnectionFactory=amqp://admin:admin@clientID/carbon?brokerlist='tcp://0.0.0.0:5675'
connectionfactory.TopicConnectionFactory=amqp://admin:admin@clientID/carbon?brokerlist='tcp://0.0.0.0:5675'

Now that we have the source and the sink we will add the required values to the JMSStream which is associated with the JMS sink as follow.

@info(name = 'AbnormalResponseTimeAlertStream message')
from AbnormalResponseTimeAlertStream
select apiName,'AbnormalResponseTime' as alertType,convert(thresholdResponseTime, 'string') as attribute
insert into JMSStream;

Please note that the “attribute” field in the above query changes accordingly as for AbnormalResponseTime the attribute is thresholdResponseTime and for AbnormalBackendTime it will be thresholdBackendTime.

Once an alert is generated this flow will be triggered accordingly and the values will be passed to the message system. As you have mentioned “based on the above parameters we will mail the alerts to the respective API owner”, you can map the receiving mail addresses according to your requirement.

Now that we know the functionality of the siddhi application, the following steps can be used to deploy our app.

  • Please download and add the following extension jar files to the <Analytics-Home>/lib directory.
  • siddhi-io-jms-2.0.4.jar [3]
  • siddhi-map-json-5.0.6.jar [4]
  • Save the custom siddhi application as APIM_NOTIFICATIONS. The complete application is as follows.
@App:name(“APIM_NOTIFICATIONS”)
@App:description(‘Send email to all the subscribers of a particular alert’)
@source(type = “inMemory”, topic = “AbnormalBackendTimeAlertStream”)
define stream AbnormalBackendTimeAlertStream( apiName string, apiVersion string, apiCreator string, apiCreatorTenantDomain string, apiResourceTemplate string, apiMethod string, backendTime long, thresholdBackendTime long, message string, severity int, alertTimestamp long);
@source(type = ‘inMemory’ , topic = ‘AbnormalResponseTimeAlertStream’)
define stream AbnormalResponseTimeAlertStream( apiName string, apiVersion string, apiCreator string, apiCreatorTenantDomain string, apiResourceTemplate string, apiMethod string, responseTime long, thresholdResponseTime long, message string, severity int, alertTimestamp long);
@source(type = ‘inMemory’, topic = ‘ApimApiHealthMonitorAlertStream’)
define stream ApiHealthMonitorAlertStream(apiName string, apiVersion string, apiCreator string, apiCreatorTenantDomain string, message string, severity int, alertTimestamp long);
@sink(type=’jms’, factory.initial=’org.wso2.andes.jndi.PropertiesFileInitialContextFactory’, provider.url=’../../resources/jndi.properties’, connection.factory.type=’queue’, destination =’mynotiqueue’, connection.factory.jndi.name=’QueueConnectionFactory’, @map(type=’json’))
define stream JMSStream (apiName string, alertType string, attribute string);
@info(name = ‘AbnormalBackendTimeAlert message’)
from AbnormalBackendTimeAlertStream
select apiName,’AbnormalBackendTime’ as alertType, convert(thresholdBackendTime, ‘string’) as attribute
insert into JMSStream;
@info(name = ‘AbnormalResponseTimeAlertStream message’)
from AbnormalResponseTimeAlertStream
select apiName,’AbnormalResponseTime’ as alertType,convert(thresholdResponseTime, ‘string’) as attribute
insert into JMSStream;
@info(name = ‘ApiHealthMonitorAlertStream message’)
from ApiHealthMonitorAlertStream
select apiName,’ApiHealthMonitor’ as alertType,message as attribute
insert into JMSStream;
  • Navigate to the <wso2am-analytics-home>/wso2/worker/deployment/siddhi-files
    directory and add the siddhi application.
  • The siddhi app will be hot deployed, no need to restart the server. When deployed you will see a log line similar to below.
INFO {org.wso2.carbon.streaming.integrator.core.internal.StreamProcessorService} - Siddhi App APIM_NOTIFICATIONS deployed successfully
  • Now we can trigger any publisher alerts and verify whether the data are sent to the messaging system.

I believe that this will help you to understand how we can push APIM-Analytics Alert Data to an External Message System. If needed you can add Dev portal alerts in the same manner to the application.

Till we meet again .. :)

[1].https://apim.docs.wso2.com/en/3.2.0/learn/analytics/alert-types/

[2]. https://siddhi-io.github.io/siddhi-io-jms/api/2.0.4/#jms-sink

[3].https://siddhi-io.github.io/siddhi-io-jms/

[4]. https://siddhi-io.github.io/siddhi-map-json/

--

--