TestDatabaseAdvanced.java

// 
// Lapetus Ltd Java Class.  Copyright (c) Lapetus Systems Ltd, 2009, 2010. 
// ----------------------------------------------------------------------- 
// This is the intellectual property of Lapetus Systems Ltd, Artemis, Greece. 
// -------------------------------------------------------------------------- 
// www.lapetus-ltd.com, www.lapetus.com.gr, www.lapetus.eu 
// ------------------------------------------------------- 
// $LastChangedRevision: 1074 $ 
// $LastChangedDate:: 2010-09-01 15:37:06#$ 
// ---------------------------------------- 
// 
 
import com.lapetus_ltd._2009.xml.types.*; 
import com.lapetus_ltd.api.TLptsMainDatabase; 
import com.lapetus_ltd.api.common.TLptsCryptoModule; 
import com.lapetus_ltd.api.common.TLptsFactoryCharSetLocale; 
import com.lapetus_ltd.api.common.TLptsFactoryFile; 
import com.lapetus_ltd.api.common.TLptsFactoryTime; 
import com.lapetus_ltd.api.common.logger.ILptsLogListener; 
import com.lapetus_ltd.api.common.logger.TLptsLog; 
import com.lapetus_ltd.api.common.logger.TLptsLogger; 
import com.lapetus_ltd.api.db.control.*; 
import com.lapetus_ltd.api.db.utils.TLptsDriverLoader; 
import com.lapetus_ltd.api.db.xml.types.TLptsDBConnectionType; 
import com.lapetus_ltd.api.db.xml.types.TLptsXmlRowSetSingleRowType; 
import com.lapetus_ltd.api.db.xml.types.TLptsXmlRowSetType; 
import com.lapetus_ltd.api.xml.types.TLptsDriverType; 
import junit.framework.TestCase; 
 
import java.sql.Date; 
import java.sql.ResultSet; 
import java.text.DecimalFormat; 
import java.util.*; 
 
 
// ###################################################################################################### 
// ####   Test code for the dbJAPI build process. This code will stop the build process on ERROR     #### 
// ###################################################################################################### 
 
// 
// Class Description : Advanced test for Thread Processing. 
// 
// AIM : To test the thread handling capabilities of dbJAPI. 
// 
// The aim of the tests below is to verify the thread-handling capabilities of the connections, statements and rowsets. 
// Only one connection is created, but 10 threads are generated that use that one connection. Each thread generates 
// 3 statements that are dynamically related to eachother and process up to 50 records per level. 
// Each statement is initiated inside its own new thread, with every rowset generated creating its own thread. 
// 
// The listener for the successful statement creation is responsible for the rowset creation. 
// The listener for the rowsets then processes the rows to the monitor and verification mechanism. 
// 
// The TLptsRowSetOutputListener has also been added (in all of it's 3 output types) so that we have data, debug and xml files 
// for later analysis and test verification. At the end of the processing the XML file is checked for the 10 copies of all the data. 
// Maps are used to store sub-level data and counters which are verified at the end of the processing. 
// Various tests are performed with counters and verification on the xml contents. There is also a human examination and verification. 
// When this is completed, the test is passed (provided it did not fail at another point). 
// 
// The database tables are large. Each table has more than one-million records in it. 
// The output is grouped using the lock object (outputGroupingLock) as there are many threads generated and output is jumbled otherwise. 
// The only structured output is the xml file. The other output data files have records that are both not structured and not in order. 
// 
// The memory is monitored with every connection and statement creation, with all readings being kept in the memory module class. 
// A summary is shown at the end of the processing. 
// 
// How it all works : see the Connection Manager for a description of the work process 
// (com.lapetus_ltd.api.db.control.TLptsFactoryConnection in the Javadoc) 
// 
 
 
public class TestDatabaseAdvanced extends TestCase 
{ 
  private MemoryMonitor memoryMonitor = new MemoryMonitor(); 
  private final Object outputGroupingLock = new Object(); 
  private TLptsRowSetOutputListener rowSetOutputXmlListener; 
 
  private final Map<String, Integer> rowSetCountMap1 = Collections.synchronizedMap(new LinkedHashMap<String, Integer>()); 
  private final Map<Integer, Integer> rowSetCountMap2 = Collections.synchronizedMap(new LinkedHashMap<Integer, Integer>()); 
 
  private final int NO_OF_THREADS = 10; 
  private final int NO_OF_RECORDS = 50; 
 
