Skip to content
Snippets Groups Projects
Commit 476c0852 authored by Andreas Schärtl's avatar Andreas Schärtl
Browse files

src/ulocollect: make it a Git submodule

parent f98a9efa
No related branches found
No related tags found
No related merge requests found
Showing
with 4 additions and 566 deletions
......@@ -10,3 +10,6 @@
[submodule "ulo/ulo"]
path = experimental/ulo/ulo-owl
url = git@gl.mathhub.info:ulo/ulo.git
[submodule "src/ulocollect"]
path = src/ulocollect
url = git@gitlab.cs.fau.de:kissen/ulocollect.git
ulocollect @ 31cf3875
Subproject commit 31cf3875bcc4d4a0ec7fe47cdc8e4819579e61dd
package core
// A Collecter is something that can Collect ULO/RDF
// data from some source and pass it to an Importer.
type Collecter interface {
// Find ULO/RDF data and pass it to dest.
Collect(dest Importer) error
}
package core
import (
"io"
"log"
)
type DummyImporter struct{}
func (im DummyImporter) Import(rdf io.Reader) error {
log.Print("Import()")
return nil
}
package core
import (
"bufio"
"github.com/pkg/errors"
"log"
"os"
"path/filepath"
"strings"
)
// A Collecter that crawls the local file system at Path
// for ULO/RDF files.
type FileSystemCollecter struct {
Path string
}
func (fsc *FileSystemCollecter) Collect(dest Importer) error {
return filepath.Walk(fsc.Path, func(path string, f os.FileInfo, err error) error {
return fsc.visit(dest, path, f, err)
})
}
// This method is called on every file found by Collect.
func (fsc *FileSystemCollecter) visit(dest Importer, path string, f os.FileInfo, err error) error {
if err != nil {
log.Print(err)
return nil
}
if f.IsDir() {
return nil
}
if strings.HasSuffix(path, ".rdf") {
return fsc.handleRdfFile(dest, path, f)
}
log.Printf("ignoring path=%v", path)
return nil
}
// Import ULO/RDF file at path with im.
func (fsc *FileSystemCollecter) handleRdfFile(im Importer, path string, f os.FileInfo) error {
fd, err := os.Open(path)
if err != nil {
return err
}
defer fd.Close()
reader := bufio.NewReader(fd)
if err := im.Import(reader); err != nil {
return errors.Wrapf(err, "importing path=%v failed", path)
}
return nil
}
module gl.kwarc.info/supervision/schaertl_andreas/ulocollect
go 1.13
require (
github.com/google/uuid v1.1.1
github.com/kissen/stringset v1.0.0
github.com/pkg/errors v0.9.1
)
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kissen/stringset v1.0.0 h1:HyLlCU/U+XHSJpmVKjhLz4PWdFALhvJfFbRSlwZsJcA=
github.com/kissen/stringset v1.0.0/go.mod h1:Xsqah6oXc+ZO4GZgFblCNzHn6Pt6Z/wYUCnZfwXxeA0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
package core
import "io"
// An Importer is something that can import ULO/RDF data
// into some kind of storage, database or processing step.
type Importer interface {
Import(rdf io.Reader) error
}
package core
import (
"bufio"
"fmt"
"io"
"log"
)
// Middleware Importer that (1) fixes all IRIs by escaping them quite
// aggressively and then (2) forward them all to another Importer.
type IriFixImporter struct {
Next Importer
}
// Implements io.Reader for use in IriFixImporter.
type irifixreader struct {
// We read bytes from this Reader.
source io.Reader
// We put converted bytes into this ch.
ch chan byte
}
func (ifi IriFixImporter) Import(rdf io.Reader) error {
proxy := &irifixreader{
source: rdf,
ch: make(chan byte, 1024),
}
go proxy.writeToChan()
return ifi.Next.Import(proxy)
}
// Implement io.Reader
func (ifr *irifixreader) Read(p []byte) (nbytes int, err error) {
for nbytes = 0; nbytes < len(p); nbytes += 1 {
if b, ok := <-ifr.ch; !ok {
break
} else {
p[nbytes] = b
}
}
if nbytes == 0 {
return 0, io.EOF
}
return nbytes, nil
}
// Fill ifr.ch with with a fixed version of ifr.source.
func (ifr *irifixreader) writeToChan() {
br := bufio.NewReader(ifr.source)
insideQuotes := false
for {
r, _, err := br.ReadRune()
// error handling
if err == io.EOF {
close(ifr.ch)
break
}
if err != nil {
close(ifr.ch)
log.Print(err)
break
}
// state machine
if r == '"' {
insideQuotes = !insideQuotes
}
var bs []byte
if insideQuotes {
bs = ifr.fixed(r)
} else {
bs = []byte(string(r))
}
// Write out to channel (where it will eventually
// be consumed by Read).
for _, b := range bs {
ifr.ch <- b
}
}
}
func (ifr *irifixreader) fixed(r rune) []byte {
bads := []rune{
'|', '\n', ' ', '^', '\\',
}
for _, bad := range bads {
if r == bad {
return []byte(fmt.Sprintf("%%%X", r))
}
}
return []byte(string(r))
}
package core
import (
"bytes"
"io"
"io/ioutil"
"testing"
)
// Importer that stores all imported bytes to buffer sink.
type stringimporter struct {
sink []byte
}
func (sim *stringimporter) Import(rdf io.Reader) error {
if bs, err := ioutil.ReadAll(rdf); err != nil {
return err
} else {
sim.sink = bs
return nil
}
}
func (sim *stringimporter) Sink() string {
return string(sim.sink)
}
func TestZeroQuotes(t *testing.T) {
iri := `abcdefghijklmnopqrstuvwxzy`
start := bytes.NewBufferString(iri)
sim := stringimporter{}
ifim := IriFixImporter{
Next: &sim,
}
if err := ifim.Import(start); err != nil {
t.Errorf("import failed: %v", err)
}
imported := sim.Sink()
if iri != imported {
t.Errorf("iri=%v and imported=%v do not match", iri, imported)
}
}
func TestZeroChange(t *testing.T) {
iri := `abcdefghijklm"nopqrs"tuvwxzy`
start := bytes.NewBufferString(iri)
sim := stringimporter{}
ifim := IriFixImporter{
Next: &sim,
}
if err := ifim.Import(start); err != nil {
t.Errorf("import failed: %v", err)
}
imported := sim.Sink()
if iri != imported {
t.Errorf("iri=%v and imported=%v do not match", iri, imported)
}
}
func TestPipe(t *testing.T) {
iri := `abcdefghijklm"no|pqrs"tuvwxzy`
escaped := `abcdefghijklm"no%7Cpqrs"tuvwxzy`
start := bytes.NewBufferString(iri)
sim := stringimporter{}
ifim := IriFixImporter{
Next: &sim,
}
if err := ifim.Import(start); err != nil {
t.Errorf("import failed: %v", err)
}
imported := sim.Sink()
if escaped != imported {
t.Errorf("escaped=%v and imported=%v do not match", escaped, imported)
}
}
package core
import "github.com/google/uuid"
// Unique identifier for a Job.
//
// Hint: It's just a UUID.
type JobId string
// Implement the Stringer interface.
func (id JobId) String() string {
return string(id)
}
// Generate a new unique job id.
func generateJobId() JobId {
id := uuid.New().String()
return JobId(id)
}
package core
// Contains information about a submitted job. Each JobInfo
// is only a snapshot of a given state.
type JobInfo struct {
// The unique identifier of this job.
Id JobId
// The state the job was in when this JobInfo was
// created.
State JobState
// In case of State == Failed, contains the cause of
// the failure.
Error error
}
package core
// Enum for the different kinds of states a job can be in.
type JobState string
const (
// The job has been created but was not yet submitted
// to a Collector.
Created JobState = "created"
// The job is currently queued for processing but was
// not yet started.
Submitted = "submitted"
// The job is currently running and should be making
// progress.
Active = "active"
// The job has successfully finished. No error occured
// during processing.
Successful = "successful"
// The job finished, but during execution errors occured.
// A failed job should not have made any committed changes
// byte the Importer.
Failed = "failed"
)
package core
import (
"fmt"
"sync"
)
// This struct implements Scheduler. All actions are run
// locally and jobs only stored in memory. Only one submitted
// job is handled at any given time.
type LocalScheduler struct {
// Destination for all collection jobs.
Dest Importer
// Once for initialization.
on sync.Once
// History of all jobs ever submitted to this
// scheduler.
jobs sync.Map // JobId -> *localjob
// Queue of jobs to execute. LocalScheduler only runs
// one task at a given time.
todo chan *localjob
}
type localjob struct {
// The actual payload of the job.
fun Collecter
// Synchronization aid for WaitFor.
wr waitingroom
// Mutex that protects all following members of the struct.
mu sync.Mutex
// Unique id of the job as assigned by the LocalScheduler.
id JobId
// State of this job.
state JobState
// Error message when state = Failure. Otherwise nil.
failure error
}
func (ls *LocalScheduler) Submit(fun Collecter) (JobId, error) {
ls.ensureInitialized()
job := &localjob{
fun: fun,
id: generateJobId(),
state: Created,
}
if _, loaded := ls.jobs.LoadOrStore(job.id, job); loaded {
// this should never happen, well, stochastically speaking
return "", fmt.Errorf("generated id=%v not unique", job.id)
}
ls.todo <- job
return job.id, nil
}
func (ls *LocalScheduler) Info(id JobId) (JobInfo, error) {
ls.ensureInitialized()
if value, ok := ls.jobs.Load(id); !ok {
return JobInfo{}, fmt.Errorf("not entry for id=%v", id)
} else {
lj := value.(*localjob)
lj.mu.Lock()
defer lj.mu.Unlock()
return JobInfo{Id: lj.id, State: lj.state, Error: lj.failure}, nil
}
}
func (ls *LocalScheduler) WaitFor(id JobId) error {
ls.ensureInitialized()
if value, ok := ls.jobs.Load(id); !ok {
return fmt.Errorf("not entry for id=%v", id)
} else {
value.(*localjob).wr.Wait()
return nil
}
}
func (ls *LocalScheduler) loop() {
for {
j, ok := <-ls.todo
if !ok {
return // done handling jobs
}
ls.execute(j)
}
}
func (ls *LocalScheduler) execute(j *localjob) {
// set state to Active
j.mu.Lock()
j.state = Active
j.mu.Unlock()
// run the payload
err := j.fun.Collect(ls.Dest)
// update state according to return
j.mu.Lock()
if err == nil {
j.state = Successful
} else {
j.state = Failed
j.failure = err
}
j.mu.Unlock()
// release all routines waiting for this job to finish
j.wr.Release()
}
func (ls *LocalScheduler) ensureInitialized() {
ls.on.Do(func() {
// initialize complicated struct members
ls.todo = make(chan *localjob)
// run concurrent tasks
go ls.loop()
})
}
// This package contains the core of the ULO/RDF Collector
// as a Go library.
package core
package core
type Scheduler interface {
// Submit job for execution with this scheduler. Only returns
// an error when enqueuing failed. On success, returns a unique
// id for use with other methods of this scheduler.
Submit(job Collecter) (JobId, error)
// Return a JobInfo for the job with given id.
Info(id JobId) (JobInfo, error)
// Wait for completion of job with given id. Returns an error
// if the id is unknown.
WaitFor(id JobId) error
}
package core
import "sync"
// Synchronization aid that lets goroutines wait for some
// job to finish. Similar to sync.Once.
type waitingroom struct {
initer sync.Once
cond *sync.Cond
released bool
}
// Wait for anyone to call Release.
func (wr *waitingroom) Wait() {
wr.initialize()
wr.cond.L.Lock()
defer wr.cond.L.Unlock()
for !wr.released {
wr.cond.Wait()
}
}
// Release all goroutines currently stuck in Wait. Also
// results in all later calls to Wait to simply not wait
// at all.
func (wr *waitingroom) Release() {
wr.initialize()
wr.cond.L.Lock()
defer wr.cond.L.Unlock()
wr.released = true
wr.cond.Broadcast()
}
// Initialize if necessary.
func (wr *waitingroom) initialize() {
wr.initer.Do(func() {
wr.cond = sync.NewCond(&sync.Mutex{})
wr.released = false
})
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment