Omgo's Blog

April 7, 2011

LVM for partitioning VMs

Filed under: Uncategorized — aswin @ 5:04 am

LVM is very handy in resizing your partitions and especially useful in EBS.

http://allaboutfedora.blogspot.com/2007/01/how-to-resize-or-expand-lvm-partitions.html

October 24, 2010

parallel quicksort with java fork join framework JSR 166y

Filed under: Core Java — aswin @ 9:39 pm
Fork Join (jsr 166y) framework is slated for Java 7 and is absolutely cool and is no surprise coming from Doug Lea.  I had to try something out with it since the packages compiled on java 6 is also availalbe. 

I tried parallizing the quicksort algorithm using this and looks like it compiles and even produces right results!!!  The important part here is the use of Phaser, a resettable CountdownLatch if you would.  So basically you create a Phaser and pass it along to the root (first) RecursiveAction object.
Each of the RecursiveAction object (in this example an instance of ParallelQuckSort) increments the Phaser counter by using the regiter() method and when they are done their work (sorting or spawning children to do
that) they "arrive" and "unregister" at the Phaser bringing down the phaser counter by one. When all the RecursiveAction (spawned by the recursion) are done, the root one waiting on the Phaser lock would resume and return to the client, at which
point the array would have been sorted. 

Any way here it is the source for what I have done, any comments and suggestions are welcome.
package test.aswin.concurrency.forkjoin;

import jsr166y.*;
import jsr166y.Phaser;

import java.util.Arrays;

/**
 * Example of using JSR 166 Y for a simple parallel quick sort.
 */
public class ParallelQuickSort extends RecursiveAction {
    Phaser phaser;
    int[] arr = null;
    int left;
    int right;

    ParallelQuickSort(Phaser phaser, int[] arr) {
        this(phaser, arr, 0, arr.length - 1);
    }

    ParallelQuickSort(Phaser phaser, int[] arr, int left, int right) {
        this.phaser = phaser;
        this.arr = arr;
        this.left = left;
        this.right = right;
        phaser.register();  //important
    }


    private ParallelQuickSort leftSorter(int pivotI) {
        return new ParallelQuickSort(phaser, arr, left, --pivotI);
    }

    private ParallelQuickSort rightSorter(int pivotI) {
        return new ParallelQuickSort(phaser, arr, pivotI, right);
    }

    private void recurSort(int leftI, int rightI) {
        if (rightI - leftI > 7) {
            int pIdx = partition(leftI, rightI, getPivot(arr, leftI, rightI));
            recurSort(leftI, pIdx - 1);
            recurSort(pIdx, rightI);
        } else if (rightI - leftI > 0) {
            insertionSort(leftI, rightI);
        }
    }


    @Override
    protected void compute() {
        if (right - left > 1000) {   // if more than 1000 (totally arbitrary number i chose) try doing it parallelly
            int pIdx = partition(left, right, getPivot(arr, left, right));
            leftSorter(pIdx).fork();
            rightSorter(pIdx).fork();

        } else if (right - left > 7) {  // less than 1000 sort recursively in this thread
            recurSort(left, right);

        } else if (right - left > 0) {  //if less than 7 try simple insertion sort
            insertionSort(left, right);
        }

        if (isRoot()) { //if this instance is the root one (the one that started the sort process), wait for others
                        // to complete.
            phaser.arriveAndAwaitAdvance();
        } else {  // all not root one just arrive and de register not waiting for others.
            phaser.arriveAndDeregister();
        }
    }

    /** Patition the array segment based on the pivot   **/
    private int partition(int startI, int endI, int pivot) {
        for (int si = startI - 1, ei = endI + 1; ; ) {
            for (; arr[++si] < pivot;) ;
            for (; ei > startI && arr[--ei] > pivot ; ) ;
            if (si >= ei) {
                return si;
            }
            swap(si, ei);
        }
    }

    private void insertionSort(int leftI, int rightI) {
        for (int i = leftI; i < rightI + 1; i++)
            for (int j = i; j > leftI && arr[j - 1] > arr[j]; j--)
                swap(j, j - 1);

    }

    private void swap(int startI, int endI) {
        int temp = arr[startI];
        arr[startI] = arr[endI];
        arr[endI] = temp;
    }

    /**
     * Check to see if this instance is the root, i.e the first one used to sort the array.
     * @return
     */
    private boolean isRoot() {
        return arr.length == (right - left) + 1;
    }

