summaryrefslogtreecommitdiff
path: root/lib/task.rb
blob: d642f48653a310227d84cad6c2e86744d30bdbe7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
DEFAULT_TASK_MAX_DURATION = 36000
module OpenTox

  # Class for handling asynchronous tasks
  class Task

    attr_accessor :pid, :observer_pid

    def metadata
      super true # always update metadata
    end

    def self.run(description, creator=nil, subjectid=SUBJECTID)

      task = Task.new nil, subjectid
      task[RDF::OT.created_at] = DateTime.now
      task[RDF::OT.hasStatus] = "Running"
      task[RDF::DC.description] = description.to_s
      task[RDF::DC.creator] = creator.to_s
      task.put
      pid = fork do
        begin
          task.completed yield
        rescue
          if $!.respond_to? :to_ntriples
            RestClientWrapper.put(File.join(task.uri,'Error'),{:errorReport => $!.to_ntriples},{:content_type => 'text/plain', :subjectid => task.subjectid}) 
          else
            cut_index = $!.backtrace.find_index{|line| line.match /gems\/sinatra/}
            cut_index = -1 unless cut_index
            @rdf = RDF::Graph.new
            subject = RDF::Node.new
            @rdf << [subject, RDF.type, RDF::OT.ErrorReport]
            @rdf << [subject, RDF::OT.message, $!.message]
            @rdf << [subject, RDF::OT.errorCode, $!.class.to_s]
            @rdf << [subject, RDF::OT.errorCause, $!.backtrace[0..cut_index].join("\n")]
            prefixes = {:rdf => "http://www.w3.org/1999/02/22-rdf-syntax-ns#", :ot => RDF::OT.to_s}
            turtle = RDF::Turtle::Writer.for(:turtle).buffer(:prefixes => prefixes)  do |writer|
              @rdf.each{|statement| writer << statement}
            end
            $logger.error turtle
            nt = RDF::Writer.for(:ntriples).buffer do |writer|
              @rdf.each{|statement| writer << statement}
            end
            RestClientWrapper.put(File.join(task.uri,'Error'),{:errorReport => nt},{:content_type => 'text/plain', :subjectid => task.subjectid}) 
          end
          task.kill
        end
      end
      Process.detach(pid)
      task.pid = pid

      # watch if task has been cancelled 
      observer_pid = fork do
        task.wait
        begin
          Process.kill(9,task.pid) if task.cancelled?
        rescue
          $logger.warn "Could not kill process of task #{task.uri}, pid: #{task.pid}"
        end
      end
      Process.detach(observer_pid)
      task.observer_pid = observer_pid
      task

    end

    def kill
      Process.kill(9,@pid)
      Process.kill(9,@observer_pid)
    rescue # no need to raise an exception if processes are not running
    end

    def description
      self.[](RDF::DC.description)
    end

    def creator
      self.[](RDF::DC.creator)
    end
    
    def cancel
      kill
      self.[]=(RDF::OT.hasStatus, "Cancelled")
      self.[]=(RDF::OT.finished_at, DateTime.now)
      put
    end

    def completed(uri)
      self.[]=(RDF::OT.resultURI, uri)
      self.[]=(RDF::OT.hasStatus, "Completed")
      self.[]=(RDF::OT.finished_at, DateTime.now)
      put
    end

    # waits for a task, unless time exceeds or state is no longer running
    def wait
      start_time = Time.new
      due_to_time = start_time + DEFAULT_TASK_MAX_DURATION
      dur = 0.2
      while running? 
        sleep dur
        dur = [[(Time.new - start_time)/20.0,0.3].max,300.0].min
        request_timeout_error "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" if (Time.new > due_to_time)
      end
    end

  end

  def code
    RestClientWrapper.head(@uri,{},:subjectid => @subjectid).code.to_i
  end

  # get only header for status requests
  def running?
    code == 202 
  end

  def cancelled?
    code == 503
  end

  def completed?
    code == 200
  end

  def error?
    code >= 400 and code != 503
  end

  [:hasStatus, :resultURI, :created_at, :finished_at].each do |method|
    define_method method do
      response = self.[](RDF::OT[method])
      response = self.[](RDF::OT1[method]) unless response  # API 1.1 compatibility
      response
    end
  end

  # Check status of a task
  # @return [String] Status
  def status
    self[RDF::OT.hasStatus]
  end

  def error_report
    get
    report = {}
    query = RDF::Query.new({
      :report => {
        RDF.type  => RDF::OT.ErrorReport,
        :property => :value,
      }
    })
    query.execute(@rdf).each do |solution|
      report[solution.property] = solution.value.to_s
    end
    report
  end

  #TODO: subtasks (only for progress)
  class SubTask
    
    def initialize(task, min, max)
      #TODO add subtask code
    end

    def self.create(task, min, max)
      if task
        SubTask.new(task, min, max)
      else
        nil
      end
    end
    
    def waiting_for(task_uri)
      #TODO add subtask code
    end
    
    def progress(pct)
      #TODO add subtask code
    end
    
    def running?()
      #TODO add subtask code
    end
  end

end