diff --git a/ulocollect/core/collecter.go b/ulocollect/core/collecter.go new file mode 100644 index 0000000000000000000000000000000000000000..4ba86d297ab4969c6a2d0bd6fa5bc6084b4d36dd --- /dev/null +++ b/ulocollect/core/collecter.go @@ -0,0 +1,8 @@ +package core + +// A Collecter is something that can Collect ULO/RDF +// data from some source and pass it to an Importer. +type Collecter interface { + // Find ULO/RDF data and pass it to dest. + Collect(dest Importer) error +} diff --git a/ulocollect/core/collector.go b/ulocollect/core/collector.go deleted file mode 100644 index 19bd6c1d827755ba852eb52e9112b703bbd3636c..0000000000000000000000000000000000000000 --- a/ulocollect/core/collector.go +++ /dev/null @@ -1,73 +0,0 @@ -package core - -import ( - "fmt" -) - -// A Collector schedules the execution of various Jobs. -type Collector struct { - // The Importer we forward the ULO/RDF bytes to. - Destination Importer - - // Collection of all ever sumbmitted jobs. Aquire - // mu before accessing this collection. - jobs JobMap -} - -// Submit j for execution. -func (c *Collector) Submit(j *Job) error { - id := j.Id() - - if _, loaded := c.jobs.LoadOrStore(id, j); loaded { - return fmt.Errorf("Id=%v already submitted", id) - } - - j.mu.Lock() - defer j.mu.Unlock() - - go c.execute(j) - return nil -} - -func (c *Collector) execute(j *Job) { - // Aquire lock as we need to set the status initially. - - j.mu.Lock() - - // Set state to submitted. Later we'll want to do some kind - // of scheduling at which point a job can spend a lot of time - // in this state. But in the current version we don't do any - // clever scheduling and as such the job enters the Active - // state right away. - - j.state = Submitted - - // Yep, we are starting already! - - j.state = Active - - // Now we give up the lock as we run the actual payload. - - j.mu.Unlock() - - // Run the actual payload. - - err := j.Collect(c) - - // Set completion state. - - j.mu.Lock() - - if err != nil { - j.state = Failed - j.failure = err - } else { - j.state = Successful - } - - j.wr.Release() - - // We are done mutating the Job. - - j.mu.Unlock() -} diff --git a/ulocollect/core/concurrent_buffer.go b/ulocollect/core/concurrent_buffer.go deleted file mode 100644 index 4751affb0faeb69d3b87e017ce877961643e2f56..0000000000000000000000000000000000000000 --- a/ulocollect/core/concurrent_buffer.go +++ /dev/null @@ -1,54 +0,0 @@ -package core - -import ( - "bytes" - "sync" -) - -// Implement something very much like bytes.Buffer, but with -// thread-safe methods. -type concurrentbuffer struct { - buf bytes.Buffer - mu sync.RWMutex -} - -// Implement io.Reader. -func (b *concurrentbuffer) Read(p []byte) (n int, err error) { - b.mu.Lock() - defer b.mu.Unlock() - - return b.buf.Read(p) -} - -// Implement io.Writer. -func (b *concurrentbuffer) Write(p []byte) (n int, err error) { - b.mu.Lock() - defer b.mu.Unlock() - - return b.buf.Write(p) -} - -// Truncate the buffer to 0 elements. -func (b *concurrentbuffer) Reset() { - b.mu.Lock() - defer b.mu.Unlock() - - b.buf.Reset() -} - -// Return a deep copy of the underlying bytes. -func (b *concurrentbuffer) Bytes() []byte { - b.mu.RLock() - defer b.mu.RUnlock() - - src := b.buf.Bytes() - return append([]byte(nil), src...) -} - -// Return a printable version of the buffer. -func (b *concurrentbuffer) String() string { - b.mu.RLock() - defer b.mu.RUnlock() - - return b.buf.String() -} diff --git a/ulocollect/core/dummy_job.go b/ulocollect/core/dummy_job.go deleted file mode 100644 index 6da7aa73f99d8e30527719dcb2dcb04eee5b4ab4..0000000000000000000000000000000000000000 --- a/ulocollect/core/dummy_job.go +++ /dev/null @@ -1,13 +0,0 @@ -package core - -// Return a new dummy job. Running it does nothing. -func NewDummyJob() *Job { - return &Job{ - collectfunc: func(self *Job, c *Collector) error { - self.Log("called!") - return nil - }, - id: generateJobId(), - state: Created, - } -} diff --git a/ulocollect/core/file_system_collecter.go b/ulocollect/core/file_system_collecter.go new file mode 100644 index 0000000000000000000000000000000000000000..0bde529e9f67faa8d005bd8c2d8fb438f95cc86a --- /dev/null +++ b/ulocollect/core/file_system_collecter.go @@ -0,0 +1,58 @@ +package core + +import ( + "bufio" + "github.com/pkg/errors" + "log" + "os" + "path/filepath" + "strings" +) + +// A Collecter that crawls the local file system at Path +// for ULO/RDF files. +type FileSystemCollecter struct { + Path string +} + +func (fsc *FileSystemCollecter) Collect(dest Importer) error { + return filepath.Walk(fsc.Path, func(path string, f os.FileInfo, err error) error { + return fsc.visit(dest, path, f, err) + }) +} + +// This method is called on every file found by Collect. +func (fsc *FileSystemCollecter) visit(dest Importer, path string, f os.FileInfo, err error) error { + if err != nil { + log.Print(err) + return nil + } + + if f.IsDir() { + return nil + } + + if strings.HasSuffix(path, ".rdf") { + return fsc.handleRdfFile(dest, path, f) + } + + log.Printf("ignoring path=%v", path) + return nil +} + +// Import ULO/RDF file at path with im. +func (fsc *FileSystemCollecter) handleRdfFile(im Importer, path string, f os.FileInfo) error { + fd, err := os.Open(path) + if err != nil { + return err + } + + defer fd.Close() + + reader := bufio.NewReader(fd) + if err := im.Import(reader); err != nil { + return errors.Wrapf(err, "importing path=%v failed", path) + } + + return nil +} diff --git a/ulocollect/core/fs_job.go b/ulocollect/core/fs_job.go deleted file mode 100644 index f29cc378ebb3f20e8a854966b87abf48c64d4da0..0000000000000000000000000000000000000000 --- a/ulocollect/core/fs_job.go +++ /dev/null @@ -1,58 +0,0 @@ -package core - -import ( - "bufio" - "log" - "os" - "path/filepath" - "strings" -) - -// Return a new Job that will scan path recursively for ULO/RDF files -// and pass them to the executing Importer. -func NewFileSystemJob(path string) *Job { - return &Job{ - collectfunc: makeFsCollectFunc(path), - id: generateJobId(), - state: Created, - } -} - -// Function that is called when collecting ULO/RDF from the local -// file system. -func makeFsCollectFunc(path string) func(*Job, *Collector) error { - return func(j *Job, c *Collector) error { - return filepath.Walk(path, func(path string, f os.FileInfo, err error) error { - if err != nil { - j.Logf("error walking path=%v: %v", path, err) - return nil - } - - log.Println(path) - return importLocalFile(c.Destination, path) - }) - } -} - -// Import file at path with im. Returns no error if the file at -// path is a not supported type. -func importLocalFile(im Importer, path string) error { - if strings.HasSuffix(path, ".rdf") { - return importLocalRdfFile(im, path) - } - - return nil -} - -// Import ULO/RDF file at path with im. -func importLocalRdfFile(im Importer, path string) error { - fd, err := os.Open(path) - if err != nil { - return err - } - - defer fd.Close() - - r := bufio.NewReader(fd) - return im.Import(r) -} diff --git a/ulocollect/core/importer.go b/ulocollect/core/importer.go index e1739df289558a7e4b400bad2513c0b883080cf4..e1daad7fbecc4cc4408b1bb923452897daeb63f1 100644 --- a/ulocollect/core/importer.go +++ b/ulocollect/core/importer.go @@ -2,9 +2,8 @@ package core import "io" -// Represents an Importer either locally or somewhere on the -// network. +// An Importer is something that can import ULO/RDF data +// into some kind of storage, database or processing step. type Importer interface { - // Import the given ULO/RDF byte stream. Import(rdf io.Reader) error } diff --git a/ulocollect/core/job.go b/ulocollect/core/job.go deleted file mode 100644 index bfa0824e1518620740fb64fbfcfa6bb20cfe732d..0000000000000000000000000000000000000000 --- a/ulocollect/core/job.go +++ /dev/null @@ -1,102 +0,0 @@ -package core - -import ( - "fmt" - "github.com/pkg/errors" - "io" - "log" - "sync" -) - -type Job struct { - // Function to call on Collect. This is the function - // that will (1) update the state of this job, (2) - // do the actual importing, (3) write to the Job-internal - // log and eventually (4) finish either with success - // or in failure. - collectfunc func(self *Job, c *Collector) error - - // Waiting room for people to wait on completion of this - // job. - wr waitingroom - - // Global lock for all following members. - mu sync.Mutex - - // Unique identifier of this job. - id JobId - - // The state the job is currently in. - state JobState - - // When state is set to Failed, contains the reason - // for this failure. - failure error - - // Recorded logs for this job. - logged concurrentbuffer -} - -// Return the unique identifier of this job. -func (bj *Job) Id() JobId { - bj.mu.Lock() - defer bj.mu.Unlock() - - return bj.id -} - -// Return the current state of this job. -func (bj *Job) State() JobState { - bj.mu.Lock() - defer bj.mu.Unlock() - - return bj.state -} - -// When State is Failed, return the error that caused -// the failure. When State is not failed, this method -// always returns nil. -func (bj *Job) Error() error { - bj.mu.Lock() - defer bj.mu.Unlock() - - return bj.failure -} - -// This is a synchronous operation that might take a long -// time. Instead of invoking Collect directly, you should -// create a Collector and Submit this job to it. -func (bj *Job) Collect(c *Collector) error { - if bj.collectfunc == nil { - return errors.New("collectfunc not set on Job") - } - - bj.collectfunc(bj, c) - return nil -} - -// Write to the log associated with this Job. The log message -// will be output with the standard logger as well as recorded -// to the buffer accessible with Job.Logs(). -func (bj *Job) Log(v ...interface{}) { - log.Output(2, fmt.Sprint(v...)) -} - -// Write a formatted string to the log associated with this -// Job. The log message will be output with the standard -// logger as well as recorded to the buffer accessible with -// Job.Logs(). -func (bj *Job) Logf(format string, v ...interface{}) { - log.Output(2, fmt.Sprintf(format, v...)) -} - -// Return the log stream of this job. -func (bj *Job) Logs() io.Reader { - panic("not yet implemented") -} - -// Wait for this job to complete. If this job was never submitted -// before a call to Wait, this call to Wait will never return. -func (bj *Job) Wait() { - bj.wr.Wait() -} diff --git a/ulocollect/core/job_info.go b/ulocollect/core/job_info.go new file mode 100644 index 0000000000000000000000000000000000000000..8126e613e384e3dfbb01edbf9ce1f64c8e03eb2a --- /dev/null +++ b/ulocollect/core/job_info.go @@ -0,0 +1,16 @@ +package core + +// Contains information about a submitted job. Each JobInfo +// is only a snapshot of a given state. +type JobInfo struct { + // The unique identifier of this job. + Id JobId + + // The state the job was in when this JobInfo was + // created. + State JobState + + // In case of State == Failed, contains the cause of + // the failure. + Error error +} diff --git a/ulocollect/core/job_map.go b/ulocollect/core/job_map.go deleted file mode 100644 index 732f8d1d3b63aaf140d85ec8f71c7e009d89dd54..0000000000000000000000000000000000000000 --- a/ulocollect/core/job_map.go +++ /dev/null @@ -1,34 +0,0 @@ -package core - -import "sync" - -// A wrapper around sync.Map for *Job. -type JobMap struct { - wrapped sync.Map -} - -func (m *JobMap) Store(key JobId, value *Job) { - m.wrapped.Store(key, value) -} - -func (m *JobMap) Load(key JobId) (value *Job, ok bool) { - if value, ok := m.wrapped.Load(key); !ok { - return nil, false - } else { - return value.(*Job), true - } -} - -func (m *JobMap) LoadOrStore(key JobId, value *Job) (actual *Job, loaded bool) { - if iactual, iloaded := m.wrapped.LoadOrStore(key, value); iactual != nil { - return iactual.(*Job), iloaded - } else { - return nil, iloaded - } -} - -func (m *JobMap) Range(f func(key JobId, value *Job) bool) { - m.wrapped.Range(func(ikey, ivalue interface{}) bool { - return f(ikey.(JobId), ivalue.(*Job)) - }) -} diff --git a/ulocollect/core/local_scheduler.go b/ulocollect/core/local_scheduler.go new file mode 100644 index 0000000000000000000000000000000000000000..58f842455ea82fac45262511ebb6e69b5eface2b --- /dev/null +++ b/ulocollect/core/local_scheduler.go @@ -0,0 +1,140 @@ +package core + +import ( + "fmt" + "sync" +) + +// This struct implements Scheduler. All actions are run +// locally and jobs only stored in memory. Only one submitted +// job is handled at any given time. +type LocalScheduler struct { + // Destination for all collection jobs. + Dest Importer + + // Once for initialization. + on sync.Once + + // History of all jobs ever submitted to this + // scheduler. + jobs sync.Map // JobId -> *localjob + + // Queue of jobs to execute. LocalScheduler only runs + // one task at a given time. + todo chan *localjob +} + +type localjob struct { + // The actual payload of the job. + fun Collecter + + // Synchronization aid for WaitFor. + wr waitingroom + + // Mutex that protects all following members of the struct. + mu sync.Mutex + + // Unique id of the job as assigned by the LocalScheduler. + id JobId + + // State of this job. + state JobState + + // Error message when state = Failure. Otherwise nil. + failure error +} + +func (ls *LocalScheduler) Submit(fun Collecter) (JobId, error) { + ls.ensureInitialized() + + job := &localjob{ + fun: fun, + id: generateJobId(), + state: Created, + } + + if _, loaded := ls.jobs.LoadOrStore(job.id, job); loaded { + // this should never happen, well, stochastically speaking + return "", fmt.Errorf("generated id=%v not unique", job.id) + } + + ls.todo <- job + return job.id, nil +} + +func (ls *LocalScheduler) Info(id JobId) (JobInfo, error) { + ls.ensureInitialized() + + if value, ok := ls.jobs.Load(id); !ok { + return JobInfo{}, fmt.Errorf("not entry for id=%v", id) + } else { + lj := value.(*localjob) + + lj.mu.Lock() + defer lj.mu.Unlock() + + return JobInfo{Id: lj.id, State: lj.state, Error: lj.failure}, nil + } +} + +func (ls *LocalScheduler) WaitFor(id JobId) error { + ls.ensureInitialized() + + if value, ok := ls.jobs.Load(id); !ok { + return fmt.Errorf("not entry for id=%v", id) + } else { + value.(*localjob).wr.Wait() + return nil + } +} + +func (ls *LocalScheduler) loop() { + for { + j, ok := <-ls.todo + + if !ok { + return // done handling jobs + } + + ls.execute(j) + } +} + +func (ls *LocalScheduler) execute(j *localjob) { + // set state to Active + + j.mu.Lock() + j.state = Active + j.mu.Unlock() + + // run the payload + + err := j.fun.Collect(ls.Dest) + + // update state according to return + + j.mu.Lock() + + if err == nil { + j.state = Successful + } else { + j.state = Failed + j.failure = err + } + + j.mu.Unlock() + + // release all routines waiting for this job to finish + + j.wr.Release() +} + +func (ls *LocalScheduler) ensureInitialized() { + ls.on.Do(func() { + // initialize complicated struct members + ls.todo = make(chan *localjob) + + // run concurrent tasks + go ls.loop() + }) +} diff --git a/ulocollect/core/scheduler.go b/ulocollect/core/scheduler.go new file mode 100644 index 0000000000000000000000000000000000000000..19a8bda9f74234b71e2b0ecb0daa9bb19afdc015 --- /dev/null +++ b/ulocollect/core/scheduler.go @@ -0,0 +1,15 @@ +package core + +type Scheduler interface { + // Submit job for execution with this scheduler. Only returns + // an error when enqueuing failed. On success, returns a unique + // id for use with other methods of this scheduler. + Submit(job Collecter) (JobId, error) + + // Return a JobInfo for the job with given id. + Info(id JobId) (JobInfo, error) + + // Wait for completion of job with given id. Returns an error + // if the id is unknown. + WaitFor(id JobId) error +} diff --git a/ulocollect/core/waiting_room.go b/ulocollect/core/waiting_room.go index 678525c890fe243fb411e2813183883117cd3dcc..a62a8a5c37954fa3622c083f1319125e88f2601e 100644 --- a/ulocollect/core/waiting_room.go +++ b/ulocollect/core/waiting_room.go @@ -2,6 +2,8 @@ package core import "sync" +// Synchronization aid that lets goroutines wait for some +// job to finish. Similar to sync.Once. type waitingroom struct { initer sync.Once cond *sync.Cond @@ -30,7 +32,6 @@ func (wr *waitingroom) Release() { defer wr.cond.L.Unlock() wr.released = true - wr.cond.Broadcast() } diff --git a/ulocollect/main.go b/ulocollect/main.go index 6dc1fa37167bfbbaeb194ea562f0b96cc183d322..42ac28f9da9cd6430227db35438688af4341afe1 100644 --- a/ulocollect/main.go +++ b/ulocollect/main.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "gl.kwarc.info/supervision/schaertl_andreas/ulocollect/core" "log" ) @@ -9,16 +8,22 @@ import ( func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) - i := core.DummyImporter{} - j := core.NewFileSystemJob("/mnt/ramdisk") - c := core.Collector{ - Destination: i, + s := core.LocalScheduler{ + Dest: core.DummyImporter{}, } - if err := c.Submit(j); err != nil { - fmt.Println(err) + c := core.FileSystemCollecter{ + Path: "/home/ats/Synchronized/Dokumente", } - fmt.Println("/done") - j.Wait() + id, err := s.Submit(&c) + if err != nil { + log.Fatal(err) + } + + log.Printf("submitted with id=%v", id) + + if err := s.WaitFor(id); err != nil { + log.Fatal(err) + } }