Omgo's Blog

January 6, 2010

WS-Addresssing with CXF for Async calls

Filed under: WebServices, WS-*, WS-Addressing, XML — aswin @ 9:55 pm

In my previous post I had an example of using WS-Addressing with CXF  for a specifying a return address along with your webservice call, to where the responses would be delivered by the service.  That example was not truly asynchronous (though the description sounded so) and the client of the service had to wait until the service had actually replied back to the specified “ReplyTO” address.  But what if you need that operation to be truly asynchronous in that want the client to just call the webservice method as a one way operation and not wait for the reply at all.

Turns out that we need to take a bit of stuff to our on hands for now for doing that (until the JSR standardized the pieces) and this post has the results of such an effort.  I must warn you on using this code in any meaningful application as it is just toy stuff to help me understand the inner workings of WSA and nothing more, as a matter of fact all the posts/code in this blog are just that. This example builds on the sample presented in my earlier post. To the service defined in the earlier post I added a new web service method named “sayHelloOneWay” and made it an oneway operation as follows

@WebResult(name="response")
@Oneway
public void sayHelloOneWay(String user);

The implementation of the HelloService changed to the following

import java.util.Date;
import javax.annotation.Resource;
import javax.jws.WebService;
import javax.xml.ws.Endpoint;
import javax.xml.ws.WebServiceContext;
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.soap.Addressing;

import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.EndpointReferenceType;

@WebService
@Addressing
@org.apache.cxf.interceptor.InInterceptors(interceptors = {"org.apache.cxf.interceptor.LoggingInInterceptor"})
public class HelloServiceImpl implements HelloService {

    CallbackThread callbacker;

    public HelloServiceImpl() {
        callbacker = new CallbackThread();
        callbacker.start();
    }

    @Resource
    WebServiceContext webServiceContext;

    public String sayHello(String user) {
        System.out.println("Invoking sayHello in " + getClass());
        return "Hello " + user + " @ " + new Date();
    }

    public void sayHelloOneWay(String user) {

        MessageContext messageContext = webServiceContext.getMessageContext();

        AddressingProperties addressProp = (AddressingProperties) messageContext
                .get(org.apache.cxf.ws.addressing.JAXWSAConstants.SERVER_ADDRESSING_PROPERTIES_INBOUND);

        EndpointReferenceType eprType = addressProp.getReplyTo();
        String message = "Hello " + user + " @ " + new Date();
        callbacker.scheduleCallback(message, eprType);
    }

    public static void main(String[] args) {
        Endpoint.publish("http://localhost:9091/hello", new HelloServiceImpl());
    }
}

The interesting part is in the sayHelloOneWay method.   It gets the AddressingProperties object from the messagecontext of the service and holds all the WS-Addressing header information received from the client including the “ReplyTo”,  Action url ,  Message ID etc.  For our example scenario we are interested in the “ReplyTo” which is an EndpointReference  to the callback service send by the client.  CXF represents an endpoint reference internally using a org.apache.cxf.ws.addressing.EndpointReferenceType object (jaxb capable).  We can convert this object to the JaxWS specification type  javax.xml.ws.wsaddressing.W3CEndpointReference using the utility methods in the CXF package.  Actually the jaxws spec mandates that the endpoint references returned by Endpoint and BindingProvider to be of type W3CEndpointReference for SOAP over Http bindings. The W3CEndpointReference object does not expose any information and hence have to be converted to a internal type to extract information about the EPR and is where the CXF utility classes comes to help (EndpointReferenceUtils, VersionTransformer etc) .With the information in the EPR (service URL , service name, portname)  and all the other meta data from the example Callback service interface annotations, we can create a service proxy object and use it to actually invoke calls on the callback service (from the previous post).

For dealing with the callbacks asynchronously I have a simple CallbackThread implementation that is used from the HelloServiceImpl.  The sayHelloOneWay web method would simply handoff the message (the input arguments) the callback thread and return immediately to the client.  The actual callback happens in a separate thread in the CallbackThread object.  The interesting parts are the processCall and the createService methods  in the code below.  The createService method basically opens up the EndpointReference object and gets the service name and the portname to construct a Service (client proxy) object to the Callback service.

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.xml.namespace.QName;
import javax.xml.ws.Service;
import javax.xml.ws.soap.SOAPBinding;
import javax.xml.ws.wsaddressing.W3CEndpointReference;

import org.apache.cxf.helpers.DOMUtils;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
import org.w3c.dom.Element;
import com.aswin.service.wsatest.hello.client.Callback;

