summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorMartin Gütlein <martin.guetlein@gmail.com>2010-04-27 12:00:39 +0200
committerMartin Gütlein <martin.guetlein@gmail.com>2010-04-27 12:00:39 +0200
commitac5a07c294bba699d6517d5be1548a33455b87c8 (patch)
tree5ad284a26e4ae0772945e2400c9d10079f317882 /lib
parentafa31803afd167c19c9ba95e73d311c9e2b8b43a (diff)
change tasks: remove tree structure, add max wait time
Diffstat (limited to 'lib')
-rw-r--r--lib/environment.rb3
-rw-r--r--lib/rest_client_wrapper.rb58
-rw-r--r--lib/task.rb83
3 files changed, 59 insertions, 85 deletions
diff --git a/lib/environment.rb b/lib/environment.rb
index 27235c3..c86d2be 100644
--- a/lib/environment.rb
+++ b/lib/environment.rb
@@ -40,6 +40,9 @@ if @@config[:database]
end
end
+# hack: store sinatra in global var to make url_for and halt methods accessible
+before {$sinatra = self unless $sinatra}
+
class Sinatra::Base
# overwriting halt to log halts (!= 202)
def halt(status,msg)
diff --git a/lib/rest_client_wrapper.rb b/lib/rest_client_wrapper.rb
index 0219f51..194a742 100644
--- a/lib/rest_client_wrapper.rb
+++ b/lib/rest_client_wrapper.rb
@@ -36,29 +36,6 @@ module OpenTox
class RestClientWrapper
- # PENDING: remove as soon as redirect tasks are remove from partner webservices
- def self.redirect_task( uri )
- raise "no redirect task uri: "+uri.to_s unless uri.to_s =~ /194.141.0.136|ambit.*task|tu-muenchen.*task/
-
- while (uri.to_s =~ /194.141.0.136|ambit.*task|tu-muenchen.*task/)
- #HACK handle redirect
- LOGGER.debug "REDIRECT TASK: "+uri.to_s
- redirect = ""
- while (redirect.size == 0)
- IO.popen("bin/redirect.sh "+uri.to_s) do |f|
- while line = f.gets
- redirect += line.chomp
- end
- end
- raise redirect!=nil && redirect.size>0 ? redirect : "TASK ERROR" if $?!=0
- sleep 0.3
- end
- uri = redirect
- LOGGER.debug "REDIRECT TO: "+uri.to_s
- end
- return uri
- end
-
def self.get(uri, headers=nil)
execute( "get", uri, headers)
end
@@ -81,7 +58,7 @@ module OpenTox
private
def self.execute( rest_call, uri, headers, payload=nil, wait=true )
-
+
do_halt 400,"uri is null",uri,headers,payload unless uri
do_halt 400,"not a uri",uri,headers,payload unless Utils.is_uri?(uri)
do_halt 400,"headers are no hash",uri,headers,payload unless headers==nil or headers.is_a?(Hash)
@@ -112,15 +89,9 @@ module OpenTox
case res.content_type
when /application\/rdf\+xml|text\/x-yaml/
task = OpenTox::Task.from_data(res, res.content_type, uri, true)
- when /text\//
+ when /text\//
return res if res.content_type=~/text\/uri-list/ and
res.split("\n").size > 1 #if uri list contains more then one uri, its not a task
- # HACK for redirect tasks
- if res =~ /ambit.*task|tu-muenchen.*task/
- res = WrapperResult.new(redirect_task(res))
- res.content_type = "text/uri-list"
- return res
- end
task = OpenTox::Task.find(res.to_s) if Utils.task_uri?(res)
else
raise "unknown content-type when checking for task: "+res.content_type+" content: "+res[0..200]
@@ -130,7 +101,7 @@ module OpenTox
if task
LOGGER.debug "result is a task '"+task.uri.to_s+"', wait for completion"
task.wait_for_completion
- raise task.description if task.error?
+ raise task.description unless task.completed?
res = WrapperResult.new(task.resultURI)
LOGGER.debug "task resultURI "+res.to_s
res.content_type = "text/uri-list"
@@ -175,18 +146,17 @@ module OpenTox
# count+=1 while File.exist?(File.join(error_dir,file_name+"_"+time+"_"+count.to_s))
# File.new(File.join(error_dir,file_name+"_"+time+"_"+count.to_s),"w").puts(body)
- # return error (by halting, halts should be logged)
- # PENDING always return yaml for now
- begin
- if defined?(halt)
- halt(502,error.to_yaml)
- elsif defined?($sinatra)
- $sinatra.halt(502,error.to_yaml)
- else
- raise ""
- end
- rescue
- raise error.to_yaml
+ # handle error
+ # we are either in a task, or in sinatra
+ # PENDING: always return yaml for now
+
+
+ if $self_task #this global var in Task.as_task to mark that the current process is running in a task
+ raise error.to_yaml # the error is caught, logged, and task state is set to error in Task.as_task
+ elsif $sinatra #else halt sinatra
+ $sinatra.halt(502,error.to_yaml)
+ else
+ raise "internal error"
end
end
end
diff --git a/lib/task.rb b/lib/task.rb
index 4b147d7..178f744 100644
--- a/lib/task.rb
+++ b/lib/task.rb
@@ -1,10 +1,16 @@
LOGGER.progname = File.expand_path(__FILE__)
+DEFAULT_TASK_MAX_DURATION = 120
+EXTERNAL_TASK_MAX_DURATION = 60
+
+$self_task=nil
+
module OpenTox
class Task
- TASK_ATTRIBS = [ :uri, :date, :title, :creator, :title, :description, :hasStatus, :percentageCompleted, :resultURI ]
+ # due_to_time is only set in local tasks
+ TASK_ATTRIBS = [ :uri, :date, :title, :creator, :title, :description, :hasStatus, :percentageCompleted, :resultURI, :due_to_time ]
TASK_ATTRIBS.each{ |a| attr_accessor(a) }
private
@@ -12,12 +18,13 @@ module OpenTox
@uri = uri.to_s.strip
end
- public
- def self.create
- task_uri = RestClientWrapper.post(@@config[:services]["opentox-task"], {}, nil, false).to_s
+ # create is private now, use OpenTox::Task.as_task
+ def self.create(max_duration)
+ task_uri = RestClientWrapper.post(@@config[:services]["opentox-task"], {:max_duration => max_duration}, nil, false).to_s
Task.find(task_uri.chomp)
end
+ public
def self.find(uri)
task = Task.new(uri)
task.reload
@@ -36,7 +43,7 @@ module OpenTox
end
def reload
- result = RestClientWrapper.get(uri)
+ result = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'})#'text/x-yaml'})
reload_from_data(result, result.content_type, uri, false)
end
@@ -45,10 +52,9 @@ module OpenTox
case content_type
when /text\/x-yaml/
task = YAML.load data
- if task.is_a?(Task)
- TASK_ATTRIBS.each{ |a| send("#{a.to_s}=".to_sym,task[a]) }
- else
- raise "yaml data is no task: "+task.class.to_s unless test_if_task
+ TASK_ATTRIBS.each do |a|
+ raise "task yaml data invalid, key missing: "+a.to_s unless task.has_key?(a)
+ send("#{a.to_s}=".to_sym,task[a])
end
when /application\/rdf\+xml/
owl = OpenTox::Owl.from_data(data,base_uri,"Task",test_if_task)
@@ -62,16 +68,6 @@ module OpenTox
raise "uri is null after loading" unless @uri and @uri.to_s.strip.size>0 unless test_if_task
end
- # invalid: getters in task.rb should work for non-internal tasks as well
- #
- #def self.base_uri
- # @@config[:services]["opentox-task"]
- #end
- #def self.all
- # task_uris = RestClientWrapper.get(@@config[:services]["opentox-task"]).chomp.split(/\n/)
- # task_uris.collect{|uri| Task.new(uri)}
- #end
-
def cancel
RestClientWrapper.put(File.join(@uri,'Cancelled'))
reload
@@ -87,15 +83,9 @@ module OpenTox
reload
end
- def parent=(task)
- RestClientWrapper.put(File.join(@uri,'parent'), {:uri => task.uri})
- reload
- end
-
- def pid=(pid)
- RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
- reload
- end
+ def running?
+ @hasStatus.to_s == 'Running'
+ end
def completed?
@hasStatus.to_s == 'Completed'
@@ -105,44 +95,55 @@ module OpenTox
@hasStatus.to_s == 'Error'
end
- def wait_for_completion(dur=0.1)
- until self.completed? or self.error?
+ # waits for a task, unless time exceeds or state is no longer running
+ def wait_for_completion(dur=0.3)
+
+ if (@uri.match(@@config[:services]["opentox-task"]))
+ due_to_time = Time.parse(@due_to_time)
+ else
+ # the date of the external task cannot be trusted, offest to local time might be to big
+ due_to_time = Time.new + EXTERNAL_TASK_MAX_DURATION
+ end
+ LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s
+ while self.running?
sleep dur
reload
+ if (Time.new > due_to_time)
+ raise "max waiting time exceeded, task seems to be stalled, task: '"+@uri.to_s+"'"
+ end
end
end
- def self.as_task(parent_task=nil)
+ # returns the task uri
+ # catches halts and exceptions, task state is set to error then
+ def self.as_task(max_duration=DEFAULT_TASK_MAX_DURATION)
#return yield nil
- task = OpenTox::Task.create
- task.parent = parent_task if parent_task
- pid = Spork.spork(:logger => LOGGER) do
+ task = OpenTox::Task.create(max_duration)
+ Spork.spork(:logger => LOGGER) do
LOGGER.debug "Task #{task.uri} started #{Time.now}"
+ $self_task = task
+
begin
result = catch(:halt) do
yield task
end
+ # catching halt, set task state to error
if result && result.is_a?(Array) && result.size==2 && result[0]>202
- # halted while executing task
LOGGER.error "task was halted: "+result.inspect
task.error(result[1])
- throw :halt,result
+ return
end
LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
task.completed(result)
rescue => ex
- #raise ex
LOGGER.error "task failed: "+ex.message
task.error(ex.message)
end
- raise "Invalid task state" unless task.completed? || task.error?
end
- LOGGER.debug "Started task with PID: " + pid.to_s
- task.pid = pid
+ LOGGER.debug "Started task: "+task.uri.to_s
task.uri
end
-
end
end