summaryrefslogtreecommitdiff
path: root/lib/task.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/task.rb')
-rw-r--r--lib/task.rb367
1 files changed, 19 insertions, 348 deletions
diff --git a/lib/task.rb b/lib/task.rb
index 3815177..0adb7a0 100644
--- a/lib/task.rb
+++ b/lib/task.rb
@@ -1,401 +1,72 @@
-
+DEFAULT_TASK_MAX_DURATION = 36000
module OpenTox
# Class for handling asynchronous tasks
class Task
- include OpenTox
- attr_accessor :http_code, :due_to_time
-
- def initialize(uri=nil)
- super uri
- @http_code = 202
- @metadata = {
- DC.title => "",
- DC.date => "",
- OT.hasStatus => "Running",
- OT.percentageCompleted => 0.0,
- OT.resultURI => "",
- DC.creator => "", # not mandatory according to API
- DC.description => "", # not mandatory according to API
- }
- end
-
- # Create a new task for the code in the block. Catches halts and exceptions and sets task state to error if necessary. The block has to return the URI of the created resource.
- # @example
- # task = OpenTox::Task.create do
- # # this code will be executed as a task
- # model = OpenTox::Algorithm.run(params) # this can be time consuming
- # model.uri # Important: return URI of the created resource
- # end
- # task.status # returns "Running", because tasks are forked
- # @param [String] title Task title
- # @param [String] creator Task creator
- # @return [OPenTox::Task] Task
- def self.create( title=nil, creator=nil, max_duration=DEFAULT_TASK_MAX_DURATION, description=nil )
-
- params = {:title=>title, :creator=>creator, :max_duration=>max_duration, :description=>description }
- task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, {}, nil, false).to_s
- task = Task.new(task_uri.chomp)
-
- # measure current memory consumption
-=begin
- memory = `free -m|sed -n '2p'`.split
- free_memory = memory[3].to_i + memory[6].to_i # include cache
- if free_memory < 20 # require at least 200 M free memory
- LOGGER.warn "Cannot start task - not enough memory left (#{free_memory} M free)"
- task.cancel
- return task
- #raise "Insufficient memory to start a new task"
- end
-
- cpu_load = `cat /proc/loadavg`.split(/\s+/)[0..2].collect{|c| c.to_f}
- nr_cpu_cores = `cat /proc/cpuinfo |grep "cpu cores"|cut -d ":" -f2|tr -d " "`.split("\n").collect{|c| c.to_i}.inject{|sum,n| sum+n}
- nr_cpu_cores = 1 if !nr_cpu_cores
- #if cpu_load[0] > nr_cpu_cores and cpu_load[0] > cpu_load[1] and cpu_load[1] > cpu_load[2] # average CPU load of the last minute is high and CPU load is increasing
- # LOGGER.warn "Cannot start task - CPU load too high (#{cpu_load.join(", ")})"
- # task.cancel
- # return task
- # #raise "Server too busy to start a new task"
- #end
-=end
- task_pid = Spork.spork(:logger => LOGGER) do
- LOGGER.debug "Task #{task.uri} started #{Time.now}"
- begin
- result = yield task
- LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
- task.completed(result)
- rescue => error
- LOGGER.error "task failed: "+error.class.to_s+": "+error.message
- LOGGER.error ":\n"+error.backtrace.join("\n")
- task.error(OpenTox::ErrorReport.create(error, creator))
- end
- end
- task.pid = task_pid
- LOGGER.debug "Started task: "+task.uri.to_s
- task
- end
-
- # Find a task for querying, status changes
- # @param [String] uri Task URI
- # @return [OpenTox::Task] Task object
- def self.find(uri)
- return nil unless uri
- task = Task.new(uri)
- task.load_metadata
- raise "could not load task metadata" if task.metadata==nil or task.metadata.size==0
- task
- end
- # Find a task for querying, status changes
- # @param [String] uri Task URI
- # @return [OpenTox::Task] Task object
- def self.exist?(uri)
- begin
- return find(uri)
- rescue
- end
+ def self.create service_uri
+ Task.new RestClient.post(service_uri,{}).chomp
+ #eval("#{self}.new(\"#{uri}\", #{subjectid})")
end
- # Get a list of all tasks
- # @param [optional, String] uri URI of task service
- # @return [text/uri-list] Task URIs
- def self.all(uri=CONFIG[:services]["opentox-task"])
- OpenTox.all uri
- end
-
- def self.from_yaml(yaml)
- @metadata = YAML.load(yaml)
- end
-
- def self.from_rdfxml(rdfxml)
- owl = OpenTox::Parser::Owl.from_rdf(rdfxml, OT.Task)
- task = Task.new(owl.uri)
- task.add_metadata(owl.metadata)
- task
- end
-
- def to_rdfxml
- s = Serializer::Owl.new
- @metadata[OT.errorReport] = @uri+"/ErrorReport/tmpId" if @error_report
- s.add_task(@uri,@metadata)
- s.add_resource(@uri+"/ErrorReport/tmpId", OT.errorReport, @error_report.rdf_content) if @error_report
- s.to_rdfxml
+ def http_code
+ get(@uri).code
end
def status
- @metadata[OT.hasStatus]
+ metadata[RDF::OT.hasStatus].to_s
end
def result_uri
- @metadata[OT.resultURI]
+ metadata[RDF::OT.resultURI]
end
def description
- @metadata[DC.description]
+ metadata[RDF::DC.description]
end
def errorReport
- @metadata[OT.errorReport]
+ metadata[RDF::OT.errorReport]
end
def cancel
- RestClientWrapper.put(File.join(@uri,'Cancelled'),{:cannot_be => "empty"})
- load_metadata
+ RestClient.put(File.join(@uri,'Cancelled'),{:cannot_be => "empty"})
end
def completed(uri)
- RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
- load_metadata
+ RestClient.put(File.join(@uri,'Completed'),{:resultURI => uri})
end
def error(error_report)
raise "no error report" unless error_report.is_a?(OpenTox::ErrorReport)
- RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml})
- load_metadata
- end
-
- # not stored just for to_rdf
- def add_error_report( error_report )
- raise "not an error report: "+error_report.class.to_s unless error_report.is_a?(ErrorReport)
- @error_report = error_report
+ RestClient.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml})
end
def pid=(pid)
- RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
+ RestClient.put(File.join(@uri,'pid'), {:pid => pid})
end
def running?
- @metadata[OT.hasStatus] == 'Running'
+ metadata[RDF::OT.hasStatus] == 'Running'
end
def completed?
- @metadata[OT.hasStatus] == 'Completed'
+ metadata[RDF::OT.hasStatus] == 'Completed'
end
def error?
- @metadata[OT.hasStatus] == 'Error'
+ metadata[RDF::OT.hasStatus] == 'Error'
end
- def load_metadata
- if (CONFIG[:yaml_hosts].include?(URI.parse(@uri).host))
- result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, nil, false)
- @metadata = YAML.load result.to_s
- @http_code = result.code
- else
- @metadata = Parser::Owl::Generic.new(@uri).load_metadata
- @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, nil, false).code
- end
- raise "could not load task metadata for task "+@uri.to_s if @metadata==nil || @metadata.size==0
- end
-
- # create is private now, use OpenTox::Task.as_task
- #def self.create( params )
- #task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, {}, false).to_s
- #Task.find(task_uri.chomp)
- #end
-
-=begin
- def self.from_data(data, content_type, code, base_uri)
- task = Task.new(nil)
- task.http_code = code
- task.reload_from_data(data, content_type, base_uri)
- return task
- end
-
- def reload( accept_header=nil )
- unless accept_header
- if (CONFIG[:yaml_hosts].include?(URI.parse(uri).host))
- accept_header = "application/x-yaml"
- else
- accept_header = 'application/rdf+xml'
- end
- end
- result = RestClientWrapper.get(uri, {:accept => accept_header}, false)#'application/x-yaml'})
- @http_code = result.code
- reload_from_data(result, result.content_type, uri)
- end
-
- def reload_from_data( data, content_type, base_uri )
- case content_type
- when /yaml/
- task = YAML.load data
- TASK_ATTRIBS.each do |a|
- raise "task yaml data invalid, key missing: "+a.to_s unless task.has_key?(a)
- send("#{a.to_s}=".to_sym,task[a])
- end
- when /application\/rdf\+xml/
- owl = OpenTox::Owl.from_data(data,base_uri,"Task")
- self.uri = owl.uri
- (TASK_ATTRIBS-[:uri]).each{|a| self.send("#{a.to_s}=".to_sym, owl.get(a.to_s))}
- else
- raise "content type for tasks not supported: "+content_type.to_s
- end
- raise "uri is nil after loading" unless @uri and @uri.to_s.strip.size>0
- end
-=end
-
# waits for a task, unless time exceeds or state is no longer running
- # @param [optional,OpenTox::Task] waiting_task (can be a OpenTox::Subtask as well), progress is updated accordingly
# @param [optional,Numeric] dur seconds pausing before cheking again for completion
- def wait_for_completion( waiting_task=nil, dur=0.3)
-
- waiting_task.waiting_for(self.uri) if waiting_task
+ def wait_for_completion(dur=0.3)
due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION
- LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s
-
- load_metadata # for extremely fast tasks
- check_state
while self.running?
- sleep dur
- load_metadata
- # if another (sub)task is waiting for self, set progress accordingly
- waiting_task.progress(@metadata[OT.percentageCompleted].to_f) if waiting_task
- check_state
- if (Time.new > due_to_time)
- raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'"
- end
- end
- waiting_task.waiting_for(nil) if waiting_task
- LOGGER.debug "Task '"+@metadata[OT.hasStatus].to_s+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s
- end
-
- # updates percentageCompleted value (can only be increased)
- # task has to be running
- # @param [Numeric] pct value between 0 and 100
- def progress(pct)
- #puts "task := "+pct.to_s
- raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100
- if (pct > @metadata[OT.percentageCompleted] + 0.0001)
- RestClientWrapper.put(File.join(@uri,'Running'),{:percentageCompleted => pct})
- load_metadata
- end
- end
-
- def waiting_for(task_uri)
- RestClientWrapper.put(File.join(@uri,'Running'),{:waiting_for => task_uri})
- end
-
- private
- VALID_TASK_STATES = ["Cancelled", "Completed", "Running", "Error"]
-
- def check_state
- begin
- raise "illegal task state, invalid status: '"+@metadata[OT.hasStatus].to_s+"'" unless
- @metadata[OT.hasStatus] unless VALID_TASK_STATES.include?(@metadata[OT.hasStatus])
- raise "illegal task state, task is completed, resultURI is no URI: '"+@metadata[OT.resultURI].to_s+
- "'" unless @metadata[OT.resultURI] and @metadata[OT.resultURI].to_s.uri? if completed?
- if @http_code == 202
- raise "#{@uri}: illegal task state, code is 202, but hasStatus is not Running: '"+@metadata[OT.hasStatus]+"'" unless running?
- elsif @http_code == 201
- # ignore hasStatus
- # raise "#{@uri}: illegal task state, code is 201, but hasStatus is not Completed: '"+@metadata[OT.hasStatus]+"'" unless completed?
- raise "#{@uri}: illegal task state, code is 201, resultURI is no task-URI: '"+@metadata[OT.resultURI].to_s+
- "'" unless @metadata[OT.resultURI] and @metadata[OT.resultURI].to_s.uri?
- end
- rescue => ex
- raise OpenTox::BadRequestError.new ex.message+" (task-uri:"+@uri+")"
+ raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" if (Time.new > due_to_time)
end
end
end
- # Convenience class to split a (sub)task into subtasks
- #
- # example:
- # a crossvalidation is split into creating datasets and performing the validations
- # creating the dataset is 1/3 of the work, perform the validations is 2/3:
- # Task.as_task do |task|
- # create_datasets( SubTask.new(task, 0, 33) )
- # perfom_validations( SubTask.new(task, 33, 100) )
- # end
- # inside the create_datasets / perform_validations you can use subtask.progress(<val>)
- # with vals from 0-100
- #
- # note that you can split a subtask into further subtasks
- class SubTask
-
- def initialize(task, min, max)
- raise "not a task or subtask" if task!=nil and !(task.is_a?(Task) or task.is_a?(SubTask))
- raise "invalid max ("+max.to_s+"), min ("+min.to_s+") params" unless
- min.is_a?(Numeric) and max.is_a?(Numeric) and min >= 0 and max <= 100 and max > min
- @task = task
- @min = min
- @max = max
- @delta = max - min
- end
-
- # convenience method to handle null tasks
- def self.create(task, min, max)
- if task
- SubTask.new(task, min, max)
- else
- nil
- end
- end
-
- def waiting_for(task_uri)
- @task.waiting_for(task_uri)
- end
-
- def progress(pct)
- raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100
- #puts "subtask := "+pct.to_s+" -> task := "+(@min + @delta * pct.to_f * 0.01).to_s
- @task.progress( @min + @delta * pct.to_f * 0.01 )
- end
-
- def running?()
- @task.running?
- end
- end
-
-
- # The David Gallagher feature:
- # a fake sub task to keep the progress bar movin for external jobs
- # note: param could be a subtask
- #
- # usage (for a call that is normally finished in under 60 seconds):
- # fsk = FakeSubTask.new(task, 60)
- # external_lib_call.start
- # external_lib_call.wait_until_finished
- # fsk.finished
- #
- # what happens:
- # the FakeSubTask updates the task.progress each second until
- # runtime is up or the finished mehtod is called
- #
- # example if the param runtime is too low:
- # 25% .. 50% .. 75% .. 100% .. 100% .. 100% .. 100% .. 100%
- # example if the param runtime is too high:
- # 5% .. 10% .. 15% .. 20% .. 25% .. 30% .. 35% .. 100%
- # the latter example is better (keep the bar movin!)
- # -> better make a conservative runtime estimate
- class FakeSubTask
-
- def initialize(task, runtime)
- @task = task
- @thread = Thread.new do
- timeleft = runtime
- while (timeleft > 0 and @task.running?)
- sleep 1
- timeleft -= 1
- @task.progress( (runtime - timeleft) / runtime.to_f * 100 )
- end
- end
- end
-
- # convenience method to handle null tasks
- def self.create(task, runtime)
- if task
- FakeSubTask.new(task, runtime)
- else
- nil
- end
- end
-
- def finished
- @thread.exit
- @task.progress(100) if @task.running?
- end
- end
-
end