From 97e3942191e1ab8f084ba8da475749a9609c37aa Mon Sep 17 00:00:00 2001 From: mguetlein Date: Fri, 14 Jan 2011 14:54:14 +0100 Subject: add percentage completed support --- lib/algorithm.rb | 5 ++-- lib/model.rb | 12 ++++++-- lib/rest_client_wrapper.rb | 68 ++++++++++++++++++++++++++++++++++------------ lib/task.rb | 33 +++++++++++++--------- 4 files changed, 83 insertions(+), 35 deletions(-) (limited to 'lib') diff --git a/lib/algorithm.rb b/lib/algorithm.rb index 0aa86e6..58a2640 100644 --- a/lib/algorithm.rb +++ b/lib/algorithm.rb @@ -13,9 +13,10 @@ module OpenTox # Execute algorithm with parameters, please consult the OpenTox API and the webservice documentation for acceptable parameters # @param [optional,Hash] params Algorithm parameters + # @param [optional,OpenTox::Task] waiting_task (can be a OpenTox::Subtask as well), progress is updated accordingly # @return [String] URI of new resource (dataset, model, ...) - def run(params=nil) - RestClientWrapper.post(@uri, {:accept => 'text/uri-list'}, params).to_s + def run(params=nil, waiting_task=nil) + RestClientWrapper.post(@uri, {:accept => 'text/uri-list'}, params, waiting_task).to_s end # Get OWL-DL representation in RDF/XML format diff --git a/lib/model.rb b/lib/model.rb index 1671ba7..e95c78c 100644 --- a/lib/model.rb +++ b/lib/model.rb @@ -6,15 +6,16 @@ module OpenTox # Run a model with parameters # @param [Hash] params Parameters for OpenTox model + # @param [optional,OpenTox::Task] waiting_task (can be a OpenTox::Subtask as well), progress is updated accordingly # @return [text/uri-list] Task or resource URI - def run(params) + def run( params, waiting_task=nil ) if CONFIG[:yaml_hosts].include?(URI.parse(@uri).host) accept = 'application/x-yaml' else accept = 'application/rdf+xml' end begin - RestClientWrapper.post(@uri,{:accept => accept},params).to_s + RestClientWrapper.post(@uri,{:accept => accept},params,waiting_task).to_s rescue => e LOGGER.error "Failed to run #{@uri} with #{params.inspect} (#{e.inspect})" raise "Failed to run #{@uri} with #{params.inspect}" @@ -121,8 +122,10 @@ module OpenTox # Predict a dataset # @param [String] dataset_uri Dataset URI + # @param [optional,subjectid] + # @param [optional,OpenTox::Task] waiting_task (can be a OpenTox::Subtask as well), progress is updated accordingly # @return [OpenTox::Dataset] Dataset with predictions - def predict_dataset(dataset_uri, subjectid=nil) + def predict_dataset(dataset_uri, subjectid=nil, waiting_task=nil) @prediction_dataset = Dataset.create(CONFIG[:services]["opentox-dataset"], subjectid) @prediction_dataset.add_metadata({ OT.hasSource => @uri, @@ -132,9 +135,12 @@ module OpenTox }) d = Dataset.new(dataset_uri) d.load_compounds + count = 0 d.compounds.each do |compound_uri| begin predict(compound_uri,false,subjectid) + count += 1 + waiting_task.progress( count/d.compounds.size.to_f*100.0 ) if waiting_task rescue => ex LOGGER.warn "prediction for compound "+compound_uri.to_s+" failed: "+ex.message end diff --git a/lib/rest_client_wrapper.rb b/lib/rest_client_wrapper.rb index 2f0e215..920a828 100644 --- a/lib/rest_client_wrapper.rb +++ b/lib/rest_client_wrapper.rb @@ -34,33 +34,67 @@ module OpenTox class RestClientWrapper - def self.get(uri, headers=nil, wait=true) - execute( "get", uri, headers, nil, wait) + # performs a GET REST call + # raises OpenTox::Error if call fails (rescued in overwrite.rb -> halt 502) + # per default: waits for Task to finish and returns result URI of Task + # @param [String] uri destination URI + # @param [optional,Hash] headers contains params like accept-header + # @param [optional,OpenTox::Task] waiting_task (can be a OpenTox::Subtask as well), progress is updated accordingly + # @param [wait,Boolean] wait set to false to NOT wait for task if result is task + # @return [OpenTox::WrapperResult] a String containing the result-body of the REST call + def self.get(uri, headers=nil, waiting_task=nil, wait=true ) + execute( "get", uri, headers, nil, waiting_task, wait) end - def self.post(uri, headers, payload=nil, wait=true) - execute( "post", uri, headers, payload, wait ) + # performs a POST REST call + # raises OpenTox::Error if call fails (rescued in overwrite.rb -> halt 502) + # per default: waits for Task to finish and returns result URI of Task + # @param [String] uri destination URI + # @param [optional,Hash] headers contains params like accept-header + # @param [optional,String] payload data posted to the service + # @param [optional,OpenTox::Task] waiting_task (can be a OpenTox::Subtask as well), progress is updated accordingly + # @param [wait,Boolean] wait set to false to NOT wait for task if result is task + # @return [OpenTox::WrapperResult] a String containing the result-body of the REST call + def self.post(uri, headers, payload=nil, waiting_task=nil, wait=true ) + execute( "post", uri, headers, payload, waiting_task, wait ) end + # performs a PUT REST call + # raises OpenTox::Error if call fails (rescued in overwrite.rb -> halt 502) + # @param [String] uri destination URI + # @param [optional,Hash] headers contains params like accept-header + # @param [optional,String] payload data put to the service + # @return [OpenTox::WrapperResult] a String containing the result-body of the REST call def self.put(uri, headers, payload=nil ) execute( "put", uri, headers, payload ) end - def self.delete(uri, headers=nil) + # performs a DELETE REST call + # raises OpenTox::Error if call fails (rescued in overwrite.rb -> halt 502) + # @param [String] uri destination URI + # @param [optional,Hash] headers contains params like accept-header + # @return [OpenTox::WrapperResult] a String containing the result-body of the REST call + def self.delete(uri, headers=nil ) execute( "delete", uri, headers, nil) end + # raises an Error message (rescued in overwrite.rb -> halt 502) + # usage: if the return value of a call is invalid + # @param [String] error_msg the error message + # @param [String] uri destination URI that is responsible for the error + # @param [optional,Hash] headers sent to the URI + # @param [optional,String] payload data sent to the URI def self.raise_uri_error(error_msg, uri, headers=nil, payload=nil) - do_halt( "-", error_msg, uri, headers, payload ) + raise_ot_error( "-", error_msg, uri, headers, payload ) end private - def self.execute( rest_call, uri, headers, payload=nil, wait=true ) + def self.execute( rest_call, uri, headers, payload=nil, waiting_task=nil, wait=true ) - do_halt 400,"uri is null",uri,headers,payload unless uri - do_halt 400,"not a uri",uri,headers,payload unless uri.to_s.uri? - do_halt 400,"headers are no hash",uri,headers,payload unless headers==nil or headers.is_a?(Hash) - do_halt 400,"nil headers for post not allowed, use {}",uri,headers,payload if rest_call=="post" and headers==nil + raise_ot_error 400,"uri is null",uri,headers,payload unless uri + raise_ot_error 400,"not a uri",uri,headers,payload unless uri.to_s.uri? + raise_ot_error 400,"headers are no hash",uri,headers,payload unless headers==nil or headers.is_a?(Hash) + raise_ot_error 400,"nil headers for post not allowed, use {}",uri,headers,payload if rest_call=="post" and headers==nil headers.each{ |k,v| headers.delete(k) if v==nil } if headers #remove keys with empty values, as this can cause problems begin @@ -84,13 +118,13 @@ module OpenTox return res if res.code==200 || !wait while (res.code==201 || res.code==202) - res = wait_for_task(res, uri) + res = wait_for_task(res, uri, waiting_task) end raise "illegal status code: '"+res.code.to_s+"'" unless res.code==200 return res rescue RestClient::RequestTimeout => ex - do_halt 408,ex.message,uri,headers,payload + raise_ot_error 408,ex.message,uri,headers,payload rescue => ex #raise ex #raise "'"+ex.message+"' uri: "+uri.to_s @@ -101,11 +135,11 @@ module OpenTox code = 500 msg = ex.to_s end - do_halt code,msg,uri,headers,payload + raise_ot_error code,msg,uri,headers,payload end end - def self.wait_for_task( res, base_uri ) + def self.wait_for_task( res, base_uri, waiting_task=nil ) task = nil case res.content_type @@ -121,7 +155,7 @@ module OpenTox end LOGGER.debug "result is a task '"+task.uri.to_s+"', wait for completion" - task.wait_for_completion + task.wait_for_completion waiting_task raise task.description unless task.completed? # maybe task was cancelled / error res = WrapperResult.new task.result_uri @@ -130,7 +164,7 @@ module OpenTox return res end - def self.do_halt( code, body, uri, headers, payload=nil ) + def self.raise_ot_error( code, body, uri, headers, payload=nil ) #build error causing_errors = Error.parse(body) diff --git a/lib/task.rb b/lib/task.rb index dcbff3f..d701c82 100644 --- a/lib/task.rb +++ b/lib/task.rb @@ -12,7 +12,7 @@ module OpenTox DC.title => "", DC.date => "", OT.hasStatus => "Running", - OT.percentageCompleted => "0", + OT.percentageCompleted => 0.0, OT.resultURI => "", DC.creator => "", # not mandatory according to API DC.description => "", # not mandatory according to API @@ -160,12 +160,12 @@ module OpenTox def load_metadata if (CONFIG[:yaml_hosts].include?(URI.parse(uri).host)) - result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, false) + result = RestClientWrapper.get(@uri, {:accept => 'application/x-yaml'}, nil, false) @metadata = YAML.load result.to_s @http_code = result.code else @metadata = Parser::Owl::Generic.new(@uri).load_metadata - @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, false).code + @http_code = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, nil, false).code end end @@ -216,7 +216,9 @@ module OpenTox =end # waits for a task, unless time exceeds or state is no longer running - def wait_for_completion(dur=0.3) + # @param [optional,OpenTox::Task] waiting_task (can be a OpenTox::Subtask as well), progress is updated accordingly + # @param [optional,Numeric] dur seconds pausing before cheking again for completion + def wait_for_completion( waiting_task=nil, dur=0.3) due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s @@ -226,6 +228,8 @@ module OpenTox while self.running? sleep dur load_metadata + # if another (sub)task is waiting for self, set progress accordingly + waiting_task.progress(@metadata[OT.percentageCompleted]) if waiting_task check_state if (Time.new > due_to_time) raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" @@ -234,6 +238,18 @@ module OpenTox LOGGER.debug "Task '"+@metadata[OT.hasStatus]+"': "+@uri.to_s+", Result: "+@metadata[OT.resultURI].to_s end + + # updates percentageCompleted value (can only be increased) + # task has to be running + # @param [Numeric] pct value between 0 and 100 + def progress(pct) + #puts "task := "+pct.to_s + raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100 + if (pct > @metadata[OT.percentageCompleted] + 0.0001) + RestClientWrapper.put(File.join(@uri,'Running'),{:percentageCompleted => pct}) + load_metadata + end + end private def check_state @@ -252,15 +268,6 @@ module OpenTox RestClientWrapper.raise_uri_error(ex.message, @uri) end end - - public - #hint: do not overwrite percentageCompleted=, this is used in toYaml - def progress(pct) -# #puts "task := "+pct.to_s -# raise "no numeric >= 0 and <= 100 : '"+pct.to_s+"'" unless pct.is_a?(Numeric) and pct>=0 and pct<=100 -# RestClientWrapper.put(File.join(@uri,'Running'),{:percentageCompleted => pct}) -# reload - end end -- cgit v1.2.3