1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
|
LOGGER.progname = File.expand_path(__FILE__)
$self_task=nil
module OpenTox
class Task
# due_to_time is only set in local tasks
TASK_ATTRIBS = [ :uri, :date, :title, :creator, :description, :hasStatus, :percentageCompleted, :resultURI, :due_to_time ]
TASK_ATTRIBS.each{ |a| attr_accessor(a) }
attr_accessor :http_code
private
def initialize(uri)
@uri = uri.to_s.strip
end
# create is private now, use OpenTox::Task.as_task
def self.create(max_duration)
task_uri = RestClientWrapper.post(@@config[:services]["opentox-task"], {:max_duration => max_duration}, nil, false).to_s
Task.find(task_uri.chomp)
end
public
def self.find(uri)
task = Task.new(uri)
task.reload
return task
end
def self.from_data(data, content_type, code, base_uri)
task = Task.new(nil)
task.http_code = code
task.reload_from_data(data, content_type, base_uri)
return task
end
def reload
result = RestClientWrapper.get(uri, {:accept => 'application/rdf+xml'}, false)#'application/x-yaml'})
@http_code = result.code
reload_from_data(result, result.content_type, uri)
end
def reload_from_data( data, content_type, base_uri )
case content_type
when /yaml/
task = YAML.load data
TASK_ATTRIBS.each do |a|
raise "task yaml data invalid, key missing: "+a.to_s unless task.has_key?(a)
send("#{a.to_s}=".to_sym,task[a])
end
when /application\/rdf\+xml/
owl = OpenTox::Owl.from_data(data,base_uri,"Task")
self.uri = owl.uri
(TASK_ATTRIBS-[:uri]).each{|a| self.send("#{a.to_s}=".to_sym, owl.get(a.to_s))}
else
raise "content type for tasks not supported: "+content_type.to_s
end
raise "uri is null after loading" unless @uri and @uri.to_s.strip.size>0
end
def cancel
RestClientWrapper.put(File.join(@uri,'Cancelled'))
reload
end
def completed(uri)
RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
reload
end
def error(description)
RestClientWrapper.put(File.join(@uri,'Error'),{:description => description.to_s[0..2000]})
reload
end
def pid=(pid)
RestClientWrapper.put(File.join(@uri,'pid'), {:pid => pid})
end
def running?
@hasStatus.to_s == 'Running'
end
def completed?
@hasStatus.to_s == 'Completed'
end
def error?
@hasStatus.to_s == 'Error'
end
# waits for a task, unless time exceeds or state is no longer running
def wait_for_completion(dur=0.3)
if (@uri.match(@@config[:services]["opentox-task"]))
due_to_time = Time.parse(@due_to_time)
running_time = due_to_time - Time.parse(@date)
else
# the date of the external task cannot be trusted, offest to local time might be to big
due_to_time = Time.new + EXTERNAL_TASK_MAX_DURATION
running_time = EXTERNAL_TASK_MAX_DURATION
end
LOGGER.debug "start waiting for task "+@uri.to_s+" at: "+Time.new.to_s+", waiting at least until "+due_to_time.to_s
while self.running?
sleep dur
reload
check_state
if (Time.new > due_to_time)
raise "max wait time exceeded ("+running_time.to_s+"sec), task: '"+@uri.to_s+"'"
end
end
LOGGER.debug "Task '"+@hasStatus+"': "+@uri.to_s+", Result: "+@resultURI.to_s
end
def check_state
begin
raise "illegal task state, task is completed, resultURI is no URI: '"+@resultURI.to_s+
"'" unless @resultURI and Utils.is_uri?(@resultURI) if completed?
if @http_code == 202
raise "illegal task state, code is 202, but hasStatus is not Running: '"+@hasStatus+"'" unless running?
elsif @http_code == 201
raise "illegal task state, code is 201, but hasStatus is not Completed: '"+@hasStatus+"'" unless completed?
raise "illegal task state, code is 201, resultURI is no task-URI: '"+@resultURI.to_s+
"'" unless @resultURI and Utils.task_uri?(@resultURI)
end
rescue => ex
RestClientWrapper.raise_uri_error(ex.message, @uri)
end
end
# returns the task uri
# catches halts and exceptions, task state is set to error then
def self.as_task(max_duration=DEFAULT_TASK_MAX_DURATION)
#return yield nil
task = OpenTox::Task.create(max_duration)
task_pid = Spork.spork(:logger => LOGGER) do
LOGGER.debug "Task #{task.uri} started #{Time.now}"
$self_task = task
begin
result = catch(:halt) do
yield task
end
# catching halt, set task state to error
if result && result.is_a?(Array) && result.size==2 && result[0]>202
LOGGER.error "task was halted: "+result.inspect
task.error(result[1])
return
end
LOGGER.debug "Task #{task.uri} done #{Time.now} -> "+result.to_s
task.completed(result)
rescue => ex
LOGGER.error "task failed: "+ex.message
LOGGER.error ": "+ex.backtrace.join("\n")
task.error(ex.message)
end
end
task.pid = task_pid
LOGGER.debug "Started task: "+task.uri.to_s
task.uri
end
end
end
|