209 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			209 lines
		
	
	
		
			4.8 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package main
 | |
| 
 | |
| import (
 | |
| 	"encoding/json"
 | |
| 	"io"
 | |
| 	"net/http"
 | |
| 	"sync"
 | |
| 
 | |
| 	"github.com/go-co-op/gocron/v2"
 | |
| 	"github.com/prometheus/client_golang/prometheus"
 | |
| 	"github.com/prometheus/client_golang/prometheus/promhttp"
 | |
| 
 | |
| 	log "github.com/sirupsen/logrus"
 | |
| )
 | |
| 
 | |
| type DomainState struct {
 | |
| 	// Hits per page (file)
 | |
| 	Pages        map[string]int
 | |
| 	PagesMetrics map[string]prometheus.Gauge
 | |
| 
 | |
| 	// Referrals
 | |
| 	Referrals        map[string]int
 | |
| 	ReferralsMetrics map[string]prometheus.Gauge
 | |
| }
 | |
| 
 | |
| type State struct {
 | |
| 	// Maps domain names to the sliding window
 | |
| 	Domains map[string]DomainState
 | |
| 
 | |
| 	// Lock guarding the access to Domains
 | |
| 	Lock sync.Mutex
 | |
| }
 | |
| 
 | |
| func increment(key string, dict *map[string]int) {
 | |
| 	v, found := (*dict)[key]
 | |
| 	if !found {
 | |
| 		(*dict)[key] = 1
 | |
| 	} else {
 | |
| 		(*dict)[key] = v + 1
 | |
| 	}
 | |
| }
 | |
| 
 | |
| type Track struct {
 | |
| 	Domain  string `json:"domain"`
 | |
| 	Referer string `json:"referer"`
 | |
| 	Path    string `json:"path"`
 | |
| }
 | |
| 
 | |