    /**
     * copied from java.util.Arrays
     */
    private int getPivot(int[] arr, int startI, int endI) {
        int len = (endI - startI) + 1;
        // Choose a partition element, v
        int m = startI + (len >> 1);       // Small arrays, middle element
        if (len > 7) {
            int l = startI;
            int n = startI + len - 1;
            if (len > 40) {        // Big arrays, pseudomedian of 9
                int s = len / 8;
                l = med3(arr, l, l + s, l + 2 * s);
                m = med3(arr, m - s, m, m + s);
                n = med3(arr, n - 2 * s, n - s, n);
            }
            m = med3(arr, l, m, n); // Mid-size, med of 3
        }
        int v = arr[m];
        return v;
    }

    /**
     * copied from java.util.Arrays
     */
    private static int med3(int x[], int a, int b, int c) {
        return (x[a] < x[b] ?
                (x[b] < x[c] ? b : x[a] < x[c] ? c : a) :
                (x[b] > x[c] ? b : x[a] > x[c] ? c : a));
    }

    public static void main(String[] args) throws InterruptedException {
        int[] arr = ArrayUtils.generateRandom(1000000);
        ForkJoinPool pool = new ForkJoinPool();


        for (int i = 0; i < 10; i++) {
            System.out.println("Sarting " + i);
            int[] arrclone1 = arr.clone();
            int[] arrclone2 = arr.clone();

            long l = System.nanoTime();
            Arrays.sort(arrclone1);
            System.out.printf("Times for seq %,8d%n", (System.nanoTime() - l));

/*          // sorted (asc and desc) are faster than unsorted one, while trying parallel sort
            for (int left=0,right=arrclone1.length-1; left<right; left++, right--) {
                // exchange the first and last
                int temp = arrclone1[left]; arrclone1[left]  = arrclone1[right]; arrclone1[right] = temp;
            }
*/


            l = System.nanoTime();
            Phaser phaser = new Phaser();
            pool.invoke(new ParallelQuickSort(phaser, arrclone2));
            System.out.printf("Times %,8d%n", (System.nanoTime() - l));
            System.out.println("Is sorted :>" + ArrayUtils.isSorted(arrclone2, true));
        }
    }
}
One thing I tried and failed was to parallelize the partition operation which is single threaded in this example. The thing I wrote for that worked, but had horrible performance so I am not including it here. What I
did basically was to have two threads parallely swapping even and odd indexed items respectively. I think processor locality of the data and synchronizing the elements across cpu cache may be the issue, but I am
investigating it further and with help of the experts out there, may find out why it is perforing bad. If anyone wants to help me out with this, please let me know :)

October 18, 2010

scala simple number guess application using case pattern guards

Filed under: scala — aswin @ 11:15 pm

There are millions of examples out there on all this, but here is a little program that shows the case pattern guards and function literals

class NumGuessCase(guess: Int) {

  def guessNum(read: () => Int, disp: String => Unit): Unit = _guessNum(read, disp)

  private def _guessNum(read: () => Int, disp: String => Unit, times: Int = 0): Unit = {
    disp("Please enter guess :> ")    
    read() match {
      case `guess` => disp("You guessed right in " + (times+1)); 
      case a if a > guess => disp ("You guessed too high");
                _guessNum(read, disp, times+1)
      case a if a < guess => disp ("You guessed too low");
                _guessNum(read, disp, times+1)
    }
  }

}

The guessNum method takes two arguments, read, and disp. Parameter “read” is a function which when invoked should return the user’s guess. “disp” (display) represents a function which is responsible for the output of the messages by the guessNum function. So the client of the guessNum can pass in as arguments a function that accepts user input and another one that displays the messages back to user and in our case we just use system console for this interaction (see the NumGuesCase companion object shown below).

