summaryrefslogtreecommitdiff
path: root/lib/task.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/task.rb')
-rw-r--r--lib/task.rb430
1 files changed, 325 insertions, 105 deletions
diff --git a/lib/task.rb b/lib/task.rb
index 1ab3893..0ee3a11 100644
--- a/lib/task.rb
+++ b/lib/task.rb
@@ -1,42 +1,207 @@
-$self_task=nil
module OpenTox
+ # Class for handling asynchronous tasks
class Task
+ include OpenTox
+ attr_accessor :http_code, :due_to_time
+
+ def initialize(uri=nil)
+ super uri
+ @metadata = {
+ DC.title => "",
+ DC.date => "",
+ OT.hasStatus => "Running",
+ OT.percentageCompleted => 0.0,
+ OT.resultURI => "",
+ DC.creator => "", # not mandatory according to API
+ DC.description => "", # not mandatory according to API
+ }
+ end
+
+ # Create a new task for the code in the block. Catches halts and exceptions and sets task state to error if necessary. The block has to return the URI of the created resource.
+ # @example
+ # task = OpenTox::Task.create do
+ # # this code will be executed as a task
+ # model = OpenTox::Algorithm.run(params) # this can be time consuming
+ # model.uri # Important: return URI of the created resource
+ # end
+ # task.status # returns "Running", because tasks are forked
+ # @param [String] title Task title
+ # @param [String] creator Task creator
+ # @return [OPenTox::Task] Task
+ def self.create( title=nil, creator=nil, max_duration=DEFAULT_TASK_MAX_DURATION, description=nil )
+
+ params = {:title=>title, :creator=>creator, :max_duration=>max_duration, :description=>description }
+ task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, {}, nil, false).to_s
+ task = Task.new(task_uri.chomp)
+
+ # measure current memory consumption
+ memory = `free -m|sed -n '2p'`.split
+ free_memory = memory[3].to_i + memory[6].to_i # include cache
+ if free_memory < 20 # require at least 200 M free memory
+ LOGGER.warn "Cannot start task - not enough memory left (#{free_memory} M free)"
+ task.cancel
+ return task
+ #raise "Insufficient memory to start a new task"
+ end
+
+ cpu_load = `cat /proc/loadavg`.split(/\s+/)[0..2].collect{|c| c.to_f}
+ nr_cpu_cores = `cat /proc/cpuinfo |grep "cpu cores"|cut -d ":" -f2|tr -d " "`.split("\n").collect{|c| c.to_i}.inject{|sum,n| sum+n}
+ nr_cpu_cores = 1 if !nr_cpu_cores
+ #if cpu_load[0] > nr_cpu_cores and cpu_load[0] > cpu_load[1] and cpu_load[1] > cpu_load[2] # average CPU load of the last minute is high and CPU load is increasing
+ # LOGGER.warn "Cannot start task - CPU load too high (#{cpu_load.join(", ")})"
+ # task.cancel
+ # return task
+ # #raise "Server too busy to start a new task"
+ #end
+
+ task_pid = Spork.spork(:logger => LOGGER) do
+ LOGGER.debug "Task #{task.uri} started #{Time.now}"
+ begin
+ result = yield task
+ LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
+ task.completed(result)
+ rescue => error
+ LOGGER.error "task failed: "+error.class.to_s+": "+error.message
+ LOGGER.error ":\n"+error.backtrace.join("\n")
+ task.error(OpenTox::ErrorReport.create(error, creator))
+ end
+ end
+ task.pid = task_pid
+ LOGGER.debug "Started task: "+task.uri.to_s
+ task
+ end
+
+ # Find a task for querying, status changes
+ # @param [String] uri Task URI
+ # @return [OpenTox::Task] Task object
+ def self.find(uri)
+ return nil unless uri
+ task = Task.new(uri)
+ task.load_metadata
+ raise "could not load task metadata" if task.metadata==nil or task.metadata.size==0
+ task
+ end
+
+ # Find a task for querying, status changes
+ # @param [String] uri Task URI
+ # @return [OpenTox::Task] Task object
+ def self.exist?(uri)
+ begin
+ return find(uri)
+ rescue
+ end
+ end
+
+ # Get a list of all tasks
+ # @param [optional, String] uri URI of task service
+ # @return [text/uri-list] Task URIs
+ def self.all(uri=CONFIG[:services]["opentox-task"])
+ OpenTox.all uri
+ end
- # due_to_time is only set in local tasks
- TASK_ATTRIBS = [ :uri, :date, :title, :creator, :description, :hasStatus, :percentageCompleted, :resultURI, :due_to_time ]
- TASK_ATTRIBS.each{ |a| attr_accessor(a) }
- attr_accessor :http_code
+ def self.from_yaml(yaml)
+ @metadata = YAML.load(yaml)
+ end
- private
- def initialize(uri)
- @uri = uri.to_s.strip
+ def self.from_rdfxml(rdfxml)
+ owl = OpenTox::Parser::Owl.from_rdf(rdfxml, OT.Task)
+ task = Task.new(owl.uri)
+ task.add_metadata(owl.metadata)
+ task
+ end
+
+ def to_rdfxml
+ s = Serializer::Owl.new
+ @metadata[OT.errorReport] = @uri+"/ErrorReport/tmpId" if @error_report
+ s.add_task(@uri,@metadata)
+ s.add_resource(@uri+"/ErrorReport/tmpId", OT.errorReport, @error_report.rdf_content) if @error_report
+ s.to_rdfxml
+ end
+
+ def status
+ @metadata[OT.hasStatus]
+ end
+
+ def result_uri
+ @metadata[OT.resultURI]
+ end
+
+ def description
+ @metadata[DC.description]
end
- # create is private now, use OpenTox::Task.as_task
- def self.create( params )
- task_uri = RestClientWrapper.post(@@config[:services]["opentox-task"], params, nil, false).to_s
- Task.find(task_uri.chomp)
+ def errorReport
+ @metadata[OT.errorReport]
end
-
- public
- def self.find( uri, accept_header=nil )
- task = Task.new(uri)
- task.reload( accept_header )
- return task
+
+ def cancel
+ RestClientWrapper.put(File.join(@uri,'Cancelled'),{:cannot_be => "empty"})
+ load_metadata
+ end
+
+ def completed(uri)
+ RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
+ load_metadata
+ end
+
+ def error(error_report)
+ raise "no error report" unless error_report.is_a?(OpenTox::ErrorReport)
+ RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml})
+ load_metadata
end
+ # not stored just for to_rdf
+ def add_error_report( error_report )
+ @error_report = error_report
+ end
+
+ def pid=(pid)
+ RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
+ end
+
+ def running?
+ @metadata[OT.hasStatus] == 'Running'
+ end
+
+ def completed?
+ @metadata[OT.hasStatus] == 'Completed'
+ end
+
+ def error?
+ @metadata[OT.hasStatus] == 'Error'
+ end
+
+ def load_metadata
+ if (CONFIG[:yaml_hosts].include?(URI.parse(@uri).host))
+ result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, nil, false)
+ @metadata = YAML.load result.to_s
+ @http_code = result.code
+ else
+ @metadata = Parser::Owl::Generic.new(@uri).load_metadata
+ @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, nil, false).code
+ end
+ raise "could not load task metadata for task "+@uri.to_s if @metadata==nil || @metadata.size==0
+ end
+
+ # create is private now, use OpenTox::Task.as_task
+ #def self.create( params )
+ #task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, {}, false).to_s
+ #Task.find(task_uri.chomp)
+ #end
+
+=begin
def self.from_data(data, content_type, code, base_uri)
task = Task.new(nil)
task.http_code = code
task.reload_from_data(data, content_type, base_uri)
return task
end
-
+
def reload( accept_header=nil )
unless accept_header
- if (@@config[:yaml_hosts].include?(URI.parse(uri).host))
+ if (CONFIG[:yaml_hosts].include?(URI.parse(uri).host))
accept_header = "application/x-yaml"
else
accept_header = 'application/rdf+xml'
@@ -64,113 +229,168 @@ module OpenTox
end
raise "uri is null after loading" unless @uri and @uri.to_s.strip.size>0
end
-
- def cancel
- RestClientWrapper.put(File.join(@uri,'Cancelled'))
- reload
- end
-
- def completed(uri)
- RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
- reload
- end
-
- def error(description)
- RestClientWrapper.put(File.join(@uri,'Error'),{:description => description.to_s[0..2000]})
- reload
- end
-
- def pid=(pid)
- RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
- end
-
- def running?
- @hasStatus.to_s == 'Running'
- end
-
- def completed?
- @hasStatus.to_s == 'Completed'
- end
-
- def error?
- @hasStatus.to_s == 'Error'
- end
+=end
# waits for a task, unless time exceeds or state is no longer running
- def wait_for_completion(dur=0.3)
+ # @param [optional,OpenTox::Task] waiting_task (can be a OpenTox::Subtask as well), progress is updated accordingly
+ # @param [optional,Numeric] dur seconds pausing before cheking again for completion
+ def wait_for_completion( waiting_task=nil, dur=0.3)
- if (@uri.match(@@config[:services]["opentox-task"]))
- due_to_time = (@due_to_time.is_a?(Time) ? @due_to_time : Time.parse(@due_to_time))
- running_time = due_to_time - (@date.is_a?(Time) ? @date : Time.parse(@date))
- else
- # the date of the external task cannot be trusted, offest to local time might be to big
- due_to_time = Time.new + EXTERNAL_TASK_MAX_DURATION
- running_time = EXTERNAL_TASK_MAX_DURATION
- end
+ waiting_task.waiting_for(self.uri) if waiting_task
+ due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION
LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s
+ load_metadata # for extremely fast tasks
+ check_state
while self.running?
sleep dur
- reload
+ load_metadata
+ # if another (sub)task is waiting for self, set progress accordingly
+ waiting_task.progress(@metadata[OT.percentageCompleted].to_f) if waiting_task
check_state
if (Time.new > due_to_time)
- raise "max wait time exceeded ("+running_time.to_s+"sec), task: '"+@uri.to_s+"'"
+ raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'"
end
end
-
- LOGGER.debug "Task '"+@hasStatus+"': "+@uri.to_s+", Result: "+@resultURI.to_s
+ waiting_task.waiting_for(nil) if waiting_task
+ LOGGER.debug "Task '"+@metadata[OT.hasStatus].to_s+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s
end
-
+
+ # updates percentageCompleted value (can only be increased)
+ # task has to be running
+ # @param [Numeric] pct value between 0 and 100
+ def progress(pct)
+ #puts "task := "+pct.to_s
+ raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100
+ if (pct > @metadata[OT.percentageCompleted] + 0.0001)
+ RestClientWrapper.put(File.join(@uri,'Running'),{:percentageCompleted => pct})
+ load_metadata
+ end
+ end
+
+ def waiting_for(task_uri)
+ RestClientWrapper.put(File.join(@uri,'Running'),{:waiting_for => task_uri})
+ end
+
+ private
+ VALID_TASK_STATES = ["Cancelled", "Completed", "Running", "Error"]
+
def check_state
begin
- raise "illegal task state, task is completed, resultURI is no URI: '"+@resultURI.to_s+
- "'" unless @resultURI and Utils.is_uri?(@resultURI) if completed?
-
+ raise "illegal task state, invalid status: '"+@metadata[OT.hasStatus].to_s+"'" unless
+ @metadata[OT.hasStatus] unless VALID_TASK_STATES.include?(@metadata[OT.hasStatus])
+ raise "illegal task state, task is completed, resultURI is no URI: '"+@metadata[OT.resultURI].to_s+
+ "'" unless @metadata[OT.resultURI] and @metadata[OT.resultURI].to_s.uri? if completed?
if @http_code == 202
- raise "illegal task state, code is 202, but hasStatus is not Running: '"+@hasStatus+"'" unless running?
+ raise "#{@uri}: illegal task state, code is 202, but hasStatus is not Running: '"+@metadata[OT.hasStatus]+"'" unless running?
elsif @http_code == 201
- raise "illegal task state, code is 201, but hasStatus is not Completed: '"+@hasStatus+"'" unless completed?
- raise "illegal task state, code is 201, resultURI is no task-URI: '"+@resultURI.to_s+
- "'" unless @resultURI and Utils.task_uri?(@resultURI)
+ raise "#{@uri}: illegal task state, code is 201, but hasStatus is not Completed: '"+@metadata[OT.hasStatus]+"'" unless completed?
+ raise "#{@uri}: illegal task state, code is 201, resultURI is no task-URI: '"+@metadata[OT.resultURI].to_s+
+ "'" unless @metadata[OT.resultURI] and @metadata[OT.resultURI].to_s.uri?
end
rescue => ex
- RestClientWrapper.raise_uri_error(ex.message, @uri)
+ raise OpenTox::BadRequestError.new ex.message+" (task-uri:"+@uri+")"
end
end
+ end
+
+ # Convenience class to split a (sub)task into subtasks
+ #
+ # example:
+ # a crossvalidation is split into creating datasets and performing the validations
+ # creating the dataset is 1/3 of the work, perform the validations is 2/3:
+ # Task.as_task do |task|
+ # create_datasets( SubTask.new(task, 0, 33) )
+ # perfom_validations( SubTask.new(task, 33, 100) )
+ # end
+ # inside the create_datasets / perform_validations you can use subtask.progress(<val>)
+ # with vals from 0-100
+ #
+ # note that you can split a subtask into further subtasks
+ class SubTask
+
+ def initialize(task, min, max)
+ raise "not a task or subtask" if task!=nil and !(task.is_a?(Task) or task.is_a?(SubTask))
+ raise "invalid max ("+max.to_s+"), min ("+min.to_s+") params" unless
+ min.is_a?(Numeric) and max.is_a?(Numeric) and min >= 0 and max <= 100 and max > min
+ @task = task
+ @min = min
+ @max = max
+ @delta = max - min
+ end
+
+ # convenience method to handle null tasks
+ def self.create(task, min, max)
+ if task
+ SubTask.new(task, min, max)
+ else
+ nil
+ end
+ end
+
+ def waiting_for(task_uri)
+ @task.waiting_for(task_uri)
+ end
+
+ def progress(pct)
+ raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100
+ #puts "subtask := "+pct.to_s+" -> task := "+(@min + @delta * pct.to_f * 0.01).to_s
+ @task.progress( @min + @delta * pct.to_f * 0.01 )
+ end
+
+ def running?()
+ @task.running?
+ end
+ end
+
- # returns the task uri
- # catches halts and exceptions, task state is set to error then
- def self.as_task( title, creator, max_duration=DEFAULT_TASK_MAX_DURATION, description=nil )
- #return yield nil
-
- params = {:title=>title, :creator=>creator, :max_duration=>max_duration, :description=>description }
- task = OpenTox::Task.create(params)
- task_pid = Spork.spork(:logger => LOGGER) do
- LOGGER.debug "Task #{task.uri} started #{Time.now}"
- $self_task = task
-
- begin
- result = catch(:halt) do
- yield task
- end
- # catching halt, set task state to error
- if result && result.is_a?(Array) && result.size==2 && result[0]>202
- LOGGER.error "task was halted: "+result.inspect
- task.error(result[1])
- return
- end
- LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
- task.completed(result)
- rescue => ex
- LOGGER.error "task failed: "+ex.message
- LOGGER.error ": "+ex.backtrace.join("\n")
- task.error(ex.message)
+ # The David Gallagher feature:
+ # a fake sub task to keep the progress bar movin for external jobs
+ # note: param could be a subtask
+ #
+ # usage (for a call that is normally finished in under 60 seconds):
+ # fsk = FakeSubTask.new(task, 60)
+ # external_lib_call.start
+ # external_lib_call.wait_until_finished
+ # fsk.finished
+ #
+ # what happens:
+ # the FakeSubTask updates the task.progress each second until
+ # runtime is up or the finished mehtod is called
+ #
+ # example if the param runtime is too low:
+ # 25% .. 50% .. 75% .. 100% .. 100% .. 100% .. 100% .. 100%
+ # example if the param runtime is too high:
+ # 5% .. 10% .. 15% .. 20% .. 25% .. 30% .. 35% .. 100%
+ # the latter example is better (keep the bar movin!)
+ # -> better make a conservative runtime estimate
+ class FakeSubTask
+
+ def initialize(task, runtime)
+ @task = task
+ @thread = Thread.new do
+ timeleft = runtime
+ while (timeleft > 0 and @task.running?)
+ sleep 1
+ timeleft -= 1
+ @task.progress( (runtime - timeleft) / runtime.to_f * 100 )
end
- end
- task.pid = task_pid
- LOGGER.debug "Started task: "+task.uri.to_s
- task.uri
- end
+ end
+ end
+
+ # convenience method to handle null tasks
+ def self.create(task, runtime)
+ if task
+ FakeSubTask.new(task, runtime)
+ else
+ nil
+ end
+ end
+
+ def finished
+ @thread.exit
+ @task.progress(100) if @task.running?
+ end
end
end