Improve Batch Bulk Load
Issue-ID: AAF-775
Change-Id: Ib2d6ff3607a30c6e8d74ff24498e5907237d6ff2
Signed-off-by: Instrumental <jonathan.gathman@att.com>
diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java
index efd1ec9..738c534 100644
--- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java
+++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatch.java
@@ -63,12 +63,14 @@
}
public ResultSet execute(boolean dryRun) {
+ ResultSet rv = null;
if(dryRun) {
end();
- return null;
} else {
- return execute();
+ rv = execute();
}
+ sb.setLength(0);
+ return rv;
}
public void touch(String table, int begin, int end, boolean dryRun) {
diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java
index ca264d1..51a88ef 100644
--- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java
+++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/CQLBatchLoop.java
@@ -27,10 +27,13 @@
private final StringBuilder sb;
private final boolean dryRun;
private int i;
+ private int count;
+ private int batches;
public CQLBatchLoop(CQLBatch cb, int max, boolean dryRun) {
cqlBatch = cb;
i=0;
+ count = 0;
maxBatch = max;
sb = cqlBatch.begin();
this.dryRun = dryRun;
@@ -43,10 +46,11 @@
public void preLoop() {
if(i<0) {
cqlBatch.begin();
- } else if(i>=maxBatch) {
+ } else if(i>=maxBatch || sb.length()>24000) {
cqlBatch.execute(dryRun);
cqlBatch.begin();
i=0;
+ ++batches;
}
}
@@ -56,6 +60,7 @@
*/
public StringBuilder inc() {
++i;
+ ++count;
return sb;
}
@@ -63,7 +68,18 @@
* Close up when done. However, can go back to "preLoop" safely.
*/
public void flush() {
- cqlBatch.execute(dryRun);
+ if(i>0) {
+ cqlBatch.execute(dryRun);
+ ++batches;
+ }
i=-1;
}
+
+ public int total() {
+ return count;
+ }
+
+ public int batches() {
+ return batches;
+ }
}
diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java
index 8db2b47..e51fcfd 100644
--- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java
+++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/helpers/Cred.java
@@ -136,7 +136,6 @@
public static void load(Trans trans, Session session, int ... types ) {
load(trans, session,"select id, type, expires, other, writetime(cred), tag from authz.cred;",types);
-
}
public static void loadOneNS(Trans trans, Session session, String ns,int ... types ) {
diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java
index 0179593..870dc1e 100644
--- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java
+++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Extend.java
@@ -25,7 +25,9 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.GregorianCalendar;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
import org.onap.aaf.auth.batch.Batch;
import org.onap.aaf.auth.batch.BatchPrincipal;
@@ -78,12 +80,14 @@
gcType = GregorianCalendar.WEEK_OF_YEAR;
int weeks = 4;
+ Set<String> cmd = new HashSet<>();
for(int i=0; i< args().length;++i) {
if("-weeks".equals(args()[i])) {
if(args().length>i+1) {
- weeks = Integer.parseInt(args()[i +1]);
- break;
+ weeks = Integer.parseInt(args()[++i]);
}
+ } else {
+ cmd.add(args()[i]);
}
}
@@ -95,12 +99,12 @@
// Create Intermediate Output
File logDir = logDir();
extFiles = new ArrayList<>();
- if(args().length>0) {
- for(int i=0;i<args().length;++i) {
- extFiles.add(new File(logDir, args()[i]));
- }
- } else {
+ if(cmd.isEmpty()) {
extFiles.add(new File(logDir,PrepExtend.PREP_EXTEND+Chrono.dateOnlyStamp()+".csv"));
+ } else {
+ for(String fn : cmd) {
+ extFiles.add(new File(logDir, fn));
+ }
}
// Load Cred. We don't follow Visitor, because we have to gather up everything into Identity Anyway
@@ -142,7 +146,7 @@
case "ur":
hi.set(++i);
gc = hgc.get();
- gc.setTime(new Date(Long.parseLong(row.get(5))));
+ gc.setTime(new Date(Long.parseLong(row.get(6))));
if(gc.before(now)) {
gc.setTime(now.getTime());
}
diff --git a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java
index 87bdf27..a3d37a0 100644
--- a/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java
+++ b/auth/auth-batch/src/main/java/org/onap/aaf/auth/batch/update/Upload.java
@@ -30,20 +30,18 @@
import java.util.List;
import org.onap.aaf.auth.batch.Batch;
+import org.onap.aaf.auth.batch.helpers.CQLBatch;
+import org.onap.aaf.auth.batch.helpers.CQLBatchLoop;
import org.onap.aaf.auth.env.AuthzTrans;
import org.onap.aaf.auth.org.OrganizationException;
import org.onap.aaf.misc.env.APIException;
import org.onap.aaf.misc.env.Env;
+import org.onap.aaf.misc.env.LogTarget;
import org.onap.aaf.misc.env.TimeTaken;
-import com.datastax.driver.core.ResultSet;
-
public class Upload extends Batch {
- private static final int BATCH_LENGTH = 100;
-
- int count;
- int batchCnt;
+ private CQLBatchLoop cqlBatch;
// APPROVALS
private static final String APPR_INS_FMT=" INSERT INTO authz.approval "
@@ -104,7 +102,8 @@
} finally {
tt.done();
}
-
+
+ cqlBatch = new CQLBatchLoop(new CQLBatch(LogTarget.NULL,session),50,dryRun);
} finally {
tt0.done();
}
@@ -114,35 +113,36 @@
protected void run(AuthzTrans trans) {
String line;
StringBuilder sb = new StringBuilder();
- StringBuilder query = new StringBuilder();
List<String> array = new ArrayList<String>();
for(String feed : args()) {
- File file = new File(feed + ".dat");
+ File file;
+ if(feed.endsWith(".dat")) {
+ file = new File(feed);
+ feed = file.getName();
+ feed = feed.substring(0,feed.length()-4);
+ } else {
+ file = new File(feed+".dat");
+ }
TimeTaken tt = trans.start(file.getAbsolutePath(), Env.SUB);
- System.out.println("#### Running " + feed + ".dat Feed ####");
+ String msg = String.format("#### Running %s.dat Feed ####",feed);
+ trans.info().log(msg);
+ System.out.println(msg);
+ BufferedReader br = null;
try {
-
if(file.exists()) {
- count=batchCnt=0;
- boolean justOne = false;
try {
- BufferedReader br = new BufferedReader(new FileReader(file));
+ br = new BufferedReader(new FileReader(file));
try {
while((line=br.readLine())!=null) {
if(line.length()>5000) {
- if(query.length()>0) {
- applyBatch(query);
- justOne=true;
- }
+ cqlBatch.flush();
}
- if(query.length()==0) {
- query.append("BEGIN BATCH\n");
- }
+ cqlBatch.preLoop();
+
// Split into fields, first turning Escaped values into something we can convert back from
char c=0;
boolean inQuote = false;
int fldcnt = 0;
-
for(int i=0;i<line.length();++i) {
switch(c=line.charAt(i)) {
case '"':
@@ -160,37 +160,27 @@
}
}
addField(feed,fldcnt,array,sb);
- query.append(build(feed, array));
-
- if((++count % BATCH_LENGTH)==0 || justOne) {
- applyBatch(query);
- justOne=false;
- }
+ cqlBatch.inc().append(build(feed, array));
}
- if(query.length()>0) {
- applyBatch(query);
- }
-
+ cqlBatch.flush();
+ } catch (Exception t) {
+ trans.error().log(t);
} finally {
br.close();
- sb.setLength(0);
- query.setLength(0);
}
-
} catch (IOException e) {
trans.error().log(e);
- e.printStackTrace();
}
-
} else {
trans.error().log("No file found: ", file.getAbsolutePath());
}
} finally {
tt.done();
System.err.flush();
- System.out.printf("\n%d applied in %d batches\n",count,batchCnt);
+ msg = String.format("\n%d applied in %d batches\n",cqlBatch.total(), cqlBatch.batches());
+ trans.info().log(msg);
+ System.out.println(msg);
}
-
}
}
@@ -198,32 +188,36 @@
private String build(String feed, List<String> array) {
String rv;
- switch(feed) {
- case "approval":
- rv = String.format(APPR_INS_FMT,array.toArray());
- break;
- case "artifact":
- rv = String.format(ARTI_INS_FMT,array.toArray());
- break;
- case "cred":
- rv = String.format(CRED_INS_FMT,array.toArray());
- break;
- case "ns":
- rv = String.format(NS_INS_FMT,array.toArray());
- break;
- case "role":
- rv = String.format(ROLE_INS_FMT,array.toArray());
- break;
- case "perm":
- rv = String.format(PERM_INS_FMT,array.toArray());
- break;
- case "x509":
- rv = String.format(X509_INS_FMT,array.toArray());
- break;
- default:
- rv = "";
+ if(array.size()>0) {
+ switch(feed) {
+ case "approval":
+ rv = String.format(APPR_INS_FMT,array.toArray());
+ break;
+ case "artifact":
+ rv = String.format(ARTI_INS_FMT,array.toArray());
+ break;
+ case "cred":
+ rv = String.format(CRED_INS_FMT,array.toArray());
+ break;
+ case "ns":
+ rv = String.format(NS_INS_FMT,array.toArray());
+ break;
+ case "role":
+ rv = String.format(ROLE_INS_FMT,array.toArray());
+ break;
+ case "perm":
+ rv = String.format(PERM_INS_FMT,array.toArray());
+ break;
+ case "x509":
+ rv = String.format(X509_INS_FMT,array.toArray());
+ break;
+ default:
+ rv = "";
+ }
+ array.clear();
+ } else {
+ rv = "";
}
- array.clear();
return rv;
}
@@ -290,23 +284,6 @@
}
}
- private void applyBatch(StringBuilder query) {
- try {
- query.append("APPLY BATCH;");
- ResultSet rv = session.execute(query.toString());
- if(rv.wasApplied()) {
- System.out.print('.');
- if((++batchCnt % 60)==0) {
- System.out.println();
- }
- } else {
- System.out.print("Data NOT APPLIED");
- }
- } finally {
- query.setLength(0);
- }
- }
-
@Override
protected void _close(AuthzTrans trans) {
diff --git a/auth/auth-cass/cass_init/cmd.sh b/auth/auth-cass/cass_init/cmd.sh
index ca1c0e8..b26536d 100644
--- a/auth/auth-cass/cass_init/cmd.sh
+++ b/auth/auth-cass/cass_init/cmd.sh
@@ -97,10 +97,10 @@
wait_start cassandra responsive
# Now, make sure data exists
if [ ! -e $INSTALLED_VERSION ] && [ -n "$(/usr/bin/cqlsh -e 'describe keyspaces' | grep authz)" ]; then
- /usr/bin/cqlsh --timeout 60 -e 'DROP KEYSPACE authz'
+ /usr/bin/cqlsh --request-timeout=60 -e 'DROP KEYSPACE authz'
fi
- if [ -z "`/usr/bin/cqlsh --timeout 60 -e 'describe keyspaces' | grep authz`" ]; then
+ if [ -z "`/usr/bin/cqlsh --request-timeout 60 -e 'describe keyspaces' | grep authz`" ]; then
status install
echo "Initializing Cassandra DB"
echo "Docker Installed Basic Cassandra on aaf.cass. Executing the following "
diff --git a/auth/auth-cass/cass_init/push.sh b/auth/auth-cass/cass_init/push.sh
index f3e5d80..f887f07 100644
--- a/auth/auth-cass/cass_init/push.sh
+++ b/auth/auth-cass/cass_init/push.sh
@@ -35,11 +35,7 @@
cd dats
for T in $(ls *.dat); do
if [ -s $T ]; then
- until cqlsh --request-timeout=60 -e "COPY authz.${T/.dat/} FROM '$T' WITH DELIMITER='|';";
- do
- echo "Unexpected failure...sleep for 10 seconds and try again"
- sleep 10
- done
+ cqlsh --request-timeout=100 -e "COPY authz.${T/.dat/} FROM '$T' WITH DELIMITER='|';";
fi
done
cd $DIR
diff --git a/auth/auth-cass/docker/dbash.sh b/auth/auth-cass/docker/dbash.sh
index 1e13d27..737cf10 100644
--- a/auth/auth-cass/docker/dbash.sh
+++ b/auth/auth-cass/docker/dbash.sh
@@ -24,5 +24,5 @@
fi
DOCKER=${DOCKER:-docker}
-$DOCKER exec -it aaf_cass bash
+$DOCKER exec -it aaf-cass bash