Omgo's Blog

January 6, 2010

WS-Addresssing with CXF for Async calls

Filed under: WebServices, WS-*, WS-Addressing, XML — aswin @ 9:55 pm

In my previous post I had an example of using WS-Addressing with CXF  for a specifying a return address along with your webservice call, to where the responses would be delivered by the service.  That example was not truly asynchronous (though the description sounded so) and the client of the service had to wait until the service had actually replied back to the specified “ReplyTO” address.  But what if you need that operation to be truly asynchronous in that want the client to just call the webservice method as a one way operation and not wait for the reply at all.

Turns out that we need to take a bit of stuff to our on hands for now for doing that (until the JSR standardized the pieces) and this post has the results of such an effort.  I must warn you on using this code in any meaningful application as it is just toy stuff to help me understand the inner workings of WSA and nothing more, as a matter of fact all the posts/code in this blog are just that. This example builds on the sample presented in my earlier post. To the service defined in the earlier post I added a new web service method named “sayHelloOneWay” and made it an oneway operation as follows

@WebResult(name="response")
@Oneway
public void sayHelloOneWay(String user);

The implementation of the HelloService changed to the following

import java.util.Date;
import javax.annotation.Resource;
import javax.jws.WebService;
import javax.xml.ws.Endpoint;
import javax.xml.ws.WebServiceContext;
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.soap.Addressing;

import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.EndpointReferenceType;

@WebService
@Addressing
@org.apache.cxf.interceptor.InInterceptors(interceptors = {"org.apache.cxf.interceptor.LoggingInInterceptor"})
public class HelloServiceImpl implements HelloService {

    CallbackThread callbacker;

    public HelloServiceImpl() {
        callbacker = new CallbackThread();
        callbacker.start();
    }

    @Resource
    WebServiceContext webServiceContext;

    public String sayHello(String user) {
        System.out.println("Invoking sayHello in " + getClass());
        return "Hello " + user + " @ " + new Date();
    }

    public void sayHelloOneWay(String user) {

        MessageContext messageContext = webServiceContext.getMessageContext();

        AddressingProperties addressProp = (AddressingProperties) messageContext
                .get(org.apache.cxf.ws.addressing.JAXWSAConstants.SERVER_ADDRESSING_PROPERTIES_INBOUND);

        EndpointReferenceType eprType = addressProp.getReplyTo();
        String message = "Hello " + user + " @ " + new Date();
        callbacker.scheduleCallback(message, eprType);
    }

    public static void main(String[] args) {
        Endpoint.publish("http://localhost:9091/hello", new HelloServiceImpl());
    }
}

The interesting part is in the sayHelloOneWay method.   It gets the AddressingProperties object from the messagecontext of the service and holds all the WS-Addressing header information received from the client including the “ReplyTo”,  Action url ,  Message ID etc.  For our example scenario we are interested in the “ReplyTo” which is an EndpointReference  to the callback service send by the client.  CXF represents an endpoint reference internally using a org.apache.cxf.ws.addressing.EndpointReferenceType object (jaxb capable).  We can convert this object to the JaxWS specification type  javax.xml.ws.wsaddressing.W3CEndpointReference using the utility methods in the CXF package.  Actually the jaxws spec mandates that the endpoint references returned by Endpoint and BindingProvider to be of type W3CEndpointReference for SOAP over Http bindings. The W3CEndpointReference object does not expose any information and hence have to be converted to a internal type to extract information about the EPR and is where the CXF utility classes comes to help (EndpointReferenceUtils, VersionTransformer etc) .With the information in the EPR (service URL , service name, portname)  and all the other meta data from the example Callback service interface annotations, we can create a service proxy object and use it to actually invoke calls on the callback service (from the previous post).

For dealing with the callbacks asynchronously I have a simple CallbackThread implementation that is used from the HelloServiceImpl.  The sayHelloOneWay web method would simply handoff the message (the input arguments) the callback thread and return immediately to the client.  The actual callback happens in a separate thread in the CallbackThread object.  The interesting parts are the processCall and the createService methods  in the code below.  The createService method basically opens up the EndpointReference object and gets the service name and the portname to construct a Service (client proxy) object to the Callback service.

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.xml.namespace.QName;
import javax.xml.ws.Service;
import javax.xml.ws.soap.SOAPBinding;
import javax.xml.ws.wsaddressing.W3CEndpointReference;

import org.apache.cxf.helpers.DOMUtils;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
import org.w3c.dom.Element;
import com.aswin.service.wsatest.hello.client.Callback;

public class CallbackThread extends Thread {

    boolean run = true;
    BlockingQueue<Tuple2> queue = new LinkedBlockingQueue<Tuple2>(5);

    public void scheduleCallback(String message, EndpointReferenceType eprType) {
        try {
            queue.put(new Tuple2(message, eprType));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void run() {
        System.out.println("Callback Thread started.. ");

        while (run) {
            try {
                Tuple2 tuple = queue.poll(1, TimeUnit.SECONDS);
                if (tuple != null) {
                    processCall(tuple);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void processCall(Tuple2 unit) {
        Service service = createService(unit.epr);
        if (service != null) {
            W3CEndpointReference reference = new W3CEndpointReference(EndpointReferenceUtils.convertToXML(unit.epr));
            Callback callback = service.getPort(reference, Callback.class);
            callback.callBack("bubbaa");
        }
    }

    private static Service createService(EndpointReferenceType referenceType) {
        List<Object> any = referenceType.getMetadata().getAny();
        String namespace = null;
        String serviceName = null;
        String endpointName = null;

        for (Object a : any) {
            Element element = (Element) a;
            if ("ServiceName".equals(element.getLocalName())
                    && "http://www.w3.org/2006/05/addressing/wsdl".equals(element.getNamespaceURI())) {
                endpointName = element.getAttribute("EndpointName");
                String serviceQnameString = element.getTextContent().trim();
                String[] split = null;
                if (serviceQnameString != null && (split = serviceQnameString.split(":")).length == 2) {
                    namespace = DOMUtils.getNamespace(element, split[0]);
                    serviceName = split[1];
                }
            }
        }

        if (serviceName != null && endpointName != null && namespace != null) {
            QName serviceQName = new QName(namespace, serviceName);
            QName portQName = new QName(namespace, endpointName);
//should cache, but for now create a new one everytime
            Service service = Service.create(serviceQName);
            service.addPort(portQName, SOAPBinding.SOAP11HTTP_BINDING, referenceType.getAddress().getValue());
            return service;
        }

        return null;
    }

    public void stopProcessing() {
        this.run = false;
    }

    class Tuple2 {
        String message;
        EndpointReferenceType epr;

        public Tuple2(String message, EndpointReferenceType epr) {
            super();
            this.message = message;
            this.epr = epr;
        }
    }

}

The dependencies for this are the same as that of the example in the previous post. Let me know if there are any questions on this.

Advertisements

Leave a Comment »

No comments yet.

RSS feed for comments on this post. TrackBack URI

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

Blog at WordPress.com.

%d bloggers like this: