diff options
author | Martin Gütlein <martin.guetlein@gmail.com> | 2010-04-27 12:00:39 +0200 |
---|---|---|
committer | Martin Gütlein <martin.guetlein@gmail.com> | 2010-04-27 12:00:39 +0200 |
commit | ac5a07c294bba699d6517d5be1548a33455b87c8 (patch) | |
tree | 5ad284a26e4ae0772945e2400c9d10079f317882 /lib | |
parent | afa31803afd167c19c9ba95e73d311c9e2b8b43a (diff) |
change tasks: remove tree structure, add max wait time
Diffstat (limited to 'lib')
-rw-r--r-- | lib/environment.rb | 3 | ||||
-rw-r--r-- | lib/rest_client_wrapper.rb | 58 | ||||
-rw-r--r-- | lib/task.rb | 83 |
3 files changed, 59 insertions, 85 deletions
diff --git a/lib/environment.rb b/lib/environment.rb index 27235c3..c86d2be 100644 --- a/lib/environment.rb +++ b/lib/environment.rb @@ -40,6 +40,9 @@ if @@config[:database] end end +# hack: store sinatra in global var to make url_for and halt methods accessible +before {$sinatra = self unless $sinatra} + class Sinatra::Base # overwriting halt to log halts (!= 202) def halt(status,msg) diff --git a/lib/rest_client_wrapper.rb b/lib/rest_client_wrapper.rb index 0219f51..194a742 100644 --- a/lib/rest_client_wrapper.rb +++ b/lib/rest_client_wrapper.rb @@ -36,29 +36,6 @@ module OpenTox class RestClientWrapper - # PENDING: remove as soon as redirect tasks are remove from partner webservices - def self.redirect_task( uri ) - raise "no redirect task uri: "+uri.to_s unless uri.to_s =~ /194.141.0.136|ambit.*task|tu-muenchen.*task/ - - while (uri.to_s =~ /194.141.0.136|ambit.*task|tu-muenchen.*task/) - #HACK handle redirect - LOGGER.debug "REDIRECT TASK: "+uri.to_s - redirect = "" - while (redirect.size == 0) - IO.popen("bin/redirect.sh "+uri.to_s) do |f| - while line = f.gets - redirect += line.chomp - end - end - raise redirect!=nil && redirect.size>0 ? redirect : "TASK ERROR" if $?!=0 - sleep 0.3 - end - uri = redirect - LOGGER.debug "REDIRECT TO: "+uri.to_s - end - return uri - end - def self.get(uri, headers=nil) execute( "get", uri, headers) end @@ -81,7 +58,7 @@ module OpenTox private def self.execute( rest_call, uri, headers, payload=nil, wait=true ) - + do_halt 400,"uri is null",uri,headers,payload unless uri do_halt 400,"not a uri",uri,headers,payload unless Utils.is_uri?(uri) do_halt 400,"headers are no hash",uri,headers,payload unless headers==nil or headers.is_a?(Hash) @@ -112,15 +89,9 @@ module OpenTox case res.content_type when /application\/rdf\+xml|text\/x-yaml/ task = OpenTox::Task.from_data(res, res.content_type, uri, true) - when /text\// + when /text\// return res if res.content_type=~/text\/uri-list/ and res.split("\n").size > 1 #if uri list contains more then one uri, its not a task - # HACK for redirect tasks - if res =~ /ambit.*task|tu-muenchen.*task/ - res = WrapperResult.new(redirect_task(res)) - res.content_type = "text/uri-list" - return res - end task = OpenTox::Task.find(res.to_s) if Utils.task_uri?(res) else raise "unknown content-type when checking for task: "+res.content_type+" content: "+res[0..200] @@ -130,7 +101,7 @@ module OpenTox if task LOGGER.debug "result is a task '"+task.uri.to_s+"', wait for completion" task.wait_for_completion - raise task.description if task.error? + raise task.description unless task.completed? res = WrapperResult.new(task.resultURI) LOGGER.debug "task resultURI "+res.to_s res.content_type = "text/uri-list" @@ -175,18 +146,17 @@ module OpenTox # count+=1 while File.exist?(File.join(error_dir,file_name+"_"+time+"_"+count.to_s)) # File.new(File.join(error_dir,file_name+"_"+time+"_"+count.to_s),"w").puts(body) - # return error (by halting, halts should be logged) - # PENDING always return yaml for now - begin - if defined?(halt) - halt(502,error.to_yaml) - elsif defined?($sinatra) - $sinatra.halt(502,error.to_yaml) - else - raise "" - end - rescue - raise error.to_yaml + # handle error + # we are either in a task, or in sinatra + # PENDING: always return yaml for now + + + if $self_task #this global var in Task.as_task to mark that the current process is running in a task + raise error.to_yaml # the error is caught, logged, and task state is set to error in Task.as_task + elsif $sinatra #else halt sinatra + $sinatra.halt(502,error.to_yaml) + else + raise "internal error" end end end diff --git a/lib/task.rb b/lib/task.rb index 4b147d7..178f744 100644 --- a/lib/task.rb +++ b/lib/task.rb @@ -1,10 +1,16 @@ LOGGER.progname = File.expand_path(__FILE__) +DEFAULT_TASK_MAX_DURATION = 120 +EXTERNAL_TASK_MAX_DURATION = 60 + +$self_task=nil + module OpenTox class Task - TASK_ATTRIBS = [ :uri, :date, :title, :creator, :title, :description, :hasStatus, :percentageCompleted, :resultURI ] + # due_to_time is only set in local tasks + TASK_ATTRIBS = [ :uri, :date, :title, :creator, :title, :description, :hasStatus, :percentageCompleted, :resultURI, :due_to_time ] TASK_ATTRIBS.each{ |a| attr_accessor(a) } private @@ -12,12 +18,13 @@ module OpenTox @uri = uri.to_s.strip end - public - def self.create - task_uri = RestClientWrapper.post(@@config[:services]["opentox-task"], {}, nil, false).to_s + # create is private now, use OpenTox::Task.as_task + def self.create(max_duration) + task_uri = RestClientWrapper.post(@@config[:services]["opentox-task"], {:max_duration => max_duration}, nil, false).to_s Task.find(task_uri.chomp) end + public def self.find(uri) task = Task.new(uri) task.reload @@ -36,7 +43,7 @@ module OpenTox end def reload - result = RestClientWrapper.get(uri) + result = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'})#'text/x-yaml'}) reload_from_data(result, result.content_type, uri, false) end @@ -45,10 +52,9 @@ module OpenTox case content_type when /text\/x-yaml/ task = YAML.load data - if task.is_a?(Task) - TASK_ATTRIBS.each{ |a| send("#{a.to_s}=".to_sym,task[a]) } - else - raise "yaml data is no task: "+task.class.to_s unless test_if_task + TASK_ATTRIBS.each do |a| + raise "task yaml data invalid, key missing: "+a.to_s unless task.has_key?(a) + send("#{a.to_s}=".to_sym,task[a]) end when /application\/rdf\+xml/ owl = OpenTox::Owl.from_data(data,base_uri,"Task",test_if_task) @@ -62,16 +68,6 @@ module OpenTox raise "uri is null after loading" unless @uri and @uri.to_s.strip.size>0 unless test_if_task end - # invalid: getters in task.rb should work for non-internal tasks as well - # - #def self.base_uri - # @@config[:services]["opentox-task"] - #end - #def self.all - # task_uris = RestClientWrapper.get(@@config[:services]["opentox-task"]).chomp.split(/\n/) - # task_uris.collect{|uri| Task.new(uri)} - #end - def cancel RestClientWrapper.put(File.join(@uri,'Cancelled')) reload @@ -87,15 +83,9 @@ module OpenTox reload end - def parent=(task) - RestClientWrapper.put(File.join(@uri,'parent'), {:uri => task.uri}) - reload - end - - def pid=(pid) - RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid}) - reload - end + def running? + @hasStatus.to_s == 'Running' + end def completed? @hasStatus.to_s == 'Completed' @@ -105,44 +95,55 @@ module OpenTox @hasStatus.to_s == 'Error' end - def wait_for_completion(dur=0.1) - until self.completed? or self.error? + # waits for a task, unless time exceeds or state is no longer running + def wait_for_completion(dur=0.3) + + if (@uri.match(@@config[:services]["opentox-task"])) + due_to_time = Time.parse(@due_to_time) + else + # the date of the external task cannot be trusted, offest to local time might be to big + due_to_time = Time.new + EXTERNAL_TASK_MAX_DURATION + end + LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s + while self.running? sleep dur reload + if (Time.new > due_to_time) + raise "max waiting time exceeded, task seems to be stalled, task: '"+@uri.to_s+"'" + end end end - def self.as_task(parent_task=nil) + # returns the task uri + # catches halts and exceptions, task state is set to error then + def self.as_task(max_duration=DEFAULT_TASK_MAX_DURATION) #return yield nil - task = OpenTox::Task.create - task.parent = parent_task if parent_task - pid = Spork.spork(:logger => LOGGER) do + task = OpenTox::Task.create(max_duration) + Spork.spork(:logger => LOGGER) do LOGGER.debug "Task #{task.uri} started #{Time.now}" + $self_task = task + begin result = catch(:halt) do yield task end + # catching halt, set task state to error if result && result.is_a?(Array) && result.size==2 && result[0]>202 - # halted while executing task LOGGER.error "task was halted: "+result.inspect task.error(result[1]) - throw :halt,result + return end LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s task.completed(result) rescue => ex - #raise ex LOGGER.error "task failed: "+ex.message task.error(ex.message) end - raise "Invalid task state" unless task.completed? || task.error? end - LOGGER.debug "Started task with PID: " + pid.to_s - task.pid = pid + LOGGER.debug "Started task: "+task.uri.to_s task.uri end - end end |