public class CallbackThread extends Thread {

    boolean run = true;
    BlockingQueue<Tuple2> queue = new LinkedBlockingQueue<Tuple2>(5);

    public void scheduleCallback(String message, EndpointReferenceType eprType) {
        try {
            queue.put(new Tuple2(message, eprType));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void run() {
        System.out.println("Callback Thread started.. ");

        while (run) {
            try {
                Tuple2 tuple = queue.poll(1, TimeUnit.SECONDS);
                if (tuple != null) {
                    processCall(tuple);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void processCall(Tuple2 unit) {
        Service service = createService(unit.epr);
        if (service != null) {
            W3CEndpointReference reference = new W3CEndpointReference(EndpointReferenceUtils.convertToXML(unit.epr));
            Callback callback = service.getPort(reference, Callback.class);
            callback.callBack("bubbaa");
        }
    }

    private static Service createService(EndpointReferenceType referenceType) {
        List<Object> any = referenceType.getMetadata().getAny();
        String namespace = null;
        String serviceName = null;
        String endpointName = null;

        for (Object a : any) {
            Element element = (Element) a;
            if ("ServiceName".equals(element.getLocalName())
                    && "http://www.w3.org/2006/05/addressing/wsdl".equals(element.getNamespaceURI())) {
                endpointName = element.getAttribute("EndpointName");
                String serviceQnameString = element.getTextContent().trim();
                String[] split = null;
                if (serviceQnameString != null && (split = serviceQnameString.split(":")).length == 2) {
                    namespace = DOMUtils.getNamespace(element, split[0]);
                    serviceName = split[1];
                }
            }
        }

        if (serviceName != null && endpointName != null && namespace != null) {
            QName serviceQName = new QName(namespace, serviceName);
            QName portQName = new QName(namespace, endpointName);
//should cache, but for now create a new one everytime
            Service service = Service.create(serviceQName);
            service.addPort(portQName, SOAPBinding.SOAP11HTTP_BINDING, referenceType.getAddress().getValue());
            return service;
        }

        return null;
    }

    public void stopProcessing() {
        this.run = false;
    }

    class Tuple2 {
        String message;
        EndpointReferenceType epr;

        public Tuple2(String message, EndpointReferenceType epr) {
            super();
            this.message = message;
            this.epr = epr;
        }
    }

}

The dependencies for this are the same as that of the example in the previous post. Let me know if there are any questions on this.

Advertisements

January 5, 2010

WS-Addresssing ReplyTo with CXF

Filed under: WebServices, WS-*, XML — aswin @ 6:18 pm

Normally in a webservice interaction scenario a client would initiate a call to a service and receive the response back synchronously. There are situations where you would want a different application to process the response than the one originating the request. One example could be a loan search application where the request for quote is sent to multiple loan providers and they all can respond back to a quote aggregator service at some later point in time when they have the individual quotes ready. The quote aggregator app can then aggregate the quote and let the user know about it by say an email or something. WS-Addresssing specification for web services provides the necessary means by which this can be achieved and java’s webservice stack JAX-WS has very good support for enabling WSA features rather seamlessly. Using WSA a webservice client could invoke an operation on the service provider and instruct the provider to send the response to a different service that could deal with the response. WS-Addressing specifies a EndpointReference structure that could be used to point to a webservice endpoint and defines the soap headers to transfer this information between the applications. In this post, we will see an example of the client sending the EndpointReference of a callback service to a webservice provider application. The provider application would initiate a call to the service referred by the EPR when it is done processing the request, all magically handled by CXF (JAX-WS impl).

Gerad Davison has a post on how to use WS-Addressing to receive the callback from a webservice on the service specified by the WSA “replyto” endpoint reference (EPR) and which is exactly what I wanted to, but needed it to work with CXF (2.2.5)  and this post talks about that.  The setup is pretty simple with only difference in the client part and is as follows.   The setReplyDestination method actually sets the WS-Addressing reply to headers to the message and is the only difference from Gerad’s post.  I have all the server and client side code shown in this post  if anyone wants to try it out.

import static org.apache.cxf.ws.addressing.JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES;

import java.util.Map;
import javax.xml.namespace.QName;
import javax.xml.ws.BindingProvider;
import javax.xml.ws.Endpoint;
import javax.xml.ws.Service;
import javax.xml.ws.soap.SOAPBinding;

import org.apache.cxf.jaxws.EndpointImpl;
import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.soap.VersionTransformer;
import com.aswin.service.wsatest.hello.server.HelloService;

public class Client {

    private static final String HELLOSERVICE_EP_URL = "http://localhost:9091/hello";
    private static final String CALLBACK_EP_URL = "http://localhost:9092/callback";

    public static void main(String[] args) {

        EndpointReferenceType callbackEP = startCallBackEndpoint();
        HelloService hello = getHelloService();
        setReplyDestination(hello, callbackEP);
        hello.sayHello("John Doe");
        System.out.println("Done.....");
    }

    private static EndpointReferenceType startCallBackEndpoint() {
        EndpointImpl endpoint = (EndpointImpl) Endpoint.publish(CALLBACK_EP_URL, new CallbackImpl());
        return VersionTransformer.convertToInternal(endpoint.getEndpointReference());
    }

    private static void setReplyDestination(HelloService hello, EndpointReferenceType replyTo) {

        AddressingPropertiesImpl maps = new AddressingPropertiesImpl();
        maps.setReplyTo(replyTo);

        Map requestContext = ((BindingProvider) hello).getRequestContext();
        requestContext.put(CLIENT_ADDRESSING_PROPERTIES, maps);

    }

    private static HelloService getHelloService() {
        QName serviceName = new QName(HelloService.NS, "HelloService");
        QName portName = new QName(HelloService.NS, "HelloServicePorts");

        Service service = Service.create(serviceName);
        service.addPort(portName, SOAPBinding.SOAP11HTTP_BINDING, HELLOSERVICE_EP_URL);

        HelloService hello = service.getPort(portName, HelloService.class);
        return hello;
    }
}

The actual hello service and the callback interface are pretty simple and are as follows

Hello Service SEI

import javax.jws.WebResult;
import javax.jws.WebService;
import javax.xml.ws.soap.Addressing;

@WebService(targetNamespace=HelloService.NS)
@Addressing(enabled=true, required=true)

public interface HelloService {
String NS = "hello.wsatest.service.aswin.com";

@WebResult(name="response")
public abstract String sayHello(String user);
}

The Service Implementation that can be run as a java application

import java.util.Date;
import javax.jws.WebService;
import javax.xml.ws.Endpoint;
import javax.xml.ws.soap.Addressing;

@WebService
@Addressing
public class HelloServiceImpl implements HelloService {

public String sayHello(String user) {
System.out.println("Invoking sayHello in " + getClass());
return "Hello " + user + " @ " + new Date();
}

public static void main(String[] args) {
Endpoint.publish("http://localhost:9091/hello", new HelloServiceImpl());
}
}

The callback sei should be able to recieve the resopnse from the hello service and should define compatible input messages  and must be same as the output message format of the hello service.  The Requestparameter and the Webparam annotations in the example below makes sure that  and the message structure matches that of the Hello service.

import javax.jws.Oneway;
import javax.jws.WebParam;
import javax.jws.WebService;
import javax.xml.ws.RequestWrapper;

@WebService()
public interface Callback {
String NS = "hello.wsatest.service.aswin.com";

@Oneway
@RequestWrapper(localName="sayHelloResponse", targetNamespace=NS)
public void callBack(@WebParam(name="response") String callbackMessage);

}
import javax.jws.Oneway;
import javax.jws.WebParam;
import javax.jws.WebService;

@WebService
@org.apache.cxf.interceptor.InInterceptors (interceptors = {"org.apache.cxf.interceptor.LoggingInInterceptor" })
public class CallbackImpl implements Callback {

@Oneway
public void callBack(@WebParam(name="response")String callbackMessage) {
System.out.println("Recieved callback message " + callbackMessage);
}
}

To try this out, you should can run the main method in HelloServiceImpl followed by that of Client.java. You should have cxf in your classpath and if you are using maven , have the following in your pom.xml

<properties>
 <cxf.version>2.2.5</cxf.version>
</properties>
<dependencies>
 <dependency>
 <groupId>org.apache.cxf</groupId>
 <artifactId>cxf-rt-frontend-jaxws</artifactId>
 <version>${cxf.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.cxf</groupId>
 <artifactId>cxf-rt-transports-http</artifactId>
 <version>${cxf.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.cxf</groupId>
 <artifactId>cxf-rt-transports-http-jetty</artifactId>
 <version>${cxf.version}</version>
 </dependency>
</dependencies>

CXF Docs:  http://cwiki.apache.org/CXF20DOC/ws-addressing.html

CXF samples  also includes an example application that show some ws addressing feature support.

The w3c spec is also a good read in understanding the details and message structure.

Create a free website or blog at WordPress.com.