Add initial async support (Search, Record)
authorJakub Skoczen <jakub@indexdata.dk>
Fri, 28 Aug 2015 15:01:35 +0000 (17:01 +0200)
committerJakub Skoczen <jakub@indexdata.dk>
Fri, 28 Aug 2015 15:01:35 +0000 (17:01 +0200)
src/main/java/org/yaz4j/AsyncConnection.java [new file with mode: 0644]
src/main/java/org/yaz4j/AsyncConnections.java [new file with mode: 0644]
src/main/java/org/yaz4j/Connection.java
src/main/java/org/yaz4j/ResultSet.java
src/main/swig/libyaz4j.i
src/test/java/org/yaz4j/AsyncConnectionsTest.java [new file with mode: 0644]

diff --git a/src/main/java/org/yaz4j/AsyncConnection.java b/src/main/java/org/yaz4j/AsyncConnection.java
new file mode 100644 (file)
index 0000000..107a00f
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Copyright (c) 1995-2015, Index Data
+ * All rights reserved.
+ * See the file LICENSE for details.
+ */
+package org.yaz4j;
+
+import org.yaz4j.exception.ZoomException;
+import org.yaz4j.jni.yaz4jlib;
+import static org.yaz4j.jni.yaz4jlib.*;
+
+/**
+ *
+ * @author jakub
+ */
+public class AsyncConnection extends Connection {
+  private ResultSet lastResultSet;
+  ErrorHandler eh;
+  SearchHandler sh;
+  RecordHandler rh;
+  
+  public interface SearchHandler {
+    public void handle(ResultSet rs);
+  }
+  
+  public interface RecordHandler {
+    public void handle(Record r);
+  }
+  
+  public interface ErrorHandler {
+    public void handle(ZoomException e);
+  }
+
+  public AsyncConnection(String host, int port) {
+    super(host, port);
+    ZOOM_connection_option_set(zoomConnection, "async", "1");
+    //what about piggy back?
+    ZOOM_connection_option_set(zoomConnection, "count", "100");
+    ZOOM_connection_option_set(zoomConnection, "step", "20");
+    closed = false;
+  }
+
+  @Override
+  public ResultSet search(Query query) throws ZoomException {
+    lastResultSet = super.search(query);
+    return null;
+  }
+  
+  public AsyncConnection onSearch(SearchHandler sh) {
+    this.sh = sh;
+    return this;
+  }
+  
+  public AsyncConnection onRecord(RecordHandler rh) {
+    this.rh = rh;
+    return this;
+  }
+  
+  public AsyncConnection onError(ErrorHandler eh) {
+    this.eh = eh;
+    return this;
+  }
+  
+  //actuall handler, pkg-private
+  
+  void handleSearch() {
+    handleError();
+    //handle search
+    if (sh != null) sh.handle(lastResultSet);
+  }
+  
+  void handleRecord() {
+    try {
+      if (rh != null) rh.handle(lastResultSet.getRecord(lastResultSet.asyncRecordOffset));
+    } catch (ZoomException ex) {
+      if (eh != null) eh.handle(ex);
+    } finally {
+      lastResultSet.asyncRecordOffset++;
+    }
+  }
+  
+  void handleError() {
+    //handle error
+    ZoomException err = ExceptionUtil.getError(zoomConnection, host, port);
+    if (err != null) {
+      if (eh != null) eh.handle(err);
+    }
+  }
+  
+}
diff --git a/src/main/java/org/yaz4j/AsyncConnections.java b/src/main/java/org/yaz4j/AsyncConnections.java
new file mode 100644 (file)
index 0000000..a7bdd44
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Copyright (c) 1995-2015, Index Data
+ * All rights reserved.
+ * See the file LICENSE for details.
+ */
+package org.yaz4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.yaz4j.jni.SWIGTYPE_p_p_ZOOM_connection_p;
+import static org.yaz4j.jni.yaz4jlib.*;
+import static java.lang.System.out;
+
+/**
+ *
+ * @author jakub
+ */
+public class AsyncConnections {
+  private List<AsyncConnection> conns = new ArrayList<AsyncConnection>();
+  
+  public void add(AsyncConnection conn) {
+    conns.add(conn);
+  }
+  
+  public void start() {
+    SWIGTYPE_p_p_ZOOM_connection_p c_conns = new_zoomConnectionArray(conns.size());
+    try {
+      for (int i=0; i<conns.size(); i++) {
+        Connection conn = conns.get(i);
+        zoomConnectionArray_setitem(c_conns, i, conn.zoomConnection);
+      }
+      int ret = 0;
+      while ((ret = ZOOM_event(conns.size(), c_conns)) != 0) {
+        int idx = ret - 1;
+        int last = ZOOM_connection_last_event(zoomConnectionArray_getitem(c_conns, idx));
+        AsyncConnection conn = conns.get(idx);
+        String event = ZOOM_get_event_str(last);
+        out.println("Received event " + event + " on connection #"+idx);
+        switch (last) {
+          case ZOOM_EVENT_RECV_SEARCH: conn.handleSearch(); break;
+          case ZOOM_EVENT_RECV_RECORD: conn.handleRecord(); break;
+        }
+      }
+    } finally {
+      delete_zoomConnectionArray(c_conns);
+    }
+  }
+  
+}
index 6a173f9..6f5f617 100644 (file)
@@ -37,8 +37,8 @@ import org.yaz4j.jni.yaz4jlib;
  * @author jakub
  */
 public class Connection implements Closeable {
-  private final String host;
-  private final int port;
+  protected final String host;
+  protected final int port;
   protected SWIGTYPE_p_ZOOM_connection_p zoomConnection;
   //connection is initially closed
   protected boolean closed = true;
@@ -138,6 +138,8 @@ public class Connection implements Closeable {
     }
     return new ResultSet(yazResultSet, this);
   }
