summaryrefslogtreecommitdiff
path: root/lib/task.rb
diff options
context:
space:
mode:
authorChristoph Helma <helma@in-silico.ch>2010-11-19 16:53:21 +0100
committerChristoph Helma <helma@in-silico.ch>2010-11-19 16:53:21 +0100
commitf8552611c2dbe25d76474f51e4e895bf9c2b5c5e (patch)
treeda145cd1d69adc4cdb8d299f0cea2e0810b88eaf /lib/task.rb
parent91c95f8dc8f60a8f0029b970ef881eecee28401b (diff)
lazar predictions for toxcreate working
Diffstat (limited to 'lib/task.rb')
-rw-r--r--lib/task.rb278
1 files changed, 177 insertions, 101 deletions
diff --git a/lib/task.rb b/lib/task.rb
index 96ee719..5b2b5d9 100644
--- a/lib/task.rb
+++ b/lib/task.rb
@@ -2,39 +2,183 @@ $self_task=nil
module OpenTox
+ # Class for handling asynchronous tasks
class Task
- attr_accessor :uri, :date, :title, :creator, :description, :hasStatus, :percentageCompleted, :resultURI, :due_to_time, :http_code
-
- # due_to_time is only set in local tasks
- TASK_ATTRIBS = [ :uri, :date, :title, :creator, :description, :hasStatus, :percentageCompleted, :resultURI, :due_to_time ]
- TASK_ATTRIBS.each{ |a| attr_accessor(a) }
- attr_accessor :http_code
+ include OpenTox
+ attr_accessor :http_code, :due_to_time
- private
- def initialize(uri)
- @uri = uri.to_s.strip
+ def initialize(uri=nil)
+ super uri
+ @metadata = {
+ DC.title => "",
+ DC.date => "",
+ OT.hasStatus => "Running",
+ OT.percentageCompleted => "0",
+ OT.resultURI => "",
+ DC.creator => "", # not mandatory according to API
+ DC.description => "", # not mandatory according to API
+ }
end
-
- # create is private now, use OpenTox::Task.as_task
- def self.create( params )
+
+ # Create a new task for the code in the block. Catches halts and exceptions and sets task state to error if necessary. The block has to return the URI of the created resource.
+ # @example
+ # task = OpenTox::Task.create do
+ # # this code will be executed as a task
+ # model = OpenTox::Algorithm.run(params) # this can be time consuming
+ # model.uri # Important: return URI of the created resource
+ # end
+ # task.status # returns "Running", because tasks are forked
+ # @param [String] title Task title
+ # @param [String] creator Task creator
+ # @return [OPenTox::Task] Task
+ def self.create( title=nil, creator=nil, max_duration=DEFAULT_TASK_MAX_DURATION, description=nil )
+
+ # measure current memory consumption
+ memory = `free -m|sed -n '2p'`.split
+ free_memory = memory[3].to_i + memory[6].to_i # include cache
+ if free_memory < 20 # require at least 200 M free memory
+ LOGGER.warn "Cannot start task - not enough memory left (#{free_memory} M free)"
+ raise "Insufficient memory to start a new task"
+ end
+
+ cpu_load = `cat /proc/loadavg`.split(/\s+/)[0..2].collect{|c| c.to_f}
+ nr_cpu_cores = `cat /proc/cpuinfo |grep "cpu cores"|cut -d ":" -f2|tr -d " "`.split("\n").collect{|c| c.to_i}.inject{|sum,n| sum+n}
+ if cpu_load[0] > nr_cpu_cores and cpu_load[0] > cpu_load[1] and cpu_load[1] > cpu_load[2] # average CPU load of the last minute is high and CPU load is increasing
+ LOGGER.warn "Cannot start task - CPU load too high (#{cpu_load.join(", ")})"
+ raise "Server too busy to start a new task"
+ end
+
+ params = {:title=>title, :creator=>creator, :max_duration=>max_duration, :description=>description }
task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, nil, false).to_s
- Task.find(task_uri.chomp)
- end
+ task = Task.new(task_uri.chomp)
+
+ task_pid = Spork.spork(:logger => LOGGER) do
+ LOGGER.debug "Task #{task.uri} started #{Time.now}"
+ $self_task = task
+
+ begin
+ result = catch(:halt) do
+ yield task
+ end
+ # catching halt, set task state to error
+ if result && result.is_a?(Array) && result.size==2 && result[0]>202
+ LOGGER.error "task was halted: "+result.inspect
+ task.error(result[1])
+ return
+ end
+ LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
+ task.completed(result)
+ rescue => ex
+ LOGGER.error "task failed: "+ex.message
+ LOGGER.error ": "+ex.backtrace.join("\n")
+ task.error(ex.message)
+ end
+ end
+ task.pid = task_pid
+ LOGGER.debug "Started task: "+task.uri.to_s
+ task
+ end
- public
- def self.find( uri, accept_header=nil )
+ # Find a task for querying, status changes
+ # @param [String] uri Task URI
+ # @return [OpenTox::Task] Task object
+ def self.find(uri)
task = Task.new(uri)
- task.reload( accept_header )
- return task
+ task.load_metadata
+ task
+ end
+
+ # Get a list of all tasks
+ # @param [optional, String] uri URI of task service
+ # @return [text/uri-list] Task URIs
+ def self.all(uri=CONFIG[:services]["opentox-task"])
+ OpenTox.all uri
+ end
+
+ def self.from_yaml(yaml)
+ @metadata = YAML.load(yaml)
+ end
+
+ def self.from_rdfxml(rdfxml)
+ file = Tempfile.open("ot-rdfxml"){|f| f.write(rdfxml)}.path
+ parser = Parser::Owl::Generic.new file
+ @metadata = parser.load_metadata
+ end
+
+ def to_rdfxml
+ s = Serializer::Owl.new
+ s.add_task(@uri,@metadata)
+ s.to_rdfxml
+ end
+
+ def status
+ @metadata[OT.hasStatus]
+ end
+
+ def result_uri
+ @metadata[OT.resultURI]
+ end
+
+ def description
+ @metadata[DC.description]
+ end
+
+ def cancel
+ RestClientWrapper.put(File.join(@uri,'Cancelled'))
+ load_metadata
+ end
+
+ def completed(uri)
+ RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
+ load_metadata
+ end
+
+ def error(description)
+ RestClientWrapper.put(File.join(@uri,'Error'),{:description => description.to_s[0..2000]})
+ load_metadata
+ end
+
+ def pid=(pid)
+ RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
+ end
+
+ def running?
+ @metadata[OT.hasStatus] == 'Running'
+ end
+
+ def completed?
+ @metadata[OT.hasStatus] == 'Completed'
+ end
+
+ def error?
+ @metadata[OT.hasStatus] == 'Error'
+ end
+
+ def load_metadata
+ if (CONFIG[:yaml_hosts].include?(URI.parse(uri).host))
+ result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, false)
+ @metadata = YAML.load result.to_s
+ @http_code = result.code
+ else
+ @metadata = Parser::Owl::Generic.new(@uri).load_metadata
+ @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, false).code
+ end
end
+ # create is private now, use OpenTox::Task.as_task
+ #def self.create( params )
+ #task_uri = RestClientWrapper.post(CONFIG[:services]["opentox-task"], params, nil, false).to_s
+ #Task.find(task_uri.chomp)
+ #end
+
+=begin
def self.from_data(data, content_type, code, base_uri)
task = Task.new(nil)
task.http_code = code
task.reload_from_data(data, content_type, base_uri)
return task
end
-
+
def reload( accept_header=nil )
unless accept_header
if (CONFIG[:yaml_hosts].include?(URI.parse(uri).host))
@@ -65,113 +209,45 @@ module OpenTox
end
raise "uri is null after loading" unless @uri and @uri.to_s.strip.size>0
end
-
- def cancel
- RestClientWrapper.put(File.join(@uri,'Cancelled'))
- reload
- end
-
- def completed(uri)
- RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
- reload
- end
-
- def error(description)
- RestClientWrapper.put(File.join(@uri,'Error'),{:description => description.to_s[0..2000]})
- reload
- end
-
- def pid=(pid)
- RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
- end
-
- def running?
- @hasStatus.to_s == 'Running'
- end
-
- def completed?
- @hasStatus.to_s == 'Completed'
- end
-
- def error?
- @hasStatus.to_s == 'Error'
- end
+=end
# waits for a task, unless time exceeds or state is no longer running
def wait_for_completion(dur=0.3)
- if (@uri.match(CONFIG[:services]["opentox-task"]))
- due_to_time = (@due_to_time.is_a?(Time) ? @due_to_time : Time.parse(@due_to_time))
- running_time = due_to_time - (@date.is_a?(Time) ? @date : Time.parse(@date))
- else
- # the date of the external task cannot be trusted, offest to local time might be to big
- due_to_time = Time.new + EXTERNAL_TASK_MAX_DURATION
- running_time = EXTERNAL_TASK_MAX_DURATION
- end
+ due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION
LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s
+ load_metadata # for extremely fast tasks
+ check_state
while self.running?
sleep dur
- reload
+ load_metadata
check_state
if (Time.new > due_to_time)
- raise "max wait time exceeded ("+running_time.to_s+"sec), task: '"+@uri.to_s+"'"
+ raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'"
end
end
- LOGGER.debug "Task '"+@hasStatus+"': "+@uri.to_s+", Result: "+@resultURI.to_s
+ LOGGER.debug "Task '"+@metadata[OT.hasStatus]+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s
end
+ private
def check_state
begin
- raise "illegal task state, task is completed, resultURI is no URI: '"+@resultURI.to_s+
- "'" unless @resultURI and @resultURI.to_s.uri? if completed?
+ raise "illegal task state, task is completed, resultURI is no URI: '"+@metadata[OT.resultURI].to_s+
+ "'" unless @metadata[OT.resultURI] and @metadata[OT.resultURI].to_s.uri? if completed?
if @http_code == 202
- raise "illegal task state, code is 202, but hasStatus is not Running: '"+@hasStatus+"'" unless running?
+ raise "illegal task state, code is 202, but hasStatus is not Running: '"+@metadata[OT.hasStatus]+"'" unless running?
elsif @http_code == 201
- raise "illegal task state, code is 201, but hasStatus is not Completed: '"+@hasStatus+"'" unless completed?
- raise "illegal task state, code is 201, resultURI is no task-URI: '"+@resultURI.to_s+
- "'" unless @resultURI and @resultURI.to_s.uri?
+ raise "illegal task state, code is 201, but hasStatus is not Completed: '"+@metadata[OT.hasStatus]+"'" unless completed?
+ raise "illegal task state, code is 201, resultURI is no task-URI: '"+@metadata[OT.resultURI].to_s+
+ "'" unless @metadata[OT.resultURI] and @metadata[OT.resultURI].to_s.uri?
end
rescue => ex
RestClientWrapper.raise_uri_error(ex.message, @uri)
end
end
-
- # returns the task uri
- # catches halts and exceptions, task state is set to error then
- def self.as_task( title, creator, max_duration=DEFAULT_TASK_MAX_DURATION, description=nil )
- #return yield nil
-
- params = {:title=>title, :creator=>creator, :max_duration=>max_duration, :description=>description }
- task = ::OpenTox::Task.create(params)
- task_pid = Spork.spork(:logger => LOGGER) do
- LOGGER.debug "Task #{task.uri} started #{Time.now}"
- $self_task = task
-
- begin
- result = catch(:halt) do
- yield task
- end
- # catching halt, set task state to error
- if result && result.is_a?(Array) && result.size==2 && result[0]>202
- LOGGER.error "task was halted: "+result.inspect
- task.error(result[1])
- return
- end
- LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
- task.completed(result)
- rescue => ex
- LOGGER.error "task failed: "+ex.message
- LOGGER.error ": "+ex.backtrace.join("\n")
- task.error(ex.message)
- end
- end
- task.pid = task_pid
- LOGGER.debug "Started task: "+task.uri.to_s
- task.uri
- end
end