summaryrefslogtreecommitdiff
path: root/lib/task.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/task.rb')
-rw-r--r--lib/task.rb164
1 files changed, 138 insertions, 26 deletions
diff --git a/lib/task.rb b/lib/task.rb
index 9cf909f..3c6aba5 100644
--- a/lib/task.rb
+++ b/lib/task.rb
@@ -1,4 +1,3 @@
-$self_task=nil
module OpenTox
@@ -13,7 +12,7 @@ module OpenTox
DC.title => "",
DC.date => "",
OT.hasStatus => "Running",
- OT.percentageCompleted => "0",
+ OT.percentageCompleted => 0.0,
OT.resultURI => "",
DC.creator => "", # not mandatory according to API
DC.description => "", # not mandatory according to API
@@ -57,27 +56,17 @@ module OpenTox
# #raise "Server too busy to start a new task"
#end
-
task_pid = Spork.spork(:logger => LOGGER) do
LOGGER.debug "Task #{task.uri} started #{Time.now}"
- $self_task = task
-
begin
- result = catch(:halt) do
- yield task
- end
- # catching halt, set task state to error
- if result && result.is_a?(Array) && result.size==2 && result[0]>202
- LOGGER.error "task was halted: "+result.inspect
- task.error(result[1])
- return
- end
+ result = yield task
LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
task.completed(result)
- rescue => ex
- LOGGER.error "task failed: "+ex.message
- LOGGER.error ": "+ex.backtrace.join("\n")
- task.error(ex.message)
+ rescue => error
+ LOGGER.error "task failed: "+error.class.to_s+": "+error.message
+ # log backtrace only if code is 500 -> unwanted (Runtime)Exceptions and internal errors (see error.rb)
+ LOGGER.error ":\n"+error.backtrace.join("\n") if error.http_code==500
+ task.error(OpenTox::ErrorReport.new(error, creator))
end
end
task.pid = task_pid
@@ -113,7 +102,9 @@ module OpenTox
def to_rdfxml
s = Serializer::Owl.new
+ @metadata[OT.errorReport] = @uri+"/ErrorReport/tmpId" if @error_report
s.add_task(@uri,@metadata)
+ s.add_resource(@uri+"/ErrorReport/tmpId", OT.errorReport, @error_report.rdf_content) if @error_report
s.to_rdfxml
end
@@ -129,6 +120,10 @@ module OpenTox
@metadata[DC.description]
end
+ def errorReport
+ @metadata[OT.errorReport]
+ end
+
def cancel
RestClientWrapper.put(File.join(@uri,'Cancelled'))
load_metadata
@@ -139,11 +134,17 @@ module OpenTox
load_metadata
end
- def error(description)
- RestClientWrapper.put(File.join(@uri,'Error'),{:description => description.to_s[0..2000]})
+ def error(error_report)
+ raise "no error report" unless error_report.is_a?(OpenTox::ErrorReport)
+ RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml})
load_metadata
end
+ # not stored just for to_rdf
+ def add_error_report( error_report )
+ @error_report = error_report
+ end
+
def pid=(pid)
RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
end
@@ -162,12 +163,12 @@ module OpenTox
def load_metadata
if (CONFIG[:yaml_hosts].include?(URI.parse(uri).host))
- result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, false)
+ result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, nil, false)
@metadata = YAML.load result.to_s
@http_code = result.code
else
@metadata = Parser::Owl::Generic.new(@uri).load_metadata
- @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, false).code
+ @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, nil, false).code
end
end
@@ -218,7 +219,9 @@ module OpenTox
=end
# waits for a task, unless time exceeds or state is no longer running
- def wait_for_completion(dur=0.3)
+ # @param [optional,OpenTox::Task] waiting_task (can be a OpenTox::Subtask as well), progress is updated accordingly
+ # @param [optional,Numeric] dur seconds pausing before cheking again for completion
+ def wait_for_completion( waiting_task=nil, dur=0.3)
due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION
LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s
@@ -228,15 +231,29 @@ module OpenTox
while self.running?
sleep dur
load_metadata
+ # if another (sub)task is waiting for self, set progress accordingly
+ waiting_task.progress(@metadata[OT.percentageCompleted]) if waiting_task
check_state
if (Time.new > due_to_time)
raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'"
end
end
- LOGGER.debug "Task '"+@metadata[OT.hasStatus]+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s
+ LOGGER.debug "Task '"+@metadata[OT.hasStatus].to_s+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s
end
-
+
+ # updates percentageCompleted value (can only be increased)
+ # task has to be running
+ # @param [Numeric] pct value between 0 and 100
+ def progress(pct)
+ #puts "task := "+pct.to_s
+ raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100
+ if (pct > @metadata[OT.percentageCompleted] + 0.0001)
+ RestClientWrapper.put(File.join(@uri,'Running'),{:percentageCompleted => pct})
+ load_metadata
+ end
+ end
+
private
def check_state
begin
@@ -251,10 +268,105 @@ module OpenTox
"'" unless @metadata[OT.resultURI] and @metadata[OT.resultURI].to_s.uri?
end
rescue => ex
- RestClientWrapper.raise_uri_error(ex.message, @uri)
+ raise OpenTox::BadRequestError.new ex.message+" (task-uri:"+@uri+")"
+ end
+ end
+
+ end
+
+ # Convenience class to split a (sub)task into subtasks
+ #
+ # example:
+ # a crossvalidation is split into creating datasets and performing the validations
+ # creating the dataset is 1/3 of the work, perform the validations is 2/3:
+ # Task.as_task do |task|
+ # create_datasets( SubTask.new(task, 0, 33) )
+ # perfom_validations( SubTask.new(task, 33, 100) )
+ # end
+ # inside the create_datasets / perform_validations you can use subtask.progress(<val>)
+ # with vals from 0-100
+ #
+ # note that you can split a subtask into further subtasks
+ class SubTask
+
+ def initialize(task, min, max)
+ raise "not a task or subtask" unless task.is_a?(Task) or task.is_a?(SubTask)
+ raise "invalid max ("+max.to_s+"), min ("+min.to_s+") params" unless
+ min.is_a?(Numeric) and max.is_a?(Numeric) and min >= 0 and max <= 100 and max > min
+ @task = task
+ @min = min
+ @max = max
+ @delta = max - min
+ end
+
+ # convenience method to handle null tasks
+ def self.create(task, min, max)
+ if task
+ SubTask.new(task, min, max)
+ else
+ nil
end
end
+
+ def progress(pct)
+ raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100
+ #puts "subtask := "+pct.to_s+" -> task := "+(@min + @delta * pct.to_f * 0.01).to_s
+ @task.progress( @min + @delta * pct.to_f * 0.01 )
+ end
+
+ def running?()
+ @task.running?
+ end
+ end
+
+ # The David Gallagher feature:
+ # a fake sub task to keep the progress bar movin for external jobs
+ # note: param could be a subtask
+ #
+ # usage (for a call that is normally finished in under 60 seconds):
+ # fsk = FakeSubTask.new(task, 60)
+ # external_lib_call.start
+ # external_lib_call.wait_until_finished
+ # fsk.finished
+ #
+ # what happens:
+ # the FakeSubTask updates the task.progress each second until
+ # runtime is up or the finished mehtod is called
+ #
+ # example if the param runtime is too low:
+ # 25% .. 50% .. 75% .. 100% .. 100% .. 100% .. 100% .. 100%
+ # example if the param runtime is too high:
+ # 5% .. 10% .. 15% .. 20% .. 25% .. 30% .. 35% .. 100%
+ # the latter example is better (keep the bar movin!)
+ # -> better make a conservative runtime estimate
+ class FakeSubTask
+
+ def initialize(task, runtime)
+ @task = task
+ @thread = Thread.new do
+ timeleft = runtime
+ while (timeleft > 0 and @task.running?)
+ sleep 1
+ timeleft -= 1
+ @task.progress( (runtime - timeleft) / runtime.to_f * 100 )
+ end
+ end
+ end
+
+ # convenience method to handle null tasks
+ def self.create(task, runtime)
+ if task
+ FakeSubTask.new(task, runtime)
+ else
+ nil
+ end
+ end
+
+ def finished
+ @thread.exit
+ @task.progress(100) if @task.running?
+ end
end
end