Python: Fix URL in Variant parsing and serialization; Notification
[smartapi.git] / Examples / Python / AdaptDataService / AdaptDataService.py
1 #!/usr/bin/python
2
3 from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
4 from io import BytesIO
5 from threading import Thread
6 from httpclient import HttpClient
7 import time
8 import traceback
9 import random
10
11 from SmartAPI.agents.RegistrationAgent import RegistrationAgent
12 from SmartAPI.agents.SearchAgent import SearchAgent
13
14 from SmartAPI.common.Tools import Tools
15 from SmartAPI.common.RESOURCE import RESOURCE
16
17 from SmartAPI.factory.ResponseFactory import ResponseFactory
18 from SmartAPI.factory.NotificationFactory import NotificationFactory
19
20 from SmartAPI.model.Activity import Activity
21 from SmartAPI.model.Authorization import Authorization
22 from SmartAPI.model.Condition import Condition
23 from SmartAPI.model.Entity import Entity
24 from SmartAPI.model.InterfaceAddress import InterfaceAddress
25 from SmartAPI.model.Offering import Offering
26 from SmartAPI.model.Organization import Organization
27 from SmartAPI.model.PropertyDependentPriceSpecification import PropertyDependentPriceSpecification
28 from SmartAPI.model.Service import Service
29 from SmartAPI.model.TimeSeries import TimeSeries
30 from SmartAPI.model.ValueObject import ValueObject
31
32
33 myIdentity = "http://adapt.asema.com/demos/python/datasource/"
34 myDeviceIdentity = myIdentity + "devices/Cdemodevice"
35 adaptServiceIdentity = "http://adapt.asema.com"
36 registrationServerUri = "http://find.smart-api.io/smart/v1.0e1.0/access"
37 registrationServerKeyUri = "http://find.smart-api.io/smart/v1.0e1.0/key"
38 #registrationServerUri = "http://192.168.2.96:8080/smartapifind-core/smart/v1.0e1.0/access"
39 #registrationServerKeyUri = "http://192.168.2.96:8080/smartapifind-core/smart/v1.0e1.0/key"
40
41 PORT = 8111
42 delay_between_sends_in_seconds = 5
43
44
45 class Notifier(Thread):
46         
47         def __init__(self, iface):
48                 Thread.__init__(self)
49                 self.daemon = True
50                 self.iface = iface
51                 self.running = True
52                 
53                 targetPath = iface.getScheme().asString() + "://" + iface.getHost().asString() + ":" + iface.getPort().asString() + iface.getPath().asString()
54                 print "Start sending notifications to", targetPath
55                 self.http_client = HttpClient(targetPath)
56                 
57         def run(self):
58                 while self.running:
59                         self.sendNotification()
60                         time.sleep(delay_between_sends_in_seconds)
61         
62         def stop(self):
63                 self.running = False
64                 
65         def sendNotification(self):
66                 n = NotificationFactory.create(myIdentity)
67                 a = Activity()
68                 a.setMethod(RESOURCE.NOTIFY)
69                 e = Entity(myDeviceIdentity)
70                 
71                 power = ValueObject(myIdentity + "service/Ppower")
72                 power.setQuantity(RESOURCE.POWER)
73                 power.setUnit(RESOURCE.AMPERE)
74                 power.setValue(float(random.randint(100, 700)))
75                 
76                 e.addValueObject(power)
77                 a.addEntity(e)
78                 n.setActivity(a)
79                 
80                 payload, contentType = Tools.serializeNotification(n)
81                 print payload
82                 self.http_client.send_data(payload)
83
84
85 class SampleRegistration(object):
86         
87         def __init__(self):
88                 pass
89
90         def run(self):
91                 agent = RegistrationAgent(myIdentity)
92                 agent.setServerAddress(registrationServerUri)
93                 agent.setServerKeyAddress(registrationServerKeyUri)
94                 agent.setDebugMode(True)
95
96                 # registrate
97                 try:
98                         org = Organization()
99                         org.setName("Asema Electronics Ltd")
100                         
101                         sampleService = Service(myIdentity)
102                         sampleService.setName("Asema Adapt demo data service")
103                         sampleService.setDescription("Demo code for a data source that is compatible with Adapt")
104                         sampleService.setCoordinates(latitude=60.180824, longitude=24.832116)
105                         sampleService.setOwner(org)
106         
107                         auth = Authorization()
108                         auth.addMethod(RESOURCE.COOKIE);
109                         auth.addMethod(RESOURCE.HTTPSTANDARD);
110         
111                         iface = InterfaceAddress()
112                         iface.setHost("127.0.0.1")
113                         iface.setPath("/test/")
114                         iface.setPort(PORT)
115                         iface.setScheme("http")
116
117                         read = Activity()
118                         read.setMethod(RESOURCE.READ)
119                         read.setInterface(iface)
120                         read.setAuthorization(auth)
121
122                         dataSource = Entity()
123                         dataSource.setIdentifierUri(myDeviceIdentity)
124                         dataSource.setName("Demo Adapt datasource")
125                         dataSource.addCapability(read)
126                         dataSource.setManagedBy(myIdentity)
127                         dataSource.setServedBy(adaptServiceIdentity)
128                         dataSource.setCoordinates(latitude=60.180824, longitude=24.832116)
129
130                         dataSource.addCapability(read)
131                         
132                         power = ValueObject(myIdentity + "service/Ppower")
133                         power.addType(RESOURCE.READABLE)
134                         power.setQuantity(RESOURCE.POWER)
135                         power.setUnit(RESOURCE.AMPERE)
136                         power.setMaximum(1000.0)
137                         power.setMinimum(0.0)
138                         power.setName("Power reading")
139                         power.setDescription("This is the power reading of the unit.");
140                         
141                         offering = Offering()
142                         priceSpec = PropertyDependentPriceSpecification()
143                         priceSpec.setName("Sample pricing")
144                         priceSpec.addType(RESOURCE.PRICETYPEDISCOUNT)
145                         
146                         c1 = Condition()
147                         c1.addGreater(0)
148                         c1.addAction(10)
149                         priceSpec.addCondition(c1)
150                         
151                         c2 = Condition()
152                         c2.addGreater(10)
153                         c2.addAction(20)
154                         priceSpec.addCondition(c2)
155                         
156                         c3 = Condition()
157                         c3.addGreater(30)
158                         c3.addAction(40)
159                         priceSpec.addCondition(c3)
160                         
161                         c4 = Condition()
162                         c4.addGreater(40)
163                         c4.addAction(55)
164                         priceSpec.addCondition(c4)
165                         
166                         offering.addPriceSpecification(priceSpec)
167                         dataSource.addOffering(offering)
168                         dataSource.addValueObject(power)
169                         
170                         agent.addEntity(sampleService)
171                         agent.addEntity(dataSource);
172
173                         r = agent.registrate()
174                         if r is None or r.hasErrors():
175                                 print "Demo data service registration FAILED"
176                                 return False
177                         return True
178                 
179                 except:
180                         print "Error in registration code:"
181                         traceback.print_exc()
182                         return False
183
184
185 class SampleDataService(BaseHTTPRequestHandler):
186         
187         def __init__(self, request, client_address, server):
188                 BaseHTTPRequestHandler.__init__(self, request, client_address, server)
189                 
190         # HANDLE GET REQUESTS
191         def do_GET(self):
192                 self.send_response(200)
193                 self.end_headers()
194                         
195                 print("GET " + self.path)
196                         
197                 req_path = self.path.rstrip('/')
198                         
199                 if req_path.endswith("/identify"):
200                         self.wfile.write(str.encode(myIdentity))
201                 elif req_path.endswith("/authorize"):
202                         response_activity = Activity()
203                         response = response_activity.toString()
204                         print("GET " + response)
205                         self.wfile.write(str.encode(response))
206                 else:
207                         print("unhandled request received!")
208         
209         # HANDLE POST REQUESTS
210         def do_POST(self):
211                 content_length = int(self.headers['Content-Length'])
212                 body = self.rfile.read(content_length)
213                 self.send_response(200)
214                 self.end_headers()
215                 response_bytes = BytesIO()
216                 
217                 print("POST" + self.path)
218                 print("POST DATA: " + body)
219                 
220                 req_path = self.path.rstrip('/')
221                 
222                 if req_path.endswith("/authorize"):
223                         # Reply with empty activity to authorize
224                         response_activity = Activity()
225                         response_text = response_activity.toString()
226                         print("POST AUTHORIZE" + response_text)
227                         response_bytes.write(response_text)
228                         
229                 elif req_path.endswith("/access"):
230                         request = Tools.parseRequest(body)
231                         
232                         for a in request.getActivities():
233                                 if a.method.asString() == RESOURCE.SUBSCRIBE:
234                                         print "Subscribing to notifications"
235                                         self.handleSubscription(a)
236                                         
237                                 elif a.method.asString() == RESOURCE.READ:
238                                         print "Reading data"
239                                         
240                                         # Handle timeseries request
241                                         if a.hasTemporalContext():
242                                                 response_bytes.write(self.handleTemporalContext(a, request))
243                                         else:
244                                                 print("unhandled request received!")
245                 
246                 else:
247                         print("unhandled request received!")
248                 
249                 # Write output to client
250                 self.wfile.write(response_bytes.getvalue())
251         
252         def handleSubscription(self, activity):
253                 if activity.hasInterface():
254                         iface = activity.getInterfaces()[0]
255                         print "Will send data to", iface.getHost().asString()
256                         n = Notifier(iface)
257                         n.run()
258                 
259                 else:
260                         print "Error: cannot subscribe without inteface data"
261         
262         def handleTemporalContext(self, activity, request):
263                 print("Timeseries request: ")
264                 print("start")
265                 print(activity.temporalContext.start)
266                 print("end")
267                 print(activity.temporalContext.end)
268                 
269                 # For the client to recognize from which activity this result is, the response
270                 # activity will carry the same identifier that was in the request
271                 response = ResponseFactory.create(myIdentity, request)
272                 response_activity = Activity(activity.getIdentifierUri())
273                 
274                 """
275                 The way results are obtained is up to the server. This could
276                 for instance be a result from a database query,
277                 """
278                 results = [
279                         {'time': '2019-06-10T15:18:54.842', 'power': 25},
280                         {'time': '2019-06-01T02:11:54.842', 'power': 21},
281                         {'time': '2019-05-25T11:11:54.842', 'power': 32}]
282                 
283                 time_series = TimeSeries()
284                 
285                 # Add data to response timeseries
286                 for r in results:
287                         value_object = ValueObject()
288                         value_object.setInstant(r["time"])
289                         value_object.setValue(r["power"])
290                         
291                         time_series.addListItem(value_object)
292                         
293                         # Add timeseries to respose activity
294                         response_activity.addTimeSerie(time_series)
295                         
296                         response.addActivity(response_activity)
297                         
298                         print("Reply:")
299                         print(Tools.serializeResponse(response)[0])
300                         return Tools.serializeResponse(response)[0]
301
302
303 def main():
304         registrator = SampleRegistration()
305         registrator.run()
306         
307         httpd = HTTPServer(('', PORT), SampleDataService)
308         httpd.serve_forever()
309
310
311 if __name__ == '__main__':
312         main()
313