Skip to content

Commit

Permalink
Merge pull request #317 from jakopako/jakopako/issue316
Browse files Browse the repository at this point in the history
Add option to generate summary
  • Loading branch information
jakopako authored Nov 24, 2024
2 parents 1f43198 + 423c582 commit 4bd49c3
Show file tree
Hide file tree
Showing 5 changed files with 105 additions and 18 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (

require (
github.com/antchfx/jsonquery v1.3.6
github.com/olekukonko/tablewriter v0.0.5
github.com/sjwhitworth/golearn v0.0.0-20221228163002-74ae077eafb2
golang.org/x/exp v0.0.0-20231219180239-dc181d75b848
)
Expand All @@ -28,7 +29,6 @@ require (
github.com/gonum/matrix v0.0.0-20181209220409-c518dec07be9 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/guptarohit/asciigraph v0.5.6 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/rocketlaunchr/dataframe-go v0.0.0-20211025052708-a1030444159b // indirect
golang.org/x/sync v0.9.0 // indirect
gonum.org/v1/gonum v0.14.0 // indirect
Expand Down
75 changes: 70 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,80 @@ import (
"math"
"os"
"runtime/debug"
"sort"
"strconv"
"sync"

"github.com/jakopako/goskyr/autoconfig"
"github.com/jakopako/goskyr/config"
"github.com/jakopako/goskyr/ml"
"github.com/jakopako/goskyr/output"
"github.com/jakopako/goskyr/scraper"
"github.com/olekukonko/tablewriter"
"gopkg.in/yaml.v3"
)

var version = "dev"

func worker(sc chan scraper.Scraper, ic chan map[string]interface{}, gc *scraper.GlobalConfig, threadNr int) {
func worker(sc <-chan scraper.Scraper, ic chan<- map[string]interface{}, stc chan<- scraper.ScrapingStats, gc *scraper.GlobalConfig, threadNr int) {
workerLogger := slog.With(slog.Int("thread", threadNr))
for s := range sc {
scraperLogger := workerLogger.With(slog.String("name", s.Name))
scraperLogger.Info("starting scraping task")
items, err := s.GetItems(gc, false)
result, err := s.Scrape(gc, false)
if err != nil {
scraperLogger.Error(fmt.Sprintf("%s: %s", s.Name, err))
continue
}
scraperLogger.Info(fmt.Sprintf("fetched %d items", len(items)))
for _, item := range items {
scraperLogger.Info(fmt.Sprintf("fetched %d items", result.Stats.NrItems))
for _, item := range result.Items {
ic <- item
}
stc <- *result.Stats
}
workerLogger.Info("done working")
}

func collectAllStats(stc <-chan scraper.ScrapingStats) []scraper.ScrapingStats {
result := []scraper.ScrapingStats{}
for st := range stc {
result = append(result, st)
}
return result
}

func printAllStats(stats []scraper.ScrapingStats) {
slog.Info("printing scraper summary")
// sort by name alphabetically
sort.Slice(stats, func(i, j int) bool {
return stats[i].Name < stats[j].Name
})

total := scraper.ScrapingStats{
Name: "total",
}

table := tablewriter.NewWriter(os.Stdout)
table.SetHeader([]string{"Name", "Items", "Errors"})

for _, s := range stats {
row := []string{s.Name, strconv.Itoa(s.NrItems), strconv.Itoa(s.NrErrors)}
if s.NrErrors > 0 {
table.Rich(row, []tablewriter.Colors{{tablewriter.Normal, tablewriter.FgRedColor}, {tablewriter.Normal, tablewriter.FgRedColor}, {tablewriter.Normal, tablewriter.FgRedColor}})
} else if s.NrErrors == 0 && s.NrItems == 0 {
table.Rich(row, []tablewriter.Colors{{tablewriter.Normal, tablewriter.FgYellowColor}, {tablewriter.Normal, tablewriter.FgYellowColor}, {tablewriter.Normal, tablewriter.FgYellowColor}})
} else {
table.Append(row)
}
total.NrErrors += s.NrErrors
total.NrItems += s.NrItems
}
table.SetFooter([]string{total.Name, strconv.Itoa(total.NrItems), strconv.Itoa(total.NrErrors)})
table.SetColumnAlignment([]int{tablewriter.ALIGN_LEFT, tablewriter.ALIGN_RIGHT, tablewriter.ALIGN_RIGHT})
table.SetBorder(false)
table.Render()
}

func main() {
singleScraper := flag.String("s", "", "The name of the scraper to be run.")
toStdout := flag.Bool("stdout", false, "If set to true the scraped data will be written to stdout despite any other existing writer configurations. In combination with the -generate flag the newly generated config will be written to stdout instead of to a file.")
Expand All @@ -51,6 +95,7 @@ func main() {
buildModel := flag.String("t", "", "Train a ML model based on the given csv features file. This will generate 2 files, goskyr.model and goskyr.class")
modelPath := flag.String("model", "", "Use a pre-trained ML model to infer names of extracted fields. Works in combination with the -g flag.")
debugFlag := flag.Bool("debug", false, "Prints debug logs and writes scraped html's to files.")
summaryFlag := flag.Bool("summary", false, "Print scraper summary at the end.")

flag.Parse()

Expand Down Expand Up @@ -143,6 +188,7 @@ func main() {

var workerWg sync.WaitGroup
var writerWg sync.WaitGroup
var statsWg sync.WaitGroup
ic := make(chan map[string]interface{})

var writer output.Writer
Expand All @@ -167,6 +213,7 @@ func main() {
}

sc := make(chan scraper.Scraper)
stc := make(chan scraper.ScrapingStats)

// fill worker queue
go func() {
Expand All @@ -190,7 +237,7 @@ func main() {
for i := 0; i < nrWorkers; i++ {
go func(j int) {
defer workerWg.Done()
worker(sc, ic, &config.Global, j)
worker(sc, ic, stc, &config.Global, j)
}(i)
}

Expand All @@ -201,7 +248,25 @@ func main() {
defer writerWg.Done()
writer.Write(ic)
}()

// start stats collection
statsWg.Add(1)
slog.Debug("starting stats collection")
go func() {
defer statsWg.Done()
allStats := collectAllStats(stc)
writerWg.Wait() // only print stats in the end
// a bit ugly to just check here and do the collection
// of the stats even though they might not be needed.
// But this is easier for now, coding-wise.
if *summaryFlag {
printAllStats(allStats)
}
}()

workerWg.Wait()
close(ic)
close(stc)
writerWg.Wait()
statsWg.Wait()
}
4 changes: 2 additions & 2 deletions ml/ml.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,12 @@ func writeFeaturesToFile(filename string, featuresChan <-chan *Features, wg *syn
func calculateScraperFeatures(s scraper.Scraper, featuresChan chan<- *Features, wordMap map[string]bool, globalConfig *scraper.GlobalConfig, wg *sync.WaitGroup) {
defer wg.Done()
log.Printf("calculating features for %s\n", s.Name)
items, err := s.GetItems(globalConfig, true)
result, err := s.Scrape(globalConfig, true)
if err != nil {
log.Printf("%s ERROR: %s", s.Name, err)
return
}
for _, item := range items {
for _, item := range result.Items {
for fName, fValue := range item {
fValueString := fValue.(string)
f := calculateFeatures(fName, fValueString, wordMap)
Expand Down
3 changes: 2 additions & 1 deletion output/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ func (f *APIWriter) Write(items chan map[string]interface{}) {
batch = append(batch, item)
if len(batch) == 100 {
if err := postBatch(client, batch, apiURL, apiUser, apiPassword); err != nil {
fmt.Printf("%v\n", err)
logger.Error(fmt.Sprintf("%v\n", err))
// fmt.Printf("%v\n", err)
} else {
nrItemsWritten = nrItemsWritten + 100
}
Expand Down
39 changes: 30 additions & 9 deletions scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,13 +249,24 @@ type Scraper struct {
fetcher fetch.Fetcher
}

// GetItems fetches and returns all items from a website according to the
type ScrapingStats struct {
Name string
NrItems int
NrErrors int
}

type ScrapingResult struct {
Items []map[string]interface{}
Stats *ScrapingStats
}

// Scrape fetches and returns all items from a website according to the
// Scraper's paramaters. When rawDyn is set to true the items returned are
// not processed according to their type but instead the raw values based
// only on the location are returned (ignore regex_extract??). And only those
// of dynamic fields, ie fields that don't have a predefined value and that are
// present on the main page (not subpages). This is used by the ML feature generation.
func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string]interface{}, error) {
func (c Scraper) Scrape(globalConfig *GlobalConfig, rawDyn bool) (*ScrapingResult, error) {

scrLogger := slog.With(slog.String("name", c.Name))
// initialize fetcher
Expand All @@ -269,11 +280,16 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string
}
}

var items []map[string]interface{}
result := &ScrapingResult{
Items: []map[string]interface{}{},
Stats: &ScrapingStats{
Name: c.Name,
},
}

scrLogger.Debug("initializing filters")
if err := c.initializeFilters(); err != nil {
return items, err
return result, err
}

hasNextPage := true
Expand All @@ -282,7 +298,7 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string

hasNextPage, pageURL, doc, err := c.fetchPage(nil, currentPage, c.URL, globalConfig.UserAgent, c.Interaction)
if err != nil {
return items, err
return result, err
}

for hasNextPage {
Expand All @@ -306,6 +322,7 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string
}
if err != nil {
scrLogger.Error(fmt.Sprintf("error while parsing field %s: %v. Skipping item %v.", f.Name, err, currentItem))
result.Stats.NrErrors++
return
}
}
Expand All @@ -332,11 +349,13 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string
subRes, err := c.fetcher.Fetch(subpageURL, fetch.FetchOpts{})
if err != nil {
scrLogger.Error(fmt.Sprintf("%v. Skipping item %v.", err, currentItem))
result.Stats.NrErrors++
return
}
subDoc, err := goquery.NewDocumentFromReader(strings.NewReader(subRes))
if err != nil {
scrLogger.Error(fmt.Sprintf("error while reading document: %v. Skipping item %v", err, currentItem))
result.Stats.NrErrors++
return
}
subDocs[subpageURL] = subDoc
Expand All @@ -345,6 +364,7 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string
err = extractField(&f, currentItem, subDocs[subpageURL].Selection, baseURLSubpage)
if err != nil {
scrLogger.Error(fmt.Sprintf("error while parsing field %s: %v. Skipping item %v.", f.Name, err, currentItem))
result.Stats.NrErrors++
return
}
// filter fast!
Expand All @@ -359,20 +379,21 @@ func (c Scraper) GetItems(globalConfig *GlobalConfig, rawDyn bool) ([]map[string
filter := c.filterItem(currentItem)
if filter {
currentItem = c.removeHiddenFields(currentItem)
items = append(items, currentItem)
result.Items = append(result.Items, currentItem)
result.Stats.NrItems++
}
})

currentPage++
hasNextPage, pageURL, doc, err = c.fetchPage(doc, currentPage, pageURL, globalConfig.UserAgent, nil)
if err != nil {
return items, err
return result, err
}
}

c.guessYear(items, time.Now())
c.guessYear(result.Items, time.Now())

return items, nil
return result, nil
}

func (c *Scraper) guessYear(items []map[string]interface{}, ref time.Time) {
Expand Down

0 comments on commit 4bd49c3

Please sign in to comment.