  private int totalRecordsProcessed = 0; 
  private int totalObjectsProcessed = 0; 
 
  static public void main(String[] args) 
  { 
    TestDatabaseAdvanced tdb = new TestDatabaseAdvanced(); 
    tdb.testConnection(); 
  } 
 
 
  public void testConnection() 
  { 
    try 
    { 
      // every dbJAPI application requires this before using it's capabilities 
      TLptsMainDatabase.initiateSystem(); 
      // we don't want any popups while running tests offline 
      TLptsLogger.setMessageDialog(0); 
 
      // register a listener for the logging system. 
      TLptsLogger.addListener(new ILptsLogListener() 
      { 
        public void newLogGenerated(TLptsLog log) 
        { 
          if (log.getType().equals(TLptsLogger.LOG_TYPE.ERROR)) // this causes a FAIL of the test 
            synchronized (outputGroupingLock) 
            { 
              fail("LOG ERROR :" + log.getMessage() + " : " + log.getSupportingText() + " : Exception : " + log.getExceptionMessage()); 
              System.exit(0); 
            } 
          if (log.getType().equals(TLptsLogger.LOG_TYPE.WARNING)) 
            synchronized (outputGroupingLock) 
            { 
              System.out.println("LOG WARNING :" + log.getMessage() + " : " + log.getSupportingText()); 
            } 
          if (log.getType().equals(TLptsLogger.LOG_TYPE.MESSAGE)) 
            synchronized (outputGroupingLock) 
            { 
              System.out.println("LOG MESSAGE :" + log.getMessage() + " : " + log.getSupportingText()); 
            } 
        } 
      }); 
 
      synchronized (outputGroupingLock) 
      { 
        String charSets = "Available Charsets on this HOST : "; 
        for (String cs : TLptsFactoryCharSetLocale.getCharsetNameList()) 
          charSets += cs + " "; 
        System.out.println(charSets); 
      } 
      startMultiThreadProcessing(); 
    } finally 
    { 
      memoryMonitor.showFinalStats(); 
    } 
  } 
 
 
  private void startMultiThreadProcessing() 
  { 
    synchronized (outputGroupingLock) 
    { 
      for (TLptsLog log : TLptsLogger.getLogList(false)) 
        System.out.println("Log: " + log.getMessage() + " : " + log.getSupportingText() + " : " + log.getExceptionMessage()); 
    } 
 
    // This listener will create threads and statements from the one connection 
    TLptsFactoryConnection.addListener(new GenerateThreadsConnectionListener()); 
 
    // To get the status of the statements we add a listener to the factory. This will also generate the rowsets. 
    TLptsFactoryStatement.addListener(new FactoryStatementListener()); 
 
    // lets take the rowset output and send it to various files, with various types. 
    String path = TLptsFactoryFile.createDirectoryPath(TLptsFactoryFile.getCurrentDirectory() + "test/"); 
    // we keep a variable for the xml for verification at the end 
    TLptsFactoryRowSet.addListener(rowSetOutputXmlListener = new TLptsRowSetOutputListener(path + "TestDatabaseAdvanced.resultset.xml", 
                                                                                           TLptsRowSetOutputListener.OUTPUT_TYPE.OUTPUT_TO_XML)); 
    TLptsFactoryRowSet.addListener(new TLptsRowSetOutputListener(path + "TestDatabaseAdvanced.resultset.data.txt", 
                                                                 TLptsRowSetOutputListener.OUTPUT_TYPE.DATA_OUTPUT)); 
    TLptsFactoryRowSet.addListener(new TLptsRowSetOutputListener(path + "TestDatabaseAdvanced.resultset.debug.txt", 
                                                                 TLptsRowSetOutputListener.OUTPUT_TYPE.DEBUG_OUTPUT)); 
 
    // now we add a local listener for the memory and test verification 
    TLptsFactoryRowSet.addListener(new RowSetListener()); 
 
    synchronized (outputGroupingLock) 
    { 
      System.out.println("*************************************************************************"); 
      System.out.println("Testing many threads generated on the same tables with dynamic processing"); 
      System.out.println("*************************************************************************"); 
    } 
    connectToMySql(); 
  } 
 
 
  // #################################################################################### 
  //            C O N N E C T I O N 
  // #################################################################################### 
 
