summaryrefslogtreecommitdiff
path: root/lib/task.rb
blob: 52d4a3069cabc57516866aeff1728d6f0f586882 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
require File.join(File.dirname(__FILE__),'error')
DEFAULT_TASK_MAX_DURATION = 36000
module OpenTox

  # Class for handling asynchronous tasks
  class Task

    attr_accessor :pid
    def self.create service_uri, params={}
      task = Task.new RestClient.post(service_uri,params).chomp
      pid = fork do
        begin
          result_uri = yield 
          if result_uri.uri?
            task.completed result_uri
          else
            raise "#{result_uri} is not a valid URI"
          end
        rescue 
          # TODO add service URI to Kernel.raise
          # serialize error and send to task service
          #task.error $!
          task.error $! 
          raise
        end
      end
      Process.detach(pid)
      task.pid = pid
      task
    end

    def kill
      begin
        Process.kill(9,pid)
      rescue 
      end
    end

    def description
      metadata[RDF::DC.description]
    end
    
    def cancel
      kill
      RestClient.put(File.join(@uri,'Cancelled'),{})
    end

    def completed(uri)
      RestClient.put(File.join(@uri,'Completed'),{:resultURI => uri})
    end

    def error error
      $logger.error self if $logger
      kill
      report = ErrorReport.create(error,"http://localhost")
      RestClient.put(File.join(@uri,'Error'),{:errorReport => report})
      #RestClient.put(File.join(@uri,'Error'),{:message => error, :backtrace => error.backtrace})
    end

    # waits for a task, unless time exceeds or state is no longer running
    # @param [optional,Numeric] dur seconds pausing before checking again for completion
    def wait_for_completion(dur=0.3)
      due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION
      while running?
        sleep dur
        raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" if (Time.new > due_to_time)
      end
    end

  end

  def method_missing(method,*args)
    method = method.to_s
    begin
      case method
      when /=/
        res = RestClient.put(File.join(@uri,method.sub(/=/,'')),{})
        super unless res.code == 200
      when /\?/
        return hasStatus == method.sub(/\?/,'').capitalize
      else
        response = metadata[RDF::OT[method]].to_s
        response = metadata[RDF::OT1[method]].to_s #if response.empty?  # API 1.1 compatibility
        if response.empty?
          $logger.error "No #{method} metadata for #{@uri} "
          raise "No #{method} metadata for #{@uri} "
        end
        return response
      end
    rescue
      $logger.error "Unknown #{self.class} method #{method}"
      super
    end
  end

  # override to read all error codes
  def metadata reload=true
    if reload
      @metadata = {}
      # ignore error codes from Task services (may contain eg 500 which causes exceptions in RestClient and RDF::Reader
      RestClient.get(@uri) do |response, request, result, &block|
        $logger.warn "#{@uri} returned #{result}" unless response.code == 200 or response.code == 202
        RDF::Reader.for(:rdfxml).new(response) do |reader|
          reader.each_statement do |statement|
            @metadata[statement.predicate] = statement.object if statement.subject == @uri
          end
        end
      end
    end
    @metadata
  end

  #TODO: subtasks

end