diff --git a/README.md b/README.md index 540febc..47899fe 100644 --- a/README.md +++ b/README.md @@ -55,12 +55,5 @@ ### Modules -There is a list of default modules (focused on xml & pdf). - -Those URL NEED to be updated when merge branch will be ok. - -- [FILETYPE](https://github.com/istex/sisyphe/tree/master/src/worker/filetype) Will detect mimetype,extension, corrupted files.. -- [PDF](https://github.com/istex/sisyphe/tree/master/src/worker/pdf) Will get info from PDF (version, author, meta...) -- [XML](https://github.com/istex/sisyphe/tree/master/src/worker/xml) Will check if it's wellformed, valid-dtd's, get elements from balises ... -- [XPATH](https://github.com/istex/sisyphe/tree/master/src/worker/xpath) Will generate a complete list of xpaths from submitted folder -- [OUT](https://github.com/istex/sisyphe/tree/master/src/worker/out) Will export data to json file & ElasticSearch database +- XML +Usage of poppler function (`pdftotext` and `pdfinfo`) diff --git a/main.go b/main.go index a0c14b8..2636122 100644 --- a/main.go +++ b/main.go @@ -9,6 +9,8 @@ "log" "os" "path/filepath" + "regexp" + "runtime" "strconv" "sync" "time" @@ -45,7 +47,7 @@ xml LogMessageXML } -var canal = make(chan struct{}, 12) +var queueForConcurrent = make(chan struct{}, runtime.NumCPU()) var wg sync.WaitGroup var numberFiles int = 0 var corpusPath = flag.String("p", "", "Corpus path") @@ -97,12 +99,15 @@ }).Info("") } numberFiles++ - fmt.Printf("\rFiles processed: %d", numberFiles) - return + if numberFiles%5000 == 0 { + fmt.Printf("\rFiles processed: %d", numberFiles) + } } func getAllFiles(dir string) { defer wg.Done() + queueForConcurrent <- struct{}{} + defer func() { <-queueForConcurrent }() visit := func(path string, file os.FileInfo, err error) error { if err != nil { @@ -133,6 +138,8 @@ wg.Add(1) go processPDF(&fileData) } else if extension == ".xml" { + m1 := regexp.MustCompile(`;.*`) + fileData.mimetype = m1.ReplaceAllString(fileData.mimetype, "") wg.Add(1) go processXML(&fileData) } else { @@ -161,6 +168,7 @@ // init logger sec := time.Now().Unix() logPath := *outputPath + "/" + strconv.Itoa(int(sec)) + "-" + *corpusName + log.Println(color.Green + "Begin of program with " + strconv.Itoa(runtime.NumCPU()) + " CPU" + color.Reset) log.Println("Read corpus in", *corpusPath, "and write out in", logPath) os.MkdirAll(logPath, os.ModePerm|os.ModeDir) outputFile, err := os.OpenFile(logPath+"/analyse-logs.json", os.O_WRONLY|os.O_CREATE, 0755) @@ -183,9 +191,10 @@ wg.Add(1) getAllFiles(*corpusPath) wg.Wait() - close(canal) + close(queueForConcurrent) elapsed := time.Since(start) fmt.Println("") + log.Println(color.Green + "End of program with " + strconv.Itoa(numberFiles) + " files processed" + color.Reset) log.Printf("Total time %s", elapsed) } diff --git a/pdf.go b/pdf.go index 9d745c1..36eab34 100644 --- a/pdf.go +++ b/pdf.go @@ -13,10 +13,10 @@ } func processPDF(message *LogMessage) { - // queue for read pdf - canal <- struct{}{} - defer func() { <-canal }() + // queue for read pdf (limit number of parallel read files) defer wg.Done() + queueForConcurrent <- struct{}{} + defer func() { <-queueForConcurrent }() metadata, err := getMetadata(message.path) diff --git a/xml.go b/xml.go index 4c3408e..7e2c61b 100644 --- a/xml.go +++ b/xml.go @@ -7,10 +7,10 @@ ) func processXML(message *LogMessage) { - // queue for read xml - canal <- struct{}{} - defer func() { <-canal }() + // queue for read xml (limit number of parallel read files) defer wg.Done() + queueForConcurrent <- struct{}{} + defer func() { <-queueForConcurrent }() message.xml.isWellFormed = ParseXml(message.path) writeLog(message)