  private void connectToMySql() 
  { 
    ResourceBundle bundle = ResourceBundle.getBundle("resources"); // we have stored all the database info in here 
    TLptsDBConnectionType connectionType; 
 
    synchronized (outputGroupingLock) 
    { 
      System.out.println(); 
      System.out.println("Connecting to MY SQL DB"); 
      System.out.println(); 
    } 
 
    connectionType = new TLptsDBConnectionType(); 
    connectionType.setTitle("MYSQL_DB"); 
 
    XLptsDriverType driverType = TLptsDriverLoader.getDriverTypeByClassName("com.mysql.jdbc.jdbc2.optional.MysqlDataSource"); 
 
    if (driverType == null) 
      synchronized (outputGroupingLock) 
      { 
        fail("Could not find driver for class : com.microsoft.sqlserver.jdbc.SQLServerDataSource"); 
      } 
 
    TLptsDriverType dt = new TLptsDriverType(driverType); 
 
    // changing values that are default in the driver.loader.xml file. 
    // the DB is now set correctly and will filter through to the Datasource interface execution 
    // in other words the testdb2 will be used as the database for the connection 
    dt.setValueForInterfaceFunctionParameter("setDatabaseName", "Database Name", bundle.getString("Testing.database.mysql.testdb2")); 
    connectionType.setDriverType(dt); 
 
    // the passwords are encrypted when stored and unencrypted when required 
    connectionType.setCredentials(TLptsCryptoModule.defaultEncryptCredentialsRSA(bundle.getString("Testing.database.mysql.user"), 
                                                                                 bundle.getString("Testing.database.mysql.password"))); 
 
    TLptsFactoryConnection.initiateConnection(connectionType); // This generates another thread!! 
  } 
 
 
//  ############################################################################################################### 
//  ####             C O N N E C T I O N      L I S T E N E R 
//  ############################################################################################################### 
 
