9b1f7b1afac32406f74298ca768ff59b801a0d63
[smartapi.git] / Examples / Python / AdaptDataService / AdaptDataService.py
1 #!/usr/bin/python
2
3 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
4 from io import BytesIO
5 from threading import Thread
6 from httpclient import HttpClient
7 import time
8 import datetime
9 import traceback
10 import random
11
12 from SmartAPI.agents.RegistrationAgent import RegistrationAgent
13 from SmartAPI.agents.SearchAgent import SearchAgent
14
15 from SmartAPI.common.Tools import Tools
16 from SmartAPI.common.RESOURCE import RESOURCE
17
18 from SmartAPI.factory.ResponseFactory import ResponseFactory
19 from SmartAPI.factory.NotificationFactory import NotificationFactory
20
21 from SmartAPI.model.Activity import Activity
22 from SmartAPI.model.Authorization import Authorization
23 from SmartAPI.model.Condition import Condition
24 from SmartAPI.model.Entity import Entity
25 from SmartAPI.model.InterfaceAddress import InterfaceAddress
26 from SmartAPI.model.Offering import Offering
27 from SmartAPI.model.Organization import Organization
28 from SmartAPI.model.PropertyDependentPriceSpecification import PropertyDependentPriceSpecification
29 from SmartAPI.model.Service import Service
30 from SmartAPI.model.TimeSeries import TimeSeries
31 from SmartAPI.model.ValueObject import ValueObject
32
33
34 myOrgIdentity = "http://www.asema.com/company/"
35 myIdentity = "http://adapt.asema.com/demos/python/datasource/"
36 myDeviceIdentity = myIdentity + "devices/Cdemodevice"
37 adaptServiceIdentity = "http://adapt.asema.com"
38 registrationServerUri = "http://find.smart-api.io/smart/v1.0e1.0/access"
39 registrationServerKeyUri = "http://find.smart-api.io/smart/v1.0e1.0/key"
40 #registrationServerUri = "http://192.168.2.96:8080/smartapifind-core/smart/v1.0e1.0/access"
41 #registrationServerKeyUri = "http://192.168.2.96:8080/smartapifind-core/smart/v1.0e1.0/key"
42
43 PORT = 8111
44 delay_between_sends_in_seconds = 5
45
46
47 class Notifier(Thread):
48         
49         def __init__(self, iface):
50                 Thread.__init__(self)
51                 self.daemon = True
52                 self.iface = iface
53                 self.running = True
54                 
55                 targetPath = iface.getScheme().asString() + "://" + iface.getHost().asString() + ":" + iface.getPort().asString() + iface.getPath().asString()
56                 print "Start sending notifications to", targetPath
57                 self.http_client = HttpClient(targetPath)
58                 
59         def run(self):
60                 while self.running:
61                         self.sendNotification()
62                         time.sleep(delay_between_sends_in_seconds)
63         
64         def stop(self):
65                 self.running = False
66                 
67         def sendNotification(self):
68                 n = NotificationFactory.create(myIdentity)
69                 a = Activity()
70                 a.setMethod(RESOURCE.NOTIFY)
71                 e = Entity(myDeviceIdentity)
72                 
73                 power = ValueObject(myIdentity + "service/Ppower")
74                 power.setQuantity(RESOURCE.POWER)
75                 power.setUnit(RESOURCE.AMPERE)
76                 power.setValue(float(random.randint(100, 700)))
77                 
78                 e.addValueObject(power)
79                 a.addEntity(e)
80                 n.setActivity(a)
81                 
82                 payload, contentType = Tools.serializeNotification(n)
83                 response = self.http_client.send_data(payload, contentType)
84                 print response
85
86
87 class SampleRegistration(object):
88         
89         def __init__(self):
90                 pass
91
92         def run(self):
93                 agent = RegistrationAgent(myIdentity)
94                 agent.setServerAddress(registrationServerUri)
95                 agent.setServerKeyAddress(registrationServerKeyUri)
96                 agent.setDebugMode(True)
97
98                 # registrate
99                 try:
100                         org = Organization(myOrgIdentity)
101                         org.setName("Asema Electronics Ltd")
102                         
103                         sampleService = Service(myIdentity)
104                         sampleService.setName("Asema Adapt demo data service")
105                         sampleService.setDescription("Demo code for a data source that is compatible with Adapt")
106                         sampleService.setCoordinates(latitude=60.180824, longitude=24.832116)
107                         sampleService.setOwner(org)
108         
109                         auth = Authorization()
110                         auth.addMethod(RESOURCE.COOKIE);
111                         auth.addMethod(RESOURCE.HTTPSTANDARD);
112         
113                         iface = InterfaceAddress()
114                         iface.setHost("127.0.0.1")
115                         iface.setPath("/test/")
116                         iface.setPort(PORT)
117                         iface.setScheme("http")
118
119                         read = Activity()
120                         read.setMethod(RESOURCE.READ)
121                         read.setInterface(iface)
122                         read.setAuthorization(auth)
123
124                         dataSource = Entity()
125                         dataSource.setIdentifierUri(myDeviceIdentity)
126                         dataSource.setName("Demo Adapt datasource")
127                         dataSource.setDescription("This datasource profived random power readings as a demo for attaching a source into Asema Adapt")
128                         dataSource.addCapability(read)
129                         dataSource.setManagedBy(myIdentity)
130                         dataSource.setServedBy(adaptServiceIdentity)
131                         dataSource.setCoordinates(latitude=60.180824, longitude=24.832116)
132                         dataSource.addOwner(org);
133                         dataSource.addCreator(org);
134                         dataSource.setManagedBy(myOrgIdentity);
135                         
136                         dataSource.addCapability(read)
137                         
138                         power = ValueObject(myIdentity + "service/Ppower")
139                         power.addType(RESOURCE.READABLE)
140                         power.setQuantity(RESOURCE.POWER)
141                         power.setUnit(RESOURCE.AMPERE)
142                         power.setMaximum(1000.0)
143                         power.setMinimum(0.0)
144                         power.setName("Power reading")
145                         power.setDescription("This is the power reading of the unit.");
146                         
147                         offering = Offering()
148                         priceSpec = PropertyDependentPriceSpecification()
149                         priceSpec.setName("Sample pricing")
150                         priceSpec.addType(RESOURCE.PRICETYPEDISCOUNT)
151                         priceSpec.setReferenceObject(myDeviceIdentity)
152                         priceSpec.setProperty(power)
153                         
154                         c1 = Condition()
155                         c1.addGreater(0)
156                         c1.addAction(10)
157                         priceSpec.addCondition(c1)
158                         
159                         c2 = Condition()
160                         c2.addGreater(10)
161                         c2.addAction(20)
162                         priceSpec.addCondition(c2)
163                         
164                         c3 = Condition()
165                         c3.addGreater(30)
166                         c3.addAction(40)
167                         priceSpec.addCondition(c3)
168                         
169                         c4 = Condition()
170                         c4.addGreater(40)
171                         c4.addAction(55)
172                         priceSpec.addCondition(c4)
173                         
174                         offering.addPriceSpecification(priceSpec)
175                         dataSource.addOffering(offering)
176                         dataSource.addValueObject(power)
177                         
178                         agent.addEntity(sampleService)
179                         agent.addEntity(dataSource);
180
181                         r = agent.registrate()
182                         if r is None or r.hasErrors():
183                                 print "Demo data service registration FAILED"
184                                 return False
185                         return True
186                 
187                 except:
188                         print "Error in registration code:"
189                         traceback.print_exc()
190                         return False
191
192
193 class SampleDataService(BaseHTTPRequestHandler):
194         
195         def __init__(self, request, client_address, server):
196                 BaseHTTPRequestHandler.__init__(self, request, client_address, server)
197                 
198         # HANDLE GET REQUESTS
199         def do_GET(self):
200                 self.send_response(200)
201                 self.end_headers()
202                         
203                 print("GET " + self.path)
204                         
205                 req_path = self.path.rstrip('/')
206                         
207                 if req_path.endswith("/identify"):
208                         self.wfile.write(str.encode(myIdentity))
209                 elif req_path.endswith("/authorize"):
210                         response_activity = Activity()
211                         response = response_activity.toString()
212                         print("GET " + response)
213                         self.wfile.write(str.encode(response))
214                 else:
215                         print("unhandled request received!")
216         
217         # HANDLE POST REQUESTS
218         def do_POST(self):
219                 content_length = int(self.headers['Content-Length'])
220                 body = self.rfile.read(content_length)
221                 self.send_response(200)
222                 self.end_headers()
223                 response_bytes = BytesIO()
224                 
225                 print("POST" + self.path)
226                 print("POST DATA: " + body)
227                 
228                 req_path = self.path.rstrip('/')
229                 
230                 if req_path.endswith("/authorize"):
231                         # Reply with empty activity to authorize
232                         response_activity = Activity()
233                         response_text = response_activity.toString()
234                         print("POST AUTHORIZE" + response_text)
235                         response_bytes.write(response_text)
236                         
237                 elif req_path.endswith("/access"):
238                         request = Tools.parseRequest(body)
239                         
240                         for a in request.getActivities():
241                                 if a.method.asString() == RESOURCE.SUBSCRIBE:
242                                         print "Subscribing to notifications"
243                                         self.handleSubscription(a)
244                                 
245                                 # Read values, either a timeseries or single value
246                                 elif a.method.asString() == RESOURCE.READ:
247                                         # Handle timeseries request
248                                         if a.hasTemporalContext():
249                                                 response_bytes.write(self.handleTimeseriesRequest(a, request))
250                                         elif a.hasEntity():
251                                                 e = a.getFirstEntity()
252                                                 response_bytes.write(self.handleSingleValueRequest(e, a, request))
253                                         else:
254                                                 print("Unknown request received")
255                 
256                 else:
257                         print("Unknown path received")
258                 
259                 # Write output to client
260                 self.wfile.write(response_bytes.getvalue())
261         
262         def handleSubscription(self, activity):
263                 if activity.hasInterface():
264                         iface = activity.getInterfaces()[0]
265                         print "Will send data to", iface.getHost().asString()
266                         n = Notifier(iface)
267                         n.run()
268                 
269                 else:
270                         print "Error: cannot subscribe without inteface data"
271         
272         def handleSingleValueRequest(self, entity, activity, request):
273                 response = ResponseFactory.create(myIdentity, request)
274                 response_activity = Activity(activity.getIdentifierUri())
275                 
276                 e = Entity(entity.getIdentifierUri())
277                 value_object = ValueObject()
278                 value_object.setValue(random.randint(100, 400))
279                 value_object.setQuantity(RESOURCE.POWER)
280                 value_object.setUnit(RESOURCE.AMPERE)
281                 e.addValueObject(value_object)
282                 response_activity.addEntity(e)
283
284                 return Tools.serializeResponse(response)[0]
285                 
286         def handleTimeseriesRequest(self, activity, request):
287                 start = activity.temporalContext.start.asDateTime()
288                 end =  activity.temporalContext.end.asDateTime()
289                 print "Timeseries request from ", start, "to", end
290
291                 # For the client to recognize from which activity this result is, the response
292                 # activity will carry the same identifier that was in the request
293                 response = ResponseFactory.create(myIdentity, request)
294                 response_activity = Activity(activity.getIdentifierUri())
295                 
296                 """
297                 The way results are obtained is up to the server. This could
298                 for instance be a result from a database query,
299                 """
300                 time_series = TimeSeries()
301                 while start < end:
302                         value_object = ValueObject()
303                         value_object.setInstant(start)
304                         value_object.setValue(random.randint(100, 400))
305                         
306                         time_series.addListItem(value_object)
307                         start = start + datetime.timedelta(hours=1)
308                         
309                 # Add timeseries to respose activity
310                 response_activity.addTimeSerie(time_series)
311                 response.addActivity(response_activity)
312                 
313                 return Tools.serializeResponse(response)[0]
314
315
316 def main():
317         registrator = SampleRegistration()
318         registrator.run()
319         
320         httpd = HTTPServer(('', PORT), SampleDataService)
321         httpd.serve_forever()
322
323
324 if __name__ == '__main__':
325         main()
326