// // Lapetus Ltd Java Class. Copyright (c) Lapetus Systems Ltd, 2009, 2010. // ----------------------------------------------------------------------- // This is the intellectual property of Lapetus Systems Ltd, Artemis, Greece. // -------------------------------------------------------------------------- // www.lapetus-ltd.com, www.lapetus.com.gr, www.lapetus.eu // ------------------------------------------------------- // $LastChangedRevision: 1210 $ // $LastChangedDate:: 2010-11-29 15:22:58#$ // ---------------------------------------- // import com.lapetus_ltd._2009.xml.types.*; import com.lapetus_ltd.api.TLptsMainDatabase; import com.lapetus_ltd.api.common.TLptsCharSetLocaleUtil; import com.lapetus_ltd.api.common.TLptsCryptoUtil; import com.lapetus_ltd.api.common.TLptsFileUtil; import com.lapetus_ltd.api.common.TLptsTimeUtil; import com.lapetus_ltd.api.common.logger.ILptsLogListener; import com.lapetus_ltd.api.common.logger.TLptsLog; import com.lapetus_ltd.api.common.logger.TLptsLogger; import com.lapetus_ltd.api.db.control.*; import com.lapetus_ltd.api.db.utils.TLptsDriverLoader; import com.lapetus_ltd.api.db.xml.types.TLptsDBConnectionType; import com.lapetus_ltd.api.db.xml.types.TLptsDriverType; import com.lapetus_ltd.api.db.xml.types.TLptsXmlRowSetSingleRowType; import com.lapetus_ltd.api.db.xml.types.TLptsXmlRowSetType; import junit.framework.TestCase; import java.sql.Date; import java.sql.ResultSet; import java.text.DecimalFormat; import java.util.*; // ###################################################################################################### // #### Test code for the dbJAPI build process. This code will stop the build process on ERROR #### // ###################################################################################################### // // Class Description : Advanced test for Thread Processing. // // AIM : To test the thread handling capabilities of dbJAPI. // // The aim of the tests below is to verify the thread-handling capabilities of the connections, statements and rowsets. // Only one connection is created, but 10 threads are generated that use that one connection. Each thread generates // 3 statements that are dynamically related to eachother and process up to 50 records per level. // Each statement is initiated inside its own new thread, with every rowset generated creating its own thread. // // The listener for the successful statement creation is responsible for the rowset creation. // The listener for the rowsets then processes the rows to the monitor and verification mechanism. // // The TLptsRowSetOutputListener has also been added (in all of it's 3 output types) so that we have data, debug and xml files // for later analysis and test verification. At the end of the processing the XML file is checked for the 10 copies of all the data. // Maps are used to store sub-level data and counters which are verified at the end of the processing. // Various tests are performed with counters and verification on the xml contents. There is also a human examination and verification. // When this is completed, the test is passed (provided it did not fail at another point). // // The database tables are large. Each table has more than one-million records in it. // The output is grouped using the lock object (outputGroupingLock) as there are many threads generated and output is jumbled otherwise. // The only structured output is the xml file. The other output data files have records that are both not structured and not in order. // // The memory is monitored with every connection and statement creation, with all readings being kept in the memory module class. // A summary is shown at the end of the processing. // // How it all works : see the Connection Manager for a description of the work process // (com.lapetus_ltd.api.db.control.TLptsFactoryConnection in the Javadoc) // // Note: There is a sister test called TestDatabaseAdvancedOracle which is exactly the same. It is run against an Oracle DB and // the results are compared using comparison tools like WinMerge. public class TestDatabaseAdvancedMySql extends TestCase { private MemoryMonitor memoryMonitor = new MemoryMonitor(); private final Object outputGroupingLock = new Object(); private TLptsRowSetOutputListener rowSetOutputXmlListener; private final Map<String, Integer> rowSetCountMap1 = Collections.synchronizedMap(new LinkedHashMap<String, Integer>()); private final Map<Integer, Integer> rowSetCountMap2 = Collections.synchronizedMap(new LinkedHashMap<Integer, Integer>()); private final int NO_OF_THREADS = 10; private final int NO_OF_RECORDS = 50; private int totalRecordsProcessed = 0; private int totalObjectsProcessed = 0; static public void main(String[] args) { TestDatabaseAdvancedMySql tdb = new TestDatabaseAdvancedMySql(); tdb.testConnection(); } public void testConnection() { try { // every dbJAPI application requires this before using it's capabilities TLptsMainDatabase.init(); // register a listener for the logging system. TLptsLogger.addListener(new ILptsLogListener() { public void newLogGenerated(TLptsLog log) { if (log.getType().equals(TLptsLogger.LOG_TYPE.ERROR)) // this causes a FAIL of the test synchronized (outputGroupingLock) { fail("LOG ERROR :" + log.getMessage() + " : " + log.getSupportingText() + " : Exception : " + log.getExceptionMessage()); System.exit(0); } if (log.getType().equals(TLptsLogger.LOG_TYPE.WARNING)) synchronized (outputGroupingLock) { System.out.println("LOG WARNING :" + log.getMessage() + " : " + log.getSupportingText()); } if (log.getType().equals(TLptsLogger.LOG_TYPE.MESSAGE)) synchronized (outputGroupingLock) { System.out.println("LOG MESSAGE :" + log.getMessage() + " : " + log.getSupportingText()); } } }); synchronized (outputGroupingLock) { String charSets = "Available Charsets on this HOST : "; for (String cs : TLptsCharSetLocaleUtil.getCharsetNameList()) charSets += cs + " "; System.out.println(charSets); } startMultiThreadProcessing(); } finally { memoryMonitor.showFinalStats(); } } private void startMultiThreadProcessing() { synchronized (outputGroupingLock) { for (TLptsLog log : TLptsLogger.getLogList(false)) System.out.println("Log: " + log.getMessage() + " : " + log.getSupportingText() + " : " + log.getExceptionMessage()); } // This listener will create threads and statements from the one connection TLptsFactoryConnection.addListener(new GenerateThreadsConnectionListener()); // To get the status of the statements we add a listener to the factory. This will also generate the rowsets. TLptsFactoryStatement.addListener(new FactoryStatementListener()); // lets take the rowset output and send it to various files, with various types. String path = TLptsFileUtil.createDirectoryPath(TLptsFileUtil.getCurrentDirectory() + "test/"); // we keep a variable for the xml for verification at the end TLptsFactoryRowSet.addListener(rowSetOutputXmlListener = new TLptsRowSetOutputListener(path + "TestDatabaseAdvanced.resultset.xml", TLptsRowSetOutputListener.OUTPUT_TYPE.OUTPUT_TO_XML, "UTF-8")); TLptsFactoryRowSet.addListener(new TLptsRowSetOutputListener(path + "TestDatabaseAdvanced.resultset.data.txt", TLptsRowSetOutputListener.OUTPUT_TYPE.DATA_OUTPUT,"UTF-8")); TLptsFactoryRowSet.addListener(new TLptsRowSetOutputListener(path + "TestDatabaseAdvanced.resultset.debug.txt", TLptsRowSetOutputListener.OUTPUT_TYPE.DEBUG_OUTPUT,"UTF-8")); // now we add a local listener for the memory and test verification TLptsFactoryRowSet.addListener(new RowSetListener()); synchronized (outputGroupingLock) { System.out.println("*************************************************************************"); System.out.println("Testing many threads generated on the same tables with dynamic processing"); System.out.println("*************************************************************************"); } connectToMySql(); } // #################################################################################### // C O N N E C T I O N // #################################################################################### private void connectToMySql() { ResourceBundle bundle = ResourceBundle.getBundle("resources"); // we have stored all the database info in here TLptsDBConnectionType connectionType; synchronized (outputGroupingLock) { System.out.println(); System.out.println("Connecting to MY SQL DB"); System.out.println(); } connectionType = new TLptsDBConnectionType(); connectionType.setTitle("MYSQL_DB"); XLptsDriverType driverType = TLptsDriverLoader.getDriverTypeByClassName("com.mysql.jdbc.jdbc2.optional.MysqlDataSource"); if (driverType == null) synchronized (outputGroupingLock) { fail("Could not find driver for class : com.microsoft.sqlserver.jdbc.SQLServerDataSource"); } TLptsDriverType dt = new TLptsDriverType(driverType); // changing values that are default in the driver.loader.xml file. // the DB is now set correctly and will filter through to the Datasource interface execution // in other words the testdb2 will be used as the database for the connection dt.setValueForInterfaceFunctionParameter("setDatabaseName", "Database Name", bundle.getString("Testing.database.mysql.testdb2")); connectionType.setDriverType(dt); // the passwords are encrypted when stored and unencrypted when required connectionType.setCredentials(TLptsCryptoUtil.defaultEncryptCredentialsRSA(bundle.getString("Testing.database.mysql.user"), bundle.getString("Testing.database.mysql.password"))); TLptsFactoryConnection.initiateConnection(connectionType); // This generates another thread!! } // ############################################################################################################### // #### C O N N E C T I O N L I S T E N E R // ############################################################################################################### private class GenerateThreadsConnectionListener implements ILptsFactoryConnectionListener { public void newConnectionProcessStarted() { synchronized (outputGroupingLock) { System.out.println("Connection process has started..."); } } public synchronized void newConnectionCreated(TLptsConnection connection) { synchronized (outputGroupingLock) { System.out.println("New Connection created successfully. Statements can be processed."); System.out.println("Process Simple SQL SELECT for all columns of all tables"); } // create threads which couple the dynamic tables. for (int i = 0; i < NO_OF_THREADS; i++) createThreadWithDynamicSql(connection); synchronized (outputGroupingLock) { System.out.println("Number of Overall Connections : " + TLptsFactoryConnection.getConnectionList().size()); assertEquals("--> Number of Connections should be 1", 1, TLptsFactoryConnection.getConnectionList().size()); System.out.println("Number of Statements on this Connection : " + connection.getNumberOfConnectedStatements()); memoryMonitor.recordMemoryDynamics(false); } } public void newConnectionFailed(TLptsLog log) { synchronized (outputGroupingLock) { fail("New Connection failed! Reason :" + log.getMessage() + " : " + log.getSupportingText() + " : Exception : " + log.getExceptionMessage()); } } public void removedAndClosedConnection(TLptsConnection connection) { synchronized (outputGroupingLock) { System.out.println("Connection removed from Connection Manager and closed successfully ... " + connection.getTitle()); } memoryMonitor.showFinalStats(); } } private void createThreadWithDynamicSql(TLptsConnection connection) { List<Integer> columnSqlTypes; List<String> columnNames; // PRINTOUT OF ALL COLUMNS OF THE TABLES synchronized (outputGroupingLock) { System.out.println("Table CUSTOMERS COLUMNS :"); } columnSqlTypes = connection.getColumnTypes("customers"); columnNames = connection.getColumnNames("customers"); for (int i = 0; i < columnNames.size(); i++) { int type = columnSqlTypes.get(i); String typeStr = TLptsFactoryStatement.getSqlTypeString(type); synchronized (outputGroupingLock) { System.out.println("Column " + columnNames.get(i) + " has SQL type " + type + "(" + typeStr + ")"); } } synchronized (outputGroupingLock) { System.out.println("Table TRANSACTIONS COLUMNS :"); } columnSqlTypes = connection.getColumnTypes("transactions"); columnNames = connection.getColumnNames("transactions"); for (int i = 0; i < columnNames.size(); i++) { int type = columnSqlTypes.get(i); String typeStr = TLptsFactoryStatement.getSqlTypeString(type); synchronized (outputGroupingLock) { System.out.println("Column " + columnNames.get(i) + " has SQL type " + type + "(" + typeStr + ")"); } } synchronized (outputGroupingLock) { System.out.println("Table IDITEMS COLUMNS :"); } columnSqlTypes = connection.getColumnTypes("iditems"); columnNames = connection.getColumnNames("iditems"); for (int i = 0; i < columnNames.size(); i++) { int type = columnSqlTypes.get(i); String typeStr = TLptsFactoryStatement.getSqlTypeString(type); synchronized (outputGroupingLock) { System.out.println("Column " + columnNames.get(i) + " has SQL type " + type + "(" + typeStr + ")"); } } // first we create the client, transaction and item statements. Then we dynamically link them. TLptsStatement customerStatement, transStatement, itemStatement; //////////////////////////////// CUSTOMER TABLE //////////////////////////////////// // contains all the tables and columns of the connection. We need to select columns or input a SQL string manually customerStatement = new TLptsStatement(connection, XLptsDBTypeOfStatementType.SELECT); customerStatement.setTitle("CUSTOMERS"); // dont actually need to set this for MySQL, Oracle, PostgreSql and ODBC. Left here for demonstration purposes. customerStatement.setSqlDatabaseFormat(XLptsDBSqlDatabaseFormat.DEFAULT_MYSQL); customerStatement.setXResultSetType(ResultSet.TYPE_FORWARD_ONLY); customerStatement.setXResultSetConcurrency(ResultSet.CONCUR_READ_ONLY); XLptsDBStatementTableType table = customerStatement.getTableItemByName("CUSTOMERS"); table.setSelected(true); // selecting one table only, with all its columns customerStatement.setSelectOnAllColumns(table.getTableName(), true); customerStatement.setSqlStatementExtension("ORDER BY `IDCUSTOMERS`"); // make sure this is not processed as we are waiting for the dynamic statements below to be created. customerStatement.setExecutable(false); TLptsFactoryStatement.createNewStatement(connection, customerStatement); /////////////////////////////// TRANSACTIONS TABLE //////////////////////////////////// transStatement = new TLptsStatement(connection, XLptsDBTypeOfStatementType.SELECT); transStatement.setTitle("TRANSACTIONS"); transStatement.setXResultSetType(ResultSet.TYPE_FORWARD_ONLY); transStatement.setXResultSetConcurrency(ResultSet.CONCUR_READ_ONLY); table = transStatement.getTableItemByName("TRANSACTIONS"); table.setSelected(true); // selecting one table only, with all its columns // select all the columns and the select statement SQL string will be created automatically. transStatement.setSelectOnAllColumns(table.getTableName(), true); // relate this statement to the customer statement // this means that 'customer' will feed the data to this statement for every record of 'customer' to // regenerate this statement every time // false means that we do not want static string substitution (uses the setObject of Statement) transStatement.setPrimaryStatement(customerStatement.getId(), false); // column 2 in this statement is related to column 1 in customer // certain operations allow for many indexes (namely IN and BETWEEN) transStatement.addPrimaryRelationship(2, new int[]{1}, TLptsFactoryStatement.OPERATION_EQUAL); transStatement.setSqlStatementExtension("ORDER BY `IDTRANSACTIONS`"); // make sure this is not processed as we are waiting for the dynamic statement below to be created. transStatement.setExecutable(false); TLptsFactoryStatement.createNewStatement(connection, transStatement); /////////////////////////////// ITEMS TABLE //////////////////////////////////// itemStatement = new TLptsStatement(connection, XLptsDBTypeOfStatementType.SELECT); itemStatement.setTitle("ITEMS"); itemStatement.setXResultSetType(ResultSet.TYPE_FORWARD_ONLY); itemStatement.setXResultSetConcurrency(ResultSet.CONCUR_READ_ONLY); table = itemStatement.getTableItemByName("ITEMS"); table.setSelected(true); // selecting one table only, with all its columns itemStatement.setSelectOnAllColumns(table.getTableName(), true); // relate this statement to the TRANSACTIONS statement itemStatement.setPrimaryStatement(transStatement.getId(), false); itemStatement.addPrimaryRelationship(1, new int[]{7}, TLptsFactoryStatement.OPERATION_EQUAL); // this will execute, along with all its related statements. transStatement.setExecutable(true); TLptsFactoryStatement.createNewStatement(connection, itemStatement); } // ############################################################################################################### // #### S T A T E M E N T L I S T E N E R // ############################################################################################################### private class FactoryStatementListener implements ILptsFactoryStatementListener { public void newStatementProcessStarted(TLptsConnection connection) { synchronized (outputGroupingLock) { System.out.println("Creating a new Statement for the connection : " + connection.getTitle()); } } public void newStatementCreated(TLptsConnection connection, TLptsStatement statement) { synchronized (outputGroupingLock) { System.out.println("Successful creation of a statement called : " + statement.getTitle()); } // Both the customer, transaction and item tables have 1 million records in them. // Note: We have 10 threads running, so lets not get greedy ! // 50 primary records (with all their transactions and items) should be enough. (times 10 threads) TLptsFactoryRowSet.executeSelectStatement(statement, 1, NO_OF_RECORDS, false); // spawns another thread. memoryMonitor.recordMemoryDynamics(false); } public void newStatementFailed(TLptsConnection connection) { synchronized (outputGroupingLock) { System.out.println("Could not create a Statement for the connection : " + connection.getTitle()); System.out.println("The logs should be checked for an error or exception."); } } public void removedAndClosedStatement(TLptsConnection connection, TLptsStatement statement) { synchronized (outputGroupingLock) { System.out.println("Statement removed from Connection Manager and closed successfully ... " + statement.getTitle()); System.out.println("Connection title : " + connection.getTitle()); System.out.println("Number of Statements left on this connection : " + connection.getNumberOfConnectedStatements()); } } } // ############################################################################################################### // #### R O W S E T L I S T E N E R // ############################################################################################################### // note: this listener is called 10 times for every row. These functions are executed in 10 different threads. private class RowSetListener implements ILptsFactoryRowSetListener { private int endCounter1 = 0, endCounter2 = 0, endCounter3 = 0, totalCompleteCounter = 0, totalPrimaryCounter = 0, totalPrimaryTopCounter = 0; public boolean processNewRowSetRows(TLptsRowSetEvent rowSetEvent) { return true; } public void rowEvent(final TLptsRowEvent rowEvent) { if (rowEvent.getStatement() == null) { synchronized (outputGroupingLock) { TLptsLogger.logWarning("New Row created, but statement not found.", null); return; } } if (rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.MOVED_TO_NEXT_PRIMARY_RECORD || rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.MOVED_TO_NEXT_RECORD) { if (rowEvent.getStatement() == null) return; totalRecordsProcessed++; if (rowEvent.getRowObjectList() != null) totalObjectsProcessed += rowEvent.getRowObjectList().size(); else return; if (rowEvent.getRowObjectList().size() == 0) return; Integer rowNo; if (rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.MOVED_TO_NEXT_PRIMARY_RECORD) { totalPrimaryCounter++; if (!rowEvent.getStatement().hasPrimaryStatement()) { rowSetCountMap1.put(rowEvent.getStatement().getId(), rowEvent.getCurrentRowNo()); totalPrimaryTopCounter++; endCounter3++; if (endCounter3 >= NO_OF_THREADS) { synchronized (outputGroupingLock) { System.out.println("Processing row " + totalPrimaryTopCounter); memoryMonitor.recordMemoryDynamics(false); } endCounter3 = 0; } } else if ((rowNo = rowSetCountMap1.get(rowEvent.getPrimaryStatementId())) != null) { int subRows = 0; synchronized (rowSetCountMap2) { if (rowSetCountMap2.get(rowNo) != null) { subRows = rowSetCountMap2.get(rowNo); } subRows++; rowSetCountMap2.put(rowNo, subRows); } } } if (rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.MOVED_TO_NEXT_RECORD) { synchronized (rowSetCountMap1) { if (rowEvent.getStatement().hasPrimaryStatement()) if ((rowNo = rowSetCountMap1.get(rowEvent.getPrimaryStatementId())) != null) { int subRows = 0; synchronized (rowSetCountMap2) { if (rowSetCountMap2.get(rowNo) != null) { subRows = rowSetCountMap2.get(rowNo); } subRows++; rowSetCountMap2.put(rowNo, subRows); } } } } } if (rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.EXECUTION_INTERRUPTED || rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.ERROR_OCCURRED) { TLptsFactoryStatement.removeStatement(rowEvent.getStatement()); TLptsFactoryConnection.removeConnection((TLptsConnection) (rowEvent.getStatement()).getConnectionWE(), false); memoryMonitor.showFinalStats(); fail("Process interrupted. Could not verify contents of output file."); System.exit(1); // this is commented out if run together with all the other tests } if (rowEvent.getEventType() == TLptsRowEvent.EVENT_TYPE.PROCESSING_COMPLETE) { totalCompleteCounter++; // we check to see if this is the parent of all the statements. // If it is not, we cannot close it if (!rowEvent.getStatement().hasPrimaryStatement()) endCounter2++; if (endCounter2 >= NO_OF_THREADS) { TLptsFactoryStatement.removeStatement(rowEvent.getStatement()); TLptsFactoryConnection.removeConnection((TLptsConnection) (rowEvent.getStatement()).getConnectionWE(), false); memoryMonitor.showFinalStats(); synchronized (outputGroupingLock) { System.out.println("Total Top Primary Records received : " + totalPrimaryTopCounter); System.out.println("Total Primary Records received : " + totalPrimaryCounter); System.out.println("Total 'processing complete' received : " + totalCompleteCounter); } if (totalPrimaryTopCounter != NO_OF_RECORDS * NO_OF_THREADS) fail("Top level primary record number is correct. Expecting " + NO_OF_RECORDS * NO_OF_THREADS + ", but got " + totalPrimaryTopCounter); if (!checkXmlForConsistency(rowSetOutputXmlListener.getXmlRowSet())) fail("The XML output is not consistent with what is expected"); else synchronized (outputGroupingLock) { System.out.println("XML output is consistent with expected result"); } rowSetOutputXmlListener.closeOutputFile(); // write the output to the XML file System.exit(1); // this is commented out if run together with all the other tests } } } } // ############################################################################################################### // #### V E R I F I C A T I O N F U N C T I O N S // ############################################################################################################### private boolean checkXmlForConsistency(TLptsXmlRowSetType xmlRowSetType) { List<String> rowObjects = new LinkedList<String>(); int totalNoOfRows = NO_OF_RECORDS * NO_OF_THREADS; if (xmlRowSetType.getPrimaryTable().getRowListItem().size() != totalNoOfRows) { synchronized (outputGroupingLock) { System.out.println("Fault: Total number of primary rows incorrect : Expecting " + totalNoOfRows + ", but got " + xmlRowSetType.getPrimaryTable().getRowListItem().size()); } return false; } for (int rowNo = 1; rowNo <= NO_OF_RECORDS; rowNo++) { synchronized (outputGroupingLock) { System.out.println("Processing and checking row number : " + rowNo); } // first we check that all the first level rows have 10 copies. int rowCounter = 0; for (XLptsXmlRowSetSingleRowType rssrt : xmlRowSetType.getPrimaryTable().getRowListItem()) { if (rssrt.getRowNumber() == rowNo) { // store the objects for comparison with the others lines if (rowCounter == 0) { rowObjects.clear(); for (XLptsXmlRowSetRowObjectType object : rssrt.getRowObjectListItem()) rowObjects.add(object.getString()); } else { // compare with the other lines int i = 0; for (XLptsXmlRowSetRowObjectType object : rssrt.getRowObjectListItem()) { try { if (!rowObjects.get(i).equals(object.getString())) { synchronized (outputGroupingLock) { System.out.println("For row number " + rowNo + ": Row object should be " + rowObjects.get(i) + " but is " + object.getString()); } return false; } } catch (Exception e) { synchronized (outputGroupingLock) { System.out.println("For row number " + rowNo + ": Exception " + e.getLocalizedMessage()); } return false; } i++; } } rowCounter++; } } if (rowCounter != 10) { synchronized (outputGroupingLock) { System.out.println("For row number " + rowNo + ": Row counter should be 10, but is " + rowCounter); } return false; } } int internalTotalRecordsProcessed = 0; for (XLptsXmlRowSetSingleRowType rowSetSingleRowType : xmlRowSetType.getPrimaryTable().getRowListItem()) { internalTotalRecordsProcessed = getTotalRowsAndObjectsInXML(rowSetSingleRowType, internalTotalRecordsProcessed, true); internalTotalRecordsProcessed++; } int internalTotalObjectsProcessed = 0; for (XLptsXmlRowSetSingleRowType rowSetSingleRowType : xmlRowSetType.getPrimaryTable().getRowListItem()) { internalTotalObjectsProcessed = getTotalRowsAndObjectsInXML(rowSetSingleRowType, internalTotalObjectsProcessed, false); } if (totalRecordsProcessed == internalTotalRecordsProcessed) synchronized (outputGroupingLock) { System.out.println("Total record count verified : " + totalRecordsProcessed); } else { synchronized (outputGroupingLock) { System.out.println("Incorrect Total record count : Expecting " + totalRecordsProcessed + ", but got " + internalTotalRecordsProcessed); } return false; } if (totalObjectsProcessed == internalTotalObjectsProcessed) synchronized (outputGroupingLock) { System.out.println("Total object count verified : " + totalObjectsProcessed); } else { synchronized (outputGroupingLock) { System.out.println("Incorrect Total object count : Expecting " + totalObjectsProcessed + ", but got " + internalTotalObjectsProcessed); } return false; } if (!verifySubRowCounts(xmlRowSetType)) return false; return true; } private int getTotalRowsAndObjectsInXML(XLptsXmlRowSetSingleRowType rowSetSingleRowType, int counter, boolean isRow) { if (!isRow) counter += ((TLptsXmlRowSetSingleRowType) rowSetSingleRowType).getRowObjectListItemSize(); for (XLptsXmlRowSetTableType tableItem : rowSetSingleRowType.getSubTableListItem()) { if (isRow) counter += tableItem.getRowListItem().size(); for (XLptsXmlRowSetSingleRowType rst : tableItem.getRowListItem()) counter = getTotalRowsAndObjectsInXML(rst, counter, isRow); } return counter; } private boolean verifySubRowCounts(TLptsXmlRowSetType xmlRowSetType) { synchronized (outputGroupingLock) { System.out.println(); System.out.print("Sub row counts : "); for (Integer cnt : rowSetCountMap2.values()) System.out.print(" " + cnt / NO_OF_THREADS); System.out.println(); } // now lets verify each of the 10 copies of the primary top level rows for their sub-row counts. // We do this by using the double map created during data consumption, which counted the numbers for all threads. for (int rowCounter = 1; rowCounter <= NO_OF_RECORDS; rowCounter++) for (XLptsXmlRowSetSingleRowType rowSetSingleRowType : xmlRowSetType.getPrimaryTable().getRowListItem()) { if (rowSetSingleRowType.getRowNumber() == rowCounter) { synchronized (rowSetCountMap2) { if (rowSetCountMap2.get(rowCounter) == null) { synchronized (outputGroupingLock) { System.out.println("Error in sub row verification: Row not found for counting sub-rows"); } return false; } int rows; if ((rows = (rowSetCountMap2.get(rowCounter) / NO_OF_THREADS)) != getTotalSubRowsForRow(rowSetSingleRowType)) { synchronized (outputGroupingLock) { System.out.println("Error in sub row verification : (row " + rowCounter + "). Sub-rows count should be " + getTotalSubRowsForRow(rowSetSingleRowType) + ", but is " + rows); } return false; } } } } return true; } private int getTotalSubRowsForRow(XLptsXmlRowSetSingleRowType row) { int counter = 0; for (XLptsXmlRowSetTableType tableItem : row.getSubTableListItem()) { counter += tableItem.getRowListItem().size(); } return counter; } private class MemoryMonitor { private List<MemoryUnit> memoryList = new LinkedList<MemoryUnit>(); private Runtime runtime; private MemoryMonitor() { runtime = Runtime.getRuntime(); } private class MemoryUnit { private double total; private double max; private double free; private int id; private long time; private MemoryUnit(boolean showMemory, int id) { this.id = id; time = TLptsTimeUtil.getCurrentTime().getTime(); total = runtime.totalMemory(); max = runtime.maxMemory(); free = runtime.freeMemory(); total /= (1024 * 1024); // convert to Mb max /= (1024 * 1024); // convert to Mb free /= (1024 * 1024); // convert to Mb synchronized (outputGroupingLock) { System.out.println("######## "); System.out.println("######## MEMORY MONITOR POINT CREATED : ID = " + id); System.out.println("######## "); } if (showMemory) { synchronized (outputGroupingLock) { System.out.println("/////////// MEMORY DYNAMICS \\\\\\\\\\\\\""); System.out.println("Total memory in Mb is :" + total); System.out.println("Maximum memory in Mb is :" + max); System.out.println("Free memory in Mb is :" + free); System.out.println("/////////// --------------- \\\\\\\\\\\\\""); } } } private void printDebugString() { DecimalFormat df = new DecimalFormat("0000.0000"); // 1234 1234.1234 1234.1234 1234.1234 System.out.println(String.format(" %04d " + df.format(max) + " " + df.format(total) + " " + df.format(free) + " " + TLptsTimeUtil.getTimeString(new Date(time)), id)); } } public synchronized void recordMemoryDynamics(boolean showMemory) { memoryList.add(new MemoryUnit(showMemory, memoryList.size() + 1)); if (runtime.freeMemory() < 512 * 1024) fail("Free Memory is less than 512 kb"); } public synchronized void showFinalStats() { synchronized (outputGroupingLock) { System.out.println(""); System.out.println("######## MEMORY STATS PRINTOUT ######### (in Mb) ########"); System.out.println(" ID MAX TOTAL FREE Timestamp"); System.out.println(" -- --------- --------- --------- ---------"); // 1234 1234.1234 1234.1234 1234.1234 for (MemoryUnit mu : memoryList) mu.printDebugString(); System.out.println("###########################################################"); } } } }