summaryrefslogtreecommitdiff
path: root/lib/task.rb
blob: 286e998262af9178b7516bef1c6b509533a52a83 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
require File.join(File.dirname(__FILE__),'error')
DEFAULT_TASK_MAX_DURATION = 36000
module OpenTox

  # Class for handling asynchronous tasks
  class Task

    attr_accessor :pid, :observer_pid
    def self.create service_uri, params={}
      task = Task.new RestClientWrapper.post(service_uri,params).chomp
      pid = fork do
        begin
          result_uri = yield 
          if URI.accessible?(result_uri)
            task.completed result_uri
          else
            raise "#{result_uri} is not a valid URI"
          end
        rescue 
          # TODO add service URI to Kernel.raise
          # serialize error and send to task service
          #task.error $!
          task.error $! 
          raise
        end
      end
      Process.detach(pid)
      task.pid = pid

      # watch if task has been cancelled
      observer_pid = fork do
        task.wait_for_completion
        begin
          Process.kill(9,task.pid) if task.cancelled?
        rescue
          $logger.warn "Could not kill process of task #{task.uri}, pid: #{task.pid}"
        end
      end
      Process.detach(observer_pid)
      task.observer_pid = observer_pid
      task
    end

    def kill
      Process.kill(9,@pid)
      Process.kill(9,@observer_pid)
      rescue # no need to raise an aexeption if processes are not running
    end

    def description
      metadata[RDF::DC.description]
    end
    
    def cancel
      kill
      RestClientWrapper.put(File.join(@uri,'Cancelled'),{})
    end

    def completed(uri)
      #TODO: subjectid?
      #TODO: error code
      raise "\"#{uri}\" does not exist." unless URI.accessible? uri
      RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
    end

    def error error
      $logger.error self if $logger
      report = ErrorReport.create(error,"http://localhost")
      RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => report})
      kill
    end

    # waits for a task, unless time exceeds or state is no longer running
    # @param [optional,Numeric] dur seconds pausing before checking again for completion
    def wait_for_completion(dur=0.3)
      due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION
      while running?
        sleep dur
        raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" if (Time.new > due_to_time)
      end
    end

  end

  # get only header for ststus requests
  def running?
    RestClient.head(@uri){ |response, request, result| result.code.to_i == 202 }
  end

  def cancelled?
    RestClient.head(@uri){ |response, request, result| result.code.to_i == 503 }
  end

  def completed?
    RestClient.head(@uri){ |response, request, result| result.code.to_i == 200 }
  end

  def error?
    RestClient.head(@uri){ |response, request, result| result.code.to_i == 500 }
  end

  def method_missing(method,*args)
    method = method.to_s
    begin
      case method
      when /=/
        res = RestClientWrapper.put(File.join(@uri,method.sub(/=/,'')),{})
        super unless res.code == 200
      #when /\?/
        #return hasStatus == method.sub(/\?/,'').capitalize
      else
        response = metadata[RDF::OT[method]].to_s
        response = metadata[RDF::OT1[method]].to_s #if response.empty?  # API 1.1 compatibility
        if response.empty?
          $logger.error "No #{method} metadata for #{@uri} "
          raise "No #{method} metadata for #{@uri} "
        end
        return response
      end
    rescue
      $logger.error "Unknown #{self.class} method #{method}"
      #super
    end
  end

  #TODO: subtasks

end