  private class GenerateThreadsConnectionListener implements ILptsFactoryConnectionListener 
  { 
    public void newConnectionProcessStarted() 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Connection process has started..."); 
      } 
    } 
 
    public synchronized void newConnectionCreated(TLptsConnection connection) 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("New Connection created successfully. Statements can be processed."); 
        System.out.println("Process Simple SQL SELECT for all columns of all tables"); 
      } 
 
      // create threads which couple the dynamic tables. 
      for (int i = 0; i < NO_OF_THREADS; i++) 
        createThreadWithDynamicSql(connection); 
 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Number of Overall Connections : " + TLptsFactoryConnection.getConnectionList().size()); 
        assertEquals("--> Number of Connections should be 1", 1, TLptsFactoryConnection.getConnectionList().size()); 
        System.out.println("Number of Statements on this Connection : " + connection.getNumberOfConnectedStatements()); 
        memoryMonitor.recordMemoryDynamics(false); 
      } 
    } 
 
    public void newConnectionFailed(TLptsLog log) 
    { 
      synchronized (outputGroupingLock) 
      { 
        fail("New Connection failed! Reason :" + log.getMessage() + " : " + log.getSupportingText() + " : Exception : " + log.getExceptionMessage()); 
      } 
    } 
 
    public void removedAndClosedConnection(TLptsConnection connection) 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Connection removed from Connection Manager and closed successfully ... " + connection.getTitle()); 
      } 
      memoryMonitor.showFinalStats(); 
    } 
  } 
 
 
  private void createThreadWithDynamicSql(TLptsConnection connection) 
  { 
    List<Integer> columnSqlTypes; 
    List<String> columnNames; 
 
    // PRINTOUT OF ALL COLUMNS OF THE TABLES 
    synchronized (outputGroupingLock) 
    { 
      System.out.println("Table CUSTOMERS COLUMNS :"); 
    } 
    columnSqlTypes = connection.getColumnTypes("customers"); 
    columnNames = connection.getColumnNames("customers"); 
 
    for (int i = 0; i < columnNames.size(); i++) 
    { 
      int type = columnSqlTypes.get(i); 
      String typeStr = TLptsFactoryStatement.getSqlTypeString(type); 
 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Column " + columnNames.get(i) + " has SQL type " + type + "(" + typeStr + ")"); 
      } 
    } 
    synchronized (outputGroupingLock) 
    { 
      System.out.println("Table TRANSACTIONS COLUMNS :"); 
    } 
 
    columnSqlTypes = connection.getColumnTypes("transactions"); 
    columnNames = connection.getColumnNames("transactions"); 
 
    for (int i = 0; i < columnNames.size(); i++) 
    { 
      int type = columnSqlTypes.get(i); 
      String typeStr = TLptsFactoryStatement.getSqlTypeString(type); 
 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Column " + columnNames.get(i) + " has SQL type " + type + "(" + typeStr + ")"); 
      } 
    } 
    synchronized (outputGroupingLock) 
    { 
      System.out.println("Table IDITEMS COLUMNS :"); 
    } 
 
    columnSqlTypes = connection.getColumnTypes("iditems"); 
    columnNames = connection.getColumnNames("iditems"); 
 
    for (int i = 0; i < columnNames.size(); i++) 
    { 
      int type = columnSqlTypes.get(i); 
      String typeStr = TLptsFactoryStatement.getSqlTypeString(type); 
 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Column " + columnNames.get(i) + " has SQL type " + type + "(" + typeStr + ")"); 
      } 
    } 
 
    // first we create the client, transaction and item statements. Then we dynamically link them. 
    TLptsStatement customerStatement, transStatement, itemStatement; 
 
    //////////////////////////////// CUSTOMER TABLE  //////////////////////////////////// 
 
    // contains all the tables and columns of the connection. We need to select columns or input a SQL string manually 
    customerStatement = new TLptsStatement(connection, XLptsDBTypeOfStatementType.SELECT); 
    customerStatement.setTitle("CUSTOMERS"); 
    customerStatement.setSqlDatabaseFormat(XLptsDBSqlDatabaseFormat.DEFAULT_MYSQL); 
    customerStatement.setXResultSetType(ResultSet.TYPE_FORWARD_ONLY); 
    customerStatement.setXResultSetConcurrency(ResultSet.CONCUR_READ_ONLY); 
 
    XLptsDBStatementTableType table = customerStatement.getTableItemByName("CUSTOMERS"); 
    table.setSelected(true); // selecting one table only, with all its columns 
    customerStatement.setSelectOnAllColumns(table.getTableName(), true); 
 
    // make sure this is not processed as we are waiting for the dynamic statements below to be created. 
    customerStatement.setExecutable(false); 
    TLptsFactoryStatement.createNewStatement(connection, customerStatement); 
 
    /////////////////////////////// TRANSACTIONS TABLE  //////////////////////////////////// 
 
    transStatement = new TLptsStatement(connection, XLptsDBTypeOfStatementType.SELECT); 
    transStatement.setTitle("TRANSACTIONS"); 
    transStatement.setSqlDatabaseFormat(XLptsDBSqlDatabaseFormat.DEFAULT_MYSQL); 
    transStatement.setXResultSetType(ResultSet.TYPE_FORWARD_ONLY); 
    transStatement.setXResultSetConcurrency(ResultSet.CONCUR_READ_ONLY); 
 
    table = transStatement.getTableItemByName("TRANSACTIONS"); 
    table.setSelected(true); // selecting one table only, with all its columns 
    // select all the columns and the select statement SQL string will be created automatically. 
    transStatement.setSelectOnAllColumns(table.getTableName(), true); 
 
    // relate this statement to the customer statement 
    // this means that 'customer' will feed the data to this statement for every record of 'customer' to 
    // regenerate this statement every time 
 
    // false means that we do not want static string substitution (uses the setObject of Statement) 
    transStatement.setPrimaryStatement(customerStatement, false); 
    // column 2 in this statement is related to column 1 in customer 
    // certain operations allow for many indexes (namely IN and BETWEEN) 
    transStatement.addPrimaryRelationship(2, new int[]{1}, TLptsFactoryStatement.OPERATION_EQUAL); 
 
    // make sure this is not processed as we are waiting for the dynamic statement below to be created. 
    transStatement.setExecutable(false); 
    TLptsFactoryStatement.createNewStatement(connection, transStatement); 
 
    /////////////////////////////// ITEMS  TABLE //////////////////////////////////// 
 
    itemStatement = new TLptsStatement(connection, XLptsDBTypeOfStatementType.SELECT); 
    itemStatement.setTitle("ITEMS"); 
    itemStatement.setSqlDatabaseFormat(XLptsDBSqlDatabaseFormat.DEFAULT_MYSQL); 
    itemStatement.setXResultSetType(ResultSet.TYPE_FORWARD_ONLY); 
    itemStatement.setXResultSetConcurrency(ResultSet.CONCUR_READ_ONLY); 
 
    table = itemStatement.getTableItemByName("ITEMS"); 
    table.setSelected(true); // selecting one table only, with all its columns 
    itemStatement.setSelectOnAllColumns(table.getTableName(), true); 
 
    // relate this statement to the TRANSACTIONS statement 
    itemStatement.setPrimaryStatement(transStatement, false); 
    itemStatement.addPrimaryRelationship(1, new int[]{7}, TLptsFactoryStatement.OPERATION_EQUAL); 
 
    // this will execute, along with all its related statements. 
    transStatement.setExecutable(true); 
    TLptsFactoryStatement.createNewStatement(connection, itemStatement); 
  } 
 
 
