From 76e2c84c766fd37d702c45e76dc666230afcd136 Mon Sep 17 00:00:00 2001 From: Christoph Helma Date: Mon, 13 Feb 2012 16:30:05 +0000 Subject: task.rb modified (untested) --- lib/spork.rb | 84 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ lib/task.rb | 68 ++++++++++++++++++++---------------------------- 2 files changed, 112 insertions(+), 40 deletions(-) create mode 100644 lib/spork.rb diff --git a/lib/spork.rb b/lib/spork.rb new file mode 100644 index 0000000..40c458b --- /dev/null +++ b/lib/spork.rb @@ -0,0 +1,84 @@ +# A way to cleanly handle process forking in Sinatra when using Passenger, aka "sporking some code". +# This will allow you to properly execute some code asynchronously, which otherwise does not work correctly. +# +# Written by Ron Evans +# More info at http://deadprogrammersociety.com +# +# Mostly lifted from the Spawn plugin for Rails (http://github.com/tra/spawn) +# but with all of the Rails stuff removed.... cause you are using Sinatra. If you are using Rails, Spawn is +# what you need. If you are using something else besides Sinatra that is Rack-based under Passenger, and you are having trouble with +# asynch processing, let me know if spork helped you. +# +module Spork + # things to close in child process + @@resources = [] + def self.resources + @@resources + end + + # set the resource to disconnect from in the child process (when forking) + def self.resource_to_close(resource) + @@resources << resource + end + + # close all the resources added by calls to resource_to_close + def self.close_resources + @@resources.each do |resource| + resource.close if resource && resource.respond_to?(:close) && !resource.closed? + end + @@resources = [] + end + + # actually perform the fork... er, spork + # valid options are: + # :priority => to set the process priority of the child + # :logger => a logger object to use from the child + # :no_detach => true if you want to keep the child process under the parent control. usually you do NOT want this + def self.spork(options={}) + logger = options[:logger] + logger.debug "spork> parent PID = #{Process.pid}" if logger + + child = fork do + begin + start = Time.now + logger.debug "spork> child PID = #{Process.pid}" if logger + + # set the nice priority if needed + Process.setpriority(Process::PRIO_PROCESS, 0, options[:priority]) if options[:priority] + + # disconnect from the rack + Spork.close_resources + + # run the block of code that takes so long + yield + + rescue => ex + #raise ex + logger.error "spork> Exception in child[#{Process.pid}] - #{ex.class}: #{ex.message}" if logger + ensure + logger.info "spork> child[#{Process.pid}] took #{Time.now - start} sec" if logger + # this form of exit doesn't call at_exit handlers + exit!(0) + end + end + + # detach from child process (parent may still wait for detached process if they wish) + Process.detach(child) unless options[:no_detach] + + return child + end + +end + +# Patch to work with passenger +if defined? Passenger::Rack::RequestHandler + class Passenger::Rack::RequestHandler + alias_method :orig_process_request, :process_request + def process_request(env, input, output) + Spork.resource_to_close(input) + Spork.resource_to_close(output) + orig_process_request(env, input, output) + end + end +end + diff --git a/lib/task.rb b/lib/task.rb index 0adb7a0..cd5aa79 100644 --- a/lib/task.rb +++ b/lib/task.rb @@ -1,62 +1,31 @@ +require File.join(File.dirname(__FILE__),'spork') DEFAULT_TASK_MAX_DURATION = 36000 module OpenTox # Class for handling asynchronous tasks class Task - - def self.create service_uri - Task.new RestClient.post(service_uri,{}).chomp - #eval("#{self}.new(\"#{uri}\", #{subjectid})") - end - - def http_code - get(@uri).code - end - - def status - metadata[RDF::OT.hasStatus].to_s - end - - def result_uri - metadata[RDF::OT.resultURI] + def self.create service_uri, params + task = Task.new RestClient.post(service_uri,params).chomp + pid = Spork.spork { yield } + task.pid = pid + task end def description metadata[RDF::DC.description] end - def errorReport - metadata[RDF::OT.errorReport] - end - def cancel - RestClient.put(File.join(@uri,'Cancelled'),{:cannot_be => "empty"}) + RestClient.put(File.join(@uri,'Cancelled'),{}) end def completed(uri) RestClient.put(File.join(@uri,'Completed'),{:resultURI => uri}) end - def error(error_report) - raise "no error report" unless error_report.is_a?(OpenTox::ErrorReport) - RestClient.put(File.join(@uri,'Error'),{:errorReport => error_report.to_yaml}) - end - - def pid=(pid) - RestClient.put(File.join(@uri,'pid'), {:pid => pid}) - end - - def running? - metadata[RDF::OT.hasStatus] == 'Running' - end - - def completed? - metadata[RDF::OT.hasStatus] == 'Completed' - end - - def error? - metadata[RDF::OT.hasStatus] == 'Error' + def error(error) + RestClient.put(File.join(@uri,'Error'),{:errorReport => OpenTox::Error.new(error)}) end # waits for a task, unless time exceeds or state is no longer running @@ -69,4 +38,23 @@ module OpenTox end end + def method_missing(method,*args) + method = method.to_s + begin + case method + when /=/ + res = RestClient.put(File.join(@uri,method.sub(/=/,'')),{}) + super unless res.code == 200 + when /?/ + return metadata[RDF::OT.hasStatus] == method.sub(/\?/,'').capitalize + else + return metadata[RDF::OT[method]] + end + rescue + super + end + end + + #TODO: subtasks + end -- cgit v1.2.3