1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
|
DEFAULT_TASK_MAX_DURATION = 36000
module OpenTox
# Class for handling asynchronous tasks
class Task
attr_accessor :pid, :observer_pid
def self.create service_uri, subjectid=nil, params={}
uri = File.join(service_uri,SecureRandom.uuid)
task = Task.new uri, subjectid
task[RDF::OT.created_at] = DateTime.now
task[RDF::OT.hasStatus] = "Running"
params.each { |k,v| task[k] = v }
task.put false
pid = fork do
begin
result_uri = yield
task.completed result_uri
rescue
unless $!.is_a?(RuntimeError) # PENDING: only runtime Errors are logged when raised
cut_index = $!.backtrace.find_index{|line| line.match /gems\/sinatra/}
msg = "\nTask ERROR\n"+
"task description: #{params[RDF::DC.description]}\n"+
"task uri: #{$!.class.to_s}\n"+
"error msg: #{$!.message}\n"+
"error backtrace:\n#{$!.backtrace[0..cut_index].join("\n")}\n"
$logger.error msg
end
if $!.respond_to? :to_ntriples
RestClientWrapper.put(File.join(task.uri,'Error'),:errorReport => $!.to_ntriples,:content_type => 'text/plain')
else
RestClientWrapper.put(File.join(task.uri,'Error'))
end
task.kill
end
end
Process.detach(pid)
task.pid = pid
# watch if task has been cancelled
observer_pid = fork do
task.wait
begin
Process.kill(9,task.pid) if task.cancelled?
rescue
$logger.warn "Could not kill process of task #{task.uri}, pid: #{task.pid}"
end
end
Process.detach(observer_pid)
task.observer_pid = observer_pid
task
end
def kill
Process.kill(9,@pid)
Process.kill(9,@observer_pid)
rescue # no need to raise an exception if processes are not running
end
def description
self.[](RDF::DC.description)
end
def creator
self.[](RDF::DC.creator)
end
def cancel
kill
self.[]=(RDF::OT.hasStatus, "Cancelled")
self.[]=(RDF::OT.finished_at, DateTime.now)
put false
end
def completed(uri)
self.[]=(RDF::OT.resultURI, uri)
self.[]=(RDF::OT.hasStatus, "Completed")
self.[]=(RDF::OT.finished_at, DateTime.now)
put false
end
# waits for a task, unless time exceeds or state is no longer running
# @param [optional,Numeric] dur seconds pausing before checking again for completion
# TODO: add waiting task
def wait
start_time = Time.new
due_to_time = start_time + DEFAULT_TASK_MAX_DURATION
dur = 0.3
while running?
sleep dur
dur = [[(Time.new - start_time)/20.0,0.3].max,300.0].min
time_out_error "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+@uri.to_s+"'" if (Time.new > due_to_time)
end
end
end
# get only header for status requests
def running?
RestClientWrapper.head(@uri).code == 202
end
def cancelled?
RestClientWrapper.head(@uri).code == 503
end
def completed?
RestClientWrapper.head(@uri).code == 200
end
def error?
code = RestClientWrapper.head(@uri).code
code >= 400 and code != 503
end
[:hasStatus, :resultURI, :created_at, :finished_at].each do |method|
define_method method do
get
response = self.[](RDF::OT[method])
response = self.[](RDF::OT1[method]) unless response # API 1.1 compatibility
response
end
end
def error_report
report = {}
query = RDF::Query.new({
:report => {
RDF.type => RDF::OT.ErrorReport,
:property => :value,
}
})
query.execute(@rdf).each do |solution|
report[solution.property] = solution.value.to_s
end
report
end
#TODO: subtasks (only for progress)
end
|