Calling WSO2 Streaming Integrator through WSO2 Enterprise Integrator

Umesha Guruge
3 min readDec 22, 2021

--

In the last blog, we talked about how we can invoke a service in the WSO2 Enterprise Integrator using the WSO2 Streaming Integrator.

In this blog, we are going to do the reverse where we will call a service( HTTP endpoint) in Streaming Integrator(WSO2 SI) using the WSO2 Enterprise Integrator(WSO2 EI).

Additionally, we will learn how to periodically keep sending POST calls to the WSO2 SI.

First of all, let’s create a really simple siddhi file with an HTTP endpoint named TestEP. When we receive an event to this TestEP we will select the sweet name and add it to the log type sink which will print a log recordin the WSO2 SI server carbon log. Also, the @map annotation converts the format of the incoming message (JSON in our case) to Siddhi events.

The complete Siddhi file is as below:

@App:name(‘TestSiddhiApp’)

@source(type=’http’, receiver.url=’http://localhost:5005/TestEP', @map(type = ‘json’))
define stream SweetStream (name string);

@sink(type=’log’, prefix=’Sweet Totals:’)
define stream TotalStream(name string);

@info(name=’Query1’)
from SweetStream
select name
insert into TotalStream;

Now we are going to create the proxy service in the WSO2 EI which will send POST requests to our http://localhost:5005/TestEP endpoint.

Here we have added the sweet specific data inside the payload factory mediator and we are using the call mediator to send POST requests to our TestEP endpoint. You can refer to official documentation [1] [2]and [3] for more information regarding creating proxy services and using call and payload factory mediators.

The complete proxy service configuration is as below:

<?xml version=”1.0" encoding=”UTF-8"?>
<proxy xmlns=”http://ws.apache.org/ns/synapse"
name=”test2proxy”
transports=”http https”
startOnLoad=”true”>
<description/>
<target>
<inSequence>
<payloadFactory media-type=”json”>
<format>{“event”: {“name”: “ Cake”}}</format>
<args/>
</payloadFactory>
<call>
<endpoint name=”HTTPEndpoint”>
<http method=”POST” uri-template=”http://localhost:5005/TestEP”/>
</endpoint>
</call>
</inSequence>
</target>
</proxy>

Now when we invoke the proxy service test2proxy, the TestEP endpoint will receive the data regarding the ‘cakes’.

Now suppose we need to send requests to the TestEP endpoint periodically from the WSO2 EI. For that, we can create a task scheduler that will inject a message to our proxy service test2proxy.

Here the trigger interval is 5 seconds and the count is 5 times. For 5 times the message will be injected with an interval of 5 seconds between two consecutive messages. You can refer to official documentation [4] for more information regarding scheduled tasks

The complete task configuration is as below:

<?xml version=”1.0" encoding=”UTF-8"?>
<task xmlns=”http://ws.apache.org/ns/synapse"
name=”Mytask”
class=”org.apache.synapse.startup.tasks.MessageInjector”
group=”synapse.simple.quartz”>
<trigger count=”5" interval=”5"/>
<property xmlns:task=”http://www.wso2.org/products/wso2commons/tasks"
name=”proxyName”
value=”test2proxy”/>
<property xmlns:task=”http://www.wso2.org/products/wso2commons/tasks" name=”message”>
<abc xmlns=””>hello</abc>
</property>
<property xmlns:task=”http://www.wso2.org/products/wso2commons/tasks"
name=”injectTo”
value=”proxy”/>
</task>

And that's how we accomplish our requirement using a basic siddhi file, proxy service and a task.

Now you can add complex logic and customize this according to your requirements.

Till we meet again …. :)

[1].https://docs.wso2.com/display/EI660/Creating+a+Proxy+Service

[2].https://docs.wso2.com/display/EI660/Call+Mediator

[3].https://docs.wso2.com/display/EI660/PayloadFactory+Mediator

[4].https://docs.wso2.com/display/EI660/Scheduling+ESB+Tasks

--

--