summaryrefslogtreecommitdiff
path: root/lib/task.rb
blob: 03d5330673f81e52f732d694ef00db90f734243c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
LOGGER.progname = File.expand_path(__FILE__)

DEFAULT_TASK_MAX_DURATION = 120
EXTERNAL_TASK_MAX_DURATION = 60

$self_task=nil

module OpenTox

	class Task

    # due_to_time is only set in local tasks 
    TASK_ATTRIBS = [ :uri, :date, :title, :creator, :title, :description, :hasStatus, :percentageCompleted, :resultURI, :due_to_time ]
    TASK_ATTRIBS.each{ |a| attr_accessor(a) }

    private
    def initialize(uri)
      @uri = uri.to_s.strip
    end
    
    # create is private now, use OpenTox::Task.as_task
		def self.create(max_duration)
      task_uri = RestClientWrapper.post(@@config[:services]["opentox-task"], {:max_duration => max_duration}, nil, false).to_s
			Task.find(task_uri.chomp)
		end

    public
		def self.find(uri)
      task = Task.new(uri)
      task.reload
      return task
    end
    
    # test_if_task = true -> error suppressed if data is no task, nil is returned
    def self.from_data(data, content_type, base_uri, test_if_task)
      task = Task.new(nil)
      task.reload_from_data(data, content_type, base_uri, test_if_task)
      if test_if_task and (!task.uri or task.uri.strip.size==0)
        return nil
      else
        return task
      end
    end
    
    def reload
      result = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'})#'text/x-yaml'})
      reload_from_data(result, result.content_type, uri, false)
    end
    
    # test_if_task = true -> error suppressed if data is no task, empty task is returned
    def reload_from_data( data, content_type, base_uri, test_if_task )
      case content_type
      when /text\/x-yaml/
        task =  YAML.load data
        TASK_ATTRIBS.each do |a|
          raise "task yaml data invalid, key missing: "+a.to_s unless task.has_key?(a)
          send("#{a.to_s}=".to_sym,task[a])
        end
      when /application\/rdf\+xml/
        owl = OpenTox::Owl.from_data(data,base_uri,"Task",test_if_task)
        if owl
          self.uri = owl.uri
          (TASK_ATTRIBS-[:uri]).each{|a| self.send("#{a.to_s}=".to_sym, owl.get(a.to_s))}
        end
      else
        raise "content type for tasks not supported: "+content_type.to_s
      end
      raise "uri is null after loading" unless @uri and @uri.to_s.strip.size>0 unless test_if_task
    end
    
		def cancel
      RestClientWrapper.put(File.join(@uri,'Cancelled'))
      reload
		end

		def completed(uri)
			RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
      reload
		end

		def error(description)
			RestClientWrapper.put(File.join(@uri,'Error'),{:description => description})
      reload
		end
    
    def pid=(pid)
      RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
    end

    def running?
      @hasStatus.to_s == 'Running'
    end

		def completed?
			@hasStatus.to_s == 'Completed'
		end

		def error?
			@hasStatus.to_s == 'Error'
		end

    # waits for a task, unless time exceeds or state is no longer running
		def wait_for_completion(dur=0.3)
      
      if (@uri.match(@@config[:services]["opentox-task"]))
        due_to_time = Time.parse(@due_to_time)
      else
        # the date of the external task cannot be trusted, offest to local time might be to big
        due_to_time = Time.new + EXTERNAL_TASK_MAX_DURATION
      end
      LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s
			while self.running?
				sleep dur
        reload
        if (Time.new > due_to_time)
          raise "max waiting time exceeded, task seems to be stalled, task: '"+@uri.to_s+"'"
        end
			end
	  end
  
    # returns the task uri
    # catches halts and exceptions, task state is set to error then
    def self.as_task(max_duration=DEFAULT_TASK_MAX_DURATION)
      #return yield nil
      
      task = OpenTox::Task.create(max_duration)
      task_pid = Spork.spork(:logger => LOGGER) do
        LOGGER.debug "Task #{task.uri} started #{Time.now}"
        $self_task = task
        
        begin
          result = catch(:halt) do
            yield task
          end
          # catching halt, set task state to error
          if result && result.is_a?(Array) && result.size==2 && result[0]>202
            LOGGER.error "task was halted: "+result.inspect
            task.error(result[1])
            return
          end
          LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
          task.completed(result)
        rescue => ex
          LOGGER.error "task failed: "+ex.message
          task.error(ex.message)
        end
      end  
      task.pid = task_pid
      LOGGER.debug "Started task: "+task.uri.to_s
      task.uri
    end  
	end

end