From a8e52efbfd977c7ecfc98df10960686a05ed8afd Mon Sep 17 00:00:00 2001 From: Christoph Helma Date: Thu, 16 Jul 2015 19:07:29 +0200 Subject: task tests passed --- lib/error.rb | 2 +- lib/opentox.rb | 7 +++- lib/task.rb | 123 +++++++++++++++++---------------------------------------- 3 files changed, 44 insertions(+), 88 deletions(-) diff --git a/lib/error.rb b/lib/error.rb index 878ed28..12e22ff 100644 --- a/lib/error.rb +++ b/lib/error.rb @@ -2,7 +2,7 @@ require 'open4' # add additional fields to Exception class to format errors according to OT-API module OpenToxError - attr_accessor :http_code, :uri, :error_cause + attr_accessor :http_code, :uri, :error_cause, :metadata def initialize(message=nil, uri=nil, cause=nil) message = message.to_s.gsub(/\A"|"\Z/, '') if message # remove quotes @error_cause = cause ? OpenToxError::cut_backtrace(cause) : short_backtrace diff --git a/lib/opentox.rb b/lib/opentox.rb index b2e9c1b..9be6078 100644 --- a/lib/opentox.rb +++ b/lib/opentox.rb @@ -19,7 +19,7 @@ module OpenTox end def created_at - # TODO from BSON::ObjectId + @data["_id"].generation_time end # Object metadata (lazy loading) @@ -66,6 +66,11 @@ module OpenTox @data["_id"] = $mongo[collection].insert_one(@data).inserted_id end + # partial update + def update metadata + $mongo[collection].find(:_id => @data["_id"]).find_one_and_replace('$set' => metadata) + end + # Save object at webservice (replace or create object) def put #@data.delete("_id") # to enable updates diff --git a/lib/task.rb b/lib/task.rb index 9d03aed..f7e4c6f 100644 --- a/lib/task.rb +++ b/lib/task.rb @@ -2,83 +2,61 @@ DEFAULT_TASK_MAX_DURATION = 36000 module OpenTox # Class for handling asynchronous tasks - class Task + class Task - attr_accessor :pid, :observer_pid + def self.run(description, creator=nil) - def metadata - super true # always update metadata - end - - def self.task_uri - Task.new.uri - end - - def self.run(description, creator=nil, uri=nil) - - task = Task.new uri - task[:created_at] = DateTime.now.to_s - task[:hasStatus] = "Running" + task = Task.new task[:description] = description.to_s task[:creator] = creator.to_s - task[:percentageCompleted] = "0" - task.put + task[:percentageCompleted] = 0 + task[:code] = 202 + task.save pid = fork do begin task.completed yield rescue => e # wrap non-opentox-errors first e = OpenTox::Error.new(500,e.message,nil,e.backtrace) unless e.is_a?(OpenTox::Error) - $logger.error "error in task #{task.uri} created by #{creator}" # creator is not logged because error is logged when thrown - RestClientWrapper.put(File.join(task.uri,'Error'),{:errorReport => e.to_json},{:content_type => 'application/json'}) + $logger.error "error in task #{task.id} created by #{creator}" # creator is not logged because error is logged when thrown + task.update(:errorReport => e.metadata, :code => e.http_code, :finished_at => Time.now) + task.get task.kill end end Process.detach(pid) - task.pid = pid + task[:pid] = pid # watch if task has been cancelled observer_pid = fork do task.wait begin - Process.kill(9,task.pid) if task.cancelled? + Process.kill(9,task[:pid]) if task.cancelled? rescue - $logger.warn "Could not kill process of task #{task.uri}, pid: #{task.pid}" + $logger.warn "Could not kill process of task #{task.id}, pid: #{task[:pid]}" end end Process.detach(observer_pid) - task.observer_pid = observer_pid + task[:observer_pid] = observer_pid task end def kill - Process.kill(9,@pid) - Process.kill(9,@observer_pid) + Process.kill(9,task[:pid]) + Process.kill(9,task[:observer_pid]) rescue # no need to raise an exception if processes are not running end - def description - self.[](:description) - end - - def creator - self.[](:creator) - end - def cancel kill - self.[]=(:hasStatus, "Cancelled") - self.[]=(:finished_at, DateTime.now.to_s) - put + update(:code => 503, :finished_at => Time.now) + get end - def completed(uri) - self.[]=(:resultURI, uri) - self.[]=(:hasStatus, "Completed") - self.[]=(:finished_at, DateTime.now.to_s) - self.[]=(:percentageCompleted, "100") - put + def completed(result) + update(:code => 200, :finished_at => Time.now, :percentageCompleted => 100, :result => result) + get end # waits for a task, unless time exceeds or state is no longer running @@ -89,17 +67,12 @@ module OpenTox while running? sleep dur dur = [[(Time.new - start_time)/20.0,0.3].max,300.0].min - request_timeout_error "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+uri.to_s+"'" if (Time.new > due_to_time) + request_timeout_error "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+id.to_s+"'" if (Time.new > due_to_time) end end end - def code - RestClientWrapper.get(uri).code.to_i - end - - # get only header for status requests def running? code == 202 end @@ -116,51 +89,29 @@ module OpenTox code >= 400 and code != 503 end - [:hasStatus, :resultURI, :created_at, :finished_at, :percentageCompleted].each do |method| - define_method method do - get - self.[](method) - end - end - # Check status of a task # @return [String] Status def status - get - self[:hasStatus] - end - - def error_report - get - self[:errorReport] + case code + when 202 + "Running" + when 200 + "Completed" + when 503 + "Cancelled" + else + "Error" + end end - #TODO: subtasks (only for progress in validation) - class SubTask - - def initialize(task, min, max) - #TODO add subtask code + [:code, :description, :creator, :finished_at, :percentageCompleted, :result, :errorReport].each do |method| + define_method method do + $mongo[:task].find(:_id => self.id).distinct(method).first end + end - def self.create(task, min, max) - if task - SubTask.new(task, min, max) - else - nil - end - end - - def waiting_for(task_uri) - #TODO add subtask code - end - - def progress(pct) - #TODO add subtask code - end - - def running?() - #TODO add subtask code - end + def error_report + self.errorReport end end -- cgit v1.2.3