Skip to content

Commit

Permalink
Add option to generate summary
Browse files Browse the repository at this point in the history
Fixes #316
  • Loading branch information
jakopako committed Nov 23, 2024
1 parent 1f43198 commit 7f2fd83
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 17 deletions.
49 changes: 44 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math"
"os"
"runtime/debug"
"sort"
"sync"

"github.com/jakopako/goskyr/autoconfig"
Expand All @@ -19,24 +20,47 @@ import (

var version = "dev"

func worker(sc chan scraper.Scraper, ic chan map[string]interface{}, gc *scraper.GlobalConfig, threadNr int) {
func worker(sc <-chan scraper.Scraper, ic chan<- map[string]interface{}, stc chan<- scraper.ScrapingStats, gc *scraper.GlobalConfig, threadNr int) {
workerLogger := slog.With(slog.Int("thread", threadNr))
for s := range sc {
scraperLogger := workerLogger.With(slog.String("name", s.Name))
scraperLogger.Info("starting scraping task")
items, err := s.GetItems(gc, false)
result, err := s.Scrape(gc, false)
if err != nil {
scraperLogger.Error(fmt.Sprintf("%s: %s", s.Name, err))
continue
}
scraperLogger.Info(fmt.Sprintf("fetched %d items", len(items)))
for _, item := range items {
scraperLogger.Info(fmt.Sprintf("fetched %d items", result.Stats.NrItems))
for _, item := range result.Items {
ic <- item
}
stc <- *result.Stats
}
workerLogger.Info("done working")
}

func collectAllStats(stc <-chan scraper.ScrapingStats) []scraper.ScrapingStats {
result := []scraper.ScrapingStats{}
for st := range stc {
result = append(result, st)
}
return result
}

func printAllStats(stats []scraper.ScrapingStats) {
// TODO: nicer format/layout, table format and colors depending on nrs, e.g. red for errors
statsString := ""
// sort by name alphabetically
sort.Slice(stats, func(i, j int) bool {
return stats[i].Name < stats[j].Name
})
for _, s := range stats {
statsString += fmt.Sprintf("name: %s, items: %d, errors: %d\n", s.Name, s.NrItems, s.NrErrors)
}
// TODO add total of everything
fmt.Println(statsString)
}

func main() {
singleScraper := flag.String("s", "", "The name of the scraper to be run.")
toStdout := flag.Bool("stdout", false, "If set to true the scraped data will be written to stdout despite any other existing writer configurations. In combination with the -generate flag the newly generated config will be written to stdout instead of to a file.")
Expand Down Expand Up @@ -143,6 +167,7 @@ func main() {

var workerWg sync.WaitGroup
var writerWg sync.WaitGroup
var statsWg sync.WaitGroup
ic := make(chan map[string]interface{})

var writer output.Writer
Expand All @@ -167,6 +192,7 @@ func main() {
}

sc := make(chan scraper.Scraper)
stc := make(chan scraper.ScrapingStats)

// fill worker queue
go func() {
Expand All @@ -190,7 +216,7 @@ func main() {
for i := 0; i < nrWorkers; i++ {
go func(j int) {
defer workerWg.Done()
worker(sc, ic, &config.Global, j)
worker(sc, ic, stc, &config.Global, j)
}(i)
}

Expand All @@ -201,7 +227,20 @@ func main() {
defer writerWg.Done()
writer.Write(ic)
}()

// start stats collection
statsWg.Add(1)
slog.Debug("starting stats collection")
go func() {
defer statsWg.Done()
allStats := collectAllStats(stc)
writerWg.Wait() // only print stats in the end
printAllStats(allStats)
}()

workerWg.Wait()
close(ic)
close(stc)
writerWg.Wait()
statsWg.Wait()
}
4 changes: 2 additions & 2 deletions ml/ml.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ func writeFeaturesToFile(filename string, featuresChan <-chan *Features, wg *syn
func calculateScraperFeatures(s scraper.Scraper, featuresChan chan<- *Features, wordMap map[string]bool, globalConfig *scraper.GlobalConfig, wg *sync.WaitGroup) {
defer wg.Done()
log.Printf("calculating features for %s\n", s.Name)
items, err := s.GetItems(globalConfig, true)
result, err := s.Scrape(globalConfig, true)
if err != nil {
log.Printf("%s ERROR: %s", s.Name, err)
return
}
for _, item := range items {
for _, item := range result.Items {
for fName, fValue := range item {
fValueString := fValue.(string)
f := calculateFeatures(fName, fValueString, wordMap)
Expand Down
3 changes: 2 additions & 1 deletion output/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func (f *APIWriter) Write(items chan map[string]interface{}) {
batch = append(batch, item)
if len(batch) == 100 {
if err := postBatch(client, batch, apiURL, apiUser, apiPassword); err != nil {
fmt.Printf("%v\n", err)
logger.Error(fmt.Sprintf("%v\n", err))
// fmt.Printf("%v\n", err)
} else {
nrItemsWritten = nrItemsWritten + 100
}
Expand Down
39 changes: 30 additions & 9 deletions scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,24 @@ type Scraper struct {
fetcher fetch.Fetcher
}

// GetItems fetches and returns all items from a website according to the
type ScrapingStats struct {
Name string
NrItems int
NrErrors int
}

type ScrapingResult struct {
Items []map[string]interface{}
Stats *ScrapingStats
}

// Scrape fetches and returns all items from a website according to the
// Scraper's paramaters. When rawDyn is set to true the items returned are
// not processed according to their type but instead the raw values based
// only on the location are returned (ignore regex_extract??). And only those
// of dynamic fields, ie fields that don't have a predefined value and that are
// present on the main page (not subpages). This is used by the ML feature generation.
func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string]interface{}, error) {
func (c Scraper) Scrape(globalConfig *GlobalConfig, rawDyn bool) (*ScrapingResult, error) {

scrLogger := slog.With(slog.String("name", c.Name))
// initialize fetcher
Expand All @@ -269,11 +280,16 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string
}
}

var items []map[string]interface{}
result := &ScrapingResult{
Items: []map[string]interface{}{},
Stats: &ScrapingStats{
Name: c.Name,
},
}

scrLogger.Debug("initializing filters")
if err := c.initializeFilters(); err != nil {
return items, err
return result, err
}

hasNextPage := true
Expand All @@ -282,7 +298,7 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string

hasNextPage, pageURL, doc, err := c.fetchPage(nil, currentPage, c.URL, globalConfig.UserAgent, c.Interaction)
if err != nil {
return items, err
return result, err
}

for hasNextPage {
Expand All @@ -306,6 +322,7 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string
}
if err != nil {
scrLogger.Error(fmt.Sprintf("error while parsing field %s: %v. Skipping item %v.", f.Name, err, currentItem))
result.Stats.NrErrors++
return
}
}
Expand All @@ -332,11 +349,13 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string
subRes, err := c.fetcher.Fetch(subpageURL, fetch.FetchOpts{})
if err != nil {
scrLogger.Error(fmt.Sprintf("%v. Skipping item %v.", err, currentItem))
result.Stats.NrErrors++
return
}
subDoc, err := goquery.NewDocumentFromReader(strings.NewReader(subRes))
if err != nil {
scrLogger.Error(fmt.Sprintf("error while reading document: %v. Skipping item %v", err, currentItem))
result.Stats.NrErrors++
return
}
subDocs[subpageURL] = subDoc
Expand All @@ -345,6 +364,7 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string
err = extractField(&f, currentItem, subDocs[subpageURL].Selection, baseURLSubpage)
if err != nil {
scrLogger.Error(fmt.Sprintf("error while parsing field %s: %v. Skipping item %v.", f.Name, err, currentItem))
result.Stats.NrErrors++
return
}
// filter fast!
Expand All @@ -359,20 +379,21 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string
filter := c.filterItem(currentItem)
if filter {
currentItem = c.removeHiddenFields(currentItem)
items = append(items, currentItem)
result.Items = append(result.Items, currentItem)
result.Stats.NrItems++
}
})

currentPage++
hasNextPage, pageURL, doc, err = c.fetchPage(doc, currentPage, pageURL, globalConfig.UserAgent, nil)
if err != nil {
return items, err
return result, err
}
}

c.guessYear(items, time.Now())
c.guessYear(result.Items, time.Now())

return items, nil
return result, nil
}

func (c *Scraper) guessYear(items []map[string]interface{}, ref time.Time) {
Expand Down

0 comments on commit 7f2fd83

Please sign in to comment.