Improve Batch Bulk Load

Issue-ID: AAF-775
Change-Id: Ib2d6ff3607a30c6e8d74ff24498e5907237d6ff2
Signed-off-by: Instrumental <jonathan.gathman@att.com>
diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java
index efd1ec9..738c534 100644
--- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java
+++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java
@@ -63,12 +63,14 @@
 	}
 	
 	public ResultSet execute(boolean dryRun) {
+		ResultSet rv = null;
 		if(dryRun) {
 			end();
-			return null;
 		} else {
-			return execute();
+			rv = execute();
 		}
+		sb.setLength(0);
+		return rv;
 	}
 	
 	public void touch(String table, int begin, int end, boolean dryRun) {
diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java
index ca264d1..51a88ef 100644
--- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java
+++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java
@@ -27,10 +27,13 @@
 	private final StringBuilder sb;
 	private final boolean dryRun;
 	private int i;
+	private int count;
+	private int batches;
 	
 	public CQLBatchLoop(CQLBatch cb, int max, boolean dryRun) {
 		cqlBatch = cb;
 		i=0;
+		count = 0;
 		maxBatch = max;
 		sb = cqlBatch.begin();
 		this.dryRun = dryRun;
@@ -43,10 +46,11 @@
 	public void preLoop() {
 		if(i<0) {
 			cqlBatch.begin();
-		} else if(i>=maxBatch) {
+		} else if(i>=maxBatch || sb.length()>24000) {
 			cqlBatch.execute(dryRun);
 			cqlBatch.begin();
 			i=0;
+			++batches;
 		}
 	}
 	
@@ -56,6 +60,7 @@
 	 */
 	public StringBuilder inc() {
 		++i;
+		++count;
 		return sb;
 	}
 	
@@ -63,7 +68,18 @@
 	 * Close up when done.  However, can go back to "preLoop" safely.
 	 */
 	public void flush() {
-		cqlBatch.execute(dryRun);
+		if(i>0) {
+			cqlBatch.execute(dryRun);
+			++batches;
+		}
 		i=-1;
 	}
+
+	public int total() {
+		return count;
+	}
+	
+	public int batches() {
+		return batches;
+	}
 }
diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java
index 8db2b47..e51fcfd 100644
--- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java
+++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java
@@ -136,7 +136,6 @@
 
     public static void load(Trans trans, Session session, int ... types ) {
         load(trans, session,"select id, type, expires, other, writetime(cred), tag from authz.cred;",types);
-        
     }
 
     public static void loadOneNS(Trans trans, Session session, String ns,int ... types ) {
diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java
index 0179593..870dc1e 100644
--- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java
+++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java
@@ -25,7 +25,9 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.GregorianCalendar;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.onap.aaf.auth.batch.Batch;
 import org.onap.aaf.auth.batch.BatchPrincipal;
@@ -78,12 +80,14 @@
         gcType = GregorianCalendar.WEEK_OF_YEAR;
         int weeks = 4;
 		
+        Set<String> cmd = new HashSet<>();
 		for(int i=0; i< args().length;++i) {
 			if("-weeks".equals(args()[i])) {
 				if(args().length>i+1) {
-					weeks = Integer.parseInt(args()[i +1]);
-					break;
+					weeks = Integer.parseInt(args()[++i]);
 				}
+			} else {
+				cmd.add(args()[i]);
 			}
 		}
 		
@@ -95,12 +99,12 @@
         // Create Intermediate Output 
         File logDir = logDir();
         extFiles = new ArrayList<>();
-        if(args().length>0) {
-        	for(int i=0;i<args().length;++i) {
-        		extFiles.add(new File(logDir, args()[i]));
-        	}
-        } else {
+        if(cmd.isEmpty()) {
         	extFiles.add(new File(logDir,PrepExtend.PREP_EXTEND+Chrono.dateOnlyStamp()+".csv"));
+        } else {
+        	for(String fn : cmd) {
+        		extFiles.add(new File(logDir, fn));
+        	}
         }
         
         // Load Cred.  We don't follow Visitor, because we have to gather up everything into Identity Anyway
@@ -142,7 +146,7 @@
 							case "ur":
 								hi.set(++i);
 								gc = hgc.get();
-								gc.setTime(new Date(Long.parseLong(row.get(5))));
+								gc.setTime(new Date(Long.parseLong(row.get(6))));
 								if(gc.before(now)) {
 									gc.setTime(now.getTime());
 								}
diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java
index 87bdf27..a3d37a0 100644
--- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java
+++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java
@@ -30,20 +30,18 @@
 import java.util.List;
 
 import org.onap.aaf.auth.batch.Batch;
+import org.onap.aaf.auth.batch.helpers.CQLBatch;
+import org.onap.aaf.auth.batch.helpers.CQLBatchLoop;
 import org.onap.aaf.auth.env.AuthzTrans;
 import org.onap.aaf.auth.org.OrganizationException;
 import org.onap.aaf.misc.env.APIException;
 import org.onap.aaf.misc.env.Env;
+import org.onap.aaf.misc.env.LogTarget;
 import org.onap.aaf.misc.env.TimeTaken;
 
-import com.datastax.driver.core.ResultSet;
-
 public class Upload extends Batch {
 
-	private static final int BATCH_LENGTH = 100;
-
-	int count;
-	int batchCnt;
+	private CQLBatchLoop cqlBatch;
 
 	// APPROVALS
 	private static final String APPR_INS_FMT="  INSERT INTO authz.approval "
@@ -104,7 +102,8 @@
 			} finally {
 				tt.done();
 			}
-
+			
+			cqlBatch = new CQLBatchLoop(new CQLBatch(LogTarget.NULL,session),50,dryRun);
 	    } finally {
 	    	tt0.done();
 	    }
@@ -114,35 +113,36 @@
 	protected void run(AuthzTrans trans) {
 		String line;
 		StringBuilder sb = new StringBuilder();
-		StringBuilder query = new StringBuilder();
 		List<String> array = new ArrayList<String>();
 		for(String feed : args()) {
-			File file = new File(feed + ".dat");
+			File file;
+			if(feed.endsWith(".dat")) {
+				file = new File(feed);
+				feed = file.getName();
+				feed = feed.substring(0,feed.length()-4);
+			} else {
+				file = new File(feed+".dat");
+			}
 			TimeTaken tt = trans.start(file.getAbsolutePath(), Env.SUB);
-			System.out.println("#### Running " + feed + ".dat Feed ####");
+			String msg = String.format("#### Running %s.dat Feed ####",feed);
+			trans.info().log(msg);
+			System.out.println(msg);
+	    	BufferedReader br = null;
 		    try {
-
 				if(file.exists()) {
-					count=batchCnt=0;
-					boolean justOne = false;
 					try {
-						BufferedReader br = new BufferedReader(new FileReader(file));
+						br = new BufferedReader(new FileReader(file));
 						try {
 							while((line=br.readLine())!=null) {
 								if(line.length()>5000) {
-									if(query.length()>0) {
-										applyBatch(query);
-										justOne=true;
-									}
+									cqlBatch.flush();
 								}
-								if(query.length()==0) {
-									query.append("BEGIN BATCH\n");
-								}
+								cqlBatch.preLoop();
+
 								// Split into fields, first turning Escaped values into something we can convert back from
 								char c=0;
 								boolean inQuote = false;
 								int fldcnt = 0;
-								
 								for(int i=0;i<line.length();++i) {
 									switch(c=line.charAt(i)) {
 										case '"':
@@ -160,37 +160,27 @@
 									}
 								}
 								addField(feed,fldcnt,array,sb);
-								query.append(build(feed, array));
-								
-								if((++count % BATCH_LENGTH)==0 || justOne) {
-									applyBatch(query);
-									justOne=false;
-								}
+								cqlBatch.inc().append(build(feed, array));
 							}
-							if(query.length()>0) {
-								applyBatch(query);
-							}
-							
+							cqlBatch.flush();
+						} catch (Exception t) {
+							trans.error().log(t);
 						} finally {
 							br.close();
-							sb.setLength(0);
-							query.setLength(0);
 						}
-						
 					} catch (IOException e) {
 						trans.error().log(e);
-						e.printStackTrace();
 					}
-
 				} else {
 					trans.error().log("No file found: ", file.getAbsolutePath());
 				}
 			} finally {
 				tt.done();
 				System.err.flush();
-				System.out.printf("\n%d applied in %d batches\n",count,batchCnt);
+				msg = String.format("\n%d applied in %d batches\n",cqlBatch.total(), cqlBatch.batches());
+				trans.info().log(msg);
+				System.out.println(msg);
 			}
-
 		}
 
 	}
@@ -198,32 +188,36 @@
 
 	private String build(String feed, List<String> array) {
 		String rv;
-		switch(feed) {
-			case "approval":
-				rv = String.format(APPR_INS_FMT,array.toArray());
-				break;
-			case "artifact":
-				rv = String.format(ARTI_INS_FMT,array.toArray());
-				break;
-			case "cred":
-				rv = String.format(CRED_INS_FMT,array.toArray());
-				break;
-			case "ns":
-				rv = String.format(NS_INS_FMT,array.toArray());
-				break;
-			case "role":
-				rv = String.format(ROLE_INS_FMT,array.toArray());
-				break;
-			case "perm":
-				rv = String.format(PERM_INS_FMT,array.toArray());
-				break;
-			case "x509":
-				rv = String.format(X509_INS_FMT,array.toArray());
-				break;
-			default:
-				rv = "";
+		if(array.size()>0) {
+			switch(feed) {
+				case "approval":
+					rv = String.format(APPR_INS_FMT,array.toArray());
+					break;
+				case "artifact":
+					rv = String.format(ARTI_INS_FMT,array.toArray());
+					break;
+				case "cred":
+					rv = String.format(CRED_INS_FMT,array.toArray());
+					break;
+				case "ns":
+					rv = String.format(NS_INS_FMT,array.toArray());
+					break;
+				case "role":
+					rv = String.format(ROLE_INS_FMT,array.toArray());
+					break;
+				case "perm":
+					rv = String.format(PERM_INS_FMT,array.toArray());
+					break;
+				case "x509":
+					rv = String.format(X509_INS_FMT,array.toArray());
+					break;
+				default:
+					rv = "";
+			}
+			array.clear();
+		} else {
+			rv = ""; 
 		}
-		array.clear();
 		return rv;
 	}
 	
@@ -290,23 +284,6 @@
 		}
 	}
 
-	private void applyBatch(StringBuilder query) {
-		try {
-			query.append("APPLY BATCH;");
-			ResultSet rv = session.execute(query.toString());
-			if(rv.wasApplied()) {
-				System.out.print('.');
-				if((++batchCnt % 60)==0) {
-					System.out.println();
-				}
-			} else {
-				System.out.print("Data NOT APPLIED");
-			}
-		} finally {
-			query.setLength(0);
-		}
-	}
-
 
 	@Override
 	protected void _close(AuthzTrans trans) {
diff --git a/auth/auth-cass/cass_init/cmd.sh b/auth/auth-cass/cass_init/cmd.sh
index ca1c0e8..b26536d 100644
--- a/auth/auth-cass/cass_init/cmd.sh
+++ b/auth/auth-cass/cass_init/cmd.sh
@@ -97,10 +97,10 @@
     wait_start cassandra responsive   
     # Now, make sure data exists
     if [ ! -e $INSTALLED_VERSION ] && [ -n "$(/usr/bin/cqlsh -e 'describe keyspaces' | grep authz)" ]; then
-      /usr/bin/cqlsh --timeout 60 -e 'DROP KEYSPACE authz' 
+      /usr/bin/cqlsh --request-timeout=60 -e 'DROP KEYSPACE authz' 
     fi
 
-    if [ -z "`/usr/bin/cqlsh --timeout 60 -e 'describe keyspaces' | grep authz`" ]; then
+    if [ -z "`/usr/bin/cqlsh --request-timeout 60 -e 'describe keyspaces' | grep authz`" ]; then
         status install 
         echo "Initializing Cassandra DB" 
         echo "Docker Installed Basic Cassandra on aaf.cass.  Executing the following "
diff --git a/auth/auth-cass/cass_init/push.sh b/auth/auth-cass/cass_init/push.sh
index f3e5d80..f887f07 100644
--- a/auth/auth-cass/cass_init/push.sh
+++ b/auth/auth-cass/cass_init/push.sh
@@ -35,11 +35,7 @@
 cd dats
 for T in $(ls *.dat); do
   if [ -s $T ]; then
-    until cqlsh --request-timeout=60 -e "COPY authz.${T/.dat/} FROM '$T' WITH DELIMITER='|';";
-    do
-      echo "Unexpected failure...sleep for 10 seconds and try again"
-      sleep 10
-    done
+    cqlsh --request-timeout=100 -e "COPY authz.${T/.dat/} FROM '$T' WITH DELIMITER='|';";
   fi
 done
 cd $DIR
diff --git a/auth/auth-cass/docker/dbash.sh b/auth/auth-cass/docker/dbash.sh
index 1e13d27..737cf10 100644
--- a/auth/auth-cass/docker/dbash.sh
+++ b/auth/auth-cass/docker/dbash.sh
@@ -24,5 +24,5 @@
 fi
 DOCKER=${DOCKER:-docker}
 
-$DOCKER exec -it aaf_cass bash
+$DOCKER exec -it aaf-cass bash