1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
|
require File.join(File.dirname(__FILE__),'error')
DEFAULT_TASK_MAX_DURATION = 36000
module OpenTox
# Class for handling asynchronous tasks
class Task
attr_accessor :pid, :observer_pid
def self.create service_uri, params={}
# TODO set/enforce request uri
task = Task.new RestClientWrapper.post(service_uri,params).chomp
pid = fork do
begin
result_uri = yield
if URI.accessible?(result_uri)
task.completed result_uri
else
raise NotFoundError.new "\"#{result_uri}\" is not a valid result URI"
#task.error OpenTox::RestError.new :http_code => 404, :cause => "#{result_uri} is not a valid URI", :actor => params[:creator]
end
rescue
task.error $!
end
end
Process.detach(pid)
task.pid = pid
# watch if task has been cancelled
observer_pid = fork do
task.wait
begin
Process.kill(9,task.pid) if task.cancelled?
rescue
$logger.warn "Could not kill process of task #{task.uri}, pid: #{task.pid}"
end
end
Process.detach(observer_pid)
task.observer_pid = observer_pid
task
end
def kill
Process.kill(9,@pid)
Process.kill(9,@observer_pid)
rescue # no need to raise an exeption if processes are not running
end
def description
metadata[RDF::DC.description]
end
def creator
metadata[RDF::DC.creator]
end
def cancel
kill
RestClientWrapper.put(File.join(@uri,'Cancelled'),{})
end
def completed(uri)
#error OpenTox::RestError.new :http_code => 404, :cause => "\"#{uri}\" does not exist.", :actor => creator unless URI.accessible? uri
raise NotFoundError.new "Result URI \"#{uri}\" does not exist." unless URI.accessible? uri
RestClientWrapper.put(File.join(@uri,'Completed'),{:resultURI => uri})
end
def error error
# TODO: switch task service to rdf
#RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => error.report.to_rdfxml})
# create report for non-runtime errors
error.respond_to?(:reporti) ? report = error.report : report = OpenTox::ErrorReport.create(error)
RestClientWrapper.put(File.join(@uri,'Error'),{:errorReport => report.to_yaml})
kill
raise error
end
# waits for a task, unless time exceeds or state is no longer running
# @param [optional,Numeric] dur seconds pausing before checking again for completion
def wait(dur=0.3)
due_to_time = Time.new + DEFAULT_TASK_MAX_DURATION
while running?
sleep dur
raise TimeOutError.new "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" if (Time.new > due_to_time)
end
end
end
# get only header for ststus requests
def running?
RestClientWrapper.head(@uri).code == 202
end
def cancelled?
RestClientWrapper.head(@uri).code == 503
end
def completed?
RestClientWrapper.head(@uri).code == 200
end
def error?
code = RestClientWrapper.head(@uri).code
code >= 400 and code != 503
end
def method_missing(method,*args)
method = method.to_s
begin
case method
when /=/
res = RestClientWrapper.put(File.join(@uri,method.sub(/=/,'')),{})
super unless res.code == 200
else
response = metadata[RDF::OT[method]].to_s
response = metadata[RDF::OT1[method]].to_s #if response.empty? # API 1.1 compatibility
if response.empty?
raise NotFoundError.new "No #{method} metadata for #{@uri} "
end
return response
end
rescue OpenTox::Error
raise $!
rescue
$logger.error "Unknown #{self.class} method #{method}"
super
end
end
#TODO: subtasks
end
|