API Requests Examples
Understand how to execute an API call with the examples outlined in this section.
We will provide examples of one off API calls, so they authenticate every time. However, if you are running the following in a loop as part of a scheduled script, you don’t need to authenticate every time and you should cache and use a session ID.
If you try to authenticate more than 10 times in one minute, you will receive an error message.
Simple API call using Python
import json
import mygeotab
api = mygeotab.API(username=<your username>, password=<your password>, database=<your database>, server='altitudeserver.geotab.com')
api.authenticate()
request = {
"method":"GetAltitudeData",
"params": {
"serviceName": "dna-altitude-general",
"functionName": "getData",
"functionParameters":{
"queryType": "getCityBoundaries",
"zones": [<your zones>]
}
}
}
results = api.call(method=request['method'], **request['params'])Or more explicitly:results = api.call(method="GetAltitudeData",
serviceName="dna-altitude-general",
functionName= "getData",
functionParameters={
"queryType": "getCityBoundaries",
"zones": [<your zones>]
}
)Special Case API call using Python
The following notebook is a Python example of how to make an API request in 3 steps, where create_job, wait_for_job_to_complete and get_data are just functions that make the API requests outlined in steps 1, 2, 3 above. Altitude Python API notebook
Special Case API call using Javascript
createQueryJob, waitForBigQueryJobToComplete and fetchBigQueryData are just functions that make the API requests outlined in steps 1, 2, 3 above.// install packages as required
// npm install mg-api-js
// mygeotab js documentation - https://github.com/Geotab/mg-api-js
const GeotabApi = require('mg-api-js');
const authentication = {
credentials: {
database: "<your database>",
userName: "your userName>",
password: "<your password>"
},
path: "altitudeserver.geotab.com"
};
const api = new GeotabApi(authentication);
function getMyGeotabBQData(serviceName, functionName, functionParameters) {
return new Promise(function (resolve, reject) {
api.call("GetAltitudeData", {
serviceName,
functionName,
functionParameters,
})
.then((result) => {
resolve(result);
})
.catch((error) => {
reject(error);
});
});
}
async function createQueryJob(functionParameters) {
try {
const { apiResult, errors } = await getMyGeotabBQData(
"dna-altitude-general",
"createQueryJob",
functionParameters
);
if (errors && errors.length) {
return [];
}
if (apiResult.errors && apiResult.errors.length) {
return apiResult.errors;
}
return apiResult.results[0]; //jobId will be at apiResult.results[0].id
} catch (err) {
return { error: err };
}
}
async function waitForBigQueryJobToComplete(functionParameters) {
try {
const { apiResult, errors } = await getMyGeotabBQData(
"dna-altitude-general",
"getJobStatus",
functionParameters
);
if (errors && errors.length) {
console.error(errors);
return [];
}
const job = apiResult.results[0];
if (
job &&
job.status &&
job.status.state !== "DONE"
) {
await sleep(5000); //poll every 5 seconds to see if job is done
return await waitForBigQueryJobToComplete(bqParams);
}
return job;
} catch (err) {
return { error: err };
}
}
const sleep = async (time) => await new Promise((r) => setTimeout(r, time));
async function* fetchBigQueryData(functionParameters) {
let index = 1;
while(index > 0){
try {
const { apiResult } = await getMyGeotabBQData(
"dna-altitude-general",
"getQueryResults",
functionParameters
);
const { error, rows, pageToken, totalRows } = apiResult.results[0];
if (error && error.length) {
throw new Error(error);
}
functionParameters["pageToken"] = pageToken;
yield { data: [rows, totalRows, index] };
index++;
if (!pageToken) {
break;
}
} catch (err) {
yield { error: err, data: [] };
}
}
}
// How to call the above functions in 3 parts:
const job = await createQueryJob(functionParameters); //create the job
const results = await waitForBigQueryJobToComplete({
...functionParameters,
jobId: job.id,
}); //wait for the job to finish by polling to see if it's done
const jobIterator = fetchBigQueryData({
...functionParameters,
resultsLimit: 50000,
jobId: job.id,
}); //get the results once done
//loop through the results one page at a time and append to results array
let data = [];
for await (const {
data: [page],
error: error,
} of jobIterator) {
if (error) {
return;
}
data = [].concat.apply([], [results, page]);
}
console.log(data);
Generating Shapefiles using API results with Python
Below is an example of generating a Shapefile based on API results. You can call any API (special case or regular) that returns geography to generate a Shapefile. The examples below leverage the getCityBoundaries and getRoadSegments APIs.
import shutil
import mygeotab
import pandas as pd
import geopandas as gpd
from shapely.geometry import shape
import warnings
warnings.filterwarnings('ignore')
# Call API
# ----------------------------------------------------------------------------------------
API_params = {
'username': '<username>',
'password': '<password>',
'database': '<database>',
'server': '<server>'
}
api = mygeotab.API(**API_params)
api.authenticate()
request = {
"method":"GetAltitudeData",
"params": {
"serviceName": "dna-altitude-general",
"functionName": "getData",
"functionParameters":{
"queryType": "getCityBoundaries",
"zones": [<your zones>]
}
}
}
results = api.call(method=request['method'], **request['params'])['apiResult']['results']
shapefile_name = '<shapefile_name>'
# Generate ShapeFile zip
# ----------------------------------------------------------------------------------------
df_results = pd.DataFrame(results)
df_results['Bounds'] = df_results.apply(lambda x: shape(eval(x['Bounds'])), axis=1)
df_results = gpd.GeoDataFrame(df_results[['CentreLongitude', 'CentreLatitude', 'MinLongitude', 'MinLatitude', 'MaxLongitude', 'MaxLatitude']], geometry=gpd.GeoSeries(df_results['Bounds'].values))
_ = df_results.to_file(filename=shapefile_name, driver='ESRI Shapefile')
_ = shutil.make_archive(shapefile_name, 'zip', shapefile_name)getRoadSegments API Example (special case API with geography output)
import json
import time
import shutil
import mygeotab
import pandas as pd
import geopandas as gpd
from shapely.geometry import shape
import warnings
warnings.filterwarnings('ignore')
class API():
def __init__(self, params):
self.username = params['username']
self.password = params['password']
self.database = params['database']
self.server = params['server']
self.jobId = None
_ = self._authenticate()
def _authenticate(self):
self.client = mygeotab.API(username=self.username, password=self.password, database=self.database, server=self.server)
def get_data(self, service_name: str, function_name: str, function_parameters: dict) -> dict:
results = self.client.call("GetAltitudeData",
serviceName=service_name,
functionName=function_name,
functionParameters=function_parameters)
return results
def create_job(self, params: dict) -> dict:
results = self.get_data(service_name="dna-altitude-general",
function_name="createQueryJob",
function_parameters=params)
self.jobId = results["apiResult"]["results"][0]["id"]
return results["apiResult"]["results"][0]
def wait_for_job_to_complete(self, params: dict) -> dict:
results = self.get_data(service_name="dna-altitude-general",
function_name="getJobStatus",
function_parameters=params
)['apiResult']
results = results["results"][0]
if results and results["status"] and results["status"]["state"] != "DONE":
time.sleep(5)
return self.wait_for_job_to_complete(params)
return results
def fetch_data(self, params: dict) -> dict:
index = 1
while index is not None:
results = self.get_data(service_name="dna-altitude-general",
function_name="getQueryResults",
function_parameters=params
)['apiResult']
error = results["results"][0].get("error", None)
rows = results["results"][0].get("rows", None)
page_token = results["results"][0].get("pageToken", None)
total_rows = results["results"][0].get("totalRows", None)
params["pageToken"] = page_token
yield {"data": [rows, total_rows, index], "error": error}
index += 1
if not page_token:
index = None
yield
example_params = {
"queryType": "getRoadSegments",
"zones": [<your zones>],
"asOfDate": "2023-03-31",
"excludeServiceRoads": True,
"roadTypes": ["motorway", "trunk", "primary", "secondary", "tertiary"],
"resultsLimit": 50000
}
client_params = {'username': '<username>',
'password': '<password>',
'database': '<database>',
'server': '<server>'
}
api = API(client_params)
job = api.create_job(example_params)
example_params["jobId"] = api.jobId
results = api.wait_for_job_to_complete(example_params)
results_iterator = api.fetch_data(example_params)
data = []
for data_page in results_iterator:
page = [] if data_page is None else data_page["data"][0]
data.extend(page)
df = pd.DataFrame(data)
df = df.astype(str)
df['Geography'] = df.apply(lambda x: shape(eval(x['Geography'])), axis=1)
gpd_df = gpd.GeoDataFrame(df[[x for x in df.columns if x != 'Geography']], geometry=gpd.GeoSeries(df['Geography'].values))
gpd_df.to_file(filename='ShapeFile', driver='ESRI Shapefile')
_ = shutil.make_archive('test_file', 'zip', 'ShapeFile')
How to Retrieve Time Series Results
For APIs with a "generateTimeSeries" parameter, retrieve time series results by making a separate “getSavedResults” call from the standard results call. Each getSavedResults call creates its own job with a unique jobId, and you must track and use separate jobIds for standard vs. time series results. To get the time series results, call “getSavedResults” and pass the job id of the parent analysis into the “analysisJobId” parameter and the queryType of the parent plus “TimeSeries” to the “analysisType” parameter.
This API call will only retrieve the time series results. In order to retrieve the results of the getOriginDestinationMatrix call, follow the process in the getOriginDestinationMatrix call.
For example, let’s say you run a “getOriginDestinationMatrix” analysis with generateTimeSeries set to true and jobId 1234. In order to get the time series results, you would call the “getSavedResults” API with the following function parameters:
"functionParameters": {
"queryType": "getSavedResults",
"jobId": "<to be determined>",
"analysisJobId": "1234",
"analysisType":"getOriginDestinationMatrixTimeSeries"
}