The part I was testing out was the case pattern guards, the second case statement would match only if the pattern guard (“a > guess”) returns true. We could have used a default catch all pattern (“_”) as the last case statement. The first statement uses a variable pattern and is why you have to use the ` (backquote) symbol.

object NumGuesCase extends Application {
  new NumGuessCase(new java.util.Random().nextInt(100)).guessNum(readInt,println)
}

This is how you would use the guessNum method, just pass in a readInt and println method. Scala compiler would capture these methods as Fuction objects and pass it to our guessNum method and since they both Type check with the Function signature it works.

If anyone is trying this out , please be aware of the missing main method bug (pre 2.8.0) as detailed out here

October 17, 2010

Something really big is cooking at JBOSS

Filed under: rumour — aswin @ 1:27 am

Its been quite some time since I have seen any activity from Gavin King (Hibernate, JBOSS). He has blogged that he is working on something really big, important and cool. I have a feeling that they are collaborating with some company on this and my guess is JREBEL. Lets see if its true.

June 25, 2010

scala gotcha

Filed under: scala — aswin @ 3:54 am

I was bitten by this one and it was painful:) . http://daily-scala.blogspot.com/2010/05/return-value-of-block.html

June 7, 2010

scala call-by-name (=>) vs call-by-type

Filed under: scala — aswin @ 9:43 pm

Scala has lot of small surprises and this is one such thing

Given the following

  def byName(a: => Unit) = {
    for (i <- 0 until 10) {println(a)}

  }

  def byValue(a: Unit) = {
    for (i <- 0 until 10) {println(a)}

  }

  var i = 1;

  byValue(i = i + 1)
  println(i);

  byName(i = i + 1)
  println(i)

This would print 2 followed by 12. What is happening here is that in case of byName the evaluation of the arguments happens after the the byName method itself is invoked and the evaluation happens everytime the method parameter “a” is used within the body on the byName method. Since the parameter “a” is refereed in the loop of 10, it executes the statement “i = i +1” 10 times yeielding the result of 12.
In case of byValue (the default invocation mechanism in scala and also in java) the statements in parameter position is evaluated before the actual method call and only the result is passed to the method and hence the evaluation happens only once.
Pretty important difference as this could lead to unintended behavior if not careful. BTW the way create a byName call is having the “=>” between the parameter and the returntype in the argument list “byName(a: => Unit)”

References
http://scala.sygneca.com/faqs/language#what-s-the-difference-between-a-lazy-argument-a-no-arg-function-argument-and-a-lazy-value
http://www.scala-lang.org/sites/default/files/sids/rytz/Mon,%202009-11-09,%2017:29/named-args.pdf (named args and such)

The named arg feature introduced in 2.8 : http://www.scala-lang.org/sites/default/files/sids/rytz/Mon,%202009-11-09,%2017:29/named-args.pdf

May 24, 2010

postgres xml indexes

Filed under: postgres, sql — Tags: — aswin @ 11:48 pm

Just a simple example of the postgres xml indexing using functional indexes on xpath functions. If you have xml data that is used for read only and for searching, indexing the key search fields like this would get you good search performance and would avoid having to “shred” the data into individual columns.


-- create a table with a xml type column
CREATE TABLE testxml
(
  doc xml,
  id serial NOT NULL,
  CONSTRAINT testxml_pkey PRIMARY KEY (id)
)
WITH (
  OIDS=FALSE
);

-- insert some data
insert into testxml (doc) 
 select XMLPARSE (DOCUMENT '<?xml version="1.0"?><book><id>'|| a || '</id><title>Manual</title><chapter>simple</chapter></book>') from generate_series(1, 1000) as s(a);

-- test if xpath is working fine
SELECT (xpath('/book/id/text()', doc))[1]::text from testxml;

-- check the explain plan , you should see a full table scan. not very efficient
explain SELECT * from testxml 
  where (xpath('/book/id/text()', doc))[1]::text = '100';

-- Add an index (function index) to the xpath. The result of the xpath would be indexed and the next time if this is used in predicate postgres 
-- would use this index instead of a full table scan
--drop index doc_idx
create  index doc_idx on testxml using btree (((xpath('/book/id/text()', doc))[1]::text)); 

--check it out in action
explain SELECT * from testxml 
  where (xpath('/book/id/text()', doc))[1]::text = '100';

References
http://developer.postgresql.org/pgdocs/postgres/functions-xml.html
http://developer.postgresql.org/pgdocs/postgres/datatype-xml.html
http://www.postgres.cz/index.php/PostgreSQL_SQL_Tricks

May 7, 2010

Apache rewrite rules for multi tenant applications

Filed under: General — aswin @ 3:22 am

We have a multi-tenant web application that serves customizable page content based on the subdomain value or the attribute set for the currently logged in user. We have a generic context root for this particular application that appears in the browser bar and looks something like https://client.example.com/online where “online” is the context root. The internally available servlet container (Tomcat) has the web application configured under this context root and we use Apache HTTP server to reverse proxy the requests to this Tomcat instance. Each client had a virtual host mapping based on the subdomain, and apache would rightfully pass in this subdomain value as part of the query string so that the backend application can customize contents if need be.

One of the clients we were onboarding had the hard requirement that the context root itself be changed to a custom value, say “custom” in addition to having a sub domain. i.e they wanted the url of the form https://client.example.com/custom and not the generic one https://client.example.com/online. The application was already in production and we could not make any changes to it at all, leaving us with the only option of making the changes at the webserver level (Apache HTTP server). The web application appends the context root as part of the URLs (we are using grails and the urls are generated with the context root) and in form post action. So we had to come up with some rewrite rules for handling this situation and used it along with the the mod_proxy and mod_proxy_ajp modules. We wanted the url in the address bar to have the “/custom” context root and was pretty easy with the rewrite rules. The mod_rewrite would just redirect a request for “/online” to “/custom”, but for POST requests this was an issue as a redirect would cause the POST value to be lost. So we had to just allow ‘POST’ alone to proxy through to the backend without any rewriting and was fine as the application logic followed PRG pattern for avoiding duplicate form submission. Since these redirect request were ‘GET’ request it would be matched by the rewrite rules and redirected again to the correct value (‘/custom’), that is redirected twice; once by the application and then by the Apache server.

Here are the relevant part of the configuration in case if any one feel the need to do it. This whole configuration goes inside a element in our configuration .

    ProxyRequests Off
      <Proxy *>
         Order deny,allow
         Allow from all
      </Proxy>

      RewriteEngine on

      #if url ends in client.example.com , redirect to client.example.com/custom
      RewriteRule ^(/)?$ /custom/ [R,L]

      # if the request is anything but post and is for client.example.com/online change it to /custom and redirect. 
	  # For post to /online we cannot redirect, as it would cause post data to be lost. (Remember the application has all the links and post form  urls coded
      # for /online, so we are trying to transparently handle these).
      #  This also means that there is a possiblity of the browser displaying online as result of a http post, but in most places its
      #  a redirect after post giving this rule a chance to redirect again to the correct url (/custom)
      RewriteCond %{REQUEST_METHOD}  !^POST$
      RewriteRule ^/online/(.*)$ /custom/$1 [R]

      # if this is here, its a post request, so allow it to proxy through to backend webapp at /online
      RewriteRule ^/online(?:/(.*)$)? ajp://localhost:8102/online/$1 [P,L,QSA]
    
	  # all request aimed specifically at /custom should be passed through 
      RewriteRule ^/custom(?:/(.*)$)? ajp://localhost:8102/online/$1 [P,L,QSA]
	        
      # ProxyPassReverse rewrites the redirect headers created by the backend webserver to the correct form, otherwise there would be an unnecessary hop
	  # The url here is what the ProxyPassReverse sees, so put it as it is. Most of the articles in the net gives this value wrongly and suggests using 
	  # the url ajp://localhost:8102..... and it does not work. Check the logs after setting the LogLevel to debug to see what the ajp connector is 
	  # returning for the Location header, which is what this function (ProxyPassReverse) rewrites. 
      ProxyPassReverse /custom/ https://client.example.com/online/

      #make sure the cookie domain is set to /, as we want interoperatbility between subdomains (custom and online). If not present there could be 
	  # funny side effects such as having to relogin or missing data etc.
      ProxyPassReverseCookiePath /online /

      #If stuff goes wrong, uncomment to enable detailed logging
      #LogLevel debug
      #RewriteLog "/etc/httpd/logs/rewrite.log"
      #RewriteLogLevel 5

January 6, 2010

WS-Addresssing with CXF for Async calls

Filed under: WebServices, WS-*, WS-Addressing, XML — aswin @ 9:55 pm

In my previous post I had an example of using WS-Addressing with CXF  for a specifying a return address along with your webservice call, to where the responses would be delivered by the service.  That example was not truly asynchronous (though the description sounded so) and the client of the service had to wait until the service had actually replied back to the specified “ReplyTO” address.  But what if you need that operation to be truly asynchronous in that want the client to just call the webservice method as a one way operation and not wait for the reply at all.

Turns out that we need to take a bit of stuff to our on hands for now for doing that (until the JSR standardized the pieces) and this post has the results of such an effort.  I must warn you on using this code in any meaningful application as it is just toy stuff to help me understand the inner workings of WSA and nothing more, as a matter of fact all the posts/code in this blog are just that. This example builds on the sample presented in my earlier post. To the service defined in the earlier post I added a new web service method named “sayHelloOneWay” and made it an oneway operation as follows

@WebResult(name="response")
@Oneway
public void sayHelloOneWay(String user);

The implementation of the HelloService changed to the following

import java.util.Date;
import javax.annotation.Resource;
import javax.jws.WebService;
import javax.xml.ws.Endpoint;
import javax.xml.ws.WebServiceContext;
import javax.xml.ws.handler.MessageContext;
import javax.xml.ws.soap.Addressing;

import org.apache.cxf.ws.addressing.AddressingProperties;
import org.apache.cxf.ws.addressing.EndpointReferenceType;

@WebService
@Addressing
@org.apache.cxf.interceptor.InInterceptors(interceptors = {"org.apache.cxf.interceptor.LoggingInInterceptor"})
public class HelloServiceImpl implements HelloService {

    CallbackThread callbacker;

    public HelloServiceImpl() {
        callbacker = new CallbackThread();
        callbacker.start();
    }

    @Resource
    WebServiceContext webServiceContext;

    public String sayHello(String user) {
        System.out.println("Invoking sayHello in " + getClass());
        return "Hello " + user + " @ " + new Date();
    }

    public void sayHelloOneWay(String user) {

        MessageContext messageContext = webServiceContext.getMessageContext();

        AddressingProperties addressProp = (AddressingProperties) messageContext
                .get(org.apache.cxf.ws.addressing.JAXWSAConstants.SERVER_ADDRESSING_PROPERTIES_INBOUND);

        EndpointReferenceType eprType = addressProp.getReplyTo();
        String message = "Hello " + user + " @ " + new Date();
        callbacker.scheduleCallback(message, eprType);
    }

    public static void main(String[] args) {
        Endpoint.publish("http://localhost:9091/hello", new HelloServiceImpl());
    }
}

The interesting part is in the sayHelloOneWay method.   It gets the AddressingProperties object from the messagecontext of the service and holds all the WS-Addressing header information received from the client including the “ReplyTo”,  Action url ,  Message ID etc.  For our example scenario we are interested in the “ReplyTo” which is an EndpointReference  to the callback service send by the client.  CXF represents an endpoint reference internally using a org.apache.cxf.ws.addressing.EndpointReferenceType object (jaxb capable).  We can convert this object to the JaxWS specification type  javax.xml.ws.wsaddressing.W3CEndpointReference using the utility methods in the CXF package.  Actually the jaxws spec mandates that the endpoint references returned by Endpoint and BindingProvider to be of type W3CEndpointReference for SOAP over Http bindings. The W3CEndpointReference object does not expose any information and hence have to be converted to a internal type to extract information about the EPR and is where the CXF utility classes comes to help (EndpointReferenceUtils, VersionTransformer etc) .With the information in the EPR (service URL , service name, portname)  and all the other meta data from the example Callback service interface annotations, we can create a service proxy object and use it to actually invoke calls on the callback service (from the previous post).

For dealing with the callbacks asynchronously I have a simple CallbackThread implementation that is used from the HelloServiceImpl.  The sayHelloOneWay web method would simply handoff the message (the input arguments) the callback thread and return immediately to the client.  The actual callback happens in a separate thread in the CallbackThread object.  The interesting parts are the processCall and the createService methods  in the code below.  The createService method basically opens up the EndpointReference object and gets the service name and the portname to construct a Service (client proxy) object to the Callback service.

import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.xml.namespace.QName;
import javax.xml.ws.Service;
import javax.xml.ws.soap.SOAPBinding;
import javax.xml.ws.wsaddressing.W3CEndpointReference;

import org.apache.cxf.helpers.DOMUtils;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.wsdl.EndpointReferenceUtils;
import org.w3c.dom.Element;
import com.aswin.service.wsatest.hello.client.Callback;

public class CallbackThread extends Thread {

    boolean run = true;
    BlockingQueue<Tuple2> queue = new LinkedBlockingQueue<Tuple2>(5);

    public void scheduleCallback(String message, EndpointReferenceType eprType) {
        try {
            queue.put(new Tuple2(message, eprType));
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public void run() {
        System.out.println("Callback Thread started.. ");

        while (run) {
            try {
                Tuple2 tuple = queue.poll(1, TimeUnit.SECONDS);
                if (tuple != null) {
                    processCall(tuple);
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private void processCall(Tuple2 unit) {
        Service service = createService(unit.epr);
        if (service != null) {
            W3CEndpointReference reference = new W3CEndpointReference(EndpointReferenceUtils.convertToXML(unit.epr));
            Callback callback = service.getPort(reference, Callback.class);
            callback.callBack("bubbaa");
        }
    }

    private static Service createService(EndpointReferenceType referenceType) {
        List<Object> any = referenceType.getMetadata().getAny();
        String namespace = null;
        String serviceName = null;
        String endpointName = null;

        for (Object a : any) {
            Element element = (Element) a;
            if ("ServiceName".equals(element.getLocalName())
                    && "http://www.w3.org/2006/05/addressing/wsdl".equals(element.getNamespaceURI())) {
                endpointName = element.getAttribute("EndpointName");
                String serviceQnameString = element.getTextContent().trim();
                String[] split = null;
                if (serviceQnameString != null && (split = serviceQnameString.split(":")).length == 2) {
                    namespace = DOMUtils.getNamespace(element, split[0]);
                    serviceName = split[1];
                }
            }
        }

        if (serviceName != null && endpointName != null && namespace != null) {
            QName serviceQName = new QName(namespace, serviceName);
            QName portQName = new QName(namespace, endpointName);
//should cache, but for now create a new one everytime
            Service service = Service.create(serviceQName);
            service.addPort(portQName, SOAPBinding.SOAP11HTTP_BINDING, referenceType.getAddress().getValue());
            return service;
        }

        return null;
    }

    public void stopProcessing() {
        this.run = false;
    }

    class Tuple2 {
        String message;
        EndpointReferenceType epr;

        public Tuple2(String message, EndpointReferenceType epr) {
            super();
            this.message = message;
            this.epr = epr;
        }
    }

}

The dependencies for this are the same as that of the example in the previous post. Let me know if there are any questions on this.

January 5, 2010

WS-Addresssing ReplyTo with CXF

Filed under: WebServices, WS-*, XML — aswin @ 6:18 pm

Normally in a webservice interaction scenario a client would initiate a call to a service and receive the response back synchronously. There are situations where you would want a different application to process the response than the one originating the request. One example could be a loan search application where the request for quote is sent to multiple loan providers and they all can respond back to a quote aggregator service at some later point in time when they have the individual quotes ready. The quote aggregator app can then aggregate the quote and let the user know about it by say an email or something. WS-Addresssing specification for web services provides the necessary means by which this can be achieved and java’s webservice stack JAX-WS has very good support for enabling WSA features rather seamlessly. Using WSA a webservice client could invoke an operation on the service provider and instruct the provider to send the response to a different service that could deal with the response. WS-Addressing specifies a EndpointReference structure that could be used to point to a webservice endpoint and defines the soap headers to transfer this information between the applications. In this post, we will see an example of the client sending the EndpointReference of a callback service to a webservice provider application. The provider application would initiate a call to the service referred by the EPR when it is done processing the request, all magically handled by CXF (JAX-WS impl).

Gerad Davison has a post on how to use WS-Addressing to receive the callback from a webservice on the service specified by the WSA “replyto” endpoint reference (EPR) and which is exactly what I wanted to, but needed it to work with CXF (2.2.5)  and this post talks about that.  The setup is pretty simple with only difference in the client part and is as follows.   The setReplyDestination method actually sets the WS-Addressing reply to headers to the message and is the only difference from Gerad’s post.  I have all the server and client side code shown in this post  if anyone wants to try it out.

import static org.apache.cxf.ws.addressing.JAXWSAConstants.CLIENT_ADDRESSING_PROPERTIES;

import java.util.Map;
import javax.xml.namespace.QName;
import javax.xml.ws.BindingProvider;
import javax.xml.ws.Endpoint;
import javax.xml.ws.Service;
import javax.xml.ws.soap.SOAPBinding;

import org.apache.cxf.jaxws.EndpointImpl;
import org.apache.cxf.ws.addressing.AddressingPropertiesImpl;
import org.apache.cxf.ws.addressing.EndpointReferenceType;
import org.apache.cxf.ws.addressing.soap.VersionTransformer;
import com.aswin.service.wsatest.hello.server.HelloService;

public class Client {

    private static final String HELLOSERVICE_EP_URL = "http://localhost:9091/hello";
    private static final String CALLBACK_EP_URL = "http://localhost:9092/callback";

    public static void main(String[] args) {

        EndpointReferenceType callbackEP = startCallBackEndpoint();
        HelloService hello = getHelloService();
        setReplyDestination(hello, callbackEP);
        hello.sayHello("John Doe");
        System.out.println("Done.....");
    }

    private static EndpointReferenceType startCallBackEndpoint() {
        EndpointImpl endpoint = (EndpointImpl) Endpoint.publish(CALLBACK_EP_URL, new CallbackImpl());
        return VersionTransformer.convertToInternal(endpoint.getEndpointReference());
    }

    private static void setReplyDestination(HelloService hello, EndpointReferenceType replyTo) {

        AddressingPropertiesImpl maps = new AddressingPropertiesImpl();
        maps.setReplyTo(replyTo);

        Map requestContext = ((BindingProvider) hello).getRequestContext();
        requestContext.put(CLIENT_ADDRESSING_PROPERTIES, maps);

    }

    private static HelloService getHelloService() {
        QName serviceName = new QName(HelloService.NS, "HelloService");
        QName portName = new QName(HelloService.NS, "HelloServicePorts");

        Service service = Service.create(serviceName);
        service.addPort(portName, SOAPBinding.SOAP11HTTP_BINDING, HELLOSERVICE_EP_URL);

        HelloService hello = service.getPort(portName, HelloService.class);
        return hello;
    }
}

The actual hello service and the callback interface are pretty simple and are as follows

Hello Service SEI

import javax.jws.WebResult;
import javax.jws.WebService;
import javax.xml.ws.soap.Addressing;

@WebService(targetNamespace=HelloService.NS)
@Addressing(enabled=true, required=true)

public interface HelloService {
String NS = "hello.wsatest.service.aswin.com";

@WebResult(name="response")
public abstract String sayHello(String user);
}

The Service Implementation that can be run as a java application

import java.util.Date;
import javax.jws.WebService;
import javax.xml.ws.Endpoint;
import javax.xml.ws.soap.Addressing;

@WebService
@Addressing
public class HelloServiceImpl implements HelloService {

public String sayHello(String user) {
System.out.println("Invoking sayHello in " + getClass());
return "Hello " + user + " @ " + new Date();
}

public static void main(String[] args) {
Endpoint.publish("http://localhost:9091/hello", new HelloServiceImpl());
}
}

The callback sei should be able to recieve the resopnse from the hello service and should define compatible input messages  and must be same as the output message format of the hello service.  The Requestparameter and the Webparam annotations in the example below makes sure that  and the message structure matches that of the Hello service.

import javax.jws.Oneway;
import javax.jws.WebParam;
import javax.jws.WebService;
import javax.xml.ws.RequestWrapper;

@WebService()
public interface Callback {
String NS = "hello.wsatest.service.aswin.com";

@Oneway
@RequestWrapper(localName="sayHelloResponse", targetNamespace=NS)
public void callBack(@WebParam(name="response") String callbackMessage);

}
import javax.jws.Oneway;
import javax.jws.WebParam;
import javax.jws.WebService;

@WebService
@org.apache.cxf.interceptor.InInterceptors (interceptors = {"org.apache.cxf.interceptor.LoggingInInterceptor" })
public class CallbackImpl implements Callback {

@Oneway
public void callBack(@WebParam(name="response")String callbackMessage) {
System.out.println("Recieved callback message " + callbackMessage);
}
}

To try this out, you should can run the main method in HelloServiceImpl followed by that of Client.java. You should have cxf in your classpath and if you are using maven , have the following in your pom.xml

<properties>
 <cxf.version>2.2.5</cxf.version>
</properties>
<dependencies>
 <dependency>
 <groupId>org.apache.cxf</groupId>
 <artifactId>cxf-rt-frontend-jaxws</artifactId>
 <version>${cxf.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.cxf</groupId>
 <artifactId>cxf-rt-transports-http</artifactId>
 <version>${cxf.version}</version>
 </dependency>
 <dependency>
 <groupId>org.apache.cxf</groupId>
 <artifactId>cxf-rt-transports-http-jetty</artifactId>
 <version>${cxf.version}</version>
 </dependency>
</dependencies>

CXF Docs:  http://cwiki.apache.org/CXF20DOC/ws-addressing.html

CXF samples  also includes an example application that show some ws addressing feature support.

The w3c spec is also a good read in understanding the details and message structure.

Older Posts »

Create a free website or blog at WordPress.com.

Follow

Get every new post delivered to your Inbox.