//  ############################################################################################################### 
//  ####                   S T A T E M E N T       L I S T E N E R 
//  ############################################################################################################### 
 
  private class FactoryStatementListener implements ILptsFactoryStatementListener 
  { 
    public void newStatementProcessStarted(TLptsConnection connection) 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Creating a new Statement for the connection : " + connection.getTitle()); 
      } 
    } 
 
    public void newStatementCreated(TLptsConnection connection, TLptsStatement statement) 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Successful creation of a statement called : " + statement.getTitle()); 
      } 
      // Both the customer, transaction and item tables have 1 million records in them. 
      // Note: We have 10 threads running, so lets not get greedy ! 
      // 50 primary records (with all their transactions and items) should be enough. (times 10 threads) 
      TLptsFactoryRowSet.executeSelectStatement(statement, 1, NO_OF_RECORDS, false);  // spawns another thread. 
      memoryMonitor.recordMemoryDynamics(false); 
    } 
 
    public void newStatementFailed(TLptsConnection connection) 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Could not create a Statement for the connection : " + connection.getTitle()); 
        System.out.println("The logs should be checked for an error or exception."); 
      } 
    } 
 
    public void removedAndClosedStatement(TLptsConnection connection, TLptsStatement statement) 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Statement removed from Connection Manager and closed successfully ... " + statement.getTitle()); 
        System.out.println("Connection title : " + connection.getTitle()); 
        System.out.println("Number of Statements left on this connection : " + connection.getNumberOfConnectedStatements()); 
      } 
    } 
  } 
 
 
