diff options
Diffstat (limited to 'lib/task.rb')
-rw-r--r-- | lib/task.rb | 164 |
1 files changed, 138 insertions, 26 deletions
diff --git a/lib/task.rb b/lib/task.rb index 9cf909f..3c6aba5 100644 --- a/lib/task.rb +++ b/lib/task.rb @@ -1,4 +1,3 @@ -$self_task=nil module OpenTox @@ -13,7 +12,7 @@ module OpenTox DC.title => "", DC.date => "", OT.hasStatus => "Running", - OT.percentageCompleted => "0", + OT.percentageCompleted => 0.0, OT.resultURI => "", DC.creator => "", # not mandatory according to API DC.description => "", # not mandatory according to API @@ -57,27 +56,17 @@ module OpenTox # #raise "Server too busy to start a new task" #end - task_pid = Spork.spork(:logger => LOGGER) do LOGGER.debug "Task #{task.uri} started #{Time.now}" - $self_task = task - begin - result = catch(:halt) do - yield task - end - # catching halt, set task state to error - if result && result.is_a?(Array) && result.size==2 && result[0]>202 - LOGGER.error "task was halted: "+result.inspect - task.error(result[1]) - return - end + result = yield task LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s task.completed(result) - rescue => ex - LOGGER.error "task failed: "+ex.message - LOGGER.error ": "+ex.backtrace.join("\n") - task.error(ex.message) + rescue => error + LOGGER.error "task failed: "+error.class.to_s+": "+error.message + # log backtrace only if code is 500 -> unwanted (Runtime)Exceptions and internal errors (see error.rb) + LOGGER.error ":\n"+error.backtrace.join("\n") if error.http_code==500 + task.error(OpenTox::ErrorReport.new(error, creator)) end end task.pid = task_pid @@ -113,7 +102,9 @@ module OpenTox def to_rdfxml s = Serializer::Owl.new + @metadata[OT.errorReport] = @uri+"/ErrorReport/tmpId" if @error_report s.add_task(@uri,@metadata) + s.add_resource(@uri+"/ErrorReport/tmpId", OT.errorReport, @error_report.rdf_content) if @error_report s.to_rdfxml end @@ -129,6 +120,10 @@ module OpenTox @metadata[DC.description] end + def errorReport + @metadata[OT.errorReport] + end + def cancel RestClientWrapper.put(File.join(@uri,'Cancelled')) load_metadata @@ -139,11 +134,17 @@ module OpenTox load_metadata end - def error(description) - RestClientWrapper.put(File.join(@uri,'Error'),{:description => description.to_s[0..2000]}) + def error(error_report) + raise "no error report" unless error_report.is_a?(OpenTox::ErrorReport) + RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml}) load_metadata end + # not stored just for to_rdf + def add_error_report( error_report ) + @error_report = error_report + end + def pid=(pid) RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid}) end @@ -162,12 +163,12 @@ module OpenTox def load_metadata if (CONFIG[:yaml_hosts].include?(URI.parse(uri).host)) - result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, false) + result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, nil, false) @metadata = YAML.load result.to_s @http_code = result.code else @metadata = Parser::Owl::Generic.new(@uri).load_metadata - @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, false).code + @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, nil, false).code end end @@ -218,7 +219,9 @@ module OpenTox =end # waits for a task, unless time exceeds or state is no longer running - def wait_for_completion(dur=0.3) + # @param [optional,OpenTox::Task] waiting_task (can be a OpenTox::Subtask as well), progress is updated accordingly + # @param [optional,Numeric] dur seconds pausing before cheking again for completion + def wait_for_completion( waiting_task=nil, dur=0.3) due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s @@ -228,15 +231,29 @@ module OpenTox while self.running? sleep dur load_metadata + # if another (sub)task is waiting for self, set progress accordingly + waiting_task.progress(@metadata[OT.percentageCompleted]) if waiting_task check_state if (Time.new > due_to_time) raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" end end - LOGGER.debug "Task '"+@metadata[OT.hasStatus]+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s + LOGGER.debug "Task '"+@metadata[OT.hasStatus].to_s+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s end - + + # updates percentageCompleted value (can only be increased) + # task has to be running + # @param [Numeric] pct value between 0 and 100 + def progress(pct) + #puts "task := "+pct.to_s + raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100 + if (pct > @metadata[OT.percentageCompleted] + 0.0001) + RestClientWrapper.put(File.join(@uri,'Running'),{:percentageCompleted => pct}) + load_metadata + end + end + private def check_state begin @@ -251,10 +268,105 @@ module OpenTox "'" unless @metadata[OT.resultURI] and @metadata[OT.resultURI].to_s.uri? end rescue => ex - RestClientWrapper.raise_uri_error(ex.message, @uri) + raise OpenTox::BadRequestError.new ex.message+" (task-uri:"+@uri+")" + end + end + + end + + # Convenience class to split a (sub)task into subtasks + # + # example: + # a crossvalidation is split into creating datasets and performing the validations + # creating the dataset is 1/3 of the work, perform the validations is 2/3: + # Task.as_task do |task| + # create_datasets( SubTask.new(task, 0, 33) ) + # perfom_validations( SubTask.new(task, 33, 100) ) + # end + # inside the create_datasets / perform_validations you can use subtask.progress(<val>) + # with vals from 0-100 + # + # note that you can split a subtask into further subtasks + class SubTask + + def initialize(task, min, max) + raise "not a task or subtask" unless task.is_a?(Task) or task.is_a?(SubTask) + raise "invalid max ("+max.to_s+"), min ("+min.to_s+") params" unless + min.is_a?(Numeric) and max.is_a?(Numeric) and min >= 0 and max <= 100 and max > min + @task = task + @min = min + @max = max + @delta = max - min + end + + # convenience method to handle null tasks + def self.create(task, min, max) + if task + SubTask.new(task, min, max) + else + nil end end + + def progress(pct) + raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100 + #puts "subtask := "+pct.to_s+" -> task := "+(@min + @delta * pct.to_f * 0.01).to_s + @task.progress( @min + @delta * pct.to_f * 0.01 ) + end + + def running?() + @task.running? + end + end + + # The David Gallagher feature: + # a fake sub task to keep the progress bar movin for external jobs + # note: param could be a subtask + # + # usage (for a call that is normally finished in under 60 seconds): + # fsk = FakeSubTask.new(task, 60) + # external_lib_call.start + # external_lib_call.wait_until_finished + # fsk.finished + # + # what happens: + # the FakeSubTask updates the task.progress each second until + # runtime is up or the finished mehtod is called + # + # example if the param runtime is too low: + # 25% .. 50% .. 75% .. 100% .. 100% .. 100% .. 100% .. 100% + # example if the param runtime is too high: + # 5% .. 10% .. 15% .. 20% .. 25% .. 30% .. 35% .. 100% + # the latter example is better (keep the bar movin!) + # -> better make a conservative runtime estimate + class FakeSubTask + + def initialize(task, runtime) + @task = task + @thread = Thread.new do + timeleft = runtime + while (timeleft > 0 and @task.running?) + sleep 1 + timeleft -= 1 + @task.progress( (runtime - timeleft) / runtime.to_f * 100 ) + end + end + end + + # convenience method to handle null tasks + def self.create(task, runtime) + if task + FakeSubTask.new(task, runtime) + else + nil + end + end + + def finished + @thread.exit + @task.progress(100) if @task.running? + end end end |