API Requests Examples
Understand how to execute an API call with the examples outlined in this section.
We will provide examples of one off API calls, so they authenticate every time. However, if you are running the following in a loop as part of a scheduled script, you don't need to authenticate every time and you should cache and use a session ID.
Note:
If you try to authenticate more than 10 times in one minute, you will receive an error message.
API call using Python
This is an example of a Python API request using the recommended three-step workflow: create the job, poll until complete, then retrieve the results. You can use the MyGeotab Python wrapper to create the API object. For a notebook-based walkthrough, see the Altitude Python API notebook.
import mygeotab
import time
api = mygeotab.API(username=<your username>, password=<your password>, database=<your database>, server='altitudeserver.geotab.com')
api.authenticate()
# Step 1 — create the job
job = api.call("GetAltitudeData",
serviceName="dna-altitude-general",
functionName="createQueryJob",
functionParameters={
"queryType": "getRoadSegments",
"zones": [<your zones>],
"asOfDate": "2025-01-01"
}
)
job_id = job["apiResult"]["results"][0]["id"]
# Step 2 — poll until complete
while True:
status = api.call("GetAltitudeData",
serviceName="dna-altitude-general",
functionName="getJobStatus",
functionParameters={"jobId": job_id}
)
if status["apiResult"]["results"][0]["status"]["state"] == "DONE":
break
time.sleep(5)
# Step 3 — retrieve results
results = api.call("GetAltitudeData",
serviceName="dna-altitude-general",
functionName="getQueryResults",
functionParameters={"jobId": job_id}
)Special Case API call using Javascript
The following is a JavaScript example of how to make an API request in 3 steps, where
createQueryJob, waitForBigQueryJobToComplete and fetchBigQueryData are just functions that make the API requests outlined in steps 1, 2, 3 above.// install packages as required
// npm install mg-api-js
// mygeotab js documentation - https://github.com/Geotab/mg-api-js
const GeotabApi = require('mg-api-js');
const authentication = {
credentials: {
database: "<your database>",
userName: "your userName>",
password: "<your password>"
},
path: "altitudeserver.geotab.com"
};
const api = new GeotabApi(authentication);
function getMyGeotabBQData(serviceName, functionName, functionParameters) {
return new Promise(function (resolve, reject) {
api.call("GetAltitudeData", {
serviceName,
functionName,
functionParameters,
})
.then((result) => {
resolve(result);
})
.catch((error) => {
reject(error);
});
});
}
async function createQueryJob(functionParameters) {
try {
const { apiResult, errors } = await getMyGeotabBQData(
"dna-altitude-general",
"createQueryJob",
functionParameters
);
if (errors && errors.length) {
return [];
}
if (apiResult.errors && apiResult.errors.length) {
return apiResult.errors;
}
return apiResult.results[0]; //jobId will be at apiResult.results[0].id
} catch (err) {
return { error: err };
}
}
async function waitForBigQueryJobToComplete(functionParameters) {
try {
const { apiResult, errors } = await getMyGeotabBQData(
"dna-altitude-general",
"getJobStatus",
functionParameters
);
if (errors && errors.length) {
console.error(errors);
return [];
}
const job = apiResult.results[0];
if (
job &&
job.status &&
job.status.state !== "DONE"
) {
await sleep(5000); //poll every 5 seconds to see if job is done
return await waitForBigQueryJobToComplete(bqParams);
}
return job;
} catch (err) {
return { error: err };
}
}
const sleep = async (time) => await new Promise((r) => setTimeout(r, time));
async function* fetchBigQueryData(functionParameters) {
let index = 1;
while(index > 0){
try {
const { apiResult } = await getMyGeotabBQData(
"dna-altitude-general",
"getQueryResults",
functionParameters
);
const { error, rows, pageToken, totalRows } = apiResult.results[0];
if (error && error.length) {
throw new Error(error);
}
functionParameters["pageToken"] = pageToken;
yield { data: [rows, totalRows, index] };
index++;
if (!pageToken) {
break;
}
} catch (err) {
yield { error: err, data: [] };
}
}
}
// How to call the above functions in 3 parts:
const job = await createQueryJob(functionParameters); //create the job
const results = await waitForBigQueryJobToComplete({
...functionParameters,
jobId: job.id,
}); //wait for the job to finish by polling to see if it's done
const jobIterator = fetchBigQueryData({
...functionParameters,
resultsLimit: 50000,
jobId: job.id,
}); //get the results once done
//loop through the results one page at a time and append to results array
let data = [];
for await (const {
data: [page],
error: error,
} of jobIterator) {
if (error) {
return;
}
data = [].concat.apply([], [results, page]);
}
console.log(data);
Generating Shapefiles using API results with Python
Below is an example of generating a Shapefile based on API results. You can call any API that returns geography to generate a Shapefile. The example below uses the getRoadSegments API.
getRoadSegments API Example (special case API with geography output)
import json
import time
import shutil
import mygeotab
import pandas as pd
import geopandas as gpd
from shapely.geometry import shape
import warnings
warnings.filterwarnings('ignore')
class API():
def __init__(self, params):
self.username = params['username']
self.password = params['password']
self.database = params['database']
self.server = params['server']
self.jobId = None
_ = self._authenticate()
def _authenticate(self):
self.client = mygeotab.API(username=self.username, password=self.password, database=self.database, server=self.server)
def get_data(self, service_name: str, function_name: str, function_parameters: dict) -> dict:
results = self.client.call("GetAltitudeData",
serviceName=service_name,
functionName=function_name,
functionParameters=function_parameters)
return results
def create_job(self, params: dict) -> dict:
results = self.get_data(service_name="dna-altitude-general",
function_name="createQueryJob",
function_parameters=params)
self.jobId = results["apiResult"]["results"][0]["id"]
return results["apiResult"]["results"][0]
def wait_for_job_to_complete(self, params: dict) -> dict:
results = self.get_data(service_name="dna-altitude-general",
function_name="getJobStatus",
function_parameters=params
)['apiResult']
results = results["results"][0]
if results and results["status"] and results["status"]["state"] != "DONE":
time.sleep(5)
return self.wait_for_job_to_complete(params)
return results
def fetch_data(self, params: dict) -> dict:
index = 1
while index is not None:
results = self.get_data(service_name="dna-altitude-general",
function_name="getQueryResults",
function_parameters=params
)['apiResult']
error = results["results"][0].get("error", None)
rows = results["results"][0].get("rows", None)
page_token = results["results"][0].get("pageToken", None)
total_rows = results["results"][0].get("totalRows", None)
params["pageToken"] = page_token
yield {"data": [rows, total_rows, index], "error": error}
index += 1
if not page_token:
index = None
yield
example_params = {
"queryType": "getRoadSegments",
"zones": [<your zones>],
"asOfDate": "2023-03-31",
"excludeServiceRoads": True,
"roadTypes": ["motorway", "trunk", "primary", "secondary", "tertiary"],
"resultsLimit": 50000
}
client_params = {'username': '<username>',
'password': '<password>',
'database': '<database>',
'server': '<server>'
}
api = API(client_params)
job = api.create_job(example_params)
example_params["jobId"] = api.jobId
results = api.wait_for_job_to_complete(example_params)
results_iterator = api.fetch_data(example_params)
data = []
for data_page in results_iterator:
page = [] if data_page is None else data_page["data"][0]
data.extend(page)
df = pd.DataFrame(data)
df = df.astype(str)
df['Geography'] = df.apply(lambda x: shape(eval(x['Geography'])), axis=1)
gpd_df = gpd.GeoDataFrame(df[[x for x in df.columns if x != 'Geography']], geometry=gpd.GeoSeries(df['Geography'].values))
gpd_df.to_file(filename='ShapeFile', driver='ESRI Shapefile')
_ = shutil.make_archive('test_file', 'zip', 'ShapeFile')