1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
require File.join(File.dirname(__FILE__),'error')
DEFAULT_TASK_MAX_DURATION = 36000
module OpenTox
# Class for handling asynchronous tasks
class Task
attr_accessor :pid, :observer_pid
def self.create service_uri, params={}
task = Task.new RestClientWrapper.post(service_uri,params).chomp
pid = fork do
begin
result_uri = yield
if URI.accessible?(result_uri)
task.completed result_uri
else
raise "#{result_uri} is not a valid URI"
end
rescue
task.error $!
end
end
Process.detach(pid)
task.pid = pid
# watch if task has been cancelled
observer_pid = fork do
task.wait
begin
Process.kill(9,task.pid) if task.cancelled?
rescue
$logger.warn "Could not kill process of task #{task.uri}, pid: #{task.pid}"
end
end
Process.detach(observer_pid)
task.observer_pid = observer_pid
task
end
def kill
Process.kill(9,@pid)
Process.kill(9,@observer_pid)
rescue # no need to raise an aexeption if processes are not running
end
def description
metadata[RDF::DC.description]
end
def creator
metadata[RDF::DC.creator]
end
def cancel
kill
RestClientWrapper.put(File.join(@uri,'Cancelled'),{})
end
def completed(uri)
#TODO: subjectid?
#TODO: error code
raise "\"#{uri}\" does not exist." unless URI.accessible? uri
RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
end
def error error
$logger.error self
report = ErrorReport.create(error,self.creator)
RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => report})
kill
raise error
end
# waits for a task, unless time exceeds or state is no longer running
# @param [optional,Numeric] dur seconds pausing before checking again for completion
def wait(dur=0.3)
due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION
while running?
sleep dur
raise "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" if (Time.new > due_to_time)
end
end
end
# get only header for ststus requests
def running?
RestClientWrapper.head(@uri).code == 202
end
def cancelled?
RestClientWrapper.head(@uri).code == 503
end
def completed?
RestClientWrapper.head(@uri).code == 200
end
def error?
RestClientWrapper.head(@uri).code == 500
end
def method_missing(method,*args)
method = method.to_s
begin
case method
when /=/
res = RestClientWrapper.put(File.join(@uri,method.sub(/=/,'')),{})
super unless res.code == 200
else
response = metadata[RDF::OT[method]].to_s
response = metadata[RDF::OT1[method]].to_s #if response.empty? # API 1.1 compatibility
if response.empty?
$logger.error "No #{method} metadata for #{@uri} "
raise "No #{method} metadata for #{@uri} "
end
return response
end
rescue
$logger.error "Unknown #{self.class} method #{method}"
#super
end
end
#TODO: subtasks
end
|