//  ############################################################################################################### 
//  ####                    R O W S E T      L I S T E N E R 
//  ############################################################################################################### 
  // note: this listener is called 10 times for every row. These functions are executed in 10 different threads. 
 
  private class RowSetListener implements ILptsFactoryRowSetListener 
  { 
    private int endCounter1 = 0, 
      endCounter2 = 0, 
      endCounter3 = 0, 
      totalCompleteCounter = 0, 
      totalPrimaryCounter = 0, 
      totalPrimaryTopCounter = 0; 
 
    public boolean processNewRowSetRows(TLptsRowSetEvent rowSetEvent) 
    { 
      return true; 
    } 
 
    public void rowEvent(final TLptsRowEvent rowEvent) 
    { 
      if (rowEvent.getStatement() == null) 
      { 
        synchronized (outputGroupingLock) 
        { 
          TLptsLogger.logWarning("New Row created, but statement not found.", null); 
          return; 
        } 
      } 
 
      if (rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.MOVED_TO_NEXT_PRIMARY_RECORD || 
          rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.MOVED_TO_NEXT_RECORD) 
      { 
        if (rowEvent.getStatement() == null) 
          return; 
 
        totalRecordsProcessed++; 
 
        if (rowEvent.getRowObjectList() != null) 
          totalObjectsProcessed += rowEvent.getRowObjectList().size(); 
        else 
          return; 
 
        if (rowEvent.getRowObjectList().size() == 0) 
          return; 
 
        Integer rowNo; 
 
        if (rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.MOVED_TO_NEXT_PRIMARY_RECORD) 
        { 
          totalPrimaryCounter++; 
 
          if (!rowEvent.getStatement().hasPrimaryStatement()) 
          { 
            rowSetCountMap1.put(rowEvent.getStatement().getId(), rowEvent.getCurrentRowNo()); 
 
            totalPrimaryTopCounter++; 
 
            endCounter3++; 
 
            if (endCounter3 >= NO_OF_THREADS) 
            { 
              synchronized (outputGroupingLock) 
              { 
                System.out.println("Processing row " + totalPrimaryTopCounter); 
                memoryMonitor.recordMemoryDynamics(false); 
              } 
              endCounter3 = 0; 
            } 
          } else 
            if ((rowNo = rowSetCountMap1.get(rowEvent.getPrimaryStatementId())) != null) 
            { 
              int subRows = 0; 
              synchronized (rowSetCountMap2) 
              { 
                if (rowSetCountMap2.get(rowNo) != null) 
                { 
                  subRows = rowSetCountMap2.get(rowNo); 
                } 
                subRows++; 
                rowSetCountMap2.put(rowNo, subRows); 
              } 
            } 
        } 
 
        if (rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.MOVED_TO_NEXT_RECORD) 
        { 
          synchronized (rowSetCountMap1) 
          { 
            if (rowEvent.getStatement().hasPrimaryStatement()) 
              if ((rowNo = rowSetCountMap1.get(rowEvent.getPrimaryStatementId())) != null) 
              { 
                int subRows = 0; 
                synchronized (rowSetCountMap2) 
                { 
                  if (rowSetCountMap2.get(rowNo) != null) 
                  { 
                    subRows = rowSetCountMap2.get(rowNo); 
                  } 
                  subRows++; 
                  rowSetCountMap2.put(rowNo, subRows); 
                } 
              } 
          } 
        } 
      } 
 
      if (rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.EXECUTION_INTERRUPTED || 
          rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.ERROR_OCCURRED) 
      { 
        TLptsFactoryStatement.removeStatement(rowEvent.getStatement()); 
        TLptsFactoryConnection.removeConnection((TLptsConnection) (rowEvent.getStatement()).getConnectionWE(), false); 
        memoryMonitor.showFinalStats(); 
 
        fail("Process interrupted. Could not verify contents of output file."); 
        System.exit(1);  // this is commented out if run together with all the other tests 
      } 
 
      if (rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.PROCESSING_COMPLETE) 
      { 
        totalCompleteCounter++; 
        // we check to see if this is the parent of all the statements. 
        // If it is not, we cannot close it 
        if (!rowEvent.getStatement().hasPrimaryStatement()) 
          endCounter2++; 
 
        if (endCounter2 >= NO_OF_THREADS) 
        { 
          TLptsFactoryStatement.removeStatement(rowEvent.getStatement()); 
          TLptsFactoryConnection.removeConnection((TLptsConnection) (rowEvent.getStatement()).getConnectionWE(), false); 
          memoryMonitor.showFinalStats(); 
 
          synchronized (outputGroupingLock) 
          { 
            System.out.println("Total Top Primary Records received : " + totalPrimaryTopCounter); 
            System.out.println("Total Primary Records received : " + totalPrimaryCounter); 
            System.out.println("Total 'processing complete' received : " + totalCompleteCounter); 
          } 
 
          if (totalPrimaryTopCounter != NO_OF_RECORDS * NO_OF_THREADS) 
            fail("Top level primary record number is correct. Expecting " + NO_OF_RECORDS * NO_OF_THREADS + ", but got " + totalPrimaryTopCounter); 
 
          if (!checkXmlForConsistency(rowSetOutputXmlListener.getXmlRowSet())) 
            fail("The XML output is not consistent with what is expected"); 
          else 
            synchronized (outputGroupingLock) 
            { 
              System.out.println("XML output is consistent with expected result"); 
            } 
          rowSetOutputXmlListener.closeOutputFile(); // write the output to the XML file 
          System.exit(1); // this is commented out if run together with all the other tests 
        } 
      } 
    } 
  } 
 
 