| func ingestHandler(registry *prometheus.Registry, state *State) http.HandlerFunc {
 | |
| 	return func(w http.ResponseWriter, req *http.Request) {
 | |
| 		if req.Method != http.MethodPost {
 | |
| 			log.Debugf("Unexpected %s request", req.Method)
 | |
| 			w.WriteHeader(http.StatusBadRequest)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		body, err := io.ReadAll(req.Body)
 | |
| 		if err != nil {
 | |
| 			log.Debugf("Failed to read body: %v", err)
 | |
| 			w.WriteHeader(http.StatusBadRequest)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		var track Track
 | |
| 		err = json.Unmarshal(body, &track)
 | |
| 		if err != nil {
 | |
| 			log.Debugf("Failed to unmarshal request body: %v", err)
 | |
| 			w.WriteHeader(http.StatusBadRequest)
 | |
| 			return
 | |
| 		}
 | |
| 
 | |
| 		// Acquire lock
 | |
| 		state.Lock.Lock()
 | |
| 		defer state.Lock.Unlock()
 | |
| 
 | |
| 		// Update everything
 | |
| 		log.Debugf("%v", state.Domains)
 | |
| 		log.Debugf("Got Domain: %s", track.Domain)
 | |
| 		domainState, found := state.Domains[track.Domain]
 | |
| 		if !found {
 | |
| 			log.Debugf("Domain %s not found. Creating state", track.Domain)
 | |
| 			domainState = DomainState{
 | |
| 				Pages:            make(map[string]int),
 | |
| 				PagesMetrics:     make(map[string]prometheus.Gauge),
 | |
| 				Referrals:        make(map[string]int),
 | |
| 				ReferralsMetrics: make(map[string]prometheus.Gauge),
 | |
| 			}
 | |
| 			state.Domains[track.Domain] = domainState
 | |
| 		}
 | |
| 
 | |
| 		// Record the referrer
 | |
| 		referrer := track.Referer
 | |
| 		if referrer != "" {
 | |
| 			v, found := domainState.Referrals[referrer]
 | |
| 			if found {
 | |
| 				value := v + 1
 | |
| 				domainState.Referrals[referrer] = value
 | |
| 
 | |
| 				metric, _ := domainState.ReferralsMetrics[referrer]
 | |
| 				metric.Set(float64(value))
 | |
| 			} else {
 | |
| 				metric := prometheus.NewGauge(
 | |
| 					prometheus.GaugeOpts{
 | |
| 						Name:        "referals",
 | |
| 						ConstLabels: prometheus.Labels{"domain": track.Domain, "referer": referrer},
 | |
| 					},
 | |
| 				)
 | |
| 				metric.Set(1)
 | |
| 				domainState.Referrals[referrer] = 1
 | |
| 				domainState.ReferralsMetrics[referrer] = metric
 | |
| 				log.Debugf("Registering gauge \"referals\" with referer=%s", referrer)
 | |
| 				registry.MustRegister(metric)
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		// Record the file
 | |
| 		path := track.Path
 | |
| 		v, found := domainState.Pages[path]
 | |
| 		log.Debugf("v=%d, found=%v", v, found)
 | |
| 		if found {
 | |
| 			value := v + 1
 | |
| 			domainState.Pages[path] = value
 | |
| 
 | |
| 			metric, _ := domainState.PagesMetrics[path]
 | |
| 			metric.Set(float64(value))
 | |
| 		} else {
 | |
| 			metric := prometheus.NewGauge(
 | |
| 				prometheus.GaugeOpts{
 | |
| 					Name:        "pages",
 | |
| 					ConstLabels: prometheus.Labels{"domain": track.Domain, "path": path},
 | |
| 				},
 | |
| 			)
 | |
| 			metric.Set(1)
 | |
| 			domainState.Pages[path] = 1
 | |
| 			domainState.PagesMetrics[path] = metric
 | |
| 			log.Debugf("Registering gauge \"path\" with path=%s", path)
 | |
| 			registry.MustRegister(metric)
 | |
| 		}
 | |
| 		state.Domains[track.Domain] = domainState
 | |
| 
 | |
| 		w.WriteHeader(http.StatusAccepted)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func main() {
 | |
| 	log.SetLevel(log.DebugLevel)
 | |
| 
 | |
| 	// Setup metrics
 | |
| 	registry := prometheus.NewRegistry()
 | |
| 	state := State{
 | |
| 		Domains: make(map[string]DomainState),
 | |
| 	}
 | |
| 
 | |
| 	// Setup the scheduler
 | |
| 	scheduler, err := gocron.NewScheduler()
 | |
| 	if err != nil {
 | |
| 		log.Fatalf("Failed to create scheduler: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	_, err = scheduler.NewJob(
 | |
| 		gocron.DailyJob(1, gocron.NewAtTimes(gocron.NewAtTime(0, 0, 0))),
 | |
| 		gocron.NewTask(
 | |
| 			func() {
 | |
| 				// Acquire the lock
 | |
| 				log.Info("Starting reset procedure...")
 | |
| 				state.Lock.Lock()
 | |
| 				defer state.Lock.Unlock()
 | |
| 
 | |
| 				for domainName, domain := range state.Domains {
 | |
| 					log.Infof("Resetting counters for %s", domainName)
 | |
| 
 | |
| 					// Reset pages
 | |
| 					for page := range domain.Pages {
 | |
| 						domain.Pages[page] = 0
 | |
| 						domain.PagesMetrics[page].Set(0)
 | |
| 					}
 | |
| 
 | |
| 					// Reset referals
 | |
| 					for referer := range domain.Referrals {
 | |
| 						domain.Referrals[referer] = 0
 | |
| 						domain.ReferralsMetrics[referer].Set(0)
 | |
| 					}
 | |
| 				}
 | |
| 			},
 | |
| 		),
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		log.Fatalf("Failed to create task: %v", err)
 | |
| 		return
 | |
| 	}
 | |
| 	scheduler.Start()
 | |
| 
 | |
| 	// Set up the HTTP server
 | |
| 	http.Handle(
 | |
| 		"/metrics", promhttp.HandlerFor(
 | |
| 			registry,
 | |
| 			promhttp.HandlerOpts{
 | |
| 				EnableOpenMetrics: true,
 | |
| 			},
 | |
| 		),
 | |
| 	)
 | |
| 	http.Handle(
 | |
| 		"/track", ingestHandler(registry, &state),
 | |
| 	)
 | |
| 	addr := "127.0.0.1:9999"
 | |
| 	log.Infof("Starting server at %s", addr)
 | |
| 	http.ListenAndServe(addr, nil)
 | |
| }
 |