From 354aaa649e9eeed5d81793e09d9714b45063c147 Mon Sep 17 00:00:00 2001 From: Christoph Helma Date: Wed, 8 Feb 2012 13:14:11 +0100 Subject: toxbank-investigation compatible version --- lib/task.rb | 367 ++++-------------------------------------------------------- 1 file changed, 19 insertions(+), 348 deletions(-) (limited to 'lib/task.rb') diff --git a/lib/task.rb b/lib/task.rb index 3815177..0adb7a0 100644 --- a/lib/task.rb +++ b/lib/task.rb @@ -1,401 +1,72 @@ - +DEFAULT_TASK_MAX_DURATION = 36000 module OpenTox # Class for handling asynchronous tasks class Task - include OpenTox - attr_accessor :http_code, :due_to_time - - def initialize(uri=nil) - super uri - @http_code = 202 - @metadata = { - DC.title => "", - DC.date => "", - OT.hasStatus => "Running", - OT.percentageCompleted => 0.0, - OT.resultURI => "", - DC.creator => "", # not mandatory according to API - DC.description => "", # not mandatory according to API - } - end - - # Create a new task for the code in the block. Catches halts and exceptions and sets task state to error if necessary. The block has to return the URI of the created resource. - # @example - # task = OpenTox::Task.create do - # # this code will be executed as a task - # model = OpenTox::Algorithm.run(params) # this can be time consuming - # model.uri # Important: return URI of the created resource - # end - # task.status # returns "Running", because tasks are forked - # @param [String] title Task title - # @param [String] creator Task creator - # @return [OPenTox::Task] Task - def self.create( title=nil, creator=nil, max_duration=DEFAULT_TASK_MAX_DURATION, description=nil ) - - params = {:title=>title, :creator=>creator, :max_duration=>max_duration, :description=>description } - task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, {}, nil, false).to_s - task = Task.new(task_uri.chomp) - - # measure current memory consumption -=begin - memory = `free -m|sed -n '2p'`.split - free_memory = memory[3].to_i + memory[6].to_i # include cache - if free_memory < 20 # require at least 200 M free memory - LOGGER.warn "Cannot start task - not enough memory left (#{free_memory} M free)" - task.cancel - return task - #raise "Insufficient memory to start a new task" - end - - cpu_load = `cat /proc/loadavg`.split(/\s+/)[0..2].collect{|c| c.to_f} - nr_cpu_cores = `cat /proc/cpuinfo |grep "cpu cores"|cut -d ":" -f2|tr -d " "`.split("\n").collect{|c| c.to_i}.inject{|sum,n| sum+n} - nr_cpu_cores = 1 if !nr_cpu_cores - #if cpu_load[0] > nr_cpu_cores and cpu_load[0] > cpu_load[1] and cpu_load[1] > cpu_load[2] # average CPU load of the last minute is high and CPU load is increasing - # LOGGER.warn "Cannot start task - CPU load too high (#{cpu_load.join(", ")})" - # task.cancel - # return task - # #raise "Server too busy to start a new task" - #end -=end - task_pid = Spork.spork(:logger => LOGGER) do - LOGGER.debug "Task #{task.uri} started #{Time.now}" - begin - result = yield task - LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s - task.completed(result) - rescue => error - LOGGER.error "task failed: "+error.class.to_s+": "+error.message - LOGGER.error ":\n"+error.backtrace.join("\n") - task.error(OpenTox::ErrorReport.create(error, creator)) - end - end - task.pid = task_pid - LOGGER.debug "Started task: "+task.uri.to_s - task - end - - # Find a task for querying, status changes - # @param [String] uri Task URI - # @return [OpenTox::Task] Task object - def self.find(uri) - return nil unless uri - task = Task.new(uri) - task.load_metadata - raise "could not load task metadata" if task.metadata==nil or task.metadata.size==0 - task - end - # Find a task for querying, status changes - # @param [String] uri Task URI - # @return [OpenTox::Task] Task object - def self.exist?(uri) - begin - return find(uri) - rescue - end + def self.create service_uri + Task.new RestClient.post(service_uri,{}).chomp + #eval("#{self}.new(\"#{uri}\", #{subjectid})") end - # Get a list of all tasks - # @param [optional, String] uri URI of task service - # @return [text/uri-list] Task URIs - def self.all(uri=CONFIG[:services]["opentox-task"]) - OpenTox.all uri - end - - def self.from_yaml(yaml) - @metadata = YAML.load(yaml) - end - - def self.from_rdfxml(rdfxml) - owl = OpenTox::Parser::Owl.from_rdf(rdfxml, OT.Task) - task = Task.new(owl.uri) - task.add_metadata(owl.metadata) - task - end - - def to_rdfxml - s = Serializer::Owl.new - @metadata[OT.errorReport] = @uri+"/ErrorReport/tmpId" if @error_report - s.add_task(@uri,@metadata) - s.add_resource(@uri+"/ErrorReport/tmpId", OT.errorReport, @error_report.rdf_content) if @error_report - s.to_rdfxml + def http_code + get(@uri).code end def status - @metadata[OT.hasStatus] + metadata[RDF::OT.hasStatus].to_s end def result_uri - @metadata[OT.resultURI] + metadata[RDF::OT.resultURI] end def description - @metadata[DC.description] + metadata[RDF::DC.description] end def errorReport - @metadata[OT.errorReport] + metadata[RDF::OT.errorReport] end def cancel - RestClientWrapper.put(File.join(@uri,'Cancelled'),{:cannot_be => "empty"}) - load_metadata + RestClient.put(File.join(@uri,'Cancelled'),{:cannot_be => "empty"}) end def completed(uri) - RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri}) - load_metadata + RestClient.put(File.join(@uri,'Completed'),{:resultURI => uri}) end def error(error_report) raise "no error report" unless error_report.is_a?(OpenTox::ErrorReport) - RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml}) - load_metadata - end - - # not stored just for to_rdf - def add_error_report( error_report ) - raise "not an error report: "+error_report.class.to_s unless error_report.is_a?(ErrorReport) - @error_report = error_report + RestClient.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml}) end def pid=(pid) - RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid}) + RestClient.put(File.join(@uri,'pid'), {:pid => pid}) end def running? - @metadata[OT.hasStatus] == 'Running' + metadata[RDF::OT.hasStatus] == 'Running' end def completed? - @metadata[OT.hasStatus] == 'Completed' + metadata[RDF::OT.hasStatus] == 'Completed' end def error? - @metadata[OT.hasStatus] == 'Error' + metadata[RDF::OT.hasStatus] == 'Error' end - def load_metadata - if (CONFIG[:yaml_hosts].include?(URI.parse(@uri).host)) - result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, nil, false) - @metadata = YAML.load result.to_s - @http_code = result.code - else - @metadata = Parser::Owl::Generic.new(@uri).load_metadata - @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, nil, false).code - end - raise "could not load task metadata for task "+@uri.to_s if @metadata==nil || @metadata.size==0 - end - - # create is private now, use OpenTox::Task.as_task - #def self.create( params ) - #task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, {}, false).to_s - #Task.find(task_uri.chomp) - #end - -=begin - def self.from_data(data, content_type, code, base_uri) - task = Task.new(nil) - task.http_code = code - task.reload_from_data(data, content_type, base_uri) - return task - end - - def reload( accept_header=nil ) - unless accept_header - if (CONFIG[:yaml_hosts].include?(URI.parse(uri).host)) - accept_header = "application/x-yaml" - else - accept_header = 'application/rdf+xml' - end - end - result = RestClientWrapper.get(uri, {:accept => accept_header}, false)#'application/x-yaml'}) - @http_code = result.code - reload_from_data(result, result.content_type, uri) - end - - def reload_from_data( data, content_type, base_uri ) - case content_type - when /yaml/ - task = YAML.load data - TASK_ATTRIBS.each do |a| - raise "task yaml data invalid, key missing: "+a.to_s unless task.has_key?(a) - send("#{a.to_s}=".to_sym,task[a]) - end - when /application\/rdf\+xml/ - owl = OpenTox::Owl.from_data(data,base_uri,"Task") - self.uri = owl.uri - (TASK_ATTRIBS-[:uri]).each{|a| self.send("#{a.to_s}=".to_sym, owl.get(a.to_s))} - else - raise "content type for tasks not supported: "+content_type.to_s - end - raise "uri is nil after loading" unless @uri and @uri.to_s.strip.size>0 - end -=end - # waits for a task, unless time exceeds or state is no longer running - # @param [optional,OpenTox::Task] waiting_task (can be a OpenTox::Subtask as well), progress is updated accordingly # @param [optional,Numeric] dur seconds pausing before cheking again for completion - def wait_for_completion( waiting_task=nil, dur=0.3) - - waiting_task.waiting_for(self.uri) if waiting_task + def wait_for_completion(dur=0.3) due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION - LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s - - load_metadata # for extremely fast tasks - check_state while self.running? - sleep dur - load_metadata - # if another (sub)task is waiting for self, set progress accordingly - waiting_task.progress(@metadata[OT.percentageCompleted].to_f) if waiting_task - check_state - if (Time.new > due_to_time) - raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" - end - end - waiting_task.waiting_for(nil) if waiting_task - LOGGER.debug "Task '"+@metadata[OT.hasStatus].to_s+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s - end - - # updates percentageCompleted value (can only be increased) - # task has to be running - # @param [Numeric] pct value between 0 and 100 - def progress(pct) - #puts "task := "+pct.to_s - raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100 - if (pct > @metadata[OT.percentageCompleted] + 0.0001) - RestClientWrapper.put(File.join(@uri,'Running'),{:percentageCompleted => pct}) - load_metadata - end - end - - def waiting_for(task_uri) - RestClientWrapper.put(File.join(@uri,'Running'),{:waiting_for => task_uri}) - end - - private - VALID_TASK_STATES = ["Cancelled", "Completed", "Running", "Error"] - - def check_state - begin - raise "illegal task state, invalid status: '"+@metadata[OT.hasStatus].to_s+"'" unless - @metadata[OT.hasStatus] unless VALID_TASK_STATES.include?(@metadata[OT.hasStatus]) - raise "illegal task state, task is completed, resultURI is no URI: '"+@metadata[OT.resultURI].to_s+ - "'" unless @metadata[OT.resultURI] and @metadata[OT.resultURI].to_s.uri? if completed? - if @http_code == 202 - raise "#{@uri}: illegal task state, code is 202, but hasStatus is not Running: '"+@metadata[OT.hasStatus]+"'" unless running? - elsif @http_code == 201 - # ignore hasStatus - # raise "#{@uri}: illegal task state, code is 201, but hasStatus is not Completed: '"+@metadata[OT.hasStatus]+"'" unless completed? - raise "#{@uri}: illegal task state, code is 201, resultURI is no task-URI: '"+@metadata[OT.resultURI].to_s+ - "'" unless @metadata[OT.resultURI] and @metadata[OT.resultURI].to_s.uri? - end - rescue => ex - raise OpenTox::BadRequestError.new ex.message+" (task-uri:"+@uri+")" + raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" if (Time.new > due_to_time) end end end - # Convenience class to split a (sub)task into subtasks - # - # example: - # a crossvalidation is split into creating datasets and performing the validations - # creating the dataset is 1/3 of the work, perform the validations is 2/3: - # Task.as_task do |task| - # create_datasets( SubTask.new(task, 0, 33) ) - # perfom_validations( SubTask.new(task, 33, 100) ) - # end - # inside the create_datasets / perform_validations you can use subtask.progress() - # with vals from 0-100 - # - # note that you can split a subtask into further subtasks - class SubTask - - def initialize(task, min, max) - raise "not a task or subtask" if task!=nil and !(task.is_a?(Task) or task.is_a?(SubTask)) - raise "invalid max ("+max.to_s+"), min ("+min.to_s+") params" unless - min.is_a?(Numeric) and max.is_a?(Numeric) and min >= 0 and max <= 100 and max > min - @task = task - @min = min - @max = max - @delta = max - min - end - - # convenience method to handle null tasks - def self.create(task, min, max) - if task - SubTask.new(task, min, max) - else - nil - end - end - - def waiting_for(task_uri) - @task.waiting_for(task_uri) - end - - def progress(pct) - raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100 - #puts "subtask := "+pct.to_s+" -> task := "+(@min + @delta * pct.to_f * 0.01).to_s - @task.progress( @min + @delta * pct.to_f * 0.01 ) - end - - def running?() - @task.running? - end - end - - - # The David Gallagher feature: - # a fake sub task to keep the progress bar movin for external jobs - # note: param could be a subtask - # - # usage (for a call that is normally finished in under 60 seconds): - # fsk = FakeSubTask.new(task, 60) - # external_lib_call.start - # external_lib_call.wait_until_finished - # fsk.finished - # - # what happens: - # the FakeSubTask updates the task.progress each second until - # runtime is up or the finished mehtod is called - # - # example if the param runtime is too low: - # 25% .. 50% .. 75% .. 100% .. 100% .. 100% .. 100% .. 100% - # example if the param runtime is too high: - # 5% .. 10% .. 15% .. 20% .. 25% .. 30% .. 35% .. 100% - # the latter example is better (keep the bar movin!) - # -> better make a conservative runtime estimate - class FakeSubTask - - def initialize(task, runtime) - @task = task - @thread = Thread.new do - timeleft = runtime - while (timeleft > 0 and @task.running?) - sleep 1 - timeleft -= 1 - @task.progress( (runtime - timeleft) / runtime.to_f * 100 ) - end - end - end - - # convenience method to handle null tasks - def self.create(task, runtime) - if task - FakeSubTask.new(task, runtime) - else - nil - end - end - - def finished - @thread.exit - @task.progress(100) if @task.running? - end - end - end -- cgit v1.2.3