1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
# TODO: task seems to run twice, see fminser tests
# TODO: do we need tasks for internal use
DEFAULT_TASK_MAX_DURATION = 36000
module OpenTox
# TODO: fix error reports
# TODO: fix field names and overwrite accessors
# Class for handling asynchronous tasks
class Task
include Mongoid::Document
include Mongoid::Timestamps
field :creator, type: String
field :percentageCompleted, type: Float
field :error_code, type: Integer # workaround name, cannot overwrite accessors in current mongoid version
field :finished, type: Time # workaround name, cannot overwrite accessors in current mongoid version
# TODO
field :result_type, type: String
field :result_id, type: BSON::ObjectId
field :report, type: String
field :pid, type: Integer
field :observer_pid, type: Integer
def self.run(description, creator=nil)
task = Task.new
task[:description] = description.to_s
task[:creator] = creator.to_s
task[:percentageCompleted] = 0
task[:error_code] = 202
task.save
pid = fork do
begin
task.completed yield
rescue => e
# wrap non-opentox-errors first
e = OpenTox::Error.new(500,e.message,nil,e.backtrace) unless e.is_a?(OpenTox::Error)
$logger.error "error in task #{task.id} created by #{creator}" # creator is not logged because error is logged when thrown
task.update(:report => e.metadata, :error_code => e.http_code, :finished => Time.now)
task.kill
end
end
Process.detach(pid)
task[:pid] = pid
# watch if task has been cancelled
observer_pid = fork do
task.wait
begin
Process.kill(9,task[:pid]) if task.cancelled?
rescue
$logger.warn "Could not kill process of task #{task.id}, pid: #{task[:pid]}"
end
end
Process.detach(observer_pid)
task[:observer_pid] = observer_pid
task
end
def kill
Process.kill(9,task[:pid])
Process.kill(9,task[:observer_pid])
rescue # no need to raise an exception if processes are not running
end
def cancel
kill
update_attributes(:error_code => 503, :finished => Time.now)
end
def completed(result)
update_attributes(:error_code => 200, :finished => Time.now, :percentageCompleted => 100, :result_type => result.type, :result_id => result.id)
end
# waits for a task, unless time exceeds or state is no longer running
def wait
start_time = Time.new
due_to_time = start_time + DEFAULT_TASK_MAX_DURATION
dur = 0.2
while running?
sleep dur
dur = [[(Time.new - start_time)/20.0,0.3].max,300.0].min
request_timeout_error "max wait time exceeded ("+DEFAULT_TASK_MAX_DURATION.to_s+"sec), task: '"+id.to_s+"'" if (Time.new > due_to_time)
end
end
end
def error_report
OpenTox::Task.find(id).report
end
def code
OpenTox::Task.find(id).error_code
end
def result
c = OpenTox::Task.find(id).result_type.downcase.to_sym
rid = OpenTox::Task.find(id).result_id
p c, rid
p $mongo[collection].all
$mongo[collection].find(rid).first
end
def finished_at
OpenTox::Task.find(id).finished
end
def running?
code == 202
end
def cancelled?
code == 503
end
def completed?
code == 200
end
def error?
code >= 400 and code != 503
end
# Check status of a task
# @return [String] Status
def status
case code
when 202
"Running"
when 200
"Completed"
when 503
"Cancelled"
else
"Error"
end
end
end
|