+  
+  
 
   /**
    * Performs a scan operation (obtains a list of candidate search terms against
index 1bae33f..00bfe49 100644 (file)
@@ -31,15 +31,15 @@ import org.yaz4j.jni.yaz4jlib;
  */
 public class ResultSet implements Iterable<Record> {
   //for GC refcount
-
   private Connection conn;
-  private SWIGTYPE_p_ZOOM_resultset_p resultSet;
-  private long size = 0;
+  SWIGTYPE_p_ZOOM_resultset_p resultSet;
   private boolean disposed = false;
+  int asyncRecordOffset = 0;
 
   ResultSet(SWIGTYPE_p_ZOOM_resultset_p resultSet, Connection conn) {
+    //do not copy anything to the java side at this point, it won't be valid
+    //in the async mode
     this.resultSet = resultSet;
-    size = yaz4jlib.ZOOM_resultset_size(this.resultSet);
     this.conn = conn;
   }
 
@@ -121,7 +121,7 @@ public class ResultSet implements Iterable<Record> {
       private long cur;
       @Override
       public boolean hasNext() {
-        return cur < size;
+        return cur < getHitCount();
       }
 
       @Override
@@ -158,7 +158,7 @@ public class ResultSet implements Iterable<Record> {
   }
 
   public long getHitCount() {
-    return size;
+    return yaz4jlib.ZOOM_resultset_size(this.resultSet);
   }
 
   void _dispose() {
index 97a74cb..aad35ec 100644 (file)
@@ -11,6 +11,7 @@
        %pointer_functions(size_t, size_tp);
        %include "carrays.i"
        %array_functions(ZOOM_record, zoomRecordArray);
+       %array_functions(ZOOM_connection, zoomConnectionArray);
        %typemap(jni) CharStarByteArray "jbyteArray"
        %typemap(jtype) CharStarByteArray "byte[]"
        %typemap(jstype) CharStarByteArray "byte[]"
diff --git a/src/test/java/org/yaz4j/AsyncConnectionsTest.java b/src/test/java/org/yaz4j/AsyncConnectionsTest.java
new file mode 100644 (file)
index 0000000..4053e81
--- /dev/null
@@ -0,0 +1,165 @@
+/*
+ * Copyright (c) 1995-2015, Index Data
+ * All rights reserved.
+ * See the file LICENSE for details.
+ */
+package org.yaz4j;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.*;
+import org.yaz4j.exception.ZoomException;
+
+import static java.lang.System.out;
+
+/**
+ *
+ * @author jakub
+ */
+public class AsyncConnectionsTest {
+  
+  class Box<T> {
+    T item;
+
+    public Box() {
+    }
+
+    public Box(T item) {
+      this.item = item;
+    }
+    
+    T getItem() {
+      return item;
+    }
+    
+    void setItem(T item) {
+      this.item = item;
+    }
+  } 
+  
+  public AsyncConnectionsTest() {
+  }
+  
+  @BeforeClass
+  public static void setUpClass() {
+  }
+  
+  @AfterClass
+  public static void tearDownClass() {
+  }
+  
+  @Before
+  public void setUp() {
+  }
+  
+  @After
+  public void tearDown() {
+  }
+
+  /**
+   * Test async ZOOM operation.
+   */
+  @Test
+  public void testSingleTarget() {
+    out.println("Trying async connection...");
+    AsyncConnection conn = new AsyncConnection("z3950.indexdata.dk:210/gils", 0);
+    AsyncConnections conns = new AsyncConnections();
+    conns.add(conn);
+    int expectedHitCount = 9;
+    final Box<Long> actualHitCount = new Box<Long>();
+    final Box<Integer> actualRecordCounter =  new Box<Integer>(0);
+    try {
+      conn.setSyntax("sutrs");
+      conn.connect();
+      conn.search(new PrefixQuery("@attr 1=4 utah"));
+      conn
+        .onSearch(new AsyncConnection.SearchHandler() {
+        public void handle(ResultSet rs) {
+          out.println("Received search, hit count "+rs.getHitCount());
+          actualHitCount.setItem(rs.getHitCount());
+        }
+      })
+        .onRecord(new AsyncConnection.RecordHandler() {
+        public void handle(Record r) {
+          out.println("Received a record of type "+r.getSyntax());
+          actualRecordCounter.setItem(actualRecordCounter.getItem()+1);
+        }
+      });
+      
+    } catch (ZoomException ex) {
+      fail(ex.getMessage());
+    }
+    conns.start();
+    assertEquals(expectedHitCount, actualHitCount.item);
+    assertEquals(expectedHitCount, actualRecordCounter.item);
+    
+  }
+  
+  
+  /**
+   * Test async ZOOM operation.
+   */
+  @Test
+  public void testMulitTarget() {
+    out.println("Trying async with multile connections...");
+    AsyncConnections conns = new AsyncConnections();
+    AsyncConnection conn = new AsyncConnection("z3950.indexdata.dk:210/gils", 0);
+    conns.add(conn);
+    AsyncConnection conn2 = new AsyncConnection("z3950.indexdata.dk:210/marc", 0);
+    conns.add(conn2);
+    int expectedHitCount = 19; //for both
+    final Box<Long> actualHitCount = new Box<Long>(0L);
+    final Box<Integer> actualRecordCounter =  new Box<Integer>(0);
+    try {
+      //we need to simplify the API for multiple
+      conn.setSyntax("sutrs");
+      conn.connect();
+      conn.search(new PrefixQuery("@attr 1=4 utah"));
+      conn
+        .onSearch(new AsyncConnection.SearchHandler() {
+        public void handle(ResultSet rs) {
+          out.println("Received search, hit count "+rs.getHitCount());
+          actualHitCount.setItem(actualHitCount.getItem() + rs.getHitCount());
+        }
+      })
+        .onRecord(new AsyncConnection.RecordHandler() {
+        public void handle(Record r) {
+          out.println("Received a record of type "+r.getSyntax());
+          actualRecordCounter.setItem(actualRecordCounter.getItem()+1);
+        }
+      });
+      conn2.setSyntax("marc21");
+      conn2.connect();
+      conn2.search(new PrefixQuery("@attr 1=4 computer"));
+      conn2
+        .onSearch(new AsyncConnection.SearchHandler() {
+        public void handle(ResultSet rs) {
+          out.println("Received search, hit count "+rs.getHitCount());
+          actualHitCount.setItem(actualHitCount.getItem() + rs.getHitCount());
+        }
+      })
+        .onRecord(new AsyncConnection.RecordHandler() {
+        public void handle(Record r) {
+          out.println("Received a record of type "+r.getSyntax());
+          actualRecordCounter.setItem(actualRecordCounter.getItem()+1);
+        }
+      })
+        .onError(new AsyncConnection.ErrorHandler() {
+
+        public void handle(ZoomException e) {
+          out.println("Caught error: "+e.getMessage());
+        }
+      });
+      
+    } catch (ZoomException ex) {
+      fail(ex.getMessage());
+    }
+    conns.start();
+    assertEquals(expectedHitCount, actualHitCount.item);
+    assertEquals(expectedHitCount, actualRecordCounter.item);
+    
+  }
+}