//  ############################################################################################################### 
//  ####                    V E R I F I C A T I O N    F U N C T I O N S 
//  ############################################################################################################### 
 
  private boolean checkXmlForConsistency(TLptsXmlRowSetType xmlRowSetType) 
  { 
    List<String> rowObjects = new LinkedList<String>(); 
    int totalNoOfRows = NO_OF_RECORDS * NO_OF_THREADS; 
 
    if (xmlRowSetType.getPrimaryTable().getRowListItem().size() != totalNoOfRows) 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Fault: Total number of primary rows incorrect : Expecting " + totalNoOfRows + 
                           ", but got " + xmlRowSetType.getPrimaryTable().getRowListItem().size()); 
      } 
      return false; 
    } 
 
    for (int rowNo = 1; rowNo <= NO_OF_RECORDS; rowNo++) 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Processing and checking row number : " + rowNo); 
      } 
 
      // first we check that all the first level rows have 10 copies. 
      int rowCounter = 0; 
      for (XLptsXmlRowSetSingleRowType rssrt : xmlRowSetType.getPrimaryTable().getRowListItem()) 
      { 
        if (rssrt.getRowNumber() == rowNo) 
        { 
          // store the objects for comparison with the others lines 
          if (rowCounter == 0) 
          { 
            rowObjects.clear(); 
            for (XLptsXmlRowSetRowObjectType object : rssrt.getRowObjectListItem()) 
              rowObjects.add(object.getString()); 
          } else 
          { 
            // compare with the other lines 
            int i = 0; 
            for (XLptsXmlRowSetRowObjectType object : rssrt.getRowObjectListItem()) 
            { 
              try 
              { 
                if (!rowObjects.get(i).equals(object.getString())) 
                { 
                  synchronized (outputGroupingLock) 
                  { 
                    System.out.println("For row number " + rowNo + ": Row object should be " + rowObjects.get(i) + " but is " + object.getString()); 
                  } 
                  return false; 
                } 
              } catch (Exception e) 
              { 
                synchronized (outputGroupingLock) 
                { 
                  System.out.println("For row number " + rowNo + ": Exception " + e.getLocalizedMessage()); 
                } 
                return false; 
              } 
              i++; 
            } 
          } 
          rowCounter++; 
        } 
      } 
 
      if (rowCounter != 10) 
      { 
        synchronized (outputGroupingLock) 
        { 
          System.out.println("For row number " + rowNo + ": Row counter should be 10, but is " + rowCounter); 
        } 
        return false; 
      } 
    } 
 
    int internalTotalRecordsProcessed = 0; 
    for (XLptsXmlRowSetSingleRowType rowSetSingleRowType : xmlRowSetType.getPrimaryTable().getRowListItem()) 
    { 
      internalTotalRecordsProcessed = getTotalRowsAndObjectsInXML(rowSetSingleRowType, internalTotalRecordsProcessed, true); 
      internalTotalRecordsProcessed++; 
    } 
 
    int internalTotalObjectsProcessed = 0; 
    for (XLptsXmlRowSetSingleRowType rowSetSingleRowType : xmlRowSetType.getPrimaryTable().getRowListItem()) 
    { 
      internalTotalObjectsProcessed = getTotalRowsAndObjectsInXML(rowSetSingleRowType, internalTotalObjectsProcessed, false); 
    } 
 
    if (totalRecordsProcessed == internalTotalRecordsProcessed) 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Total record count verified : " + totalRecordsProcessed); 
      } 
    else 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Incorrect Total record count : Expecting " + totalRecordsProcessed + ", but got " + internalTotalRecordsProcessed); 
      } 
      return false; 
    } 
 
    if (totalObjectsProcessed == internalTotalObjectsProcessed) 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Total object count verified : " + totalObjectsProcessed); 
      } 
    else 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println("Incorrect Total object count : Expecting " + totalObjectsProcessed + ", but got " + internalTotalObjectsProcessed); 
      } 
      return false; 
    } 
 
    if (!verifySubRowCounts(xmlRowSetType)) 
      return false; 
 
    return true; 
  } 
 
 
  private int getTotalRowsAndObjectsInXML(XLptsXmlRowSetSingleRowType rowSetSingleRowType, int counter, boolean isRow) 
  { 
    if (!isRow) 
      counter += ((TLptsXmlRowSetSingleRowType) rowSetSingleRowType).getRowObjectListItemSize(); 
 
    for (XLptsXmlRowSetTableType tableItem : rowSetSingleRowType.getSubTableListItem()) 
    { 
      if (isRow) 
        counter += tableItem.getRowListItem().size(); 
 
      for (XLptsXmlRowSetSingleRowType rst : tableItem.getRowListItem()) 
        counter = getTotalRowsAndObjectsInXML(rst, counter, isRow); 
    } 
    return counter; 
  } 
 
 
  private boolean verifySubRowCounts(TLptsXmlRowSetType xmlRowSetType) 
  { 
    synchronized (outputGroupingLock) 
    { 
      System.out.println(); 
      System.out.print("Sub row counts : "); 
      for (Integer cnt : rowSetCountMap2.values()) 
        System.out.print(" " + cnt / NO_OF_THREADS); 
      System.out.println(); 
    } 
 
    // now lets verify each of the 10 copies of the primary top level rows for their sub-row counts. 
    // We do this by using the double map created during data consumption, which counted the numbers for all threads. 
    for (int rowCounter = 1; rowCounter <= NO_OF_RECORDS; rowCounter++) 
      for (XLptsXmlRowSetSingleRowType rowSetSingleRowType : xmlRowSetType.getPrimaryTable().getRowListItem()) 
      { 
        if (rowSetSingleRowType.getRowNumber() == rowCounter) 
        { 
          synchronized (rowSetCountMap2) 
          { 
            if (rowSetCountMap2.get(rowCounter) == null) 
            { 
              synchronized (outputGroupingLock) 
              { 
                System.out.println("Error in sub row verification: Row not found for counting sub-rows"); 
              } 
              return false; 
            } 
            int rows; 
            if ((rows = (rowSetCountMap2.get(rowCounter) / NO_OF_THREADS)) != getTotalSubRowsForRow(rowSetSingleRowType)) 
            { 
              synchronized (outputGroupingLock) 
              { 
                System.out.println("Error in sub row verification : (row " + rowCounter + "). Sub-rows count should be " + 
                                   getTotalSubRowsForRow(rowSetSingleRowType) + ", but is " + rows); 
              } 
              return false; 
            } 
          } 
        } 
      } 
 
    return true; 
  } 
 
  private int getTotalSubRowsForRow(XLptsXmlRowSetSingleRowType row) 
  { 
    int counter = 0; 
 
    for (XLptsXmlRowSetTableType tableItem : row.getSubTableListItem()) 
    { 
      counter += tableItem.getRowListItem().size(); 
    } 
    return counter; 
  } 
 
  private class MemoryMonitor 
  { 
    private List<MemoryUnit> memoryList = new LinkedList<MemoryUnit>(); 
    private Runtime runtime; 
 
    private MemoryMonitor() 
    { 
      runtime = Runtime.getRuntime(); 
    } 
 
    private class MemoryUnit 
    { 
      private double total; 
      private double max; 
      private double free; 
      private int id; 
      private long time; 
 
      private MemoryUnit(boolean showMemory, int id) 
      { 
        this.id = id; 
        time = TLptsFactoryTime.getCurrentTime().getTime(); 
 
        total = runtime.totalMemory(); 
        max = runtime.maxMemory(); 
        free = runtime.freeMemory(); 
 
        total /= (1024 * 1024); // convert to Mb 
        max /= (1024 * 1024);   // convert to Mb 
        free /= (1024 * 1024);  // convert to Mb 
 
        synchronized (outputGroupingLock) 
        { 
          System.out.println("######## "); 
          System.out.println("######## MEMORY MONITOR POINT CREATED : ID = " + id); 
          System.out.println("######## "); 
        } 
 
        if (showMemory) 
        { 
          synchronized (outputGroupingLock) 
          { 
            System.out.println("///////////    MEMORY DYNAMICS    \\\\\\\\\\\\\""); 
            System.out.println("Total memory in Mb is :" + total); 
            System.out.println("Maximum memory in Mb is :" + max); 
            System.out.println("Free memory in Mb is :" + free); 
            System.out.println("///////////    ---------------    \\\\\\\\\\\\\""); 
          } 
        } 
      } 
 
      private void printDebugString() 
      { 
        DecimalFormat df = new DecimalFormat("0000.0000"); 
        //                                 1234    1234.1234    1234.1234    1234.1234 
        System.out.println(String.format(" %04d    " + df.format(max) + "    " + df.format(total) + "    " + 
                                         df.format(free) + "   " + TLptsFactoryTime.getTimeString(new Date(time)), id)); 
      } 
    } 
 
    public synchronized void recordMemoryDynamics(boolean showMemory) 
    { 
      memoryList.add(new MemoryUnit(showMemory, memoryList.size() + 1)); 
 
      if (runtime.freeMemory() < 512 * 1024) 
        fail("Free Memory is less than 512 kb"); 
    } 
 
    public synchronized void showFinalStats() 
    { 
      synchronized (outputGroupingLock) 
      { 
        System.out.println(""); 
        System.out.println("######## MEMORY STATS PRINTOUT #########  (in Mb)  ########"); 
        System.out.println(" ID        MAX         TOTAL         FREE        Timestamp"); 
        System.out.println(" --     ---------    ---------    ---------      ---------"); 
        //                  1234    1234.1234    1234.1234    1234.1234 
 
        for (MemoryUnit mu : memoryList) 
          mu.printDebugString(); 
 
        System.out.println("###########################################################"); 
      } 
    } 
  } 
}