diff --git a/ulocollect/core/base_job.go b/ulocollect/core/base_job.go new file mode 100644 index 0000000000000000000000000000000000000000..17a67e302537a0e55f4e77577e5440fc96eedc4a --- /dev/null +++ b/ulocollect/core/base_job.go @@ -0,0 +1,68 @@ +package core + +import ( + "fmt" + "github.com/pkg/errors" + "log" + "io" + "bytes" + "sync" +) + +type basejob struct { + // Function to call on Collect + collectfunc func(c *Collector) error + + // Global lock for all following members. + mu sync.Mutex + + // Unique identifier of this job. + id JobId + + // The state the job is currently in. + state JobState + + // When state is set to Failed, contains the reason + // for this failure. + failure error + + // Recorded logs for this job. + logged bytes.Buffer +} + +func (bj *basejob) Id() JobId { + bj.mu.Lock() + defer bj.mu.Unlock() + + return bj.id +} + +func (bj *basejob) State() JobState { + bj.mu.Lock() + defer bj.mu.Unlock() + + return bj.state +} + +func (bj *basejob) Error() error { + bj.mu.Lock() + defer bj.mu.Unlock() + + return bj.failure +} + +func (bj *basejob) Collect(c *Collector) error { + return errors.New("not implemented") +} + +func (bj *basejob) Log(v ...interface{}) { + log.Output(3, fmt.Sprint(v...)) +} + +func (bj *basejob) Logf(format string, v ...interface{}) { + log.Output(3, fmt.Sprintf(format, v...)) +} + +func (bj *basejob) Logs() io.Reader { + panic("not yet implemented") +} diff --git a/ulocollect/core/collector.go b/ulocollect/core/collector.go new file mode 100644 index 0000000000000000000000000000000000000000..26f6b2fc765537a3cba6d0e1239b951a9cf5ac75 --- /dev/null +++ b/ulocollect/core/collector.go @@ -0,0 +1,72 @@ +package core + +import ( + "fmt" + "sync" +) + +// A Collector schedules the execution of various Jobs. +type Collector struct { + // The Importer we forward the ULO/RDF bytes to. + im Importer + + // Global lock. Aquire it before changing any state + // in Collector. + mu sync.Mutex + + // Whether this Collector is currently processing request, + // i.e. whether Collector.loop is running. Aquire mu + // before accessing this member. + looping bool + + // Collection of all ever sumbmitted jobs. Aquire + // mu before accessing this collection. + jobs map[JobId]Job + + // Here we keep duplicate references to the submitted jobs. + // We need to keep some kind of order and also want to be + // able to report on the current state of all queues. As + // such we have to use lists and cannot use channels here. + + // List of submitted, but not currently running, jobs. + // Aquire mu before accessing this collection. + submitted []Job + + // List of active jobs. Aquire mu before accessing this + // collection. + active []Job + + // List of completed, i.e. successful or failed, jobs. + // Aquire mu before accessing this collection. + completed []Job +} + +func (c *Collector) Submit(j Job) error { + c.mu.Lock() + defer c.mu.Unlock() + + // check whether j was already submitted previously + + id := j.Id() + + if _, ok := c.jobs[id]; ok { + return fmt.Errorf("id=%v already submitted", id) + } + + // it's a new job, enqueue it + + c.jobs[id] = j + c.submitted = append(c.submitted, j) + + // if the collector isn't running, start it + + if !c.looping { + go c.loop() + c.looping = true + } + + return nil +} + +func (c *Collector) loop() { +} diff --git a/ulocollect/core/concurrent_buffer.go b/ulocollect/core/concurrent_buffer.go new file mode 100644 index 0000000000000000000000000000000000000000..1fb3b2fcd4a34229144c0b3800d8421679f055de --- /dev/null +++ b/ulocollect/core/concurrent_buffer.go @@ -0,0 +1,34 @@ +package core + +import ( + "bytes" + "sync" +) + +// Implement something very much like bytes.Buffer, but with +// thread-safe methods. +type concurrentbuffer struct { + buf bytes.Buffer + mu sync.Mutex +} + +func (b *concurrentbuffer) Read(p []byte) (n int, err error) { + b.mu.Lock() + b.mu.Unlock() + + return b.buf.Read(p) +} + +func (b *concurrentbuffer) Write(p []byte) (n int, err error) { + b.mu.Lock() + b.mu.Unlock() + + return b.buf.Write(p) +} + +func (b *concurrentbuffer) String() string { + b.mu.Lock() + b.mu.Unlock() + + return b.buf.String() +} diff --git a/ulocollect/core/importer.go b/ulocollect/core/importer.go new file mode 100644 index 0000000000000000000000000000000000000000..ab13724ee9165334e0d150accc51493a46c72ec8 --- /dev/null +++ b/ulocollect/core/importer.go @@ -0,0 +1,12 @@ +package core + +import ( + "io" +) + +// Represents an Importer either locally or somewhere on the +// network. +type Importer interface { + // Import the given ULO/RDF byte stream. + Import(rdf io.Reader) error +} diff --git a/ulocollect/core/job.go b/ulocollect/core/job.go new file mode 100644 index 0000000000000000000000000000000000000000..d3214334703af7d9eeb6c48393e52d2f4c173023 --- /dev/null +++ b/ulocollect/core/job.go @@ -0,0 +1,38 @@ +package core + +import "io" + +// A single collection job. Defines as an interface as there are all +// kinds of different collection jobs as the respective source +// repository is different. +type Job interface { + // Return the unique identifier of this job. + Id() JobId + + // Return the current state of this job. + State() JobState + + // When State is Failed, return the error that caused + // the failure. When State is not failed, this method + // always returns nil. + Error() error + + // This is a synchronous operation that might take a long + // time. Instead of invoking Collect directly, you should + // create a Collector and Submit this job to it. + Collect(c *Collector) error + + // Write to the log associated with this Job. The log message + // will be output with the standard logger as well as recorded + // to the buffer accessible with Job.Logs(). + Log(v ...interface{}) + + // Write a formatted string to the log associated with this + // Job. The log message will be output with the standard + // logger as well as recorded to the buffer accessible with + // Job.Logs(). + Logf(format string, v ...interface{}) + + // Return the log stream of this job. + Logs() io.Reader +} diff --git a/ulocollect/core/job_id.go b/ulocollect/core/job_id.go new file mode 100644 index 0000000000000000000000000000000000000000..05cfac5b916377ab6da1f675576d8102d6a01b0b --- /dev/null +++ b/ulocollect/core/job_id.go @@ -0,0 +1,19 @@ +package core + +import "github.com/google/uuid" + +// Unique identifier for a Job. +// +// Hint: It's just a UUID. +type JobId string + +// Implement the Stringer interface. +func (id JobId) String() string { + return string(id) +} + +// Generate a new unique job id. +func generateJobId() JobId { + id := uuid.New().String() + return JobId(id) +} diff --git a/ulocollect/core/job_state.go b/ulocollect/core/job_state.go new file mode 100644 index 0000000000000000000000000000000000000000..e22a8bf7b93fa9537a07d3794f277ea35d9ecbe4 --- /dev/null +++ b/ulocollect/core/job_state.go @@ -0,0 +1,27 @@ +package core + +// Enum for the different kinds of states a job can be in. +type JobState string + +const ( + // The job has been created but was not yet submitted + // to a Collector. + Created JobState = "created" + + // The job is currently queued for processing but was + // not yet started. + Submitted = "submitted" + + // The job is currently running and should be making + // progress. + Active = "active" + + // The job has successfully finished. No error occured + // during processing. + Successful = "successful" + + // The job finished, but during execution errors occured. + // A failed job should not have made any committed changes + // byte the Importer. + Failed = "failed" +) diff --git a/ulocollect/core/package.go b/ulocollect/core/package.go new file mode 100644 index 0000000000000000000000000000000000000000..9b3ebdf478153093103a8369ba8b5546bbc02de9 --- /dev/null +++ b/ulocollect/core/package.go @@ -0,0 +1,3 @@ +// This package contains the core of the ULO/RDF Collector +// as a Go library. +package core diff --git a/ulocollect/go.mod b/ulocollect/go.mod index 58a3793ec3b3ef721398bbf8630d4d3c7a0697fe..0981a5f4f22a9a2a24b9c99604df0a1c8c598be0 100644 --- a/ulocollect/go.mod +++ b/ulocollect/go.mod @@ -1,3 +1,9 @@ module gl.kwarc.info/supervision/schaertl_andreas/uoimport go 1.13 + +require ( + github.com/google/uuid v1.1.1 + github.com/kissen/stringset v1.0.0 + github.com/pkg/errors v0.9.1 +) diff --git a/ulocollect/go.sum b/ulocollect/go.sum new file mode 100644 index 0000000000000000000000000000000000000000..ab7de5c637a25e61e823b7b55079d663f8fffeee --- /dev/null +++ b/ulocollect/go.sum @@ -0,0 +1,6 @@ +github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= +github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kissen/stringset v1.0.0 h1:HyLlCU/U+XHSJpmVKjhLz4PWdFALhvJfFbRSlwZsJcA= +github.com/kissen/stringset v1.0.0/go.mod h1:Xsqah6oXc+ZO4GZgFblCNzHn6Pt6Z/wYUCnZfwXxeA0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=